648540858
2024-04-22 8cba63642fcff122907bd7d7a8d7d822555d34ca
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
@@ -1,12 +1,9 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.CivilCodeFileConf;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
@@ -28,6 +25,8 @@
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
@@ -38,7 +37,6 @@
import javax.sip.header.FromHeader;
import javax.sip.message.Response;
import java.text.ParseException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -81,7 +79,7 @@
   private NotifyRequestForCatalogProcessor notifyRequestForCatalogProcessor;
   @Autowired
   private CivilCodeFileConf civilCodeFileConf;
   private NotifyRequestForMobilePositionProcessor notifyRequestForMobilePositionProcessor;
   private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
@@ -100,7 +98,6 @@
   @Override
   public void process(RequestEvent evt) {
      try {
         if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) {
            responseAck((SIPRequest) evt.getRequest(), Response.BUSY_HERE, null, null);
            logger.error("[notify] 待处理消息队列已满 {},返回486 BUSY_HERE,消息不做处理", userSetting.getMaxNotifyCountQueue());
@@ -113,10 +110,10 @@
         logger.error("未处理的异常 ", e);
      }
      boolean runed = !taskQueue.isEmpty();
      logger.info("[notify] 待处理消息数量: {}", taskQueue.size());
      taskQueue.offer(new HandlerCatchData(evt, null, null));
      if (!runed) {
         taskExecutor.execute(()-> {
//            logger.warn("开始处理");
            while (!taskQueue.isEmpty()) {
               try {
                  HandlerCatchData take = taskQueue.poll();
@@ -137,8 +134,12 @@
                     logger.info("接收到Alarm通知");
                     processNotifyAlarm(take.getEvt());
                  } else if (CmdType.MOBILE_POSITION.equals(cmd)) {
                     logger.info("接收到MobilePosition通知");
                     processNotifyMobilePosition(take.getEvt());
//                     logger.info("接收到MobilePosition通知");
//                     processNotifyMobilePosition(take.getEvt());
                     taskExecutor.execute(() -> {
                        notifyRequestForMobilePositionProcessor.process(take.getEvt());
                     });
                  } else {
                     logger.info("接收到消息:" + cmd);
                  }
@@ -155,11 +156,11 @@
    *
    * @param evt
    */
   private void processNotifyMobilePosition(RequestEvent evt) {
   @Async("taskExecutor")
   public void processNotifyMobilePosition(RequestEvent evt) {
      try {
         FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
         String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
         // 回复 200 OK
         Element rootElement = getRootElement(evt);
         if (rootElement == null) {
@@ -187,6 +188,13 @@
         if (device == null) {
            logger.warn("[mobilePosition移动位置Notify] 未找到通道{}所属的设备", channelId);
            return;
         }
         // 兼容设备部分设备上报是通道编号与设备编号一致的情况
         if(deviceId.equals(channelId)) {
            List<DeviceChannel> deviceChannels = deviceChannelService.queryChaneListByDeviceId(deviceId);
            if (deviceChannels.size() == 1) {
               channelId = deviceChannels.get(0).getChannelId();
            }
         }
         if (!ObjectUtils.isEmpty(device.getName())) {
            mobilePosition.setDeviceName(device.getName());
@@ -222,7 +230,6 @@
               mobilePosition.getLongitude(), mobilePosition.getLatitude());
         mobilePosition.setReportSource("Mobile Position");
         // 更新device channel 的经纬度
         DeviceChannel deviceChannel = new DeviceChannel();
         deviceChannel.setDeviceId(device.getDeviceId());
@@ -230,30 +237,15 @@
         deviceChannel.setLongitude(mobilePosition.getLongitude());
         deviceChannel.setLatitude(mobilePosition.getLatitude());
         deviceChannel.setGpsTime(mobilePosition.getTime());
         deviceChannel = deviceChannelService.updateGps(deviceChannel, device);
//         deviceChannel = deviceChannelService.updateGps(deviceChannel, device);
//
//         mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84());
//         mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84());
//         mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
//         mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
         mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84());
         mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84());
         mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
         mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
         deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
         if (userSetting.getSavePositionHistory()) {
            storager.insertMobilePosition(mobilePosition);
         }
         storager.updateChannelPosition(deviceChannel);
         // 发送redis消息。 通知位置信息的变化
         JSONObject jsonObject = new JSONObject();
         jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime()));
         jsonObject.put("serial", deviceId);
         jsonObject.put("code", channelId);
         jsonObject.put("longitude", mobilePosition.getLongitude());
         jsonObject.put("latitude", mobilePosition.getLatitude());
         jsonObject.put("altitude", mobilePosition.getAltitude());
         jsonObject.put("direction", mobilePosition.getDirection());
         jsonObject.put("speed", mobilePosition.getSpeed());
         redisCatchStorage.sendMobilePositionMsg(jsonObject);
      } catch (DocumentException  e) {
         logger.error("未处理的异常 ", e);
      }
@@ -341,25 +333,8 @@
            mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
            mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
            if (userSetting.getSavePositionHistory()) {
               storager.insertMobilePosition(mobilePosition);
            }
            storager.updateChannelPosition(deviceChannel);
            // 发送redis消息。 通知位置信息的变化
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime()));
            jsonObject.put("serial", deviceChannel.getDeviceId());
            jsonObject.put("code", deviceChannel.getChannelId());
            jsonObject.put("longitude", mobilePosition.getLongitude());
            jsonObject.put("latitude", mobilePosition.getLatitude());
            jsonObject.put("altitude", mobilePosition.getAltitude());
            jsonObject.put("direction", mobilePosition.getDirection());
            jsonObject.put("speed", mobilePosition.getSpeed());
            redisCatchStorage.sendMobilePositionMsg(jsonObject);
            deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
         }
         // TODO: 需要实现存储报警信息、报警分类
         // 回复200 OK
         if (redisCatchStorage.deviceIsOnline(deviceId)) {
@@ -394,4 +369,9 @@
   public void setRedisCatchStorage(IRedisCatchStorage redisCatchStorage) {
      this.redisCatchStorage = redisCatchStorage;
   }
   @Scheduled(fixedRate = 1000)   //每1秒执行一次
   public void execute(){
      System.out.println("待处理消息数量: " + taskQueue.size());
   }
}