648540858
2024-04-24 2113e8cf271e0d189d4ff9dd2d4d5dd7cba6e3ab
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
@@ -1,12 +1,9 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
@@ -14,9 +11,7 @@
import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
import com.genersoft.iot.vmp.service.IDeviceChannelService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import gov.nist.javax.sip.message.SIPRequest;
import org.dom4j.DocumentException;
import org.dom4j.Element;
@@ -24,9 +19,6 @@
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException;
@@ -35,9 +27,6 @@
import javax.sip.header.FromHeader;
import javax.sip.message.Response;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
 * SIP命令类型: NOTIFY请求,这是作为上级发送订阅请求后,设备才会响应的
@@ -47,15 +36,6 @@
    private final static Logger logger = LoggerFactory.getLogger(NotifyRequestProcessor.class);
   @Autowired
   private UserSetting userSetting;
   @Autowired
   private IVideoManagerStorage storager;
   @Autowired
   private EventPublisher eventPublisher;
   @Autowired
   private SipConfig sipConfig;
@@ -80,14 +60,6 @@
   @Autowired
   private NotifyRequestForMobilePositionProcessor notifyRequestForMobilePositionProcessor;
   private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
   @Qualifier("taskExecutor")
   @Autowired
   private ThreadPoolTaskExecutor taskExecutor;
   private int maxQueueCount = 30000;
   @Override
   public void afterPropertiesSet() throws Exception {
      // 添加消息处理的订阅
@@ -97,73 +69,38 @@
   @Override
   public void process(RequestEvent evt) {
      try {
         if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) {
            responseAck((SIPRequest) evt.getRequest(), Response.BUSY_HERE, null, null);
            logger.error("[notify] 待处理消息队列已满 {},返回486 BUSY_HERE,消息不做处理", userSetting.getMaxNotifyCountQueue());
            return;
         } else {
            responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null);
         }
      } catch (SipException | InvalidArgumentException | ParseException e) {
         logger.error("未处理的异常 ", e);
      }
      taskQueue.offer(new HandlerCatchData(evt, null, null));
   }
   @Scheduled(fixedRate = 200)   //每200毫秒执行一次
   public void executeTaskQueue(){
      if (taskQueue.isEmpty()) {
         return;
      }
      try {
         List<RequestEvent> catalogEventList = new ArrayList<>();
         List<RequestEvent> alarmEventList = new ArrayList<>();
         List<RequestEvent> mobilePositionEventList = new ArrayList<>();
         for (HandlerCatchData take : taskQueue) {
            if (take == null) {
               continue;
            }
            Element rootElement = getRootElement(take.getEvt());
         Element rootElement = getRootElement(evt);
            if (rootElement == null) {
               logger.error("处理NOTIFY消息时未获取到消息体,{}", take.getEvt().getRequest());
               continue;
            logger.error("处理NOTIFY消息时未获取到消息体,{}", evt.getRequest());
            responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null);
            return;
            }
            String cmd = XmlUtil.getText(rootElement, "CmdType");
            if (CmdType.CATALOG.equals(cmd)) {
               catalogEventList.add(take.getEvt());
            notifyRequestForCatalogProcessor.process(evt);
            } else if (CmdType.ALARM.equals(cmd)) {
               alarmEventList.add(take.getEvt());
            processNotifyAlarm(evt);
            } else if (CmdType.MOBILE_POSITION.equals(cmd)) {
               mobilePositionEventList.add(take.getEvt());
            notifyRequestForMobilePositionProcessor.process(evt);
            } else {
               logger.info("接收到消息:" + cmd);
            }
         }
         taskQueue.clear();
         if (!alarmEventList.isEmpty()) {
            processNotifyAlarm(alarmEventList);
         }
         if (!catalogEventList.isEmpty()) {
            notifyRequestForCatalogProcessor.process(catalogEventList);
         }
         if (!mobilePositionEventList.isEmpty()) {
            notifyRequestForMobilePositionProcessor.process(mobilePositionEventList);
         }
      } catch (SipException | InvalidArgumentException | ParseException e) {
         logger.error("未处理的异常 ", e);
      } catch (DocumentException e) {
         logger.error("处理NOTIFY消息时错误", e);
      }
         throw new RuntimeException(e);
   }
   }
   /***
    * 处理alarm设备报警Notify
    */
   private void processNotifyAlarm(List<RequestEvent> evtList) {
   private void processNotifyAlarm(RequestEvent evt) {
      if (!sipConfig.isAlarm()) {
         return;
      }
      if (!evtList.isEmpty()) {
         for (RequestEvent evt : evtList) {
            try {
               FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
               String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
@@ -248,36 +185,6 @@
               logger.error("未处理的异常 ", e);
            }
         }
      }
   }
   public void setCmder(SIPCommander cmder) {
   }
   public void setStorager(IVideoManagerStorage storager) {
      this.storager = storager;
   }
   public void setPublisher(EventPublisher publisher) {
      this.publisher = publisher;
   }
   public void setRedis(RedisUtil redis) {
   }
   public void setDeferredResultHolder(DeferredResultHolder deferredResultHolder) {
   }
   public IRedisCatchStorage getRedisCatchStorage() {
      return redisCatchStorage;
   }
   public void setRedisCatchStorage(IRedisCatchStorage redisCatchStorage) {
      this.redisCatchStorage = redisCatchStorage;
   }
   @Scheduled(fixedRate = 10000)   //每1秒执行一次
   public void execute(){
      logger.info("[待处理Notify消息数量]: {}", taskQueue.size());
   }
}