648540858
2024-04-23 901dee2bf4c91fa92306b5d8aa66b3148658186c
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
@@ -25,6 +25,8 @@
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
@@ -76,6 +78,9 @@
   @Autowired
   private NotifyRequestForCatalogProcessor notifyRequestForCatalogProcessor;
   @Autowired
   private NotifyRequestForMobilePositionProcessor notifyRequestForMobilePositionProcessor;
   private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
   @Qualifier("taskExecutor")
@@ -105,10 +110,10 @@
         logger.error("未处理的异常 ", e);
      }
      boolean runed = !taskQueue.isEmpty();
      logger.info("[notify] 待处理消息数量: {}", taskQueue.size());
      taskQueue.offer(new HandlerCatchData(evt, null, null));
      if (!runed) {
         taskExecutor.execute(()-> {
//            logger.warn("开始处理");
            while (!taskQueue.isEmpty()) {
               try {
                  HandlerCatchData take = taskQueue.poll();
@@ -129,8 +134,12 @@
                     logger.info("接收到Alarm通知");
                     processNotifyAlarm(take.getEvt());
                  } else if (CmdType.MOBILE_POSITION.equals(cmd)) {
                     logger.info("接收到MobilePosition通知");
                     processNotifyMobilePosition(take.getEvt());
//                     logger.info("接收到MobilePosition通知");
//                     processNotifyMobilePosition(take.getEvt());
//                     taskExecutor.execute(() -> {
                        notifyRequestForMobilePositionProcessor.process(take.getEvt());
//                     });
                  } else {
                     logger.info("接收到消息:" + cmd);
                  }
@@ -147,11 +156,11 @@
    *
    * @param evt
    */
   private void processNotifyMobilePosition(RequestEvent evt) {
   @Async("taskExecutor")
   public void processNotifyMobilePosition(RequestEvent evt) {
      try {
         FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
         String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
         // 回复 200 OK
         Element rootElement = getRootElement(evt);
         if (rootElement == null) {
@@ -217,8 +226,8 @@
         } else {
            mobilePosition.setAltitude(0.0);
         }
         logger.info("[收到移动位置订阅通知]:{}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
               mobilePosition.getLongitude(), mobilePosition.getLatitude());
//         logger.info("[收到移动位置订阅通知]:{}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
//               mobilePosition.getLongitude(), mobilePosition.getLatitude());
         mobilePosition.setReportSource("Mobile Position");
         // 更新device channel 的经纬度
@@ -228,12 +237,12 @@
         deviceChannel.setLongitude(mobilePosition.getLongitude());
         deviceChannel.setLatitude(mobilePosition.getLatitude());
         deviceChannel.setGpsTime(mobilePosition.getTime());
         deviceChannel = deviceChannelService.updateGps(deviceChannel, device);
         mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84());
         mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84());
         mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
         mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
//         deviceChannel = deviceChannelService.updateGps(deviceChannel, device);
//
//         mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84());
//         mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84());
//         mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
//         mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
         deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
@@ -360,4 +369,9 @@
   public void setRedisCatchStorage(IRedisCatchStorage redisCatchStorage) {
      this.redisCatchStorage = redisCatchStorage;
   }
   @Scheduled(fixedRate = 10000)   //每1秒执行一次
   public void execute(){
      logger.info("[待处理Notify消息数量]: {}", taskQueue.size());
   }
}