648540858
2023-05-12 b498e2fcf21ee4f612dfaf0b45a945c52da37c60
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
@@ -76,11 +76,16 @@
   @Autowired
   private IDeviceChannelService deviceChannelService;
   @Autowired
   private NotifyRequestForCatalogProcessor notifyRequestForCatalogProcessor;
   private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
   @Qualifier("taskExecutor")
   @Autowired
   private ThreadPoolTaskExecutor taskExecutor;
   private int maxQueueCount = 30000;
   @Override
   public void afterPropertiesSet() throws Exception {
@@ -91,17 +96,29 @@
   @Override
   public void process(RequestEvent evt) {
      try {
         responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null);
         if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) {
            responseAck((SIPRequest) evt.getRequest(), Response.BUSY_HERE, null, null);
            logger.error("[notify] 待处理消息队列已满 {},返回486 BUSY_HERE,消息不做处理", userSetting.getMaxNotifyCountQueue());
            return;
         }else {
            responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null);
         }
      }catch (SipException | InvalidArgumentException | ParseException e) {
         logger.error("未处理的异常 ", e);
      }
      boolean runed = !taskQueue.isEmpty();
      logger.info("[notify] 待处理消息数量: {}", taskQueue.size());
      taskQueue.offer(new HandlerCatchData(evt, null, null));
      if (!runed) {
         taskExecutor.execute(()-> {
            while (!taskQueue.isEmpty()) {
               try {
                  HandlerCatchData take = taskQueue.poll();
                  if (take == null) {
                     continue;
                  }
                  Element rootElement = getRootElement(take.getEvt());
                  if (rootElement == null) {
                     logger.error("处理NOTIFY消息时未获取到消息体,{}", take.getEvt().getRequest());
@@ -111,7 +128,8 @@
                  if (CmdType.CATALOG.equals(cmd)) {
                     logger.info("接收到Catalog通知");
                     processNotifyCatalogList(take.getEvt());
//                     processNotifyCatalogList(take.getEvt());
                     notifyRequestForCatalogProcessor.process(take.getEvt());
                  } else if (CmdType.ALARM.equals(cmd)) {
                     logger.info("接收到Alarm通知");
                     processNotifyAlarm(take.getEvt());
@@ -131,7 +149,7 @@
   /**
    * 处理MobilePosition移动位置Notify
    *
    *
    * @param evt
    */
   private void processNotifyMobilePosition(RequestEvent evt) {
@@ -235,7 +253,7 @@
   /***
    * 处理alarm设备报警Notify
    *
    *
    * @param evt
    */
   private void processNotifyAlarm(RequestEvent evt) {
@@ -345,7 +363,7 @@
   /***
    * 处理catalog设备目录列表Notify
    *
    *
    * @param evt
    */
   private void processNotifyCatalogList(RequestEvent evt) {