From 764d04b497356ba6bcbb75fd42b51eca750f7223 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期三, 29 五月 2024 15:02:51 +0800 Subject: [PATCH] 调整上级观看消息的发送 --- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java | 289 +++++++++++++++++++-------------------------------------- 1 files changed, 98 insertions(+), 191 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java index dcf823b..8da07a1 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -1,12 +1,9 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; import com.genersoft.iot.vmp.conf.SipConfig; -import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; -import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; @@ -14,9 +11,7 @@ import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; import com.genersoft.iot.vmp.service.IDeviceChannelService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.DateUtil; -import com.genersoft.iot.vmp.utils.redis.RedisUtil; import gov.nist.javax.sip.message.SIPRequest; import org.dom4j.DocumentException; import org.dom4j.Element; @@ -24,9 +19,6 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import javax.sip.InvalidArgumentException; @@ -35,9 +27,6 @@ import javax.sip.header.FromHeader; import javax.sip.message.Response; import java.text.ParseException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; /** * SIP鍛戒护绫诲瀷锛� NOTIFY璇锋眰,杩欐槸浣滀负涓婄骇鍙戦�佽闃呰姹傚悗锛岃澶囨墠浼氬搷搴旂殑 @@ -47,15 +36,6 @@ private final static Logger logger = LoggerFactory.getLogger(NotifyRequestProcessor.class); - - @Autowired - private UserSetting userSetting; - - @Autowired - private IVideoManagerStorage storager; - - @Autowired - private EventPublisher eventPublisher; @Autowired private SipConfig sipConfig; @@ -80,14 +60,6 @@ @Autowired private NotifyRequestForMobilePositionProcessor notifyRequestForMobilePositionProcessor; - private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>(); - - @Qualifier("taskExecutor") - @Autowired - private ThreadPoolTaskExecutor taskExecutor; - - private int maxQueueCount = 30000; - @Override public void afterPropertiesSet() throws Exception { // 娣诲姞娑堟伅澶勭悊鐨勮闃� @@ -97,187 +69,122 @@ @Override public void process(RequestEvent evt) { try { - if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) { - responseAck((SIPRequest) evt.getRequest(), Response.BUSY_HERE, null, null); - logger.error("[notify] 寰呭鐞嗘秷鎭槦鍒楀凡婊� {}锛岃繑鍥�486 BUSY_HERE锛屾秷鎭笉鍋氬鐞�", userSetting.getMaxNotifyCountQueue()); - return; - } else { + responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null); + Element rootElement = getRootElement(evt); + if (rootElement == null) { + logger.error("澶勭悊NOTIFY娑堟伅鏃舵湭鑾峰彇鍒版秷鎭綋,{}", evt.getRequest()); responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null); + return; } + String cmd = XmlUtil.getText(rootElement, "CmdType"); + if (CmdType.CATALOG.equals(cmd)) { + notifyRequestForCatalogProcessor.process(evt); + } else if (CmdType.ALARM.equals(cmd)) { + processNotifyAlarm(evt); + } else if (CmdType.MOBILE_POSITION.equals(cmd)) { + notifyRequestForMobilePositionProcessor.process(evt); + } else { + logger.info("鎺ユ敹鍒版秷鎭細" + cmd); + } } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("鏈鐞嗙殑寮傚父 ", e); - } - taskQueue.offer(new HandlerCatchData(evt, null, null)); - } - @Scheduled(fixedRate = 200) //姣�200姣鎵ц涓�娆� - public void executeTaskQueue(){ - if (taskQueue.isEmpty()) { - return; - } - try { - List<RequestEvent> catalogEventList = new ArrayList<>(); - List<RequestEvent> alarmEventList = new ArrayList<>(); - List<RequestEvent> mobilePositionEventList = new ArrayList<>(); - for (HandlerCatchData take : taskQueue) { - if (take == null) { - continue; - } - Element rootElement = getRootElement(take.getEvt()); - if (rootElement == null) { - logger.error("澶勭悊NOTIFY娑堟伅鏃舵湭鑾峰彇鍒版秷鎭綋,{}", take.getEvt().getRequest()); - continue; - } - String cmd = XmlUtil.getText(rootElement, "CmdType"); - - if (CmdType.CATALOG.equals(cmd)) { - catalogEventList.add(take.getEvt()); - } else if (CmdType.ALARM.equals(cmd)) { - alarmEventList.add(take.getEvt()); - } else if (CmdType.MOBILE_POSITION.equals(cmd)) { - mobilePositionEventList.add(take.getEvt()); - } else { - logger.info("鎺ユ敹鍒版秷鎭細" + cmd); - } - } - taskQueue.clear(); - if (!alarmEventList.isEmpty()) { - processNotifyAlarm(alarmEventList); - } - if (!catalogEventList.isEmpty()) { - notifyRequestForCatalogProcessor.process(catalogEventList); - } - if (!mobilePositionEventList.isEmpty()) { - notifyRequestForMobilePositionProcessor.process(mobilePositionEventList); - } } catch (DocumentException e) { - logger.error("澶勭悊NOTIFY娑堟伅鏃堕敊璇�", e); + throw new RuntimeException(e); } - } + } /*** * 澶勭悊alarm璁惧鎶ヨNotify */ - private void processNotifyAlarm(List<RequestEvent> evtList) { + private void processNotifyAlarm(RequestEvent evt) { if (!sipConfig.isAlarm()) { return; } - if (!evtList.isEmpty()) { - for (RequestEvent evt : evtList) { - try { - FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); - String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); + try { + FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); + String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); - Element rootElement = getRootElement(evt); - if (rootElement == null) { - logger.error("澶勭悊alarm璁惧鎶ヨNotify鏃舵湭鑾峰彇鍒版秷鎭綋{}", evt.getRequest()); - return; - } - Element deviceIdElement = rootElement.element("DeviceID"); - String channelId = deviceIdElement.getText().toString(); - - Device device = redisCatchStorage.getDevice(deviceId); - if (device == null) { - logger.warn("[ NotifyAlarm ] 鏈壘鍒拌澶囷細{}", deviceId); - return; - } - rootElement = getRootElement(evt, device.getCharset()); - if (rootElement == null) { - logger.warn("[ NotifyAlarm ] content cannot be null, {}", evt.getRequest()); - return; - } - DeviceAlarm deviceAlarm = new DeviceAlarm(); - deviceAlarm.setDeviceId(deviceId); - deviceAlarm.setAlarmPriority(XmlUtil.getText(rootElement, "AlarmPriority")); - deviceAlarm.setAlarmMethod(XmlUtil.getText(rootElement, "AlarmMethod")); - String alarmTime = XmlUtil.getText(rootElement, "AlarmTime"); - if (alarmTime == null) { - logger.warn("[ NotifyAlarm ] AlarmTime cannot be null"); - return; - } - deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime)); - if (XmlUtil.getText(rootElement, "AlarmDescription") == null) { - deviceAlarm.setAlarmDescription(""); - } else { - deviceAlarm.setAlarmDescription(XmlUtil.getText(rootElement, "AlarmDescription")); - } - if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Longitude"))) { - deviceAlarm.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude"))); - } else { - deviceAlarm.setLongitude(0.00); - } - if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Latitude"))) { - deviceAlarm.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude"))); - } else { - deviceAlarm.setLatitude(0.00); - } - logger.info("[鏀跺埌Notify-Alarm]锛歿}/{}", device.getDeviceId(), deviceAlarm.getChannelId()); - if ("4".equals(deviceAlarm.getAlarmMethod())) { - MobilePosition mobilePosition = new MobilePosition(); - mobilePosition.setChannelId(channelId); - mobilePosition.setCreateTime(DateUtil.getNow()); - mobilePosition.setDeviceId(deviceAlarm.getDeviceId()); - mobilePosition.setTime(deviceAlarm.getAlarmTime()); - mobilePosition.setLongitude(deviceAlarm.getLongitude()); - mobilePosition.setLatitude(deviceAlarm.getLatitude()); - mobilePosition.setReportSource("GPS Alarm"); - - // 鏇存柊device channel 鐨勭粡绾害 - DeviceChannel deviceChannel = new DeviceChannel(); - deviceChannel.setDeviceId(device.getDeviceId()); - deviceChannel.setChannelId(channelId); - deviceChannel.setLongitude(mobilePosition.getLongitude()); - deviceChannel.setLatitude(mobilePosition.getLatitude()); - deviceChannel.setGpsTime(mobilePosition.getTime()); - - deviceChannel = deviceChannelService.updateGps(deviceChannel, device); - - mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84()); - mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84()); - mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); - mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); - - deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition); - } - - // 鍥炲200 OK - if (redisCatchStorage.deviceIsOnline(deviceId)) { - publisher.deviceAlarmEventPublish(deviceAlarm); - } - } catch (DocumentException e) { - logger.error("鏈鐞嗙殑寮傚父 ", e); - } + Element rootElement = getRootElement(evt); + if (rootElement == null) { + logger.error("澶勭悊alarm璁惧鎶ヨNotify鏃舵湭鑾峰彇鍒版秷鎭綋{}", evt.getRequest()); + return; } + Element deviceIdElement = rootElement.element("DeviceID"); + String channelId = deviceIdElement.getText().toString(); + + Device device = redisCatchStorage.getDevice(deviceId); + if (device == null) { + logger.warn("[ NotifyAlarm ] 鏈壘鍒拌澶囷細{}", deviceId); + return; + } + rootElement = getRootElement(evt, device.getCharset()); + if (rootElement == null) { + logger.warn("[ NotifyAlarm ] content cannot be null, {}", evt.getRequest()); + return; + } + DeviceAlarm deviceAlarm = new DeviceAlarm(); + deviceAlarm.setDeviceId(deviceId); + deviceAlarm.setAlarmPriority(XmlUtil.getText(rootElement, "AlarmPriority")); + deviceAlarm.setAlarmMethod(XmlUtil.getText(rootElement, "AlarmMethod")); + String alarmTime = XmlUtil.getText(rootElement, "AlarmTime"); + if (alarmTime == null) { + logger.warn("[ NotifyAlarm ] AlarmTime cannot be null"); + return; + } + deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime)); + if (XmlUtil.getText(rootElement, "AlarmDescription") == null) { + deviceAlarm.setAlarmDescription(""); + } else { + deviceAlarm.setAlarmDescription(XmlUtil.getText(rootElement, "AlarmDescription")); + } + if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Longitude"))) { + deviceAlarm.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude"))); + } else { + deviceAlarm.setLongitude(0.00); + } + if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Latitude"))) { + deviceAlarm.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude"))); + } else { + deviceAlarm.setLatitude(0.00); + } + logger.info("[鏀跺埌Notify-Alarm]锛歿}/{}", device.getDeviceId(), deviceAlarm.getChannelId()); + if ("4".equals(deviceAlarm.getAlarmMethod())) { + MobilePosition mobilePosition = new MobilePosition(); + mobilePosition.setChannelId(channelId); + mobilePosition.setCreateTime(DateUtil.getNow()); + mobilePosition.setDeviceId(deviceAlarm.getDeviceId()); + mobilePosition.setTime(deviceAlarm.getAlarmTime()); + mobilePosition.setLongitude(deviceAlarm.getLongitude()); + mobilePosition.setLatitude(deviceAlarm.getLatitude()); + mobilePosition.setReportSource("GPS Alarm"); + + // 鏇存柊device channel 鐨勭粡绾害 + DeviceChannel deviceChannel = new DeviceChannel(); + deviceChannel.setDeviceId(device.getDeviceId()); + deviceChannel.setChannelId(channelId); + deviceChannel.setLongitude(mobilePosition.getLongitude()); + deviceChannel.setLatitude(mobilePosition.getLatitude()); + deviceChannel.setGpsTime(mobilePosition.getTime()); + + deviceChannel = deviceChannelService.updateGps(deviceChannel, device); + + mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84()); + mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84()); + mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); + mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); + + deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition); + } + + // 鍥炲200 OK + if (redisCatchStorage.deviceIsOnline(deviceId)) { + publisher.deviceAlarmEventPublish(deviceAlarm); + } + } catch (DocumentException e) { + logger.error("鏈鐞嗙殑寮傚父 ", e); } } - public void setCmder(SIPCommander cmder) { - } - public void setStorager(IVideoManagerStorage storager) { - this.storager = storager; - } - - public void setPublisher(EventPublisher publisher) { - this.publisher = publisher; - } - - public void setRedis(RedisUtil redis) { - } - - public void setDeferredResultHolder(DeferredResultHolder deferredResultHolder) { - } - - public IRedisCatchStorage getRedisCatchStorage() { - return redisCatchStorage; - } - - public void setRedisCatchStorage(IRedisCatchStorage redisCatchStorage) { - this.redisCatchStorage = redisCatchStorage; - } - - @Scheduled(fixedRate = 10000) //姣�1绉掓墽琛屼竴娆� - public void execute(){ - logger.info("[寰呭鐞哊otify娑堟伅鏁伴噺]: {}", taskQueue.size()); - } } -- Gitblit v1.8.0