From b15e559eae82db811b31f1e3c47e3be027f20e27 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期一, 08 一月 2024 16:18:28 +0800 Subject: [PATCH] Merge branch '2.6.9' into wvp-28181-2.0 --- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java | 350 +++++++++++++++++++++++++++++++--------------------------- 1 files changed, 188 insertions(+), 162 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java old mode 100644 new mode 100755 index 680be66..d35c6a6 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -1,11 +1,10 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; -import com.alibaba.fastjson.JSONObject; -import com.genersoft.iot.vmp.common.VideoManagerConstants; +import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.conf.CivilCodeFileConf; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; -import com.genersoft.iot.vmp.gb28181.event.DeviceOffLineDetector; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; @@ -13,22 +12,25 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; -import com.genersoft.iot.vmp.gb28181.utils.Coordtransform; import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; +import com.genersoft.iot.vmp.service.IDeviceChannelService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; -import com.genersoft.iot.vmp.utils.GpsUtil; +import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.utils.redis.RedisUtil; +import gov.nist.javax.sip.message.SIPRequest; import org.dom4j.DocumentException; import org.dom4j.Element; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; -import org.springframework.util.StringUtils; +import org.springframework.util.ObjectUtils; import javax.sip.InvalidArgumentException; import javax.sip.RequestEvent; @@ -37,9 +39,11 @@ import javax.sip.message.Response; import java.text.ParseException; import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; /** - * SIP鍛戒护绫诲瀷锛� NOTIFY璇锋眰 + * SIP鍛戒护绫诲瀷锛� NOTIFY璇锋眰,杩欐槸浣滀负涓婄骇鍙戦�佽闃呰姹傚悗锛岃澶囨墠浼氬搷搴旂殑 */ @Component public class NotifyRequestProcessor extends SIPRequestProcessorParent implements InitializingBean, ISIPRequestProcessor { @@ -65,14 +69,27 @@ @Autowired private EventPublisher publisher; - @Autowired - private DeviceOffLineDetector offLineDetector; - - - private String method = "NOTIFY"; + private final String method = "NOTIFY"; @Autowired private SIPProcessorObserver sipProcessorObserver; + + @Autowired + private IDeviceChannelService deviceChannelService; + + @Autowired + private NotifyRequestForCatalogProcessor notifyRequestForCatalogProcessor; + + @Autowired + private CivilCodeFileConf civilCodeFileConf; + + private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>(); + + @Qualifier("taskExecutor") + @Autowired + private ThreadPoolTaskExecutor taskExecutor; + + private int maxQueueCount = 30000; @Override public void afterPropertiesSet() throws Exception { @@ -83,30 +100,59 @@ @Override public void process(RequestEvent evt) { try { - Element rootElement = getRootElement(evt); - String cmd = XmlUtil.getText(rootElement, "CmdType"); - if (CmdType.CATALOG.equals(cmd)) { - logger.info("鎺ユ敹鍒癈atalog閫氱煡"); - processNotifyCatalogList(evt); - } else if (CmdType.ALARM.equals(cmd)) { - logger.info("鎺ユ敹鍒癆larm閫氱煡"); - processNotifyAlarm(evt); - } else if (CmdType.MOBILE_POSITION.equals(cmd)) { - logger.info("鎺ユ敹鍒癕obilePosition閫氱煡"); - processNotifyMobilePosition(evt); - } else { - logger.info("鎺ユ敹鍒版秷鎭細" + cmd); - responseAck(evt, Response.OK); + if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) { + responseAck((SIPRequest) evt.getRequest(), Response.BUSY_HERE, null, null); + logger.error("[notify] 寰呭鐞嗘秷鎭槦鍒楀凡婊� {}锛岃繑鍥�486 BUSY_HERE锛屾秷鎭笉鍋氬鐞�", userSetting.getMaxNotifyCountQueue()); + return; + }else { + responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null); } - } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) { - e.printStackTrace(); + + }catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("鏈鐞嗙殑寮傚父 ", e); + } + boolean runed = !taskQueue.isEmpty(); + logger.info("[notify] 寰呭鐞嗘秷鎭暟閲忥細 {}", taskQueue.size()); + taskQueue.offer(new HandlerCatchData(evt, null, null)); + if (!runed) { + taskExecutor.execute(()-> { + while (!taskQueue.isEmpty()) { + try { + HandlerCatchData take = taskQueue.poll(); + if (take == null) { + continue; + } + Element rootElement = getRootElement(take.getEvt()); + if (rootElement == null) { + logger.error("澶勭悊NOTIFY娑堟伅鏃舵湭鑾峰彇鍒版秷鎭綋,{}", take.getEvt().getRequest()); + continue; + } + String cmd = XmlUtil.getText(rootElement, "CmdType"); + + if (CmdType.CATALOG.equals(cmd)) { + logger.info("鎺ユ敹鍒癈atalog閫氱煡"); + notifyRequestForCatalogProcessor.process(take.getEvt()); + } else if (CmdType.ALARM.equals(cmd)) { + logger.info("鎺ユ敹鍒癆larm閫氱煡"); + processNotifyAlarm(take.getEvt()); + } else if (CmdType.MOBILE_POSITION.equals(cmd)) { + logger.info("鎺ユ敹鍒癕obilePosition閫氱煡"); + processNotifyMobilePosition(take.getEvt()); + } else { + logger.info("鎺ユ敹鍒版秷鎭細" + cmd); + } + } catch (DocumentException e) { + logger.error("澶勭悊NOTIFY娑堟伅鏃堕敊璇�", e); + } + } + }); } } /** * 澶勭悊MobilePosition绉诲姩浣嶇疆Notify - * + * * @param evt */ private void processNotifyMobilePosition(RequestEvent evt) { @@ -116,20 +162,45 @@ // 鍥炲 200 OK Element rootElement = getRootElement(evt); + if (rootElement == null) { + logger.error("澶勭悊MobilePosition绉诲姩浣嶇疆Notify鏃舵湭鑾峰彇鍒版秷鎭綋,{}", evt.getRequest()); + return; + } MobilePosition mobilePosition = new MobilePosition(); + mobilePosition.setCreateTime(DateUtil.getNow()); + Element deviceIdElement = rootElement.element("DeviceID"); String channelId = deviceIdElement.getTextTrim().toString(); Device device = redisCatchStorage.getDevice(deviceId); - if (device != null) { - if (!StringUtils.isEmpty(device.getName())) { - mobilePosition.setDeviceName(device.getName()); + + if (device == null) { + device = redisCatchStorage.getDevice(channelId); + if (device == null) { + // 鏍规嵁閫氶亾id鏌ヨ璁惧Id + List<Device> deviceList = deviceChannelService.getDeviceByChannelId(channelId); + if (deviceList.size() > 0) { + device = deviceList.get(0); + } } } - mobilePosition.setDeviceId(XmlUtil.getText(rootElement, "DeviceID")); + if (device == null) { + logger.warn("[mobilePosition绉诲姩浣嶇疆Notify] 鏈壘鍒伴�氶亾{}鎵�灞炵殑璁惧", channelId); + return; + } + if (!ObjectUtils.isEmpty(device.getName())) { + mobilePosition.setDeviceName(device.getName()); + } + + mobilePosition.setDeviceId(device.getDeviceId()); mobilePosition.setChannelId(channelId); String time = XmlUtil.getText(rootElement, "Time"); - mobilePosition.setTime(time); + if (ObjectUtils.isEmpty(time)){ + mobilePosition.setTime(DateUtil.getNow()); + }else { + mobilePosition.setTime(SipUtils.parseTime(time)); + } + mobilePosition.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude"))); mobilePosition.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude"))); if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Speed"))) { @@ -147,23 +218,34 @@ } else { mobilePosition.setAltitude(0.0); } - logger.info("[鏀跺埌Notify-MobilePosition]锛歿}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(), + logger.info("[鏀跺埌绉诲姩浣嶇疆璁㈤槄閫氱煡]锛歿}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(), mobilePosition.getLongitude(), mobilePosition.getLatitude()); mobilePosition.setReportSource("Mobile Position"); - // 榛樿鏉ユ簮鍧愭爣绯讳负WGS-84澶勭悊 - Double[] gcj02Point = Coordtransform.WGS84ToGCJ02(mobilePosition.getLongitude(), mobilePosition.getLatitude()); - logger.info("GCJ02鍧愭爣锛�" + gcj02Point[0] + ", " + gcj02Point[1]); - mobilePosition.setGeodeticSystem("GCJ-02"); - mobilePosition.setCnLng(gcj02Point[0] + ""); - mobilePosition.setCnLat(gcj02Point[1] + ""); - if (!userSetting.getSavePositionHistory()) { - storager.clearMobilePositionsByDeviceId(deviceId); + + + // 鏇存柊device channel 鐨勭粡绾害 + DeviceChannel deviceChannel = new DeviceChannel(); + deviceChannel.setDeviceId(device.getDeviceId()); + deviceChannel.setChannelId(channelId); + deviceChannel.setLongitude(mobilePosition.getLongitude()); + deviceChannel.setLatitude(mobilePosition.getLatitude()); + deviceChannel.setGpsTime(mobilePosition.getTime()); + deviceChannel = deviceChannelService.updateGps(deviceChannel, device); + + mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84()); + mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84()); + mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); + mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); + + if (userSetting.getSavePositionHistory()) { + storager.insertMobilePosition(mobilePosition); } - storager.insertMobilePosition(mobilePosition); - storager.updateChannelPotion(deviceId, channelId, mobilePosition.getLongitude(), mobilePosition.getLatitude() ); + + storager.updateChannelPosition(deviceChannel); + // 鍙戦�乺edis娑堟伅銆� 閫氱煡浣嶇疆淇℃伅鐨勫彉鍖� JSONObject jsonObject = new JSONObject(); - jsonObject.put("time", time); + jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime())); jsonObject.put("serial", deviceId); jsonObject.put("code", channelId); jsonObject.put("longitude", mobilePosition.getLongitude()); @@ -172,15 +254,14 @@ jsonObject.put("direction", mobilePosition.getDirection()); jsonObject.put("speed", mobilePosition.getSpeed()); redisCatchStorage.sendMobilePositionMsg(jsonObject); - responseAck(evt, Response.OK); - } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) { - e.printStackTrace(); + } catch (DocumentException e) { + logger.error("鏈鐞嗙殑寮傚父 ", e); } } /*** * 澶勭悊alarm璁惧鎶ヨNotify - * + * * @param evt */ private void processNotifyAlarm(RequestEvent evt) { @@ -188,20 +269,37 @@ return; } try { + FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); + String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); + Element rootElement = getRootElement(evt); + if (rootElement == null) { + logger.error("澶勭悊alarm璁惧鎶ヨNotify鏃舵湭鑾峰彇鍒版秷鎭綋{}", evt.getRequest()); + return; + } Element deviceIdElement = rootElement.element("DeviceID"); - String deviceId = deviceIdElement.getText().toString(); + String channelId = deviceIdElement.getText().toString(); Device device = redisCatchStorage.getDevice(deviceId); if (device == null) { + logger.warn("[ NotifyAlarm ] 鏈壘鍒拌澶囷細{}", deviceId); return; } rootElement = getRootElement(evt, device.getCharset()); + if (rootElement == null) { + logger.warn("[ NotifyAlarm ] content cannot be null, {}", evt.getRequest()); + return; + } DeviceAlarm deviceAlarm = new DeviceAlarm(); deviceAlarm.setDeviceId(deviceId); deviceAlarm.setAlarmPriority(XmlUtil.getText(rootElement, "AlarmPriority")); deviceAlarm.setAlarmMethod(XmlUtil.getText(rootElement, "AlarmMethod")); - deviceAlarm.setAlarmTime(XmlUtil.getText(rootElement, "AlarmTime")); + String alarmTime = XmlUtil.getText(rootElement, "AlarmTime"); + if (alarmTime == null) { + logger.warn("[ NotifyAlarm ] AlarmTime cannot be null"); + return; + } + deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime)); if (XmlUtil.getText(rootElement, "AlarmDescription") == null) { deviceAlarm.setAlarmDescription(""); } else { @@ -218,125 +316,57 @@ deviceAlarm.setLatitude(0.00); } logger.info("[鏀跺埌Notify-Alarm]锛歿}/{}", device.getDeviceId(), deviceAlarm.getChannelId()); - if (deviceAlarm.getAlarmMethod().equals("4")) { + if ("4".equals(deviceAlarm.getAlarmMethod())) { MobilePosition mobilePosition = new MobilePosition(); + mobilePosition.setChannelId(channelId); + mobilePosition.setCreateTime(DateUtil.getNow()); mobilePosition.setDeviceId(deviceAlarm.getDeviceId()); mobilePosition.setTime(deviceAlarm.getAlarmTime()); mobilePosition.setLongitude(deviceAlarm.getLongitude()); mobilePosition.setLatitude(deviceAlarm.getLatitude()); mobilePosition.setReportSource("GPS Alarm"); - // 榛樿鏉ユ簮鍧愭爣绯讳负WGS-84澶勭悊 - Double[] gcj02Point = Coordtransform.WGS84ToGCJ02(mobilePosition.getLongitude(), mobilePosition.getLatitude()); - logger.info("GCJ02鍧愭爣锛�" + gcj02Point[0] + ", " + gcj02Point[1]); - mobilePosition.setGeodeticSystem("GCJ-02"); - mobilePosition.setCnLng(gcj02Point[0] + ""); - mobilePosition.setCnLat(gcj02Point[1] + ""); - if (!userSetting.getSavePositionHistory()) { - storager.clearMobilePositionsByDeviceId(deviceId); + + // 鏇存柊device channel 鐨勭粡绾害 + DeviceChannel deviceChannel = new DeviceChannel(); + deviceChannel.setDeviceId(device.getDeviceId()); + deviceChannel.setChannelId(channelId); + deviceChannel.setLongitude(mobilePosition.getLongitude()); + deviceChannel.setLatitude(mobilePosition.getLatitude()); + deviceChannel.setGpsTime(mobilePosition.getTime()); + + deviceChannel = deviceChannelService.updateGps(deviceChannel, device); + + mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84()); + mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84()); + mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); + mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); + + if (userSetting.getSavePositionHistory()) { + storager.insertMobilePosition(mobilePosition); } - storager.insertMobilePosition(mobilePosition); + + storager.updateChannelPosition(deviceChannel); + // 鍙戦�乺edis娑堟伅銆� 閫氱煡浣嶇疆淇℃伅鐨勫彉鍖� + JSONObject jsonObject = new JSONObject(); + jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime())); + jsonObject.put("serial", deviceChannel.getDeviceId()); + jsonObject.put("code", deviceChannel.getChannelId()); + jsonObject.put("longitude", mobilePosition.getLongitude()); + jsonObject.put("latitude", mobilePosition.getLatitude()); + jsonObject.put("altitude", mobilePosition.getAltitude()); + jsonObject.put("direction", mobilePosition.getDirection()); + jsonObject.put("speed", mobilePosition.getSpeed()); + redisCatchStorage.sendMobilePositionMsg(jsonObject); + } // TODO: 闇�瑕佸疄鐜板瓨鍌ㄦ姤璀︿俊鎭�佹姤璀﹀垎绫� // 鍥炲200 OK - responseAck(evt, Response.OK); - if (offLineDetector.isOnline(deviceId)) { + if (redisCatchStorage.deviceIsOnline(deviceId)) { publisher.deviceAlarmEventPublish(deviceAlarm); } - } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) { - e.printStackTrace(); - } - } - - /*** - * 澶勭悊catalog璁惧鐩綍鍒楄〃Notify - * - * @param evt - */ - private void processNotifyCatalogList(RequestEvent evt) { - try { - FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); - String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); - - Element rootElement = getRootElement(evt); - Device device = redisCatchStorage.getDevice(deviceId); - if (device == null) { - return; - } - if (device != null ) { - rootElement = getRootElement(evt, device.getCharset()); - } - Element deviceListElement = rootElement.element("DeviceList"); - if (deviceListElement == null) { - return; - } - Iterator<Element> deviceListIterator = deviceListElement.elementIterator(); - if (deviceListIterator != null) { - - // 閬嶅巻DeviceList - while (deviceListIterator.hasNext()) { - Element itemDevice = deviceListIterator.next(); - Element channelDeviceElement = itemDevice.element("DeviceID"); - if (channelDeviceElement == null) { - continue; - } - Element eventElement = itemDevice.element("Event"); - DeviceChannel channel = XmlUtil.channelContentHander(itemDevice); - channel.setDeviceId(device.getDeviceId()); - logger.info("[鏀跺埌Notify-Catalog]锛歿}/{}", device.getDeviceId(), channel.getChannelId()); - switch (eventElement.getText().toUpperCase()) { - case CatalogEvent.ON: // 涓婄嚎 - logger.info("鏀跺埌鏉ヨ嚜璁惧銆恵}銆戠殑閫氶亾銆恵}銆戜笂绾块�氱煡", device.getDeviceId(), channel.getChannelId()); - storager.deviceChannelOnline(deviceId, channel.getChannelId()); - // 鍥炲200 OK - responseAck(evt, Response.OK); - break; - case CatalogEvent.OFF : // 绂荤嚎 - logger.info("鏀跺埌鏉ヨ嚜璁惧銆恵}銆戠殑閫氶亾銆恵}銆戠绾块�氱煡", device.getDeviceId(), channel.getChannelId()); - storager.deviceChannelOffline(deviceId, channel.getChannelId()); - // 鍥炲200 OK - responseAck(evt, Response.OK); - break; - case CatalogEvent.VLOST: // 瑙嗛涓㈠け - logger.info("鏀跺埌鏉ヨ嚜璁惧銆恵}銆戠殑閫氶亾銆恵}銆戣棰戜涪澶遍�氱煡", device.getDeviceId(), channel.getChannelId()); - storager.deviceChannelOffline(deviceId, channel.getChannelId()); - // 鍥炲200 OK - responseAck(evt, Response.OK); - break; - case CatalogEvent.DEFECT: // 鏁呴殰 - // 鍥炲200 OK - responseAck(evt, Response.OK); - break; - case CatalogEvent.ADD: // 澧炲姞 - logger.info("鏀跺埌鏉ヨ嚜璁惧銆恵}銆戠殑澧炲姞閫氶亾銆恵}銆戦�氱煡", device.getDeviceId(), channel.getChannelId()); - storager.updateChannel(deviceId, channel); - responseAck(evt, Response.OK); - break; - case CatalogEvent.DEL: // 鍒犻櫎 - logger.info("鏀跺埌鏉ヨ嚜璁惧銆恵}銆戠殑鍒犻櫎閫氶亾銆恵}銆戦�氱煡", device.getDeviceId(), channel.getChannelId()); - storager.delChannel(deviceId, channel.getChannelId()); - responseAck(evt, Response.OK); - break; - case CatalogEvent.UPDATE: // 鏇存柊 - logger.info("鏀跺埌鏉ヨ嚜璁惧銆恵}銆戠殑鏇存柊閫氶亾銆恵}銆戦�氱煡", device.getDeviceId(), channel.getChannelId()); - storager.updateChannel(deviceId, channel); - responseAck(evt, Response.OK); - break; - default: - responseAck(evt, Response.BAD_REQUEST, "event not found"); - - } - // 杞彂鍙樺寲淇℃伅 - eventPublisher.catalogEventPublish(null, channel, eventElement.getText().toUpperCase()); - - } - - if (!offLineDetector.isOnline(deviceId)) { - publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_MESSAGE); - } - } - } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) { - e.printStackTrace(); + } catch (DocumentException e) { + logger.error("鏈鐞嗙殑寮傚父 ", e); } } @@ -355,10 +385,6 @@ } public void setDeferredResultHolder(DeferredResultHolder deferredResultHolder) { - } - - public void setOffLineDetector(DeviceOffLineDetector offLineDetector) { - this.offLineDetector = offLineDetector; } public IRedisCatchStorage getRedisCatchStorage() { -- Gitblit v1.8.0