From 3496ca2378dccfbe37d2311e9d85e7c7b60726b0 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期二, 02 八月 2022 15:41:07 +0800 Subject: [PATCH] Merge pull request #353 from mrjackwang/wvp-28181-2.0 --- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java | 280 ++++++++++++++++++++++++++++++++++--------------------- 1 files changed, 172 insertions(+), 108 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java index c339598..4ce30a2 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -1,10 +1,9 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; -import com.genersoft.iot.vmp.common.VideoManagerConstants; +import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.conf.SipConfig; -import com.genersoft.iot.vmp.conf.UserSetup; +import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; -import com.genersoft.iot.vmp.gb28181.event.DeviceOffLineDetector; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; @@ -12,12 +11,14 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; +import com.genersoft.iot.vmp.gb28181.utils.Coordtransform; import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; +import com.genersoft.iot.vmp.service.IDeviceChannelService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import com.genersoft.iot.vmp.storager.IVideoManagerStorager; -import com.genersoft.iot.vmp.utils.GpsUtil; +import com.genersoft.iot.vmp.storager.IVideoManagerStorage; +import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.utils.redis.RedisUtil; import org.dom4j.DocumentException; import org.dom4j.Element; @@ -25,6 +26,8 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; @@ -35,6 +38,7 @@ import javax.sip.message.Response; import java.text.ParseException; import java.util.Iterator; +import java.util.concurrent.ConcurrentLinkedQueue; /** * SIP鍛戒护绫诲瀷锛� NOTIFY璇锋眰 @@ -46,10 +50,10 @@ private final static Logger logger = LoggerFactory.getLogger(NotifyRequestProcessor.class); @Autowired - private UserSetup userSetup; + private UserSetting userSetting; @Autowired - private IVideoManagerStorager storager; + private IVideoManagerStorage storager; @Autowired private EventPublisher eventPublisher; @@ -63,14 +67,21 @@ @Autowired private EventPublisher publisher; - @Autowired - private DeviceOffLineDetector offLineDetector; - - - private String method = "NOTIFY"; + private final String method = "NOTIFY"; @Autowired private SIPProcessorObserver sipProcessorObserver; + + @Autowired + private IDeviceChannelService deviceChannelService; + + private boolean taskQueueHandlerRun = false; + + private final ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>(); + + @Qualifier("taskExecutor") + @Autowired + private ThreadPoolTaskExecutor taskExecutor; @Override public void afterPropertiesSet() throws Exception { @@ -81,23 +92,37 @@ @Override public void process(RequestEvent evt) { try { - Element rootElement = getRootElement(evt); - String cmd = XmlUtil.getText(rootElement, "CmdType"); + taskQueue.offer(new HandlerCatchData(evt, null, null)); + responseAck(evt, Response.OK); + if (!taskQueueHandlerRun) { + taskQueueHandlerRun = true; + taskExecutor.execute(()-> { + while (!taskQueue.isEmpty()) { + try { + HandlerCatchData take = taskQueue.poll(); + Element rootElement = getRootElement(take.getEvt()); + String cmd = XmlUtil.getText(rootElement, "CmdType"); - if (CmdType.CATALOG.equals(cmd)) { - logger.info("鎺ユ敹鍒癈atalog閫氱煡"); - processNotifyCatalogList(evt); - } else if (CmdType.ALARM.equals(cmd)) { - logger.info("鎺ユ敹鍒癆larm閫氱煡"); - processNotifyAlarm(evt); - } else if (CmdType.MOBILE_POSITION.equals(cmd)) { - logger.info("鎺ユ敹鍒癕obilePosition閫氱煡"); - processNotifyMobilePosition(evt); - } else { - logger.info("鎺ユ敹鍒版秷鎭細" + cmd); - responseAck(evt, Response.OK); + if (CmdType.CATALOG.equals(cmd)) { + logger.info("鎺ユ敹鍒癈atalog閫氱煡"); + processNotifyCatalogList(take.getEvt()); + } else if (CmdType.ALARM.equals(cmd)) { + logger.info("鎺ユ敹鍒癆larm閫氱煡"); + processNotifyAlarm(take.getEvt()); + } else if (CmdType.MOBILE_POSITION.equals(cmd)) { + logger.info("鎺ユ敹鍒癕obilePosition閫氱煡"); + processNotifyMobilePosition(take.getEvt()); + } else { + logger.info("鎺ユ敹鍒版秷鎭細" + cmd); + } + } catch (DocumentException e) { + throw new RuntimeException(e); + } + } + taskQueueHandlerRun = false; + }); } - } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) { + } catch (SipException | InvalidArgumentException | ParseException e) { e.printStackTrace(); } } @@ -109,11 +134,16 @@ */ private void processNotifyMobilePosition(RequestEvent evt) { try { + FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); + String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); + // 鍥炲 200 OK Element rootElement = getRootElement(evt); + MobilePosition mobilePosition = new MobilePosition(); + mobilePosition.setCreateTime(DateUtil.getNow()); Element deviceIdElement = rootElement.element("DeviceID"); - String deviceId = deviceIdElement.getTextTrim().toString(); + String channelId = deviceIdElement.getTextTrim().toString(); Device device = redisCatchStorage.getDevice(deviceId); if (device != null) { if (!StringUtils.isEmpty(device.getName())) { @@ -121,7 +151,9 @@ } } mobilePosition.setDeviceId(XmlUtil.getText(rootElement, "DeviceID")); - mobilePosition.setTime(XmlUtil.getText(rootElement, "Time")); + mobilePosition.setChannelId(channelId); + String time = XmlUtil.getText(rootElement, "Time"); + mobilePosition.setTime(time); mobilePosition.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude"))); mobilePosition.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude"))); if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Speed"))) { @@ -139,19 +171,42 @@ } else { mobilePosition.setAltitude(0.0); } + logger.info("[鏀跺埌绉诲姩浣嶇疆璁㈤槄閫氱煡]锛歿}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(), + mobilePosition.getLongitude(), mobilePosition.getLatitude()); mobilePosition.setReportSource("Mobile Position"); - BaiduPoint bp = new BaiduPoint(); - bp = GpsUtil.Wgs84ToBd09(String.valueOf(mobilePosition.getLongitude()), String.valueOf(mobilePosition.getLatitude())); - logger.info("鐧惧害鍧愭爣锛�" + bp.getBdLng() + ", " + bp.getBdLat()); - mobilePosition.setGeodeticSystem("BD-09"); - mobilePosition.setCnLng(bp.getBdLng()); - mobilePosition.setCnLat(bp.getBdLat()); - if (!userSetup.getSavePositionHistory()) { - storager.clearMobilePositionsByDeviceId(deviceId); + + + // 鏇存柊device channel 鐨勭粡绾害 + DeviceChannel deviceChannel = new DeviceChannel(); + deviceChannel.setDeviceId(device.getDeviceId()); + deviceChannel.setChannelId(channelId); + deviceChannel.setLongitude(mobilePosition.getLongitude()); + deviceChannel.setLatitude(mobilePosition.getLatitude()); + deviceChannel.setGpsTime(mobilePosition.getTime()); + deviceChannel = deviceChannelService.updateGps(deviceChannel, device); + + mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84()); + mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84()); + mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); + mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); + + if (userSetting.getSavePositionHistory()) { + storager.insertMobilePosition(mobilePosition); } - storager.insertMobilePosition(mobilePosition); - responseAck(evt, Response.OK); - } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) { + + storager.updateChannelPosition(deviceChannel); + // 鍙戦�乺edis娑堟伅銆� 閫氱煡浣嶇疆淇℃伅鐨勫彉鍖� + JSONObject jsonObject = new JSONObject(); + jsonObject.put("time", time); + jsonObject.put("serial", deviceId); + jsonObject.put("code", channelId); + jsonObject.put("longitude", mobilePosition.getLongitude()); + jsonObject.put("latitude", mobilePosition.getLatitude()); + jsonObject.put("altitude", mobilePosition.getAltitude()); + jsonObject.put("direction", mobilePosition.getDirection()); + jsonObject.put("speed", mobilePosition.getSpeed()); + redisCatchStorage.sendMobilePositionMsg(jsonObject); + } catch (DocumentException e) { e.printStackTrace(); } } @@ -166,12 +221,16 @@ return; } try { + FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); + String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); + Element rootElement = getRootElement(evt); Element deviceIdElement = rootElement.element("DeviceID"); - String deviceId = deviceIdElement.getText().toString(); + String channelId = deviceIdElement.getText().toString(); Device device = redisCatchStorage.getDevice(deviceId); if (device == null) { + logger.warn("[ NotifyAlarm ] 鏈壘鍒拌澶囷細{}", deviceId); return; } rootElement = getRootElement(evt, device.getCharset()); @@ -179,7 +238,12 @@ deviceAlarm.setDeviceId(deviceId); deviceAlarm.setAlarmPriority(XmlUtil.getText(rootElement, "AlarmPriority")); deviceAlarm.setAlarmMethod(XmlUtil.getText(rootElement, "AlarmMethod")); - deviceAlarm.setAlarmTime(XmlUtil.getText(rootElement, "AlarmTime")); + String alarmTime = XmlUtil.getText(rootElement, "AlarmTime"); + if (alarmTime == null) { + logger.warn("[ NotifyAlarm ] AlarmTime cannot be null"); + return; + } + deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime)); if (XmlUtil.getText(rootElement, "AlarmDescription") == null) { deviceAlarm.setAlarmDescription(""); } else { @@ -195,33 +259,46 @@ } else { deviceAlarm.setLatitude(0.00); } - - if (deviceAlarm.getAlarmMethod().equals("4")) { + logger.info("[鏀跺埌Notify-Alarm]锛歿}/{}", device.getDeviceId(), deviceAlarm.getChannelId()); + if ("4".equals(deviceAlarm.getAlarmMethod())) { MobilePosition mobilePosition = new MobilePosition(); + mobilePosition.setCreateTime(DateUtil.getNow()); mobilePosition.setDeviceId(deviceAlarm.getDeviceId()); mobilePosition.setTime(deviceAlarm.getAlarmTime()); mobilePosition.setLongitude(deviceAlarm.getLongitude()); mobilePosition.setLatitude(deviceAlarm.getLatitude()); mobilePosition.setReportSource("GPS Alarm"); - BaiduPoint bp = new BaiduPoint(); - bp = GpsUtil.Wgs84ToBd09(String.valueOf(mobilePosition.getLongitude()), String.valueOf(mobilePosition.getLatitude())); - logger.info("鐧惧害鍧愭爣锛�" + bp.getBdLng() + ", " + bp.getBdLat()); - mobilePosition.setGeodeticSystem("BD-09"); - mobilePosition.setCnLng(bp.getBdLng()); - mobilePosition.setCnLat(bp.getBdLat()); - if (!userSetup.getSavePositionHistory()) { - storager.clearMobilePositionsByDeviceId(deviceId); + + + + // 鏇存柊device channel 鐨勭粡绾害 + DeviceChannel deviceChannel = new DeviceChannel(); + deviceChannel.setDeviceId(device.getDeviceId()); + deviceChannel.setChannelId(channelId); + deviceChannel.setLongitude(mobilePosition.getLongitude()); + deviceChannel.setLatitude(mobilePosition.getLatitude()); + deviceChannel.setGpsTime(mobilePosition.getTime()); + + deviceChannel = deviceChannelService.updateGps(deviceChannel, device); + + mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84()); + mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84()); + mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); + mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); + + if (userSetting.getSavePositionHistory()) { + storager.insertMobilePosition(mobilePosition); } - storager.insertMobilePosition(mobilePosition); + + storager.updateChannelPosition(deviceChannel); } // TODO: 闇�瑕佸疄鐜板瓨鍌ㄦ姤璀︿俊鎭�佹姤璀﹀垎绫� // 鍥炲200 OK - responseAck(evt, Response.OK); - if (offLineDetector.isOnline(deviceId)) { + if (redisCatchStorage.deviceIsOnline(deviceId)) { publisher.deviceAlarmEventPublish(deviceAlarm); } - } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) { + } catch (DocumentException e) { e.printStackTrace(); } } @@ -236,14 +313,12 @@ FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); - Element rootElement = getRootElement(evt); Device device = redisCatchStorage.getDevice(deviceId); - if (device == null) { + if (device == null || device.getOnline() == 0) { + logger.warn("[鏀跺埌鐩綍璁㈤槄]锛歿}, 浣嗘槸璁惧宸茬粡绂荤嚎", (device != null ? device.getDeviceId():"" )); return; } - if (device != null ) { - rootElement = getRootElement(evt, device.getCharset()); - } + Element rootElement = getRootElement(evt, device.getCharset()); Element deviceListElement = rootElement.element("DeviceList"); if (deviceListElement == null) { return; @@ -259,67 +334,60 @@ continue; } Element eventElement = itemDevice.element("Event"); - DeviceChannel channel = XmlUtil.channelContentHander(itemDevice); + String event; + if (eventElement == null) { + logger.warn("[鏀跺埌鐩綍璁㈤槄]锛歿}, 浣嗘槸Event涓虹┖, 璁句负榛樿鍊� ADD", (device != null ? device.getDeviceId():"" )); + event = CatalogEvent.ADD; + }else { + event = eventElement.getText().toUpperCase(); + } + DeviceChannel channel = XmlUtil.channelContentHander(itemDevice, device, event); channel.setDeviceId(device.getDeviceId()); - logger.debug("鏀跺埌鏉ヨ嚜璁惧銆恵}銆戠殑閫氶亾: {}銆恵}銆�", device.getDeviceId(), channel.getName(), channel.getChannelId()); - switch (eventElement.getText().toUpperCase()) { - case CatalogEvent.ON: // 涓婄嚎 - logger.info("鏀跺埌鏉ヨ嚜璁惧銆恵}銆戠殑閫氶亾銆恵}銆戜笂绾块�氱煡", device.getDeviceId(), channel.getChannelId()); + logger.info("[鏀跺埌鐩綍璁㈤槄]锛歿}/{}", device.getDeviceId(), channel.getChannelId()); + switch (event) { + case CatalogEvent.ON: + // 涓婄嚎 + logger.info("[鏀跺埌閫氶亾涓婄嚎閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); storager.deviceChannelOnline(deviceId, channel.getChannelId()); - // 鍥炲200 OK - responseAck(evt, Response.OK); break; - case CatalogEvent.OFF : // 绂荤嚎 - logger.info("鏀跺埌鏉ヨ嚜璁惧銆恵}銆戠殑閫氶亾銆恵}銆戠绾块�氱煡", device.getDeviceId(), channel.getChannelId()); + case CatalogEvent.OFF : + // 绂荤嚎 + logger.info("[鏀跺埌閫氶亾绂荤嚎閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); storager.deviceChannelOffline(deviceId, channel.getChannelId()); - // 鍥炲200 OK - responseAck(evt, Response.OK); break; - case CatalogEvent.VLOST: // 瑙嗛涓㈠け - logger.info("鏀跺埌鏉ヨ嚜璁惧銆恵}銆戠殑閫氶亾銆恵}銆戣棰戜涪澶遍�氱煡", device.getDeviceId(), channel.getChannelId()); + case CatalogEvent.VLOST: + // 瑙嗛涓㈠け + logger.info("[鏀跺埌閫氶亾瑙嗛涓㈠け閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); storager.deviceChannelOffline(deviceId, channel.getChannelId()); - // 鍥炲200 OK - responseAck(evt, Response.OK); break; - case CatalogEvent.DEFECT: // 鏁呴殰 - // 鍥炲200 OK - responseAck(evt, Response.OK); + case CatalogEvent.DEFECT: + // 鏁呴殰 break; - case CatalogEvent.ADD: // 澧炲姞 - logger.info("鏀跺埌鏉ヨ嚜璁惧銆恵}銆戠殑澧炲姞閫氶亾銆恵}銆戦�氱煡", device.getDeviceId(), channel.getChannelId()); - storager.updateChannel(deviceId, channel); - responseAck(evt, Response.OK); + case CatalogEvent.ADD: + // 澧炲姞 + logger.info("[鏀跺埌澧炲姞閫氶亾閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); + deviceChannelService.updateChannel(deviceId, channel); break; - case CatalogEvent.DEL: // 鍒犻櫎 - logger.info("鏀跺埌鏉ヨ嚜璁惧銆恵}銆戠殑鍒犻櫎閫氶亾銆恵}銆戦�氱煡", device.getDeviceId(), channel.getChannelId()); + case CatalogEvent.DEL: + // 鍒犻櫎 + logger.info("[鏀跺埌鍒犻櫎閫氶亾閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); storager.delChannel(deviceId, channel.getChannelId()); - responseAck(evt, Response.OK); break; - case CatalogEvent.UPDATE: // 鏇存柊 - logger.info("鏀跺埌鏉ヨ嚜璁惧銆恵}銆戠殑鏇存柊閫氶亾銆恵}銆戦�氱煡", device.getDeviceId(), channel.getChannelId()); - storager.updateChannel(deviceId, channel); - responseAck(evt, Response.OK); + case CatalogEvent.UPDATE: + // 鏇存柊 + logger.info("[鏀跺埌鏇存柊閫氶亾閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); + deviceChannelService.updateChannel(deviceId, channel); break; default: - responseAck(evt, Response.BAD_REQUEST, "event not found"); + logger.warn("[ NotifyCatalog ] event not found 锛� {}", event ); } // 杞彂鍙樺寲淇℃伅 - eventPublisher.catalogEventPublish(null, channel, eventElement.getText().toUpperCase()); + eventPublisher.catalogEventPublish(null, channel, event); - } - - // RequestMessage msg = new RequestMessage(); - // msg.setDeviceId(deviceId); - // msg.setType(DeferredResultHolder.CALLBACK_CMD_CATALOG); - // msg.setData(device); - // deferredResultHolder.invokeResult(msg); - - if (offLineDetector.isOnline(deviceId)) { - publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_MESSAGE); } } - } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) { + } catch (DocumentException e) { e.printStackTrace(); } } @@ -327,7 +395,7 @@ public void setCmder(SIPCommander cmder) { } - public void setStorager(IVideoManagerStorager storager) { + public void setStorager(IVideoManagerStorage storager) { this.storager = storager; } @@ -339,10 +407,6 @@ } public void setDeferredResultHolder(DeferredResultHolder deferredResultHolder) { - } - - public void setOffLineDetector(DeviceOffLineDetector offLineDetector) { - this.offLineDetector = offLineDetector; } public IRedisCatchStorage getRedisCatchStorage() { -- Gitblit v1.8.0