648540858
2024-05-29 764d04b497356ba6bcbb75fd42b51eca750f7223
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java
@@ -5,6 +5,7 @@
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
@@ -18,6 +19,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
@@ -28,6 +30,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
/**
@@ -45,6 +48,8 @@
   private final Map<String, DeviceChannel> addChannelMap = new ConcurrentHashMap<>();
   private final List<DeviceChannel> deleteChannelList = new CopyOnWriteArrayList<>();
   private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
   @Autowired
   private UserSetting userSetting;
@@ -65,11 +70,24 @@
   private SipConfig sipConfig;
   @Transactional
   public void process(List<RequestEvent> evtList) {
      if (evtList.isEmpty()) {
   public void process(RequestEvent evt) {
      if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) {
         logger.error("[notify-目录订阅] 待处理消息队列已满 {},返回486 BUSY_HERE,消息不做处理", userSetting.getMaxNotifyCountQueue());
         return;
      }
      for (RequestEvent evt : evtList) {
      taskQueue.offer(new HandlerCatchData(evt, null, null));
   }
   @Scheduled(fixedRate = 400)   //每400毫秒执行一次
   public void executeTaskQueue(){
      if (taskQueue.isEmpty()) {
         return;
      }
      for (HandlerCatchData take : taskQueue) {
         if (take == null) {
            continue;
         }
         RequestEvent evt = take.getEvt();
         try {
            long start = System.currentTimeMillis();
            FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
@@ -221,6 +239,7 @@
            logger.error("未处理的异常 ", e);
         }
      }
      taskQueue.clear();
      if (!updateChannelMap.keySet().isEmpty()
            || !addChannelMap.keySet().isEmpty()
            || !updateChannelOnlineList.isEmpty()
@@ -295,4 +314,9 @@
         updateChannelOfflineList.clear();
      }
   }
   @Scheduled(fixedRate = 10000)   //每1秒执行一次
   public void execute(){
      logger.info("[待处理Notify-目录订阅消息数量]: {}", taskQueue.size());
   }
}