648540858
2024-04-24 2113e8cf271e0d189d4ff9dd2d4d5dd7cba6e3ab
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java
@@ -4,6 +4,7 @@
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData;
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
@@ -17,6 +18,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;
@@ -27,6 +29,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
 * SIP命令类型: NOTIFY请求中的移动位置请求处理
@@ -37,6 +40,7 @@
    private final static Logger logger = LoggerFactory.getLogger(NotifyRequestForMobilePositionProcessor.class);
   private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
   @Autowired
   private UserSetting userSetting;
@@ -50,14 +54,28 @@
   @Autowired
   private IDeviceChannelService deviceChannelService;
   public void process(RequestEvent evt) {
      if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) {
         logger.error("[notify-移动位置] 待处理消息队列已满 {},返回486 BUSY_HERE,消息不做处理", userSetting.getMaxNotifyCountQueue());
         return;
      }
      taskQueue.offer(new HandlerCatchData(evt, null, null));
   }
   @Scheduled(fixedRate = 200) //每200毫秒执行一次
   @Transactional
   public void process(List<RequestEvent> eventList) {
      if (eventList.isEmpty()) {
   public void executeTaskQueue() {
      if (taskQueue.isEmpty()) {
         return;
      }
      Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>();
      List<MobilePosition> addMobilePositionList = new ArrayList<>();
      for (RequestEvent evt : eventList) {
      for (HandlerCatchData take : taskQueue) {
         if (take == null) {
            continue;
         }
         RequestEvent evt = take.getEvt();
         try {
            FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
            String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
@@ -180,10 +198,11 @@
            logger.error("未处理的异常 ", e);
         }
      }
      taskQueue.clear();
      if(!updateChannelMap.isEmpty()) {
         List<DeviceChannel>  channels = new ArrayList<>(updateChannelMap.values());
         logger.info("[移动位置订阅]更新通道位置: {}", channels.size());
         deviceChannelService.batchUpdateChannelGPS(channels);
         deviceChannelService.batchUpdateChannel(channels);
         updateChannelMap.clear();
      }
      if (userSetting.isSavePositionHistory() && !addMobilePositionList.isEmpty()) {
@@ -196,4 +215,8 @@
         addMobilePositionList.clear();
      }
   }
   @Scheduled(fixedRate = 10000)
   public void execute(){
      logger.info("[待处理Notify-移动位置订阅消息数量]: {}", taskQueue.size());
   }
}