From 82adc0cb23f3ee47322e78889cdaba57e9309000 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期二, 21 三月 2023 15:55:24 +0800 Subject: [PATCH] 完善语音对讲级联 --- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java | 92 ++++++++++++++++++++++++++++++++++------------ 1 files changed, 68 insertions(+), 24 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java index 25261f3..a468926 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -1,6 +1,6 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; -import com.alibaba.fastjson.JSONObject; +import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; @@ -11,7 +11,6 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; -import com.genersoft.iot.vmp.gb28181.utils.Coordtransform; import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; @@ -20,6 +19,7 @@ import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.utils.redis.RedisUtil; +import gov.nist.javax.sip.message.SIPRequest; import org.dom4j.DocumentException; import org.dom4j.Element; import org.slf4j.Logger; @@ -29,7 +29,7 @@ import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; -import org.springframework.util.StringUtils; +import org.springframework.util.ObjectUtils; import javax.sip.InvalidArgumentException; import javax.sip.RequestEvent; @@ -38,10 +38,11 @@ import javax.sip.message.Response; import java.text.ParseException; import java.util.Iterator; +import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; /** - * SIP鍛戒护绫诲瀷锛� NOTIFY璇锋眰 + * SIP鍛戒护绫诲瀷锛� NOTIFY璇锋眰,杩欐槸浣滀负涓婄骇鍙戦�佽闃呰姹傚悗锛岃澶囨墠浼氬搷搴旂殑 */ @Component public class NotifyRequestProcessor extends SIPRequestProcessorParent implements InitializingBean, ISIPRequestProcessor { @@ -75,9 +76,7 @@ @Autowired private IDeviceChannelService deviceChannelService; - private boolean taskQueueHandlerRun = false; - - private final ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>(); + private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>(); @Qualifier("taskExecutor") @Autowired @@ -92,15 +91,23 @@ @Override public void process(RequestEvent evt) { try { - taskQueue.offer(new HandlerCatchData(evt, null, null)); - responseAck(evt, Response.OK); - if (!taskQueueHandlerRun) { - taskQueueHandlerRun = true; - taskExecutor.execute(()-> { + responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null); + }catch (SipException | InvalidArgumentException | ParseException e) { + e.printStackTrace(); + } + boolean runed = !taskQueue.isEmpty(); + taskQueue.offer(new HandlerCatchData(evt, null, null)); + if (!runed) { + taskExecutor.execute(()-> { + try { while (!taskQueue.isEmpty()) { try { HandlerCatchData take = taskQueue.poll(); Element rootElement = getRootElement(take.getEvt()); + if (rootElement == null) { + logger.error("澶勭悊NOTIFY娑堟伅鏃舵湭鑾峰彇鍒版秷鎭綋,{}", take.getEvt().getRequest()); + continue; + } String cmd = XmlUtil.getText(rootElement, "CmdType"); if (CmdType.CATALOG.equals(cmd)) { @@ -116,14 +123,13 @@ logger.info("鎺ユ敹鍒版秷鎭細" + cmd); } } catch (DocumentException e) { - throw new RuntimeException(e); + logger.error("澶勭悊NOTIFY娑堟伅鏃堕敊璇�", e); } } - taskQueueHandlerRun = false; - }); - } - } catch (SipException | InvalidArgumentException | ParseException e) { - e.printStackTrace(); + }catch (Exception e) { + logger.error("澶勭悊NOTIFY娑堟伅鏃堕敊璇�", e); + } + }); } } @@ -139,14 +145,29 @@ // 鍥炲 200 OK Element rootElement = getRootElement(evt); + if (rootElement == null) { + logger.error("澶勭悊MobilePosition绉诲姩浣嶇疆Notify鏃舵湭鑾峰彇鍒版秷鎭綋,{}", evt.getRequest()); + return; + } MobilePosition mobilePosition = new MobilePosition(); mobilePosition.setCreateTime(DateUtil.getNow()); Element deviceIdElement = rootElement.element("DeviceID"); String channelId = deviceIdElement.getTextTrim().toString(); Device device = redisCatchStorage.getDevice(deviceId); + + if (device == null) { + // 鏍规嵁閫氶亾id鏌ヨ璁惧Id + List<Device> deviceList = deviceChannelService.getDeviceByChannelId(channelId); + if (deviceList.size() > 0) { + device = deviceList.get(0); + }else { + logger.warn("[mobilePosition绉诲姩浣嶇疆Notify] 鏈壘鍒伴�氶亾{}鎵�灞炵殑璁惧", channelId); + return; + } + } if (device != null) { - if (!StringUtils.isEmpty(device.getName())) { + if (!ObjectUtils.isEmpty(device.getName())) { mobilePosition.setDeviceName(device.getName()); } } @@ -195,6 +216,7 @@ } storager.updateChannelPosition(deviceChannel); + // 鍙戦�乺edis娑堟伅銆� 閫氱煡浣嶇疆淇℃伅鐨勫彉鍖� JSONObject jsonObject = new JSONObject(); jsonObject.put("time", time); @@ -225,6 +247,10 @@ String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); Element rootElement = getRootElement(evt); + if (rootElement == null) { + logger.error("澶勭悊alarm璁惧鎶ヨNotify鏃舵湭鑾峰彇鍒版秷鎭綋{}", evt.getRequest()); + return; + } Element deviceIdElement = rootElement.element("DeviceID"); String channelId = deviceIdElement.getText().toString(); @@ -234,6 +260,10 @@ return; } rootElement = getRootElement(evt, device.getCharset()); + if (rootElement == null) { + logger.warn("[ NotifyAlarm ] content cannot be null, {}", evt.getRequest()); + return; + } DeviceAlarm deviceAlarm = new DeviceAlarm(); deviceAlarm.setDeviceId(deviceId); deviceAlarm.setAlarmPriority(XmlUtil.getText(rootElement, "AlarmPriority")); @@ -269,8 +299,6 @@ mobilePosition.setLatitude(deviceAlarm.getLatitude()); mobilePosition.setReportSource("GPS Alarm"); - - // 鏇存柊device channel 鐨勭粡绾害 DeviceChannel deviceChannel = new DeviceChannel(); deviceChannel.setDeviceId(device.getDeviceId()); @@ -291,6 +319,18 @@ } storager.updateChannelPosition(deviceChannel); + // 鍙戦�乺edis娑堟伅銆� 閫氱煡浣嶇疆淇℃伅鐨勫彉鍖� + JSONObject jsonObject = new JSONObject(); + jsonObject.put("time", mobilePosition.getTime()); + jsonObject.put("serial", deviceChannel.getDeviceId()); + jsonObject.put("code", deviceChannel.getChannelId()); + jsonObject.put("longitude", mobilePosition.getLongitude()); + jsonObject.put("latitude", mobilePosition.getLatitude()); + jsonObject.put("altitude", mobilePosition.getAltitude()); + jsonObject.put("direction", mobilePosition.getDirection()); + jsonObject.put("speed", mobilePosition.getSpeed()); + redisCatchStorage.sendMobilePositionMsg(jsonObject); + } // TODO: 闇�瑕佸疄鐜板瓨鍌ㄦ姤璀︿俊鎭�佹姤璀﹀垎绫� @@ -315,10 +355,14 @@ Device device = redisCatchStorage.getDevice(deviceId); if (device == null || device.getOnline() == 0) { - logger.warn("[鏀跺埌 鐩綍璁㈤槄]锛歿}, 浣嗘槸璁惧宸茬粡绂荤嚎", (device != null ? device.getDeviceId():"" )); + logger.warn("[鏀跺埌鐩綍璁㈤槄]锛歿}, 浣嗘槸璁惧宸茬粡绂荤嚎", (device != null ? device.getDeviceId():"" )); return; } Element rootElement = getRootElement(evt, device.getCharset()); + if (rootElement == null) { + logger.warn("[ 鏀跺埌鐩綍璁㈤槄 ] content cannot be null, {}", evt.getRequest()); + return; + } Element deviceListElement = rootElement.element("DeviceList"); if (deviceListElement == null) { return; @@ -336,14 +380,14 @@ Element eventElement = itemDevice.element("Event"); String event; if (eventElement == null) { - logger.warn("[鏀跺埌 鐩綍璁㈤槄]锛歿}, 浣嗘槸Event涓虹┖, 璁句负榛樿鍊� ADD", (device != null ? device.getDeviceId():"" )); + logger.warn("[鏀跺埌鐩綍璁㈤槄]锛歿}, 浣嗘槸Event涓虹┖, 璁句负榛樿鍊� ADD", (device != null ? device.getDeviceId():"" )); event = CatalogEvent.ADD; }else { event = eventElement.getText().toUpperCase(); } DeviceChannel channel = XmlUtil.channelContentHander(itemDevice, device, event); channel.setDeviceId(device.getDeviceId()); - logger.info("[鏀跺埌 鐩綍璁㈤槄]锛歿}/{}", device.getDeviceId(), channel.getChannelId()); + logger.info("[鏀跺埌鐩綍璁㈤槄]锛歿}/{}", device.getDeviceId(), channel.getChannelId()); switch (event) { case CatalogEvent.ON: // 涓婄嚎 -- Gitblit v1.8.0