648540858
2023-04-23 269ad8cedbb07ca207a6f33af23085894dab4aa6
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
@@ -1,6 +1,6 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.*;
@@ -11,7 +11,6 @@
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.Coordtransform;
import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
@@ -20,6 +19,7 @@
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import gov.nist.javax.sip.message.SIPRequest;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.slf4j.Logger;
@@ -30,7 +30,6 @@
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
@@ -39,6 +38,7 @@
import javax.sip.message.Response;
import java.text.ParseException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
@@ -76,13 +76,16 @@
   @Autowired
   private IDeviceChannelService deviceChannelService;
   private boolean taskQueueHandlerRun = false;
   @Autowired
   private NotifyRequestForCatalogProcessor notifyRequestForCatalogProcessor;
   private final ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
   private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
   @Qualifier("taskExecutor")
   @Autowired
   private ThreadPoolTaskExecutor taskExecutor;
   private int maxQueueCount = 30000;
   @Override
   public void afterPropertiesSet() throws Exception {
@@ -93,51 +96,60 @@
   @Override
   public void process(RequestEvent evt) {
      try {
         taskQueue.offer(new HandlerCatchData(evt, null, null));
         responseAck(evt, Response.OK);
         if (!taskQueueHandlerRun) {
            taskQueueHandlerRun = true;
            taskExecutor.execute(()-> {
               while (!taskQueue.isEmpty()) {
                  try {
                     HandlerCatchData take = taskQueue.poll();
                     Element rootElement = getRootElement(take.getEvt());
                     if (rootElement == null) {
                        logger.error("处理NOTIFY消息时未获取到消息体,{}", take.getEvt().getRequest());
                        continue;
                     }
                     String cmd = XmlUtil.getText(rootElement, "CmdType");
                     if (CmdType.CATALOG.equals(cmd)) {
                        logger.info("接收到Catalog通知");
                        processNotifyCatalogList(take.getEvt());
                     } else if (CmdType.ALARM.equals(cmd)) {
                        logger.info("接收到Alarm通知");
                        processNotifyAlarm(take.getEvt());
                     } else if (CmdType.MOBILE_POSITION.equals(cmd)) {
                        logger.info("接收到MobilePosition通知");
                        processNotifyMobilePosition(take.getEvt());
                     } else {
                        logger.info("接收到消息:" + cmd);
                     }
                  } catch (DocumentException e) {
                     logger.error("处理NOTIFY消息时错误", e);
                  } finally {
                     taskQueueHandlerRun = false;
                  }
               }
            });
         if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) {
            responseAck((SIPRequest) evt.getRequest(), Response.BUSY_HERE, null, null);
            logger.error("[notify] 待处理消息队列已满 {},返回486 BUSY_HERE,消息不做处理", userSetting.getMaxNotifyCountQueue());
            return;
         }else {
            responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null);
         }
      } catch (SipException | InvalidArgumentException | ParseException e) {
         e.printStackTrace();
      } finally {
         taskQueueHandlerRun = false;
      }catch (SipException | InvalidArgumentException | ParseException e) {
         logger.error("未处理的异常 ", e);
      }
      boolean runed = !taskQueue.isEmpty();
      logger.info("[notify] 待处理消息数量: {}", taskQueue.size());
      taskQueue.offer(new HandlerCatchData(evt, null, null));
      if (!runed) {
         taskExecutor.execute(()-> {
            while (!taskQueue.isEmpty()) {
               try {
                  HandlerCatchData take = taskQueue.poll();
                  if (take == null) {
                     continue;
                  }
                  Element rootElement = getRootElement(take.getEvt());
                  if (rootElement == null) {
                     logger.error("处理NOTIFY消息时未获取到消息体,{}", take.getEvt().getRequest());
                     continue;
                  }
                  String cmd = XmlUtil.getText(rootElement, "CmdType");
                  if (CmdType.CATALOG.equals(cmd)) {
                     logger.info("接收到Catalog通知");
//                     processNotifyCatalogList(take.getEvt());
                     notifyRequestForCatalogProcessor.process(take.getEvt());
                  } else if (CmdType.ALARM.equals(cmd)) {
                     logger.info("接收到Alarm通知");
                     processNotifyAlarm(take.getEvt());
                  } else if (CmdType.MOBILE_POSITION.equals(cmd)) {
                     logger.info("接收到MobilePosition通知");
                     processNotifyMobilePosition(take.getEvt());
                  } else {
                     logger.info("接收到消息:" + cmd);
                  }
               } catch (DocumentException e) {
                  logger.error("处理NOTIFY消息时错误", e);
               }
            }
         });
      }
   }
   /**
    * 处理MobilePosition移动位置Notify
    *
    *
    * @param evt
    */
   private void processNotifyMobilePosition(RequestEvent evt) {
@@ -154,15 +166,30 @@
         MobilePosition mobilePosition = new MobilePosition();
         mobilePosition.setCreateTime(DateUtil.getNow());
         Element deviceIdElement = rootElement.element("DeviceID");
         String channelId = deviceIdElement.getTextTrim().toString();
         Device device = redisCatchStorage.getDevice(deviceId);
         if (device != null) {
            if (!ObjectUtils.isEmpty(device.getName())) {
               mobilePosition.setDeviceName(device.getName());
         if (device == null) {
            device = redisCatchStorage.getDevice(channelId);
            if (device == null) {
               // 根据通道id查询设备Id
               List<Device> deviceList = deviceChannelService.getDeviceByChannelId(channelId);
               if (deviceList.size() > 0) {
                  device = deviceList.get(0);
               }
            }
         }
         mobilePosition.setDeviceId(XmlUtil.getText(rootElement, "DeviceID"));
         if (device == null) {
            logger.warn("[mobilePosition移动位置Notify] 未找到通道{}所属的设备", channelId);
            return;
         }
         if (!ObjectUtils.isEmpty(device.getName())) {
            mobilePosition.setDeviceName(device.getName());
         }
         mobilePosition.setDeviceId(device.getDeviceId());
         mobilePosition.setChannelId(channelId);
         String time = XmlUtil.getText(rootElement, "Time");
         mobilePosition.setTime(time);
@@ -220,13 +247,13 @@
         jsonObject.put("speed", mobilePosition.getSpeed());
         redisCatchStorage.sendMobilePositionMsg(jsonObject);
      } catch (DocumentException  e) {
         e.printStackTrace();
         logger.error("未处理的异常 ", e);
      }
   }
   /***
    * 处理alarm设备报警Notify
    *
    *
    * @param evt
    */
   private void processNotifyAlarm(RequestEvent evt) {
@@ -330,13 +357,13 @@
            publisher.deviceAlarmEventPublish(deviceAlarm);
         }
      } catch (DocumentException e) {
         e.printStackTrace();
         logger.error("未处理的异常 ", e);
      }
   }
   /***
    * 处理catalog设备目录列表Notify
    *
    *
    * @param evt
    */
   private void processNotifyCatalogList(RequestEvent evt) {
@@ -388,12 +415,20 @@
                  case CatalogEvent.OFF :
                     // 离线
                     logger.info("[收到通道离线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                     storager.deviceChannelOffline(deviceId, channel.getChannelId());
                     if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
                        storager.deviceChannelOffline(deviceId, channel.getChannelId());
                     }else {
                        logger.info("[收到通道离线通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                     }
                     break;
                  case CatalogEvent.VLOST:
                     // 视频丢失
                     logger.info("[收到通道视频丢失通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                     storager.deviceChannelOffline(deviceId, channel.getChannelId());
                     if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
                        storager.deviceChannelOffline(deviceId, channel.getChannelId());
                     }else {
                        logger.info("[收到通道视频丢失通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                     }
                     break;
                  case CatalogEvent.DEFECT:
                     // 故障
@@ -423,7 +458,7 @@
            }
         }
      } catch (DocumentException e) {
         e.printStackTrace();
         logger.error("未处理的异常 ", e);
      }
   }