648540858
2024-05-29 764d04b497356ba6bcbb75fd42b51eca750f7223
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java
@@ -11,6 +11,7 @@
import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.service.IDeviceChannelService;
import com.genersoft.iot.vmp.service.IMobilePositionService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
import org.dom4j.DocumentException;
@@ -20,15 +21,11 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;
import javax.sip.RequestEvent;
import javax.sip.header.FromHeader;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
@@ -54,6 +51,9 @@
   @Autowired
   private IDeviceChannelService deviceChannelService;
   @Autowired
   private IMobilePositionService mobilePositionService;
   public void process(RequestEvent evt) {
      if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) {
@@ -64,13 +64,10 @@
   }
   @Scheduled(fixedRate = 200) //每200毫秒执行一次
   @Transactional
   public void executeTaskQueue() {
      if (taskQueue.isEmpty()) {
         return;
      }
      Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>();
      List<MobilePosition> addMobilePositionList = new ArrayList<>();
      for (HandlerCatchData take : taskQueue) {
         if (take == null) {
            continue;
@@ -146,27 +143,18 @@
               }
            }
//         logger.info("[收到移动位置订阅通知]:{}/{}->{}.{}, 时间: {}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
//               mobilePosition.getLongitude(), mobilePosition.getLatitude(), System.currentTimeMillis() - startTime);
         logger.debug("[收到移动位置订阅通知]:{}/{}->{}.{}, 时间: {}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
               mobilePosition.getLongitude(), mobilePosition.getLatitude(), System.currentTimeMillis() - startTime);
            mobilePosition.setReportSource("Mobile Position");
            // 更新device channel 的经纬度
            DeviceChannel deviceChannel = new DeviceChannel();
            deviceChannel.setDeviceId(device.getDeviceId());
            deviceChannel.setLongitude(mobilePosition.getLongitude());
            deviceChannel.setLatitude(mobilePosition.getLatitude());
            deviceChannel.setGpsTime(mobilePosition.getTime());
            updateChannelMap.put(deviceId + mobilePosition.getChannelId(), deviceChannel);
            addMobilePositionList.add(mobilePosition);
            mobilePositionService.add(mobilePosition);
            // 向关联了该通道并且开启移动位置订阅的上级平台发送移动位置订阅消息
            try {
               eventPublisher.mobilePositionEventPublish(mobilePosition);
            }catch (Exception e) {
               logger.error("[向上级转发移动位置失败] ", e);
            }
            if (mobilePosition.getChannelId() == null || mobilePosition.getChannelId().equals(mobilePosition.getDeviceId()) ) {
            if (mobilePosition.getChannelId() == null || mobilePosition.getChannelId().equals(mobilePosition.getDeviceId())) {
               List<DeviceChannel> channels = deviceChannelService.queryChaneListByDeviceId(mobilePosition.getDeviceId());
               channels.forEach(channel -> {
                  // 发送redis消息。 通知位置信息的变化
@@ -199,21 +187,6 @@
         }
      }
      taskQueue.clear();
      if(!updateChannelMap.isEmpty()) {
         List<DeviceChannel>  channels = new ArrayList<>(updateChannelMap.values());
         logger.info("[移动位置订阅]更新通道位置: {}", channels.size());
         deviceChannelService.batchUpdateChannel(channels);
         updateChannelMap.clear();
      }
      if (userSetting.isSavePositionHistory() && !addMobilePositionList.isEmpty()) {
         try {
            logger.info("[移动位置订阅] 添加通道轨迹点位: {}", addMobilePositionList.size());
            deviceChannelService.batchAddMobilePosition(addMobilePositionList);
         }catch (Exception e) {
            logger.info("[移动位置订阅] b添加通道轨迹点位保存失败: {}", addMobilePositionList.size());
         }
         addMobilePositionList.clear();
      }
   }
   @Scheduled(fixedRate = 10000)
   public void execute(){