From 764d04b497356ba6bcbb75fd42b51eca750f7223 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期三, 29 五月 2024 15:02:51 +0800 Subject: [PATCH] 调整上级观看消息的发送 --- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java | 309 +++------------------------------------------------ 1 files changed, 20 insertions(+), 289 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java old mode 100644 new mode 100755 index 7366f30..8da07a1 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -1,14 +1,9 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; -import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.conf.SipConfig; -import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; -import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; -import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; @@ -16,9 +11,7 @@ import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; import com.genersoft.iot.vmp.service.IDeviceChannelService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.DateUtil; -import com.genersoft.iot.vmp.utils.redis.RedisUtil; import gov.nist.javax.sip.message.SIPRequest; import org.dom4j.DocumentException; import org.dom4j.Element; @@ -26,10 +19,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; -import org.springframework.util.ObjectUtils; import javax.sip.InvalidArgumentException; import javax.sip.RequestEvent; @@ -37,9 +27,6 @@ import javax.sip.header.FromHeader; import javax.sip.message.Response; import java.text.ParseException; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; /** * SIP鍛戒护绫诲瀷锛� NOTIFY璇锋眰,杩欐槸浣滀负涓婄骇鍙戦�佽闃呰姹傚悗锛岃澶囨墠浼氬搷搴旂殑 @@ -49,15 +36,6 @@ private final static Logger logger = LoggerFactory.getLogger(NotifyRequestProcessor.class); - - @Autowired - private UserSetting userSetting; - - @Autowired - private IVideoManagerStorage storager; - - @Autowired - private EventPublisher eventPublisher; @Autowired private SipConfig sipConfig; @@ -76,11 +54,11 @@ @Autowired private IDeviceChannelService deviceChannelService; - private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>(); - - @Qualifier("taskExecutor") @Autowired - private ThreadPoolTaskExecutor taskExecutor; + private NotifyRequestForCatalogProcessor notifyRequestForCatalogProcessor; + + @Autowired + private NotifyRequestForMobilePositionProcessor notifyRequestForMobilePositionProcessor; @Override public void afterPropertiesSet() throws Exception { @@ -92,147 +70,32 @@ public void process(RequestEvent evt) { try { responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null); - }catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("鏈鐞嗙殑寮傚父 ", e); - } - boolean runed = !taskQueue.isEmpty(); - taskQueue.offer(new HandlerCatchData(evt, null, null)); - if (!runed) { - taskExecutor.execute(()-> { - while (!taskQueue.isEmpty()) { - try { - HandlerCatchData take = taskQueue.poll(); - Element rootElement = getRootElement(take.getEvt()); - if (rootElement == null) { - logger.error("澶勭悊NOTIFY娑堟伅鏃舵湭鑾峰彇鍒版秷鎭綋,{}", take.getEvt().getRequest()); - continue; - } - String cmd = XmlUtil.getText(rootElement, "CmdType"); - - if (CmdType.CATALOG.equals(cmd)) { - logger.info("鎺ユ敹鍒癈atalog閫氱煡"); - processNotifyCatalogList(take.getEvt()); - } else if (CmdType.ALARM.equals(cmd)) { - logger.info("鎺ユ敹鍒癆larm閫氱煡"); - processNotifyAlarm(take.getEvt()); - } else if (CmdType.MOBILE_POSITION.equals(cmd)) { - logger.info("鎺ユ敹鍒癕obilePosition閫氱煡"); - processNotifyMobilePosition(take.getEvt()); - } else { - logger.info("鎺ユ敹鍒版秷鎭細" + cmd); - } - } catch (DocumentException e) { - logger.error("澶勭悊NOTIFY娑堟伅鏃堕敊璇�", e); - } - } - }); - } - } - - /** - * 澶勭悊MobilePosition绉诲姩浣嶇疆Notify - * - * @param evt - */ - private void processNotifyMobilePosition(RequestEvent evt) { - try { - FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); - String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); - - // 鍥炲 200 OK Element rootElement = getRootElement(evt); if (rootElement == null) { - logger.error("澶勭悊MobilePosition绉诲姩浣嶇疆Notify鏃舵湭鑾峰彇鍒版秷鎭綋,{}", evt.getRequest()); + logger.error("澶勭悊NOTIFY娑堟伅鏃舵湭鑾峰彇鍒版秷鎭綋,{}", evt.getRequest()); + responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null); return; } + String cmd = XmlUtil.getText(rootElement, "CmdType"); - MobilePosition mobilePosition = new MobilePosition(); - mobilePosition.setCreateTime(DateUtil.getNow()); - Element deviceIdElement = rootElement.element("DeviceID"); - String channelId = deviceIdElement.getTextTrim().toString(); - Device device = redisCatchStorage.getDevice(deviceId); - - if (device == null) { - // 鏍规嵁閫氶亾id鏌ヨ璁惧Id - List<Device> deviceList = deviceChannelService.getDeviceByChannelId(channelId); - if (deviceList.size() > 0) { - device = deviceList.get(0); - }else { - logger.warn("[mobilePosition绉诲姩浣嶇疆Notify] 鏈壘鍒伴�氶亾{}鎵�灞炵殑璁惧", channelId); - return; - } - } - if (device != null) { - if (!ObjectUtils.isEmpty(device.getName())) { - mobilePosition.setDeviceName(device.getName()); - } - } - mobilePosition.setDeviceId(XmlUtil.getText(rootElement, "DeviceID")); - mobilePosition.setChannelId(channelId); - String time = XmlUtil.getText(rootElement, "Time"); - mobilePosition.setTime(time); - mobilePosition.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude"))); - mobilePosition.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude"))); - if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Speed"))) { - mobilePosition.setSpeed(Double.parseDouble(XmlUtil.getText(rootElement, "Speed"))); + if (CmdType.CATALOG.equals(cmd)) { + notifyRequestForCatalogProcessor.process(evt); + } else if (CmdType.ALARM.equals(cmd)) { + processNotifyAlarm(evt); + } else if (CmdType.MOBILE_POSITION.equals(cmd)) { + notifyRequestForMobilePositionProcessor.process(evt); } else { - mobilePosition.setSpeed(0.0); + logger.info("鎺ユ敹鍒版秷鎭細" + cmd); } - if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Direction"))) { - mobilePosition.setDirection(Double.parseDouble(XmlUtil.getText(rootElement, "Direction"))); - } else { - mobilePosition.setDirection(0.0); - } - if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Altitude"))) { - mobilePosition.setAltitude(Double.parseDouble(XmlUtil.getText(rootElement, "Altitude"))); - } else { - mobilePosition.setAltitude(0.0); - } - logger.info("[鏀跺埌绉诲姩浣嶇疆璁㈤槄閫氱煡]锛歿}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(), - mobilePosition.getLongitude(), mobilePosition.getLatitude()); - mobilePosition.setReportSource("Mobile Position"); - - - // 鏇存柊device channel 鐨勭粡绾害 - DeviceChannel deviceChannel = new DeviceChannel(); - deviceChannel.setDeviceId(device.getDeviceId()); - deviceChannel.setChannelId(channelId); - deviceChannel.setLongitude(mobilePosition.getLongitude()); - deviceChannel.setLatitude(mobilePosition.getLatitude()); - deviceChannel.setGpsTime(mobilePosition.getTime()); - deviceChannel = deviceChannelService.updateGps(deviceChannel, device); - - mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84()); - mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84()); - mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); - mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); - - if (userSetting.getSavePositionHistory()) { - storager.insertMobilePosition(mobilePosition); - } - - storager.updateChannelPosition(deviceChannel); - - // 鍙戦�乺edis娑堟伅銆� 閫氱煡浣嶇疆淇℃伅鐨勫彉鍖� - JSONObject jsonObject = new JSONObject(); - jsonObject.put("time", time); - jsonObject.put("serial", deviceId); - jsonObject.put("code", channelId); - jsonObject.put("longitude", mobilePosition.getLongitude()); - jsonObject.put("latitude", mobilePosition.getLatitude()); - jsonObject.put("altitude", mobilePosition.getAltitude()); - jsonObject.put("direction", mobilePosition.getDirection()); - jsonObject.put("speed", mobilePosition.getSpeed()); - redisCatchStorage.sendMobilePositionMsg(jsonObject); - } catch (DocumentException e) { + } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("鏈鐞嗙殑寮傚父 ", e); + } catch (DocumentException e) { + throw new RuntimeException(e); } - } + } /*** * 澶勭悊alarm璁惧鎶ヨNotify - * - * @param evt */ private void processNotifyAlarm(RequestEvent evt) { if (!sipConfig.isAlarm()) { @@ -288,6 +151,7 @@ logger.info("[鏀跺埌Notify-Alarm]锛歿}/{}", device.getDeviceId(), deviceAlarm.getChannelId()); if ("4".equals(deviceAlarm.getAlarmMethod())) { MobilePosition mobilePosition = new MobilePosition(); + mobilePosition.setChannelId(channelId); mobilePosition.setCreateTime(DateUtil.getNow()); mobilePosition.setDeviceId(deviceAlarm.getDeviceId()); mobilePosition.setTime(deviceAlarm.getAlarmTime()); @@ -310,25 +174,8 @@ mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); - if (userSetting.getSavePositionHistory()) { - storager.insertMobilePosition(mobilePosition); - } - - storager.updateChannelPosition(deviceChannel); - // 鍙戦�乺edis娑堟伅銆� 閫氱煡浣嶇疆淇℃伅鐨勫彉鍖� - JSONObject jsonObject = new JSONObject(); - jsonObject.put("time", mobilePosition.getTime()); - jsonObject.put("serial", deviceChannel.getDeviceId()); - jsonObject.put("code", deviceChannel.getChannelId()); - jsonObject.put("longitude", mobilePosition.getLongitude()); - jsonObject.put("latitude", mobilePosition.getLatitude()); - jsonObject.put("altitude", mobilePosition.getAltitude()); - jsonObject.put("direction", mobilePosition.getDirection()); - jsonObject.put("speed", mobilePosition.getSpeed()); - redisCatchStorage.sendMobilePositionMsg(jsonObject); - + deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition); } - // TODO: 闇�瑕佸疄鐜板瓨鍌ㄦ姤璀︿俊鎭�佹姤璀﹀垎绫� // 鍥炲200 OK if (redisCatchStorage.deviceIsOnline(deviceId)) { @@ -339,121 +186,5 @@ } } - /*** - * 澶勭悊catalog璁惧鐩綍鍒楄〃Notify - * - * @param evt - */ - private void processNotifyCatalogList(RequestEvent evt) { - try { - FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); - String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); - Device device = redisCatchStorage.getDevice(deviceId); - if (device == null || device.getOnline() == 0) { - logger.warn("[鏀跺埌鐩綍璁㈤槄]锛歿}, 浣嗘槸璁惧宸茬粡绂荤嚎", (device != null ? device.getDeviceId():"" )); - return; - } - Element rootElement = getRootElement(evt, device.getCharset()); - if (rootElement == null) { - logger.warn("[ 鏀跺埌鐩綍璁㈤槄 ] content cannot be null, {}", evt.getRequest()); - return; - } - Element deviceListElement = rootElement.element("DeviceList"); - if (deviceListElement == null) { - return; - } - Iterator<Element> deviceListIterator = deviceListElement.elementIterator(); - if (deviceListIterator != null) { - - // 閬嶅巻DeviceList - while (deviceListIterator.hasNext()) { - Element itemDevice = deviceListIterator.next(); - Element channelDeviceElement = itemDevice.element("DeviceID"); - if (channelDeviceElement == null) { - continue; - } - Element eventElement = itemDevice.element("Event"); - String event; - if (eventElement == null) { - logger.warn("[鏀跺埌鐩綍璁㈤槄]锛歿}, 浣嗘槸Event涓虹┖, 璁句负榛樿鍊� ADD", (device != null ? device.getDeviceId():"" )); - event = CatalogEvent.ADD; - }else { - event = eventElement.getText().toUpperCase(); - } - DeviceChannel channel = XmlUtil.channelContentHander(itemDevice, device, event); - channel.setDeviceId(device.getDeviceId()); - logger.info("[鏀跺埌鐩綍璁㈤槄]锛歿}/{}", device.getDeviceId(), channel.getChannelId()); - switch (event) { - case CatalogEvent.ON: - // 涓婄嚎 - logger.info("[鏀跺埌閫氶亾涓婄嚎閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - storager.deviceChannelOnline(deviceId, channel.getChannelId()); - break; - case CatalogEvent.OFF : - // 绂荤嚎 - logger.info("[鏀跺埌閫氶亾绂荤嚎閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - storager.deviceChannelOffline(deviceId, channel.getChannelId()); - break; - case CatalogEvent.VLOST: - // 瑙嗛涓㈠け - logger.info("[鏀跺埌閫氶亾瑙嗛涓㈠け閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - storager.deviceChannelOffline(deviceId, channel.getChannelId()); - break; - case CatalogEvent.DEFECT: - // 鏁呴殰 - break; - case CatalogEvent.ADD: - // 澧炲姞 - logger.info("[鏀跺埌澧炲姞閫氶亾閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - deviceChannelService.updateChannel(deviceId, channel); - break; - case CatalogEvent.DEL: - // 鍒犻櫎 - logger.info("[鏀跺埌鍒犻櫎閫氶亾閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - storager.delChannel(deviceId, channel.getChannelId()); - break; - case CatalogEvent.UPDATE: - // 鏇存柊 - logger.info("[鏀跺埌鏇存柊閫氶亾閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - deviceChannelService.updateChannel(deviceId, channel); - break; - default: - logger.warn("[ NotifyCatalog ] event not found 锛� {}", event ); - - } - // 杞彂鍙樺寲淇℃伅 - eventPublisher.catalogEventPublish(null, channel, event); - - } - } - } catch (DocumentException e) { - logger.error("鏈鐞嗙殑寮傚父 ", e); - } - } - - public void setCmder(SIPCommander cmder) { - } - - public void setStorager(IVideoManagerStorage storager) { - this.storager = storager; - } - - public void setPublisher(EventPublisher publisher) { - this.publisher = publisher; - } - - public void setRedis(RedisUtil redis) { - } - - public void setDeferredResultHolder(DeferredResultHolder deferredResultHolder) { - } - - public IRedisCatchStorage getRedisCatchStorage() { - return redisCatchStorage; - } - - public void setRedisCatchStorage(IRedisCatchStorage redisCatchStorage) { - this.redisCatchStorage = redisCatchStorage; - } } -- Gitblit v1.8.0