From 1af77ab5f7c11a4b3d59c1989b51b9fca29679ce Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期二, 18 十月 2022 22:18:49 +0800 Subject: [PATCH] Merge pull request #645 from IKangXu/wvp-28181-2.0 --- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java | 105 ++++++++++++++++++++++++++++++++++------------------ 1 files changed, 69 insertions(+), 36 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java index 4ce30a2..33e42ab 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -20,6 +20,7 @@ import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.utils.redis.RedisUtil; +import gov.nist.javax.sip.message.SIPRequest; import org.dom4j.DocumentException; import org.dom4j.Element; import org.slf4j.Logger; @@ -29,6 +30,7 @@ import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; +import org.springframework.util.ObjectUtils; import org.springframework.util.StringUtils; import javax.sip.InvalidArgumentException; @@ -41,7 +43,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; /** - * SIP鍛戒护绫诲瀷锛� NOTIFY璇锋眰 + * SIP鍛戒护绫诲瀷锛� NOTIFY璇锋眰,杩欐槸浣滀负涓婄骇鍙戦�佽闃呰姹傚悗锛岃澶囨墠浼氬搷搴旂殑 */ @Component public class NotifyRequestProcessor extends SIPRequestProcessorParent implements InitializingBean, ISIPRequestProcessor { @@ -77,7 +79,7 @@ private boolean taskQueueHandlerRun = false; - private final ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>(); + private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>(); @Qualifier("taskExecutor") @Autowired @@ -92,38 +94,42 @@ @Override public void process(RequestEvent evt) { try { - taskQueue.offer(new HandlerCatchData(evt, null, null)); - responseAck(evt, Response.OK); - if (!taskQueueHandlerRun) { - taskQueueHandlerRun = true; - taskExecutor.execute(()-> { - while (!taskQueue.isEmpty()) { - try { - HandlerCatchData take = taskQueue.poll(); - Element rootElement = getRootElement(take.getEvt()); - String cmd = XmlUtil.getText(rootElement, "CmdType"); - - if (CmdType.CATALOG.equals(cmd)) { - logger.info("鎺ユ敹鍒癈atalog閫氱煡"); - processNotifyCatalogList(take.getEvt()); - } else if (CmdType.ALARM.equals(cmd)) { - logger.info("鎺ユ敹鍒癆larm閫氱煡"); - processNotifyAlarm(take.getEvt()); - } else if (CmdType.MOBILE_POSITION.equals(cmd)) { - logger.info("鎺ユ敹鍒癕obilePosition閫氱煡"); - processNotifyMobilePosition(take.getEvt()); - } else { - logger.info("鎺ユ敹鍒版秷鎭細" + cmd); - } - } catch (DocumentException e) { - throw new RuntimeException(e); - } - } - taskQueueHandlerRun = false; - }); - } - } catch (SipException | InvalidArgumentException | ParseException e) { + responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null); + }catch (SipException | InvalidArgumentException | ParseException e) { e.printStackTrace(); + } + taskQueue.offer(new HandlerCatchData(evt, null, null)); + if (!taskQueueHandlerRun) { + taskQueueHandlerRun = true; + taskExecutor.execute(()-> { + while (!taskQueue.isEmpty()) { + try { + HandlerCatchData take = taskQueue.poll(); + Element rootElement = getRootElement(take.getEvt()); + if (rootElement == null) { + logger.error("澶勭悊NOTIFY娑堟伅鏃舵湭鑾峰彇鍒版秷鎭綋,{}", take.getEvt().getRequest()); + continue; + } + String cmd = XmlUtil.getText(rootElement, "CmdType"); + + if (CmdType.CATALOG.equals(cmd)) { + logger.info("鎺ユ敹鍒癈atalog閫氱煡"); + processNotifyCatalogList(take.getEvt()); + } else if (CmdType.ALARM.equals(cmd)) { + logger.info("鎺ユ敹鍒癆larm閫氱煡"); + processNotifyAlarm(take.getEvt()); + } else if (CmdType.MOBILE_POSITION.equals(cmd)) { + logger.info("鎺ユ敹鍒癕obilePosition閫氱煡"); + processNotifyMobilePosition(take.getEvt()); + } else { + logger.info("鎺ユ敹鍒版秷鎭細" + cmd); + } + } catch (DocumentException e) { + logger.error("澶勭悊NOTIFY娑堟伅鏃堕敊璇�", e); + } + } + taskQueueHandlerRun = false; + }); } } @@ -139,6 +145,10 @@ // 鍥炲 200 OK Element rootElement = getRootElement(evt); + if (rootElement == null) { + logger.error("澶勭悊MobilePosition绉诲姩浣嶇疆Notify鏃舵湭鑾峰彇鍒版秷鎭綋,{}", evt.getRequest()); + return; + } MobilePosition mobilePosition = new MobilePosition(); mobilePosition.setCreateTime(DateUtil.getNow()); @@ -146,7 +156,7 @@ String channelId = deviceIdElement.getTextTrim().toString(); Device device = redisCatchStorage.getDevice(deviceId); if (device != null) { - if (!StringUtils.isEmpty(device.getName())) { + if (!ObjectUtils.isEmpty(device.getName())) { mobilePosition.setDeviceName(device.getName()); } } @@ -195,6 +205,7 @@ } storager.updateChannelPosition(deviceChannel); + // 鍙戦�乺edis娑堟伅銆� 閫氱煡浣嶇疆淇℃伅鐨勫彉鍖� JSONObject jsonObject = new JSONObject(); jsonObject.put("time", time); @@ -225,6 +236,10 @@ String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); Element rootElement = getRootElement(evt); + if (rootElement == null) { + logger.error("澶勭悊alarm璁惧鎶ヨNotify鏃舵湭鑾峰彇鍒版秷鎭綋{}", evt.getRequest()); + return; + } Element deviceIdElement = rootElement.element("DeviceID"); String channelId = deviceIdElement.getText().toString(); @@ -234,6 +249,10 @@ return; } rootElement = getRootElement(evt, device.getCharset()); + if (rootElement == null) { + logger.warn("[ NotifyAlarm ] content cannot be null, {}", evt.getRequest()); + return; + } DeviceAlarm deviceAlarm = new DeviceAlarm(); deviceAlarm.setDeviceId(deviceId); deviceAlarm.setAlarmPriority(XmlUtil.getText(rootElement, "AlarmPriority")); @@ -269,8 +288,6 @@ mobilePosition.setLatitude(deviceAlarm.getLatitude()); mobilePosition.setReportSource("GPS Alarm"); - - // 鏇存柊device channel 鐨勭粡绾害 DeviceChannel deviceChannel = new DeviceChannel(); deviceChannel.setDeviceId(device.getDeviceId()); @@ -291,6 +308,18 @@ } storager.updateChannelPosition(deviceChannel); + // 鍙戦�乺edis娑堟伅銆� 閫氱煡浣嶇疆淇℃伅鐨勫彉鍖� + JSONObject jsonObject = new JSONObject(); + jsonObject.put("time", mobilePosition.getTime()); + jsonObject.put("serial", deviceChannel.getDeviceId()); + jsonObject.put("code", deviceChannel.getChannelId()); + jsonObject.put("longitude", mobilePosition.getLongitude()); + jsonObject.put("latitude", mobilePosition.getLatitude()); + jsonObject.put("altitude", mobilePosition.getAltitude()); + jsonObject.put("direction", mobilePosition.getDirection()); + jsonObject.put("speed", mobilePosition.getSpeed()); + redisCatchStorage.sendMobilePositionMsg(jsonObject); + } // TODO: 闇�瑕佸疄鐜板瓨鍌ㄦ姤璀︿俊鎭�佹姤璀﹀垎绫� @@ -319,6 +348,10 @@ return; } Element rootElement = getRootElement(evt, device.getCharset()); + if (rootElement == null) { + logger.warn("[ 鏀跺埌鐩綍璁㈤槄 ] content cannot be null, {}", evt.getRequest()); + return; + } Element deviceListElement = rootElement.element("DeviceList"); if (deviceListElement == null) { return; -- Gitblit v1.8.0