648540858
2024-01-10 c25a99d60bef3d3bbd59fee895bd658928fd00db
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
old mode 100644 new mode 100755
@@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.CivilCodeFileConf;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.*;
@@ -76,11 +77,19 @@
   @Autowired
   private IDeviceChannelService deviceChannelService;
   @Autowired
   private NotifyRequestForCatalogProcessor notifyRequestForCatalogProcessor;
   @Autowired
   private CivilCodeFileConf civilCodeFileConf;
   private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
   @Qualifier("taskExecutor")
   @Autowired
   private ThreadPoolTaskExecutor taskExecutor;
   private int maxQueueCount = 30000;
   @Override
   public void afterPropertiesSet() throws Exception {
@@ -91,17 +100,29 @@
   @Override
   public void process(RequestEvent evt) {
      try {
         responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null);
         if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) {
            responseAck((SIPRequest) evt.getRequest(), Response.BUSY_HERE, null, null);
            logger.error("[notify] 待处理消息队列已满 {},返回486 BUSY_HERE,消息不做处理", userSetting.getMaxNotifyCountQueue());
            return;
         }else {
            responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null);
         }
      }catch (SipException | InvalidArgumentException | ParseException e) {
         logger.error("未处理的异常 ", e);
      }
      boolean runed = !taskQueue.isEmpty();
      logger.info("[notify] 待处理消息数量: {}", taskQueue.size());
      taskQueue.offer(new HandlerCatchData(evt, null, null));
      if (!runed) {
         taskExecutor.execute(()-> {
            while (!taskQueue.isEmpty()) {
               try {
                  HandlerCatchData take = taskQueue.poll();
                  if (take == null) {
                     continue;
                  }
                  Element rootElement = getRootElement(take.getEvt());
                  if (rootElement == null) {
                     logger.error("处理NOTIFY消息时未获取到消息体,{}", take.getEvt().getRequest());
@@ -111,7 +132,7 @@
                  if (CmdType.CATALOG.equals(cmd)) {
                     logger.info("接收到Catalog通知");
                     processNotifyCatalogList(take.getEvt());
                     notifyRequestForCatalogProcessor.process(take.getEvt());
                  } else if (CmdType.ALARM.equals(cmd)) {
                     logger.info("接收到Alarm通知");
                     processNotifyAlarm(take.getEvt());
@@ -131,7 +152,7 @@
   /**
    * 处理MobilePosition移动位置Notify
    *
    *
    * @param evt
    */
   private void processNotifyMobilePosition(RequestEvent evt) {
@@ -174,7 +195,12 @@
         mobilePosition.setDeviceId(device.getDeviceId());
         mobilePosition.setChannelId(channelId);
         String time = XmlUtil.getText(rootElement, "Time");
         mobilePosition.setTime(time);
         if (ObjectUtils.isEmpty(time)){
            mobilePosition.setTime(DateUtil.getNow());
         }else {
            mobilePosition.setTime(SipUtils.parseTime(time));
         }
         mobilePosition.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude")));
         mobilePosition.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude")));
         if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Speed"))) {
@@ -219,7 +245,7 @@
         // 发送redis消息。 通知位置信息的变化
         JSONObject jsonObject = new JSONObject();
         jsonObject.put("time", time);
         jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime()));
         jsonObject.put("serial", deviceId);
         jsonObject.put("code", channelId);
         jsonObject.put("longitude", mobilePosition.getLongitude());
@@ -235,7 +261,7 @@
   /***
    * 处理alarm设备报警Notify
    *
    *
    * @param evt
    */
   private void processNotifyAlarm(RequestEvent evt) {
@@ -292,6 +318,7 @@
         logger.info("[收到Notify-Alarm]:{}/{}", device.getDeviceId(), deviceAlarm.getChannelId());
         if ("4".equals(deviceAlarm.getAlarmMethod())) {
            MobilePosition mobilePosition = new MobilePosition();
            mobilePosition.setChannelId(channelId);
            mobilePosition.setCreateTime(DateUtil.getNow());
            mobilePosition.setDeviceId(deviceAlarm.getDeviceId());
            mobilePosition.setTime(deviceAlarm.getAlarmTime());
@@ -321,7 +348,7 @@
            storager.updateChannelPosition(deviceChannel);
            // 发送redis消息。 通知位置信息的变化
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("time", mobilePosition.getTime());
            jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime()));
            jsonObject.put("serial", deviceChannel.getDeviceId());
            jsonObject.put("code", deviceChannel.getChannelId());
            jsonObject.put("longitude", mobilePosition.getLongitude());
@@ -337,107 +364,6 @@
         // 回复200 OK
         if (redisCatchStorage.deviceIsOnline(deviceId)) {
            publisher.deviceAlarmEventPublish(deviceAlarm);
         }
      } catch (DocumentException e) {
         logger.error("未处理的异常 ", e);
      }
   }
   /***
    * 处理catalog设备目录列表Notify
    *
    * @param evt
    */
   private void processNotifyCatalogList(RequestEvent evt) {
      try {
         FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
         String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
         Device device = redisCatchStorage.getDevice(deviceId);
         if (device == null || device.getOnline() == 0) {
            logger.warn("[收到目录订阅]:{}, 但是设备已经离线", (device != null ? device.getDeviceId():"" ));
            return;
         }
         Element rootElement = getRootElement(evt, device.getCharset());
         if (rootElement == null) {
            logger.warn("[ 收到目录订阅 ] content cannot be null, {}", evt.getRequest());
            return;
         }
         Element deviceListElement = rootElement.element("DeviceList");
         if (deviceListElement == null) {
            return;
         }
         Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
         if (deviceListIterator != null) {
            // 遍历DeviceList
            while (deviceListIterator.hasNext()) {
               Element itemDevice = deviceListIterator.next();
               Element channelDeviceElement = itemDevice.element("DeviceID");
               if (channelDeviceElement == null) {
                  continue;
               }
               Element eventElement = itemDevice.element("Event");
               String event;
               if (eventElement == null) {
                  logger.warn("[收到目录订阅]:{}, 但是Event为空, 设为默认值 ADD", (device != null ? device.getDeviceId():"" ));
                  event = CatalogEvent.ADD;
               }else {
                  event = eventElement.getText().toUpperCase();
               }
               DeviceChannel channel = XmlUtil.channelContentHander(itemDevice, device, event);
               channel.setDeviceId(device.getDeviceId());
               logger.info("[收到目录订阅]:{}/{}", device.getDeviceId(), channel.getChannelId());
               switch (event) {
                  case CatalogEvent.ON:
                     // 上线
                     logger.info("[收到通道上线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                     storager.deviceChannelOnline(deviceId, channel.getChannelId());
                     break;
                  case CatalogEvent.OFF :
                     // 离线
                     logger.info("[收到通道离线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                     if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
                        storager.deviceChannelOffline(deviceId, channel.getChannelId());
                     }else {
                        logger.info("[收到通道离线通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                     }
                     break;
                  case CatalogEvent.VLOST:
                     // 视频丢失
                     logger.info("[收到通道视频丢失通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                     if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
                        storager.deviceChannelOffline(deviceId, channel.getChannelId());
                     }else {
                        logger.info("[收到通道视频丢失通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                     }
                     break;
                  case CatalogEvent.DEFECT:
                     // 故障
                     break;
                  case CatalogEvent.ADD:
                     // 增加
                     logger.info("[收到增加通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                     deviceChannelService.updateChannel(deviceId, channel);
                     break;
                  case CatalogEvent.DEL:
                     // 删除
                     logger.info("[收到删除通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                     storager.delChannel(deviceId, channel.getChannelId());
                     break;
                  case CatalogEvent.UPDATE:
                     // 更新
                     logger.info("[收到更新通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                     deviceChannelService.updateChannel(deviceId, channel);
                     break;
                  default:
                     logger.warn("[ NotifyCatalog ] event not found : {}", event );
               }
               // 转发变化信息
               eventPublisher.catalogEventPublish(null, channel, event);
            }
         }
      } catch (DocumentException e) {
         logger.error("未处理的异常 ", e);