648540858
2024-05-29 764d04b497356ba6bcbb75fd42b51eca750f7223
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java
@@ -1,17 +1,17 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.genersoft.iot.vmp.conf.CivilCodeFileConf;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData;
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.service.IDeviceChannelService;
import com.genersoft.iot.vmp.service.IMobilePositionService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
import org.dom4j.DocumentException;
@@ -19,16 +19,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import javax.sip.RequestEvent;
import javax.sip.header.FromHeader;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
 * SIP命令类型: NOTIFY请求中的移动位置请求处理
@@ -39,10 +37,7 @@
    private final static Logger logger = LoggerFactory.getLogger(NotifyRequestForMobilePositionProcessor.class);
   private final Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>();
   private final List<MobilePosition> addMobilePositionList = new CopyOnWriteArrayList();
   private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
   @Autowired
   private UserSetting userSetting;
@@ -57,179 +52,144 @@
   private IDeviceChannelService deviceChannelService;
   @Autowired
   private DynamicTask dynamicTask;
   private IMobilePositionService mobilePositionService;
   @Autowired
   private CivilCodeFileConf civilCodeFileConf;
   @Autowired
   private SipConfig sipConfig;
   private final static String talkKey = "notify-request-for-mobile-position-task";
//   @Async("taskExecutor")
   public void process(RequestEvent evt) {
      try {
         FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
         String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
         long startTime = System.currentTimeMillis();
         // 回复 200 OK
         Element rootElement = getRootElement(evt);
         if (rootElement == null) {
            logger.error("处理MobilePosition移动位置Notify时未获取到消息体,{}", evt.getRequest());
            return;
         }
         Device device = redisCatchStorage.getDevice(deviceId);
         MobilePosition mobilePosition = new MobilePosition();
         mobilePosition.setCreateTime(DateUtil.getNow());
         List<Element> elements = rootElement.elements();
         String channelId = null;
         for (Element element : elements) {
            switch (element.getName()){
               case "DeviceID":
                  channelId = element.getStringValue();
                  if (device == null) {
                     device = redisCatchStorage.getDevice(channelId);
                     if (device == null) {
                        // 根据通道id查询设备Id
                        List<Device> deviceList = deviceChannelService.getDeviceByChannelId(channelId);
                        if (!deviceList.isEmpty()) {
                           device = deviceList.get(0);
                        }
                     }
                  }
                  if (device == null) {
                     logger.warn("[mobilePosition移动位置Notify] 未找到通道{}所属的设备", channelId);
                     return;
                  }
                  mobilePosition.setDeviceId(device.getDeviceId());
                  mobilePosition.setChannelId(channelId);
                  // 兼容设备部分设备上报是通道编号与设备编号一致的情况
                  if (deviceId.equals(channelId)) {
                     List<DeviceChannel> deviceChannels = deviceChannelService.queryChaneListByDeviceId(deviceId);
                     if (deviceChannels.size() == 1) {
                        channelId = deviceChannels.get(0).getChannelId();
                     }
                  }
                  if (!ObjectUtils.isEmpty(device.getName())) {
                     mobilePosition.setDeviceName(device.getName());
                  }
                  mobilePosition.setDeviceId(device.getDeviceId());
                  mobilePosition.setChannelId(channelId);
                  continue;
               case "Time":
                  String timeVal = element.getStringValue();
                  if (ObjectUtils.isEmpty(timeVal)) {
                     mobilePosition.setTime(DateUtil.getNow());
                  } else {
                     mobilePosition.setTime(SipUtils.parseTime(timeVal));
                  }
                  continue;
               case "Longitude":
                  mobilePosition.setLongitude(Double.parseDouble(element.getStringValue()));
                  continue;
               case "Latitude":
                  mobilePosition.setLatitude(Double.parseDouble(element.getStringValue()));
                  continue;
               case "Speed":
                  String speedVal = element.getStringValue();
                  if (NumericUtil.isDouble(speedVal)) {
                     mobilePosition.setSpeed(Double.parseDouble(speedVal));
                  } else {
                     mobilePosition.setSpeed(0.0);
                  }
                  continue;
               case "Direction":
                  String directionVal = element.getStringValue();
                  if (NumericUtil.isDouble(directionVal)) {
                     mobilePosition.setDirection(Double.parseDouble(directionVal));
                  } else {
                     mobilePosition.setDirection(0.0);
                  }
                  continue;
               case "Altitude":
                  String altitudeVal = element.getStringValue();
                  if (NumericUtil.isDouble(altitudeVal)) {
                     mobilePosition.setAltitude(Double.parseDouble(altitudeVal));
                  } else {
                     mobilePosition.setAltitude(0.0);
                  }
                  continue;
            }
         }
//         logger.info("[收到移动位置订阅通知]:{}/{}->{}.{}, 时间: {}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
//               mobilePosition.getLongitude(), mobilePosition.getLatitude(), System.currentTimeMillis() - startTime);
         mobilePosition.setReportSource("Mobile Position");
         // 更新device channel 的经纬度
         DeviceChannel deviceChannel = new DeviceChannel();
         deviceChannel.setDeviceId(device.getDeviceId());
         deviceChannel.setChannelId(channelId);
         deviceChannel.setLongitude(mobilePosition.getLongitude());
         deviceChannel.setLatitude(mobilePosition.getLatitude());
         deviceChannel.setGpsTime(mobilePosition.getTime());
         updateChannelMap.put(deviceId + channelId, deviceChannel);
         addMobilePositionList.add(mobilePosition);
         if(updateChannelMap.size() > 100) {
            executeSaveChannel();
         }
         if (userSetting.isSavePositionHistory()) {
            if(addMobilePositionList.size() > 100) {
               executeSaveMobilePosition();
            }
         }
//         deviceChannel = deviceChannelService.updateGps(deviceChannel, device);
//
//         mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84());
//         mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84());
//         mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
//         mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
//         deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
         if (!dynamicTask.contains(talkKey)) {
            dynamicTask.startDelay(talkKey, this::executeSave, 3000);
         }
      } catch (DocumentException e) {
         logger.error("未处理的异常 ", e);
      if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) {
         logger.error("[notify-移动位置] 待处理消息队列已满 {},返回486 BUSY_HERE,消息不做处理", userSetting.getMaxNotifyCountQueue());
         return;
      }
      taskQueue.offer(new HandlerCatchData(evt, null, null));
   }
   private void executeSave(){
      executeSaveChannel();
      executeSaveMobilePosition();
      dynamicTask.stop(talkKey);
   }
   @Async("taskExecutor")
   public void executeSaveChannel(){
      dynamicTask.execute();
      try {
         logger.info("[移动位置订阅]更新通道位置: {}", updateChannelMap.size());
//         ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values());
//         deviceChannelService.batchUpdateChannelGPS(deviceChannels);
         updateChannelMap.clear();
      }catch (Exception e) {
   @Scheduled(fixedRate = 200) //每200毫秒执行一次
   public void executeTaskQueue() {
      if (taskQueue.isEmpty()) {
         return;
      }
   }
   @Async("taskExecutor")
   public void executeSaveMobilePosition(){
      if (userSetting.isSavePositionHistory()) {
      for (HandlerCatchData take : taskQueue) {
         if (take == null) {
            continue;
         }
         RequestEvent evt = take.getEvt();
         try {
//            logger.info("[移动位置订阅] 添加通道轨迹点位: {}", addMobilePositionList.size());
//            deviceChannelService.batchAddMobilePosition(addMobilePositionList);
            addMobilePositionList.clear();
         }catch (Exception e) {
            logger.info("[移动位置订阅] b添加通道轨迹点位保存失败: {}", addMobilePositionList.size());
            FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
            String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
            long startTime = System.currentTimeMillis();
            // 回复 200 OK
            Element rootElement = getRootElement(evt);
            if (rootElement == null) {
               logger.error("处理MobilePosition移动位置Notify时未获取到消息体,{}", evt.getRequest());
               return;
            }
            Device device = redisCatchStorage.getDevice(deviceId);
            if (device == null) {
               logger.error("处理MobilePosition移动位置Notify时未获取到device,{}", deviceId);
               return;
            }
            MobilePosition mobilePosition = new MobilePosition();
            mobilePosition.setDeviceId(device.getDeviceId());
            mobilePosition.setDeviceName(device.getName());
            mobilePosition.setCreateTime(DateUtil.getNow());
            List<Element> elements = rootElement.elements();
            for (Element element : elements) {
               switch (element.getName()){
                  case "DeviceID":
                     String channelId = element.getStringValue();
                     if (!deviceId.equals(channelId)) {
                        mobilePosition.setChannelId(channelId);
                     }
                     continue;
                  case "Time":
                     String timeVal = element.getStringValue();
                     if (ObjectUtils.isEmpty(timeVal)) {
                        mobilePosition.setTime(DateUtil.getNow());
                     } else {
                        mobilePosition.setTime(SipUtils.parseTime(timeVal));
                     }
                     continue;
                  case "Longitude":
                     mobilePosition.setLongitude(Double.parseDouble(element.getStringValue()));
                     continue;
                  case "Latitude":
                     mobilePosition.setLatitude(Double.parseDouble(element.getStringValue()));
                     continue;
                  case "Speed":
                     String speedVal = element.getStringValue();
                     if (NumericUtil.isDouble(speedVal)) {
                        mobilePosition.setSpeed(Double.parseDouble(speedVal));
                     } else {
                        mobilePosition.setSpeed(0.0);
                     }
                     continue;
                  case "Direction":
                     String directionVal = element.getStringValue();
                     if (NumericUtil.isDouble(directionVal)) {
                        mobilePosition.setDirection(Double.parseDouble(directionVal));
                     } else {
                        mobilePosition.setDirection(0.0);
                     }
                     continue;
                  case "Altitude":
                     String altitudeVal = element.getStringValue();
                     if (NumericUtil.isDouble(altitudeVal)) {
                        mobilePosition.setAltitude(Double.parseDouble(altitudeVal));
                     } else {
                        mobilePosition.setAltitude(0.0);
                     }
                     continue;
               }
            }
         logger.debug("[收到移动位置订阅通知]:{}/{}->{}.{}, 时间: {}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
               mobilePosition.getLongitude(), mobilePosition.getLatitude(), System.currentTimeMillis() - startTime);
            mobilePosition.setReportSource("Mobile Position");
            mobilePositionService.add(mobilePosition);
            // 向关联了该通道并且开启移动位置订阅的上级平台发送移动位置订阅消息
            try {
               eventPublisher.mobilePositionEventPublish(mobilePosition);
            }catch (Exception e) {
               logger.error("[向上级转发移动位置失败] ", e);
            }
            if (mobilePosition.getChannelId() == null || mobilePosition.getChannelId().equals(mobilePosition.getDeviceId())) {
               List<DeviceChannel> channels = deviceChannelService.queryChaneListByDeviceId(mobilePosition.getDeviceId());
               channels.forEach(channel -> {
                  // 发送redis消息。 通知位置信息的变化
                  JSONObject jsonObject = new JSONObject();
                  jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime()));
                  jsonObject.put("serial", channel.getDeviceId());
                  jsonObject.put("code", channel.getChannelId());
                  jsonObject.put("longitude", mobilePosition.getLongitude());
                  jsonObject.put("latitude", mobilePosition.getLatitude());
                  jsonObject.put("altitude", mobilePosition.getAltitude());
                  jsonObject.put("direction", mobilePosition.getDirection());
                  jsonObject.put("speed", mobilePosition.getSpeed());
                  redisCatchStorage.sendMobilePositionMsg(jsonObject);
               });
            }else {
               // 发送redis消息。 通知位置信息的变化
               JSONObject jsonObject = new JSONObject();
               jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime()));
               jsonObject.put("serial", mobilePosition.getDeviceId());
               jsonObject.put("code", mobilePosition.getChannelId());
               jsonObject.put("longitude", mobilePosition.getLongitude());
               jsonObject.put("latitude", mobilePosition.getLatitude());
               jsonObject.put("altitude", mobilePosition.getAltitude());
               jsonObject.put("direction", mobilePosition.getDirection());
               jsonObject.put("speed", mobilePosition.getSpeed());
               redisCatchStorage.sendMobilePositionMsg(jsonObject);
            }
         } catch (DocumentException e) {
            logger.error("未处理的异常 ", e);
         }
      }
      taskQueue.clear();
   }
   @Scheduled(fixedRate = 10000)
   public void execute(){
      logger.info("[待处理Notify-移动位置订阅消息数量]: {}", taskQueue.size());
   }
}