648540858
2024-04-23 d41d6b34af2485198ed01e1888db1571e4da1a6a
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
@@ -25,6 +25,8 @@
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
@@ -35,6 +37,7 @@
import javax.sip.header.FromHeader;
import javax.sip.message.Response;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -76,6 +79,9 @@
   @Autowired
   private NotifyRequestForCatalogProcessor notifyRequestForCatalogProcessor;
   @Autowired
   private NotifyRequestForMobilePositionProcessor notifyRequestForMobilePositionProcessor;
   private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
   @Qualifier("taskExecutor")
@@ -105,13 +111,18 @@
         logger.error("未处理的异常 ", e);
      }
      boolean runed = !taskQueue.isEmpty();
      logger.info("[notify] 待处理消息数量: {}", taskQueue.size());
      taskQueue.offer(new HandlerCatchData(evt, null, null));
      if (!runed) {
         taskExecutor.execute(()-> {
            while (!taskQueue.isEmpty()) {
   }
   @Scheduled(fixedRate = 200)   //每200毫秒执行一次
   public void executeTaskQueue(){
      if (taskQueue.isEmpty()) {
         return;
      }
               try {
                  HandlerCatchData take = taskQueue.poll();
         List<RequestEvent> catalogEventList = new ArrayList<>();
         List<RequestEvent> alarmEventList = new ArrayList<>();
         List<RequestEvent> mobilePositionEventList = new ArrayList<>();
         for (HandlerCatchData take : taskQueue) {
                  if (take == null) {
                     continue;
                  }
@@ -123,35 +134,42 @@
                  String cmd = XmlUtil.getText(rootElement, "CmdType");
                  if (CmdType.CATALOG.equals(cmd)) {
                     logger.info("接收到Catalog通知");
                     notifyRequestForCatalogProcessor.process(take.getEvt());
               catalogEventList.add(take.getEvt());
                  } else if (CmdType.ALARM.equals(cmd)) {
                     logger.info("接收到Alarm通知");
                     processNotifyAlarm(take.getEvt());
               alarmEventList.add(take.getEvt());
                  } else if (CmdType.MOBILE_POSITION.equals(cmd)) {
                     logger.info("接收到MobilePosition通知");
                     processNotifyMobilePosition(take.getEvt());
               mobilePositionEventList.add(take.getEvt());
                  } else {
                     logger.info("接收到消息:" + cmd);
            }
         }
         taskQueue.clear();
         if (!alarmEventList.isEmpty()) {
            processNotifyAlarm(alarmEventList);
         }
         if (!catalogEventList.isEmpty()) {
            notifyRequestForCatalogProcessor.process(catalogEventList);
         }
         if (!mobilePositionEventList.isEmpty()) {
            notifyRequestForMobilePositionProcessor.process(mobilePositionEventList);
                  }
               } catch (DocumentException e) {
                  logger.error("处理NOTIFY消息时错误", e);
               }
            }
         });
      }
   }
   /**
    * 处理MobilePosition移动位置Notify
    *
    * @param evt
    */
   private void processNotifyMobilePosition(RequestEvent evt) {
   @Async("taskExecutor")
   public void processNotifyMobilePosition(RequestEvent evt) {
      try {
         FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
         String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
         // 回复 200 OK
         Element rootElement = getRootElement(evt);
         if (rootElement == null) {
@@ -179,6 +197,13 @@
         if (device == null) {
            logger.warn("[mobilePosition移动位置Notify] 未找到通道{}所属的设备", channelId);
            return;
         }
         // 兼容设备部分设备上报是通道编号与设备编号一致的情况
         if(deviceId.equals(channelId)) {
            List<DeviceChannel> deviceChannels = deviceChannelService.queryChaneListByDeviceId(deviceId);
            if (deviceChannels.size() == 1) {
               channelId = deviceChannels.get(0).getChannelId();
            }
         }
         if (!ObjectUtils.isEmpty(device.getName())) {
            mobilePosition.setDeviceName(device.getName());
@@ -210,8 +235,8 @@
         } else {
            mobilePosition.setAltitude(0.0);
         }
         logger.info("[收到移动位置订阅通知]:{}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
               mobilePosition.getLongitude(), mobilePosition.getLatitude());
//         logger.info("[收到移动位置订阅通知]:{}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
//               mobilePosition.getLongitude(), mobilePosition.getLatitude());
         mobilePosition.setReportSource("Mobile Position");
         // 更新device channel 的经纬度
@@ -221,12 +246,12 @@
         deviceChannel.setLongitude(mobilePosition.getLongitude());
         deviceChannel.setLatitude(mobilePosition.getLatitude());
         deviceChannel.setGpsTime(mobilePosition.getTime());
         deviceChannel = deviceChannelService.updateGps(deviceChannel, device);
         mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84());
         mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84());
         mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
         mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
//         deviceChannel = deviceChannelService.updateGps(deviceChannel, device);
//
//         mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84());
//         mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84());
//         mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
//         mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
         deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
@@ -237,13 +262,13 @@
   /***
    * 处理alarm设备报警Notify
    *
    * @param evt
    */
   private void processNotifyAlarm(RequestEvent evt) {
   private void processNotifyAlarm(List<RequestEvent> evtList) {
      if (!sipConfig.isAlarm()) {
         return;
      }
      if (!evtList.isEmpty()) {
         for (RequestEvent evt : evtList) {
      try {
         FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
         String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
@@ -328,6 +353,8 @@
         logger.error("未处理的异常 ", e);
      }
   }
      }
   }
   public void setCmder(SIPCommander cmder) {
   }
@@ -353,4 +380,9 @@
   public void setRedisCatchStorage(IRedisCatchStorage redisCatchStorage) {
      this.redisCatchStorage = redisCatchStorage;
   }
   @Scheduled(fixedRate = 10000)   //每1秒执行一次
   public void execute(){
      logger.info("[待处理Notify消息数量]: {}", taskQueue.size());
   }
}