648540858
2024-04-23 66a681d67969ae64c5cfe7140cb60a7885edef86
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
@@ -37,6 +37,7 @@
import javax.sip.header.FromHeader;
import javax.sip.message.Response;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -111,12 +112,17 @@
      }
      boolean runed = !taskQueue.isEmpty();
      taskQueue.offer(new HandlerCatchData(evt, null, null));
      if (!runed) {
         taskExecutor.execute(()-> {
//            logger.warn("开始处理");
            while (!taskQueue.isEmpty()) {
   }
   @Scheduled(fixedRate = 200)   //每200毫秒执行一次
   public void executeTaskQueue(){
      if (taskQueue.isEmpty()) {
         return;
      }
               try {
                  HandlerCatchData take = taskQueue.poll();
         List<RequestEvent> catalogEventList = new ArrayList<>();
         List<RequestEvent> alarmEventList = new ArrayList<>();
         List<RequestEvent> mobilePositionEventList = new ArrayList<>();
         for (HandlerCatchData take : taskQueue) {
                  if (take == null) {
                     continue;
                  }
@@ -128,28 +134,31 @@
                  String cmd = XmlUtil.getText(rootElement, "CmdType");
                  if (CmdType.CATALOG.equals(cmd)) {
                     logger.info("接收到Catalog通知");
                     notifyRequestForCatalogProcessor.process(take.getEvt());
               catalogEventList.add(take.getEvt());
                  } else if (CmdType.ALARM.equals(cmd)) {
                     logger.info("接收到Alarm通知");
                     processNotifyAlarm(take.getEvt());
               alarmEventList.add(take.getEvt());
                  } else if (CmdType.MOBILE_POSITION.equals(cmd)) {
//                     logger.info("接收到MobilePosition通知");
//                     processNotifyMobilePosition(take.getEvt());
//                     taskExecutor.execute(() -> {
                        notifyRequestForMobilePositionProcessor.process(take.getEvt());
//                     });
               mobilePositionEventList.add(take.getEvt());
                  } else {
                     logger.info("接收到消息:" + cmd);
            }
         }
         taskQueue.clear();
         if (!alarmEventList.isEmpty()) {
            processNotifyAlarm(alarmEventList);
         }
         if (!catalogEventList.isEmpty()) {
            notifyRequestForCatalogProcessor.process(catalogEventList);
         }
         if (!mobilePositionEventList.isEmpty()) {
            notifyRequestForMobilePositionProcessor.process(mobilePositionEventList);
                  }
               } catch (DocumentException e) {
                  logger.error("处理NOTIFY消息时错误", e);
               }
            }
         });
      }
   }
   /**
    * 处理MobilePosition移动位置Notify
@@ -253,13 +262,13 @@
   /***
    * 处理alarm设备报警Notify
    *
    * @param evt
    */
   private void processNotifyAlarm(RequestEvent evt) {
   private void processNotifyAlarm(List<RequestEvent> evtList) {
      if (!sipConfig.isAlarm()) {
         return;
      }
      if (!evtList.isEmpty()) {
         for (RequestEvent evt : evtList) {
      try {
         FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
         String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
@@ -344,6 +353,8 @@
         logger.error("未处理的异常 ", e);
      }
   }
      }
   }
   public void setCmder(SIPCommander cmder) {
   }