648540858
2022-09-26 0cc5917d8acca044b330721cd235caedc2023967
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
@@ -29,10 +29,12 @@
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.ServerTransaction;
import javax.sip.SipException;
import javax.sip.header.FromHeader;
import javax.sip.message.Response;
@@ -41,7 +43,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;
/**
 * SIP命令类型: NOTIFY请求
 * SIP命令类型: NOTIFY请求,这是作为上级发送订阅请求后,设备才会响应的
 */
@Component
public class NotifyRequestProcessor extends SIPRequestProcessorParent implements InitializingBean, ISIPRequestProcessor {
@@ -77,7 +79,7 @@
   private boolean taskQueueHandlerRun = false;
   private final ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
   private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
   @Qualifier("taskExecutor")
   @Autowired
@@ -93,7 +95,8 @@
   public void process(RequestEvent evt) {
      try {
         taskQueue.offer(new HandlerCatchData(evt, null, null));
         responseAck(evt, Response.OK);
         ServerTransaction serverTransaction = getServerTransaction(evt);
         responseAck(serverTransaction, Response.OK);
         if (!taskQueueHandlerRun) {
            taskQueueHandlerRun = true;
            taskExecutor.execute(()-> {
@@ -101,6 +104,10 @@
                  try {
                     HandlerCatchData take = taskQueue.poll();
                     Element rootElement = getRootElement(take.getEvt());
                     if (rootElement == null) {
                        logger.error("处理NOTIFY消息时未获取到消息体,{}", take.getEvt().getRequest());
                        continue;
                     }
                     String cmd = XmlUtil.getText(rootElement, "CmdType");
                     if (CmdType.CATALOG.equals(cmd)) {
@@ -116,14 +123,16 @@
                        logger.info("接收到消息:" + cmd);
                     }
                  } catch (DocumentException e) {
                     throw new RuntimeException(e);
                     logger.error("处理NOTIFY消息时错误", e);
                  }
               }
            taskQueueHandlerRun = false;
               taskQueueHandlerRun = false;
            });
         }
      } catch (SipException | InvalidArgumentException | ParseException e) {
         e.printStackTrace();
      } finally {
         taskQueueHandlerRun = false;
      }
   }
@@ -139,6 +148,10 @@
         // 回复 200 OK
         Element rootElement = getRootElement(evt);
         if (rootElement == null) {
            logger.error("处理MobilePosition移动位置Notify时未获取到消息体,{}", evt.getRequest());
            return;
         }
         MobilePosition mobilePosition = new MobilePosition();
         mobilePosition.setCreateTime(DateUtil.getNow());
@@ -146,7 +159,7 @@
         String channelId = deviceIdElement.getTextTrim().toString();
         Device device = redisCatchStorage.getDevice(deviceId);
         if (device != null) {
            if (!StringUtils.isEmpty(device.getName())) {
            if (!ObjectUtils.isEmpty(device.getName())) {
               mobilePosition.setDeviceName(device.getName());
            }
         }
@@ -195,6 +208,7 @@
         }
         storager.updateChannelPosition(deviceChannel);
         // 发送redis消息。 通知位置信息的变化
         JSONObject jsonObject = new JSONObject();
         jsonObject.put("time", time);
@@ -225,6 +239,10 @@
         String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
         Element rootElement = getRootElement(evt);
         if (rootElement == null) {
            logger.error("处理alarm设备报警Notify时未获取到消息体{}", evt.getRequest());
            return;
         }
         Element deviceIdElement = rootElement.element("DeviceID");
         String channelId = deviceIdElement.getText().toString();
@@ -234,6 +252,10 @@
            return;
         }
         rootElement = getRootElement(evt, device.getCharset());
         if (rootElement == null) {
            logger.warn("[ NotifyAlarm ] content cannot be null, {}", evt.getRequest());
            return;
         }
         DeviceAlarm deviceAlarm = new DeviceAlarm();
         deviceAlarm.setDeviceId(deviceId);
         deviceAlarm.setAlarmPriority(XmlUtil.getText(rootElement, "AlarmPriority"));
@@ -269,8 +291,6 @@
            mobilePosition.setLatitude(deviceAlarm.getLatitude());
            mobilePosition.setReportSource("GPS Alarm");
            // 更新device channel 的经纬度
            DeviceChannel deviceChannel = new DeviceChannel();
            deviceChannel.setDeviceId(device.getDeviceId());
@@ -291,6 +311,18 @@
            }
            storager.updateChannelPosition(deviceChannel);
            // 发送redis消息。 通知位置信息的变化
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("time", mobilePosition.getTime());
            jsonObject.put("serial", deviceChannel.getDeviceId());
            jsonObject.put("code", deviceChannel.getChannelId());
            jsonObject.put("longitude", mobilePosition.getLongitude());
            jsonObject.put("latitude", mobilePosition.getLatitude());
            jsonObject.put("altitude", mobilePosition.getAltitude());
            jsonObject.put("direction", mobilePosition.getDirection());
            jsonObject.put("speed", mobilePosition.getSpeed());
            redisCatchStorage.sendMobilePositionMsg(jsonObject);
         }
         // TODO: 需要实现存储报警信息、报警分类
@@ -315,10 +347,14 @@
         Device device = redisCatchStorage.getDevice(deviceId);
         if (device == null || device.getOnline() == 0) {
            logger.warn("[收到 目录订阅]:{}, 但是设备已经离线", (device != null ? device.getDeviceId():"" ));
            logger.warn("[收到目录订阅]:{}, 但是设备已经离线", (device != null ? device.getDeviceId():"" ));
            return;
         }
         Element rootElement = getRootElement(evt, device.getCharset());
         if (rootElement == null) {
            logger.warn("[ 收到目录订阅 ] content cannot be null, {}", evt.getRequest());
            return;
         }
         Element deviceListElement = rootElement.element("DeviceList");
         if (deviceListElement == null) {
            return;
@@ -336,14 +372,14 @@
               Element eventElement = itemDevice.element("Event");
               String event;
               if (eventElement == null) {
                  logger.warn("[收到 目录订阅]:{}, 但是Event为空, 设为默认值 ADD", (device != null ? device.getDeviceId():"" ));
                  logger.warn("[收到目录订阅]:{}, 但是Event为空, 设为默认值 ADD", (device != null ? device.getDeviceId():"" ));
                  event = CatalogEvent.ADD;
               }else {
                  event = eventElement.getText().toUpperCase();
               }
               DeviceChannel channel = XmlUtil.channelContentHander(itemDevice, device, event);
               channel.setDeviceId(device.getDeviceId());
               logger.info("[收到 目录订阅]:{}/{}", device.getDeviceId(), channel.getChannelId());
               logger.info("[收到目录订阅]:{}/{}", device.getDeviceId(), channel.getChannelId());
               switch (event) {
                  case CatalogEvent.ON:
                     // 上线