648540858
2024-04-22 8cba63642fcff122907bd7d7a8d7d822555d34ca
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java
@@ -11,7 +11,6 @@
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
import com.genersoft.iot.vmp.service.IDeviceChannelService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
@@ -20,12 +19,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import javax.sip.RequestEvent;
import javax.sip.header.FromHeader;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -68,78 +67,100 @@
   private final static String talkKey = "notify-request-for-mobile-position-task";
//   @Async("taskExecutor")
   public void process(RequestEvent evt) {
      try {
         FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
         String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
         long startTime = System.currentTimeMillis();
         // 回复 200 OK
         Element rootElement = getRootElement(evt);
         if (rootElement == null) {
            logger.error("处理MobilePosition移动位置Notify时未获取到消息体,{}", evt.getRequest());
            return;
         }
         Device device = redisCatchStorage.getDevice(deviceId);
         MobilePosition mobilePosition = new MobilePosition();
         mobilePosition.setCreateTime(DateUtil.getNow());
         List<Element> elements = rootElement.elements();
         String channelId = null;
         for (Element element : elements) {
            switch (element.getName()){
               case "DeviceID":
                  channelId = element.getStringValue();
                  if (device == null) {
                     device = redisCatchStorage.getDevice(channelId);
                     if (device == null) {
                        // 根据通道id查询设备Id
                        List<Device> deviceList = deviceChannelService.getDeviceByChannelId(channelId);
                        if (!deviceList.isEmpty()) {
                           device = deviceList.get(0);
                        }
                     }
                  }
                  if (device == null) {
                     logger.warn("[mobilePosition移动位置Notify] 未找到通道{}所属的设备", channelId);
                     return;
                  }
                  mobilePosition.setDeviceId(device.getDeviceId());
                  mobilePosition.setChannelId(channelId);
                  // 兼容设备部分设备上报是通道编号与设备编号一致的情况
                  if (deviceId.equals(channelId)) {
                     List<DeviceChannel> deviceChannels = deviceChannelService.queryChaneListByDeviceId(deviceId);
                     if (deviceChannels.size() == 1) {
                        channelId = deviceChannels.get(0).getChannelId();
                     }
                  }
                  if (!ObjectUtils.isEmpty(device.getName())) {
                     mobilePosition.setDeviceName(device.getName());
                  }
                  mobilePosition.setDeviceId(device.getDeviceId());
                  mobilePosition.setChannelId(channelId);
                  continue;
               case "Time":
                  String timeVal = element.getStringValue();
                  if (ObjectUtils.isEmpty(timeVal)) {
                     mobilePosition.setTime(DateUtil.getNow());
                  } else {
                     mobilePosition.setTime(SipUtils.parseTime(timeVal));
                  }
                  continue;
               case "Longitude":
                  mobilePosition.setLongitude(Double.parseDouble(element.getStringValue()));
                  continue;
               case "Latitude":
                  mobilePosition.setLatitude(Double.parseDouble(element.getStringValue()));
                  continue;
               case "Speed":
                  String speedVal = element.getStringValue();
                  if (NumericUtil.isDouble(speedVal)) {
                     mobilePosition.setSpeed(Double.parseDouble(speedVal));
                  } else {
                     mobilePosition.setSpeed(0.0);
                  }
                  continue;
               case "Direction":
                  String directionVal = element.getStringValue();
                  if (NumericUtil.isDouble(directionVal)) {
                     mobilePosition.setDirection(Double.parseDouble(directionVal));
                  } else {
                     mobilePosition.setDirection(0.0);
                  }
                  continue;
               case "Altitude":
                  String altitudeVal = element.getStringValue();
                  if (NumericUtil.isDouble(altitudeVal)) {
                     mobilePosition.setAltitude(Double.parseDouble(altitudeVal));
                  } else {
                     mobilePosition.setAltitude(0.0);
                  }
                  continue;
         Element deviceIdElement = rootElement.element("DeviceID");
         String channelId = deviceIdElement.getTextTrim().toString();
         Device device = redisCatchStorage.getDevice(deviceId);
         if (device == null) {
            device = redisCatchStorage.getDevice(channelId);
            if (device == null) {
               // 根据通道id查询设备Id
               List<Device> deviceList = deviceChannelService.getDeviceByChannelId(channelId);
               if (deviceList.size() > 0) {
                  device = deviceList.get(0);
               }
            }
         }
         if (device == null) {
            logger.warn("[mobilePosition移动位置Notify] 未找到通道{}所属的设备", channelId);
            return;
         }
         // 兼容设备部分设备上报是通道编号与设备编号一致的情况
         if (deviceId.equals(channelId)) {
            List<DeviceChannel> deviceChannels = deviceChannelService.queryChaneListByDeviceId(deviceId);
            if (deviceChannels.size() == 1) {
               channelId = deviceChannels.get(0).getChannelId();
            }
         }
         if (!ObjectUtils.isEmpty(device.getName())) {
            mobilePosition.setDeviceName(device.getName());
         }
         mobilePosition.setDeviceId(device.getDeviceId());
         mobilePosition.setChannelId(channelId);
         String time = XmlUtil.getText(rootElement, "Time");
         if (ObjectUtils.isEmpty(time)) {
            mobilePosition.setTime(DateUtil.getNow());
         } else {
            mobilePosition.setTime(SipUtils.parseTime(time));
         }
         mobilePosition.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude")));
         mobilePosition.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude")));
         if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Speed"))) {
            mobilePosition.setSpeed(Double.parseDouble(XmlUtil.getText(rootElement, "Speed")));
         } else {
            mobilePosition.setSpeed(0.0);
         }
         if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Direction"))) {
            mobilePosition.setDirection(Double.parseDouble(XmlUtil.getText(rootElement, "Direction")));
         } else {
            mobilePosition.setDirection(0.0);
         }
         if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Altitude"))) {
            mobilePosition.setAltitude(Double.parseDouble(XmlUtil.getText(rootElement, "Altitude")));
         } else {
            mobilePosition.setAltitude(0.0);
         }
         logger.info("[收到移动位置订阅通知]:{}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
               mobilePosition.getLongitude(), mobilePosition.getLatitude());
//         logger.info("[收到移动位置订阅通知]:{}/{}->{}.{}, 时间: {}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
//               mobilePosition.getLongitude(), mobilePosition.getLatitude(), System.currentTimeMillis() - startTime);
         mobilePosition.setReportSource("Mobile Position");
         // 更新device channel 的经纬度
@@ -149,13 +170,13 @@
         deviceChannel.setLongitude(mobilePosition.getLongitude());
         deviceChannel.setLatitude(mobilePosition.getLatitude());
         deviceChannel.setGpsTime(mobilePosition.getTime());
         updateChannelMap.put(channelId, deviceChannel);
         updateChannelMap.put(deviceId + channelId, deviceChannel);
         addMobilePositionList.add(mobilePosition);
         if(updateChannelMap.size() > 300) {
         if(updateChannelMap.size() > 100) {
            executeSaveChannel();
         }
         if (userSetting.isSavePositionHistory()) {
            if(addMobilePositionList.size() > 300) {
            if(addMobilePositionList.size() > 100) {
               executeSaveMobilePosition();
            }
         }
@@ -170,7 +191,7 @@
//         deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
         if (!dynamicTask.contains(talkKey)) {
            dynamicTask.startDelay(talkKey, this::executeSave, 1000);
            dynamicTask.startDelay(talkKey, this::executeSave, 3000);
         }
      } catch (DocumentException e) {
@@ -186,29 +207,29 @@
      dynamicTask.stop(talkKey);
   }
   private void executeSaveChannel(){
   @Async("taskExecutor")
   public void executeSaveChannel(){
      dynamicTask.execute();
      try {
         logger.info("[移动位置订阅]更新通道位置: {}", updateChannelMap.size());
         ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values());
         deviceChannelService.batchUpdateChannelGPS(deviceChannels);
//         ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values());
//         deviceChannelService.batchUpdateChannelGPS(deviceChannels);
         updateChannelMap.clear();
      }catch (Exception e) {
      }
   }
   private void executeSaveMobilePosition(){
   @Async("taskExecutor")
   public void executeSaveMobilePosition(){
      if (userSetting.isSavePositionHistory()) {
         try {
            logger.info("[移动位置订阅] 添加通道轨迹点位: {}", addMobilePositionList.size());
            deviceChannelService.batchAddMobilePosition(addMobilePositionList);
//            logger.info("[移动位置订阅] 添加通道轨迹点位: {}", addMobilePositionList.size());
//            deviceChannelService.batchAddMobilePosition(addMobilePositionList);
            addMobilePositionList.clear();
         }catch (Exception e) {
            logger.info("[移动位置订阅] b添加通道轨迹点位保存失败: {}", addMobilePositionList.size());
         }
      }
   }
}