648540858
2024-04-22 8cba63642fcff122907bd7d7a8d7d822555d34ca
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
@@ -25,6 +25,8 @@
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
@@ -76,6 +78,9 @@
   @Autowired
   private NotifyRequestForCatalogProcessor notifyRequestForCatalogProcessor;
   @Autowired
   private NotifyRequestForMobilePositionProcessor notifyRequestForMobilePositionProcessor;
   private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
   @Qualifier("taskExecutor")
@@ -105,10 +110,10 @@
         logger.error("未处理的异常 ", e);
      }
      boolean runed = !taskQueue.isEmpty();
      logger.info("[notify] 待处理消息数量: {}", taskQueue.size());
      taskQueue.offer(new HandlerCatchData(evt, null, null));
      if (!runed) {
         taskExecutor.execute(()-> {
//            logger.warn("开始处理");
            while (!taskQueue.isEmpty()) {
               try {
                  HandlerCatchData take = taskQueue.poll();
@@ -129,8 +134,12 @@
                     logger.info("接收到Alarm通知");
                     processNotifyAlarm(take.getEvt());
                  } else if (CmdType.MOBILE_POSITION.equals(cmd)) {
                     logger.info("接收到MobilePosition通知");
                     processNotifyMobilePosition(take.getEvt());
//                     logger.info("接收到MobilePosition通知");
//                     processNotifyMobilePosition(take.getEvt());
                     taskExecutor.execute(() -> {
                        notifyRequestForMobilePositionProcessor.process(take.getEvt());
                     });
                  } else {
                     logger.info("接收到消息:" + cmd);
                  }
@@ -147,11 +156,11 @@
    *
    * @param evt
    */
   private void processNotifyMobilePosition(RequestEvent evt) {
   @Async("taskExecutor")
   public void processNotifyMobilePosition(RequestEvent evt) {
      try {
         FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
         String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
         // 回复 200 OK
         Element rootElement = getRootElement(evt);
         if (rootElement == null) {
@@ -360,4 +369,9 @@
   public void setRedisCatchStorage(IRedisCatchStorage redisCatchStorage) {
      this.redisCatchStorage = redisCatchStorage;
   }
   @Scheduled(fixedRate = 1000)   //每1秒执行一次
   public void execute(){
      System.out.println("待处理消息数量: " + taskQueue.size());
   }
}