648540858
2024-04-23 d41d6b34af2485198ed01e1888db1571e4da1a6a
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java
@@ -1,8 +1,6 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.genersoft.iot.vmp.conf.CivilCodeFileConf;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
@@ -19,16 +17,16 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;
import javax.sip.RequestEvent;
import javax.sip.header.FromHeader;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
/**
 * SIP命令类型: NOTIFY请求中的移动位置请求处理
@@ -38,10 +36,6 @@
    private final static Logger logger = LoggerFactory.getLogger(NotifyRequestForMobilePositionProcessor.class);
   private final Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>();
   private final List<MobilePosition> addMobilePositionList = new CopyOnWriteArrayList();
   @Autowired
@@ -56,180 +50,150 @@
   @Autowired
   private IDeviceChannelService deviceChannelService;
   @Autowired
   private DynamicTask dynamicTask;
   @Autowired
   private CivilCodeFileConf civilCodeFileConf;
   @Autowired
   private SipConfig sipConfig;
   private final static String talkKey = "notify-request-for-mobile-position-task";
//   @Async("taskExecutor")
   public void process(RequestEvent evt) {
      try {
         FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
         String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
         long startTime = System.currentTimeMillis();
         // 回复 200 OK
         Element rootElement = getRootElement(evt);
         if (rootElement == null) {
            logger.error("处理MobilePosition移动位置Notify时未获取到消息体,{}", evt.getRequest());
            return;
         }
         Device device = redisCatchStorage.getDevice(deviceId);
         MobilePosition mobilePosition = new MobilePosition();
         mobilePosition.setCreateTime(DateUtil.getNow());
         List<Element> elements = rootElement.elements();
         String channelId = null;
         for (Element element : elements) {
            switch (element.getName()){
               case "DeviceID":
                  channelId = element.getStringValue();
                  if (device == null) {
                     device = redisCatchStorage.getDevice(channelId);
                     if (device == null) {
                        // 根据通道id查询设备Id
                        List<Device> deviceList = deviceChannelService.getDeviceByChannelId(channelId);
                        if (!deviceList.isEmpty()) {
                           device = deviceList.get(0);
                        }
                     }
                  }
                  if (device == null) {
                     logger.warn("[mobilePosition移动位置Notify] 未找到通道{}所属的设备", channelId);
                     return;
                  }
                  mobilePosition.setDeviceId(device.getDeviceId());
                  mobilePosition.setChannelId(channelId);
                  // 兼容设备部分设备上报是通道编号与设备编号一致的情况
                  if (deviceId.equals(channelId)) {
                     List<DeviceChannel> deviceChannels = deviceChannelService.queryChaneListByDeviceId(deviceId);
                     if (deviceChannels.size() == 1) {
                        channelId = deviceChannels.get(0).getChannelId();
                     }
                  }
                  if (!ObjectUtils.isEmpty(device.getName())) {
                     mobilePosition.setDeviceName(device.getName());
                  }
                  mobilePosition.setDeviceId(device.getDeviceId());
                  mobilePosition.setChannelId(channelId);
                  continue;
               case "Time":
                  String timeVal = element.getStringValue();
                  if (ObjectUtils.isEmpty(timeVal)) {
                     mobilePosition.setTime(DateUtil.getNow());
                  } else {
                     mobilePosition.setTime(SipUtils.parseTime(timeVal));
                  }
                  continue;
               case "Longitude":
                  mobilePosition.setLongitude(Double.parseDouble(element.getStringValue()));
                  continue;
               case "Latitude":
                  mobilePosition.setLatitude(Double.parseDouble(element.getStringValue()));
                  continue;
               case "Speed":
                  String speedVal = element.getStringValue();
                  if (NumericUtil.isDouble(speedVal)) {
                     mobilePosition.setSpeed(Double.parseDouble(speedVal));
                  } else {
                     mobilePosition.setSpeed(0.0);
                  }
                  continue;
               case "Direction":
                  String directionVal = element.getStringValue();
                  if (NumericUtil.isDouble(directionVal)) {
                     mobilePosition.setDirection(Double.parseDouble(directionVal));
                  } else {
                     mobilePosition.setDirection(0.0);
                  }
                  continue;
               case "Altitude":
                  String altitudeVal = element.getStringValue();
                  if (NumericUtil.isDouble(altitudeVal)) {
                     mobilePosition.setAltitude(Double.parseDouble(altitudeVal));
                  } else {
                     mobilePosition.setAltitude(0.0);
                  }
                  continue;
   @Transactional
   public void process(List<RequestEvent> eventList) {
      if (eventList.isEmpty()) {
         return;
      }
      Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>();
      List<MobilePosition> addMobilePositionList = new ArrayList<>();
      for (RequestEvent evt : eventList) {
         try {
            FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
            String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
            long startTime = System.currentTimeMillis();
            // 回复 200 OK
            Element rootElement = getRootElement(evt);
            if (rootElement == null) {
               logger.error("处理MobilePosition移动位置Notify时未获取到消息体,{}", evt.getRequest());
               return;
            }
         }
            Device device = redisCatchStorage.getDevice(deviceId);
            if (device == null) {
               logger.error("处理MobilePosition移动位置Notify时未获取到device,{}", deviceId);
               return;
            }
            MobilePosition mobilePosition = new MobilePosition();
            mobilePosition.setDeviceId(device.getDeviceId());
            mobilePosition.setDeviceName(device.getName());
            mobilePosition.setCreateTime(DateUtil.getNow());
            List<Element> elements = rootElement.elements();
            for (Element element : elements) {
               switch (element.getName()){
                  case "DeviceID":
                     String channelId = element.getStringValue();
                     if (!deviceId.equals(channelId)) {
                        mobilePosition.setChannelId(channelId);
                     }
                     continue;
                  case "Time":
                     String timeVal = element.getStringValue();
                     if (ObjectUtils.isEmpty(timeVal)) {
                        mobilePosition.setTime(DateUtil.getNow());
                     } else {
                        mobilePosition.setTime(SipUtils.parseTime(timeVal));
                     }
                     continue;
                  case "Longitude":
                     mobilePosition.setLongitude(Double.parseDouble(element.getStringValue()));
                     continue;
                  case "Latitude":
                     mobilePosition.setLatitude(Double.parseDouble(element.getStringValue()));
                     continue;
                  case "Speed":
                     String speedVal = element.getStringValue();
                     if (NumericUtil.isDouble(speedVal)) {
                        mobilePosition.setSpeed(Double.parseDouble(speedVal));
                     } else {
                        mobilePosition.setSpeed(0.0);
                     }
                     continue;
                  case "Direction":
                     String directionVal = element.getStringValue();
                     if (NumericUtil.isDouble(directionVal)) {
                        mobilePosition.setDirection(Double.parseDouble(directionVal));
                     } else {
                        mobilePosition.setDirection(0.0);
                     }
                     continue;
                  case "Altitude":
                     String altitudeVal = element.getStringValue();
                     if (NumericUtil.isDouble(altitudeVal)) {
                        mobilePosition.setAltitude(Double.parseDouble(altitudeVal));
                     } else {
                        mobilePosition.setAltitude(0.0);
                     }
                     continue;
               }
            }
//         logger.info("[收到移动位置订阅通知]:{}/{}->{}.{}, 时间: {}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
//               mobilePosition.getLongitude(), mobilePosition.getLatitude(), System.currentTimeMillis() - startTime);
         mobilePosition.setReportSource("Mobile Position");
            mobilePosition.setReportSource("Mobile Position");
         // 更新device channel 的经纬度
         DeviceChannel deviceChannel = new DeviceChannel();
         deviceChannel.setDeviceId(device.getDeviceId());
         deviceChannel.setChannelId(channelId);
         deviceChannel.setLongitude(mobilePosition.getLongitude());
         deviceChannel.setLatitude(mobilePosition.getLatitude());
         deviceChannel.setGpsTime(mobilePosition.getTime());
         updateChannelMap.put(deviceId + channelId, deviceChannel);
         addMobilePositionList.add(mobilePosition);
         if(updateChannelMap.size() > 100) {
            executeSaveChannel();
         }
         if (userSetting.isSavePositionHistory()) {
            if(addMobilePositionList.size() > 100) {
               executeSaveMobilePosition();
            // 更新device channel 的经纬度
            DeviceChannel deviceChannel = new DeviceChannel();
            deviceChannel.setDeviceId(device.getDeviceId());
            deviceChannel.setLongitude(mobilePosition.getLongitude());
            deviceChannel.setLatitude(mobilePosition.getLatitude());
            deviceChannel.setGpsTime(mobilePosition.getTime());
            updateChannelMap.put(deviceId + mobilePosition.getChannelId(), deviceChannel);
            addMobilePositionList.add(mobilePosition);
            // 向关联了该通道并且开启移动位置订阅的上级平台发送移动位置订阅消息
            try {
               eventPublisher.mobilePositionEventPublish(mobilePosition);
            }catch (Exception e) {
               logger.error("[向上级转发移动位置失败] ", e);
            }
            if (mobilePosition.getChannelId().equals(mobilePosition.getDeviceId()) || mobilePosition.getChannelId() == null) {
               List<DeviceChannel> channels = deviceChannelService.queryChaneListByDeviceId(mobilePosition.getDeviceId());
               channels.forEach(channel -> {
                  // 发送redis消息。 通知位置信息的变化
                  JSONObject jsonObject = new JSONObject();
                  jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime()));
                  jsonObject.put("serial", channel.getDeviceId());
                  jsonObject.put("code", channel.getChannelId());
                  jsonObject.put("longitude", mobilePosition.getLongitude());
                  jsonObject.put("latitude", mobilePosition.getLatitude());
                  jsonObject.put("altitude", mobilePosition.getAltitude());
                  jsonObject.put("direction", mobilePosition.getDirection());
                  jsonObject.put("speed", mobilePosition.getSpeed());
                  redisCatchStorage.sendMobilePositionMsg(jsonObject);
               });
            }else {
               // 发送redis消息。 通知位置信息的变化
               JSONObject jsonObject = new JSONObject();
               jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime()));
               jsonObject.put("serial", mobilePosition.getDeviceId());
               jsonObject.put("code", mobilePosition.getChannelId());
               jsonObject.put("longitude", mobilePosition.getLongitude());
               jsonObject.put("latitude", mobilePosition.getLatitude());
               jsonObject.put("altitude", mobilePosition.getAltitude());
               jsonObject.put("direction", mobilePosition.getDirection());
               jsonObject.put("speed", mobilePosition.getSpeed());
               redisCatchStorage.sendMobilePositionMsg(jsonObject);
            }
         } catch (DocumentException e) {
            logger.error("未处理的异常 ", e);
         }
//         deviceChannel = deviceChannelService.updateGps(deviceChannel, device);
//
//         mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84());
//         mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84());
//         mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
//         mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
//         deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
         if (!dynamicTask.contains(talkKey)) {
            dynamicTask.startDelay(talkKey, this::executeSave, 3000);
         }
      } catch (DocumentException e) {
         logger.error("未处理的异常 ", e);
      }
   }
   private void executeSave(){
      executeSaveChannel();
      executeSaveMobilePosition();
      dynamicTask.stop(talkKey);
   }
   @Async("taskExecutor")
   public void executeSaveChannel(){
      dynamicTask.execute();
      try {
         logger.info("[移动位置订阅]更新通道位置: {}", updateChannelMap.size());
//         ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values());
//         deviceChannelService.batchUpdateChannelGPS(deviceChannels);
      if(!updateChannelMap.isEmpty()) {
         List<DeviceChannel>  channels = new ArrayList<>(updateChannelMap.values());
         logger.info("[移动位置订阅]更新通道位置: {}", channels.size());
         deviceChannelService.batchUpdateChannelGPS(channels);
         updateChannelMap.clear();
      }catch (Exception e) {
      }
   }
   @Async("taskExecutor")
   public void executeSaveMobilePosition(){
      if (userSetting.isSavePositionHistory()) {
      if (userSetting.isSavePositionHistory() && !addMobilePositionList.isEmpty()) {
         try {
//            logger.info("[移动位置订阅] 添加通道轨迹点位: {}", addMobilePositionList.size());
//            deviceChannelService.batchAddMobilePosition(addMobilePositionList);
            addMobilePositionList.clear();
            logger.info("[移动位置订阅] 添加通道轨迹点位: {}", addMobilePositionList.size());
            deviceChannelService.batchAddMobilePosition(addMobilePositionList);
         }catch (Exception e) {
            logger.info("[移动位置订阅] b添加通道轨迹点位保存失败: {}", addMobilePositionList.size());
         }
         addMobilePositionList.clear();
      }
   }
}