From ddb36e54bd51761138c536ccca889d3f80182334 Mon Sep 17 00:00:00 2001
From: lin <18010473990@163.com>
Date: 星期六, 08 一月 2022 16:47:20 +0800
Subject: [PATCH] 级联平台添加GPS订阅支持
---
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java | 78 +++++++
src/main/java/com/genersoft/iot/vmp/service/impl/RedisGPSMsgListener.java | 1
src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/SubscribeListenerForPlatform.java | 52 +++++
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java | 136 ++++++++++++-
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java | 49 ++++
src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java | 23 +-
src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java | 4
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java | 4
src/main/java/com/genersoft/iot/vmp/gb28181/bean/CmdType.java | 8
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java | 28 ++
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java | 43 ++++
src/main/java/com/genersoft/iot/vmp/service/bean/GPSMsgInfo.java | 2
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java | 17 +
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java | 41 ++++
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java | 10 +
src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java | 7
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java | 10
src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java | 59 +++++
18 files changed, 530 insertions(+), 42 deletions(-)
diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
index e45055b..7f659ab 100644
--- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
+++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
@@ -58,6 +58,10 @@
public static final String SIP_CSEQ_PREFIX = "VMP_SIP_CSEQ_";
+ public static final String SIP_SN_PREFIX = "VMP_SIP_SN_";
+
+ public static final String SIP_SUBSCRIBE_PREFIX = "SIP_SUBSCRIBE_";
+
//************************** redis 娑堟伅*********************************
public static final String WVP_MSG_STREAM_CHANGE_PREFIX = "WVP_MSG_STREAM_CHANGE_";
public static final String WVP_MSG_GPS_PREFIX = "WVP_MSG_GPS_";
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CmdType.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CmdType.java
new file mode 100644
index 0000000..ed5cad6
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CmdType.java
@@ -0,0 +1,8 @@
+package com.genersoft.iot.vmp.gb28181.bean;
+
+public class CmdType {
+
+ public static final String CATALOG = "Catalog";
+ public static final String ALARM = "Alarm";
+ public static final String MOBILE_POSITION = "MobilePosition";
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java
new file mode 100644
index 0000000..66d67bf
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java
@@ -0,0 +1,78 @@
+package com.genersoft.iot.vmp.gb28181.bean;
+
+import javax.sip.RequestEvent;
+import javax.sip.header.*;
+import javax.sip.message.Request;
+
+public class SubscribeInfo {
+
+ public SubscribeInfo() {
+ }
+
+ public SubscribeInfo(RequestEvent evt, String id) {
+ this.id = id;
+ Request request = evt.getRequest();
+ CallIdHeader callIdHeader = (CallIdHeader)request.getHeader(CallIdHeader.NAME);
+ this.callId = callIdHeader.getCallId();
+ FromHeader fromHeader = (FromHeader)request.getHeader(FromHeader.NAME);
+ this.fromTag = fromHeader.getTag();
+ ExpiresHeader expiresHeader = (ExpiresHeader)request.getHeader(ExpiresHeader.NAME);
+ this.expires = expiresHeader.getExpires();
+ this.event = (EventHeader)request.getHeader(EventHeader.NAME);
+ }
+
+ private String id;
+ private int expires;
+ private String callId;
+ private EventHeader event;
+ private String fromTag;
+ private String toTag;
+
+ public String getId() {
+ return id;
+ }
+
+ public int getExpires() {
+ return expires;
+ }
+
+ public String getCallId() {
+ return callId;
+ }
+
+ public EventHeader getEvent() {
+ return event;
+ }
+
+ public String getFromTag() {
+ return fromTag;
+ }
+
+ public void setToTag(String toTag) {
+ this.toTag = toTag;
+ }
+
+ public String getToTag() {
+ return toTag;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public void setExpires(int expires) {
+ this.expires = expires;
+ }
+
+ public void setCallId(String callId) {
+ this.callId = callId;
+ }
+
+ public void setEvent(EventHeader event) {
+ this.event = event;
+ }
+
+ public void setFromTag(String fromTag) {
+ this.fromTag = fromTag;
+ }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/SubscribeListenerForPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/SubscribeListenerForPlatform.java
new file mode 100644
index 0000000..bac8e3d
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/SubscribeListenerForPlatform.java
@@ -0,0 +1,52 @@
+package com.genersoft.iot.vmp.gb28181.event.subscribe;
+
+import com.genersoft.iot.vmp.common.VideoManagerConstants;
+import com.genersoft.iot.vmp.conf.DynamicTask;
+import com.genersoft.iot.vmp.conf.RedisKeyExpirationEventMessageListener;
+import com.genersoft.iot.vmp.conf.UserSetup;
+import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
+import org.checkerframework.checker.units.qual.A;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.listener.RedisMessageListenerContainer;
+import org.springframework.stereotype.Component;
+
+/**
+ * 骞冲彴璁㈤槄鍒版湡浜嬩欢
+ */
+@Component
+public class SubscribeListenerForPlatform extends RedisKeyExpirationEventMessageListener {
+
+ private Logger logger = LoggerFactory.getLogger(SubscribeListenerForPlatform.class);
+
+ @Autowired
+ private UserSetup userSetup;
+
+ @Autowired
+ private DynamicTask dynamicTask;
+
+ public SubscribeListenerForPlatform(RedisMessageListenerContainer listenerContainer, UserSetup userSetup) {
+ super(listenerContainer, userSetup);
+ }
+
+
+ /**
+ * 鐩戝惉澶辨晥鐨刱ey
+ * @param message
+ * @param pattern
+ */
+ @Override
+ public void onMessage(Message message, byte[] pattern) {
+ // 鑾峰彇澶辨晥鐨刱ey
+ String expiredKey = message.toString();
+ logger.debug(expiredKey);
+ // 璁㈤槄鍒版湡
+ String PLATFORM_KEEPLIVEKEY_PREFIX = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_";
+ if (expiredKey.startsWith(PLATFORM_KEEPLIVEKEY_PREFIX)) {
+ // 鍙栨秷瀹氭椂浠诲姟
+ dynamicTask.stopCron(expiredKey);
+ }
+ }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java
new file mode 100644
index 0000000..ce990a0
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java
@@ -0,0 +1,59 @@
+package com.genersoft.iot.vmp.gb28181.task;
+
+import com.genersoft.iot.vmp.gb28181.bean.GbStream;
+import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
+import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo;
+import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
+import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
+import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
+import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
+
+import java.text.SimpleDateFormat;
+import java.util.List;
+
+public class GPSSubscribeTask implements Runnable{
+
+ private IRedisCatchStorage redisCatchStorage;
+ private IVideoManagerStorager storager;
+ private ISIPCommanderForPlatform sipCommanderForPlatform;
+ private String platformId;
+ private String sn;
+ private String key;
+
+ private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+ public GPSSubscribeTask(IRedisCatchStorage redisCatchStorage, ISIPCommanderForPlatform sipCommanderForPlatform, IVideoManagerStorager storager, String platformId, String sn, String key) {
+ this.redisCatchStorage = redisCatchStorage;
+ this.storager = storager;
+ this.platformId = platformId;
+ this.sn = sn;
+ this.key = key;
+ this.sipCommanderForPlatform = sipCommanderForPlatform;
+ }
+
+ @Override
+ public void run() {
+
+ SubscribeInfo subscribe = redisCatchStorage.getSubscribe(key);
+ if (subscribe != null) {
+ System.out.println("鍙戦�丟PS娑堟伅");
+ ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformId);
+ if (parentPlatform == null || parentPlatform.isStatus()) {
+ // TODO 鏆傛椂鍙鐞嗚棰戞祦鐨勫洖澶�,鍚庣画澧炲姞瀵瑰浗鏍囪澶囩殑鏀寔
+ List<GbStream> gbStreams = storager.queryGbStreamListInPlatform(platformId);
+ if (gbStreams.size() > 0) {
+ for (GbStream gbStream : gbStreams) {
+ String gbId = gbStream.getGbId();
+ GPSMsgInfo gpsMsgInfo = redisCatchStorage.getGpsMsgInfo(gbId);
+ if (gpsMsgInfo != null && gbStream.isStatus()) {
+ // 鍙戦�丟PS娑堟伅
+ sipCommanderForPlatform.sendMobilePosition(parentPlatform, gpsMsgInfo, subscribe);
+ }
+ }
+ }
+ }
+ }
+
+
+ }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java
index fe30293..e8b4124 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java
@@ -2,7 +2,9 @@
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
+import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
+import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import javax.sip.header.WWWAuthenticateHeader;
@@ -61,4 +63,12 @@
*/
boolean deviceStatusResponse(ParentPlatform parentPlatform, String sn, String fromTag);
+ /**
+ * 鍚戜笂绾у洖澶嶇Щ鍔ㄤ綅缃闃呮秷鎭�
+ * @param parentPlatform 骞冲彴淇℃伅
+ * @param gpsMsgInfo GPS淇℃伅
+ * @param subscribeInfo 璁㈤槄鐩稿叧鐨勪俊鎭�
+ * @return
+ */
+ boolean sendMobilePosition(ParentPlatform parentPlatform, GPSMsgInfo gpsMsgInfo, SubscribeInfo subscribeInfo);
}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java
index cc41af7..27125e1 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java
@@ -2,6 +2,7 @@
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
+import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import gov.nist.javax.sip.message.MessageFactoryImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -32,6 +33,9 @@
@Autowired
private SipFactory sipFactory;
+ @Autowired
+ private IRedisCatchStorage redisCatchStorage;
+
public Request createKeetpaliveMessageRequest(ParentPlatform parentPlatform, String content, String viaTag, String fromTag, String toTag, CallIdHeader callIdHeader) throws ParseException, InvalidArgumentException, PeerUnavailableException {
Request request = null;
@@ -57,7 +61,7 @@
// Forwards
MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70);
// ceq
- CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(1L, Request.MESSAGE);
+ CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(Request.MESSAGE), Request.MESSAGE);
request = sipFactory.createMessageFactory().createRequest(requestURI, Request.MESSAGE, callIdHeader, cSeqHeader, fromHeader,
toHeader, viaHeaders, maxForwards);
@@ -122,7 +126,7 @@
String callId, WWWAuthenticateHeader www , CallIdHeader callIdHeader) throws ParseException, PeerUnavailableException, InvalidArgumentException {
- Request registerRequest = createRegisterRequest(parentPlatform, 2L, fromTag, viaTag, callIdHeader);
+ Request registerRequest = createRegisterRequest(parentPlatform, redisCatchStorage.getCSEQ(Request.REGISTER), fromTag, viaTag, callIdHeader);
String realm = www.getRealm();
String nonce = www.getNonce();
@@ -208,7 +212,7 @@
// Forwards
MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70);
// ceq
- CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(1L, Request.MESSAGE);
+ CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(Request.MESSAGE), Request.MESSAGE);
MessageFactoryImpl messageFactory = (MessageFactoryImpl) sipFactory.createMessageFactory();
// 璁剧疆缂栫爜锛� 闃叉涓枃涔辩爜
messageFactory.setDefaultContentEncodingCharset("gb2312");
@@ -223,4 +227,43 @@
request.setContent(content, contentTypeHeader);
return request;
}
+
+ public Request createNotifyRequest(ParentPlatform parentPlatform, String content, String fromTag, String toTag, CallIdHeader callIdHeader) throws PeerUnavailableException, ParseException, InvalidArgumentException {
+ Request request = null;
+ // sipuri
+ SipURI requestURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerIP()+ ":" + parentPlatform.getServerPort());
+ // via
+ ArrayList<ViaHeader> viaHeaders = new ArrayList<ViaHeader>();
+ ViaHeader viaHeader = sipFactory.createHeaderFactory().createViaHeader(parentPlatform.getDeviceIp(), Integer.parseInt(parentPlatform.getDevicePort()),
+ parentPlatform.getTransport(), null);
+ viaHeader.setRPort();
+ viaHeaders.add(viaHeader);
+ // from
+ SipURI fromSipURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getDeviceGBId(),
+ parentPlatform.getDeviceIp() + ":" + parentPlatform.getDevicePort());
+ Address fromAddress = sipFactory.createAddressFactory().createAddress(fromSipURI);
+ FromHeader fromHeader = sipFactory.createHeaderFactory().createFromHeader(fromAddress, fromTag);
+ // to
+ SipURI toSipURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerGBDomain());
+ Address toAddress = sipFactory.createAddressFactory().createAddress(toSipURI);
+ ToHeader toHeader = sipFactory.createHeaderFactory().createToHeader(toAddress, toTag);
+
+ // Forwards
+ MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70);
+ // ceq
+ CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(Request.NOTIFY), Request.NOTIFY);
+ MessageFactoryImpl messageFactory = (MessageFactoryImpl) sipFactory.createMessageFactory();
+ // 璁剧疆缂栫爜锛� 闃叉涓枃涔辩爜
+ messageFactory.setDefaultContentEncodingCharset("gb2312");
+ request = messageFactory.createRequest(requestURI, Request.NOTIFY, callIdHeader, cSeqHeader, fromHeader,
+ toHeader, viaHeaders, maxForwards);
+ List<String> agentParam = new ArrayList<>();
+ agentParam.add("wvp-pro");
+ UserAgentHeader userAgentHeader = sipFactory.createHeaderFactory().createUserAgentHeader(agentParam);
+ request.addHeader(userAgentHeader);
+
+ ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("APPLICATION", "MANSCDP+xml");
+ request.setContent(content, contentTypeHeader);
+ return request;
+ }
}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
index cf8b0a5..1707bde 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
@@ -3,9 +3,11 @@
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch;
+import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderPlarformProvider;
+import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -92,7 +94,7 @@
callIdHeader = udpSipProvider.getNewCallId();
}
- request = headerProviderPlarformProvider.createRegisterRequest(parentPlatform, 1L, "FromRegister" + tm, null, callIdHeader);
+ request = headerProviderPlarformProvider.createRegisterRequest(parentPlatform, redisCatchStorage.getCSEQ(Request.REGISTER), "FromRegister" + tm, null, callIdHeader);
// 灏� callid 鍐欏叆缂撳瓨锛� 绛夋敞鍐屾垚鍔熷彲浠ユ洿鏂扮姸鎬�
redisCatchStorage.updatePlatformRegisterInfo(callIdHeader.getCallId(), parentPlatform.getServerGBId());
@@ -325,4 +327,41 @@
}
return true;
}
+
+ @Override
+ public boolean sendMobilePosition(ParentPlatform parentPlatform, GPSMsgInfo gpsMsgInfo, SubscribeInfo subscribeInfo) {
+ if (parentPlatform == null) {
+ return false;
+ }
+
+ try {
+ StringBuffer deviceStatusXml = new StringBuffer(600);
+ deviceStatusXml.append("<?xml version=\"1.0\" encoding=\"GB2312\"?>\r\n");
+ deviceStatusXml.append("<Notify>\r\n");
+ deviceStatusXml.append("<CmdType>MobilePosition</CmdType>\r\n");
+ deviceStatusXml.append("<SN>" + (int)((Math.random()*9+1)*100000) + "</SN>\r\n");
+ deviceStatusXml.append("<DeviceID>" + gpsMsgInfo.getId() + "</DeviceID>\r\n");
+ deviceStatusXml.append("<Time>" + gpsMsgInfo.getTime() + "</Time>\r\n");
+ deviceStatusXml.append("<Longitude>" + gpsMsgInfo.getLng() + "</Longitude>\r\n");
+ deviceStatusXml.append("<Latitude>" + gpsMsgInfo.getLat() + "</Latitude>\r\n");
+ deviceStatusXml.append("<Speed>" + gpsMsgInfo.getSpeed() + "</Speed>\r\n");
+ deviceStatusXml.append("<Direction>" + gpsMsgInfo.getDirection() + "</Direction>\r\n");
+ deviceStatusXml.append("<Altitude>" + gpsMsgInfo.getAltitude() + "</Altitude>\r\n");
+ deviceStatusXml.append("</Notify>\r\n");
+
+ CallIdHeader callIdHeader = parentPlatform.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
+ : udpSipProvider.getNewCallId();
+ callIdHeader.setCallId(subscribeInfo.getCallId());
+
+ String tm = Long.toString(System.currentTimeMillis());
+
+ Request request = headerProviderPlarformProvider.createNotifyRequest(parentPlatform, deviceStatusXml.toString(), subscribeInfo.getToTag(), subscribeInfo.getFromTag(), callIdHeader);
+ transmitRequest(parentPlatform, request);
+
+ } catch (SipException | ParseException | InvalidArgumentException e) {
+ e.printStackTrace();
+ return false;
+ }
+ return true;
+ }
}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java
index e7f8f72..d4de725 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java
@@ -18,6 +18,7 @@
import javax.sip.address.AddressFactory;
import javax.sip.address.SipURI;
import javax.sip.header.ContentTypeHeader;
+import javax.sip.header.ExpiresHeader;
import javax.sip.header.HeaderFactory;
import javax.sip.header.ViaHeader;
import javax.sip.message.MessageFactory;
@@ -153,7 +154,7 @@
* @throws InvalidArgumentException
* @throws ParseException
*/
- public void responseAck(RequestEvent evt, String sdp) throws SipException, InvalidArgumentException, ParseException {
+ public void responseSdpAck(RequestEvent evt, String sdp) throws SipException, InvalidArgumentException, ParseException {
Response response = getMessageFactory().createResponse(Response.OK, evt.getRequest());
SipFactory sipFactory = SipFactory.getInstance();
ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("APPLICATION", "SDP");
@@ -168,6 +169,31 @@
getServerTransaction(evt).sendResponse(response);
}
+ /**
+ * 鍥炲甯ml鐨�200
+ * @param evt
+ * @param xml
+ * @throws SipException
+ * @throws InvalidArgumentException
+ * @throws ParseException
+ */
+ public Response responseXmlAck(RequestEvent evt, String xml) throws SipException, InvalidArgumentException, ParseException {
+ Response response = getMessageFactory().createResponse(Response.OK, evt.getRequest());
+ SipFactory sipFactory = SipFactory.getInstance();
+ ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("APPLICATION", "MANSCDP+xml");
+ response.setContent(xml, contentTypeHeader);
+
+ SipURI sipURI = (SipURI)evt.getRequest().getRequestURI();
+
+ Address concatAddress = sipFactory.createAddressFactory().createAddress(
+ sipFactory.createAddressFactory().createSipURI(sipURI.getUser(), sipURI.getHost()+":"+sipURI.getPort()
+ ));
+ response.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress));
+ response.addHeader(evt.getRequest().getHeader(ExpiresHeader.NAME));
+ getServerTransaction(evt).sendResponse(response);
+ return response;
+ }
+
public Element getRootElement(RequestEvent evt) throws DocumentException {
return getRootElement(evt, "gb2312");
}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
index 203d58b..2a9abad 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -253,7 +253,7 @@
content.append("f=\r\n");
try {
- responseAck(evt, content.toString());
+ responseSdpAck(evt, content.toString());
} catch (SipException e) {
e.printStackTrace();
} catch (InvalidArgumentException e) {
@@ -310,7 +310,7 @@
content.append("f=\r\n");
try {
- responseAck(evt, content.toString());
+ responseSdpAck(evt, content.toString());
} catch (SipException e) {
e.printStackTrace();
} catch (InvalidArgumentException e) {
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
index 2e58d9d..67bb56c 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
@@ -62,9 +62,7 @@
@Autowired
private DeviceOffLineDetector offLineDetector;
- private static final String NOTIFY_CATALOG = "Catalog";
- private static final String NOTIFY_ALARM = "Alarm";
- private static final String NOTIFY_MOBILE_POSITION = "MobilePosition";
+
private String method = "NOTIFY";
@Autowired
@@ -82,13 +80,13 @@
Element rootElement = getRootElement(evt);
String cmd = XmlUtil.getText(rootElement, "CmdType");
- if (NOTIFY_CATALOG.equals(cmd)) {
+ if (CmdType.CATALOG.equals(cmd)) {
logger.info("鎺ユ敹鍒癈atalog閫氱煡");
processNotifyCatalogList(evt);
- } else if (NOTIFY_ALARM.equals(cmd)) {
+ } else if (CmdType.ALARM.equals(cmd)) {
logger.info("鎺ユ敹鍒癆larm閫氱煡");
processNotifyAlarm(evt);
- } else if (NOTIFY_MOBILE_POSITION.equals(cmd)) {
+ } else if (CmdType.MOBILE_POSITION.equals(cmd)) {
logger.info("鎺ユ敹鍒癕obilePosition閫氱煡");
processNotifyMobilePosition(evt);
} else {
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java
index be4b2ce..9c5be8e 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java
@@ -1,8 +1,21 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
+import com.genersoft.iot.vmp.common.VideoManagerConstants;
+import com.genersoft.iot.vmp.conf.DynamicTask;
+import com.genersoft.iot.vmp.conf.UserSetup;
+import com.genersoft.iot.vmp.gb28181.bean.CmdType;
+import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo;
+import com.genersoft.iot.vmp.gb28181.task.GPSSubscribeTask;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
+import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
+import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
+import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
+import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
+import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
+import org.dom4j.DocumentException;
+import org.dom4j.Element;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
@@ -13,7 +26,10 @@
import javax.sip.RequestEvent;
import javax.sip.ServerTransaction;
import javax.sip.SipException;
+import javax.sip.header.CallIdHeader;
import javax.sip.header.ExpiresHeader;
+import javax.sip.header.Header;
+import javax.sip.header.ToHeader;
import javax.sip.message.Request;
import javax.sip.message.Response;
import java.text.ParseException;
@@ -29,6 +45,21 @@
@Autowired
private SIPProcessorObserver sipProcessorObserver;
+
+ @Autowired
+ private IRedisCatchStorage redisCatchStorage;
+
+ @Autowired
+ private ISIPCommanderForPlatform sipCommanderForPlatform;
+
+ @Autowired
+ private IVideoManagerStorager storager;
+
+ @Autowired
+ private DynamicTask dynamicTask;
+
+ @Autowired
+ private UserSetup userSetup;
@Override
public void afterPropertiesSet() throws Exception {
@@ -46,21 +77,39 @@
Request request = evt.getRequest();
try {
- Response response = null;
- response = getMessageFactory().createResponse(200, request);
- if (response != null) {
- ExpiresHeader expireHeader = getHeaderFactory().createExpiresHeader(30);
- response.setExpires(expireHeader);
- }
- logger.info("response : " + response.toString());
- ServerTransaction transaction = getServerTransaction(evt);
- if (transaction != null) {
- transaction.sendResponse(response);
- transaction.getDialog().delete();
- transaction.terminate();
+ Element rootElement = getRootElement(evt);
+ String cmd = XmlUtil.getText(rootElement, "CmdType");
+ if (CmdType.MOBILE_POSITION.equals(cmd)) {
+ logger.info("鎺ユ敹鍒癕obilePosition璁㈤槄");
+ processNotifyMobilePosition(evt, rootElement);
+// } else if (CmdType.ALARM.equals(cmd)) {
+// logger.info("鎺ユ敹鍒癆larm璁㈤槄");
+// processNotifyAlarm(evt, rootElement);
+// } else if (CmdType.CATALOG.equals(cmd)) {
+// logger.info("鎺ユ敹鍒癈atalog璁㈤槄");
+// processNotifyCatalogList(evt, rootElement);
} else {
- logger.info("processRequest serverTransactionId is null.");
+ logger.info("鎺ユ敹鍒版秷鎭細" + cmd);
+// responseAck(evt, Response.OK);
+
+ Response response = null;
+ response = getMessageFactory().createResponse(200, request);
+ if (response != null) {
+ ExpiresHeader expireHeader = getHeaderFactory().createExpiresHeader(30);
+ response.setExpires(expireHeader);
+ }
+ logger.info("response : " + response.toString());
+ ServerTransaction transaction = getServerTransaction(evt);
+ if (transaction != null) {
+ transaction.sendResponse(response);
+ transaction.getDialog().delete();
+ transaction.terminate();
+ } else {
+ logger.info("processRequest serverTransactionId is null.");
+ }
}
+
+
} catch (ParseException e) {
e.printStackTrace();
@@ -68,8 +117,67 @@
e.printStackTrace();
} catch (InvalidArgumentException e) {
e.printStackTrace();
+ } catch (DocumentException e) {
+ e.printStackTrace();
}
-
+
+ }
+
+ /**
+ * 澶勭悊绉诲姩浣嶇疆璁㈤槄娑堟伅
+ */
+ private void processNotifyMobilePosition(RequestEvent evt, Element rootElement) {
+ String platformId = SipUtils.getUserIdFromFromHeader(evt.getRequest());
+ String deviceID = XmlUtil.getText(rootElement, "DeviceID");
+ SubscribeInfo subscribeInfo = new SubscribeInfo(evt, platformId);
+ String sn = XmlUtil.getText(rootElement, "SN");
+ String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_MobilePosition_" + platformId;
+
+ StringBuilder resultXml = new StringBuilder(200);
+ resultXml.append("<?xml version=\"1.0\" ?>\r\n")
+ .append("<Response>\r\n")
+ .append("<CmdType>MobilePosition</CmdType>\r\n")
+ .append("<SN>" + sn + "</SN>\r\n")
+ .append("<DeviceID>" + deviceID + "</DeviceID>\r\n")
+ .append("<Result>OK</Result>\r\n")
+ .append("</Response>\r\n");
+
+ if (subscribeInfo.getExpires() > 0) {
+ if (redisCatchStorage.getSubscribe(key) != null) {
+ dynamicTask.stopCron(key);
+ }
+ String interval = XmlUtil.getText(rootElement, "Interval"); // GPS涓婃姤鏃堕棿闂撮殧
+ dynamicTask.startCron(key, new GPSSubscribeTask(redisCatchStorage, sipCommanderForPlatform, storager, platformId, sn, key), Integer.parseInt(interval));
+
+ redisCatchStorage.updateSubscribe(key, subscribeInfo);
+ }else if (subscribeInfo.getExpires() == 0) {
+ dynamicTask.stopCron(key);
+ redisCatchStorage.delSubscribe(key);
+ }
+
+
+
+ try {
+ Response response = responseXmlAck(evt, resultXml.toString());
+ ToHeader toHeader = (ToHeader)response.getHeader(ToHeader.NAME);
+ subscribeInfo.setToTag(toHeader.getTag());
+ redisCatchStorage.updateSubscribe(key, subscribeInfo);
+
+ } catch (SipException e) {
+ e.printStackTrace();
+ } catch (InvalidArgumentException e) {
+ e.printStackTrace();
+ } catch (ParseException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void processNotifyAlarm(RequestEvent evt, Element rootElement) {
+
+ }
+
+ private void processNotifyCatalogList(RequestEvent evt, Element rootElement) {
+
}
}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/GPSMsgInfo.java b/src/main/java/com/genersoft/iot/vmp/service/bean/GPSMsgInfo.java
index 3c5b2bf..855cc5e 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/bean/GPSMsgInfo.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/bean/GPSMsgInfo.java
@@ -23,7 +23,7 @@
private double speed;
/**
- * 浜х敓閫氱煡鏃堕棿,
+ * 浜х敓閫氱煡鏃堕棿, 鏃堕棿鏍煎紡锛� 2020-01-14T14:32:12
*/
private String time;
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGPSMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGPSMsgListener.java
index e3bfcde..4e390b7 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGPSMsgListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGPSMsgListener.java
@@ -17,6 +17,7 @@
@Override
public void onMessage(Message message, byte[] bytes) {
GPSMsgInfo gpsMsgInfo = JSON.parseObject(message.getBody(), GPSMsgInfo.class);
+ System.out.println(JSON.toJSON(gpsMsgInfo));
redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo);
}
}
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
index 709d548..ea85cf1 100644
--- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
@@ -2,10 +2,7 @@
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
-import com.genersoft.iot.vmp.gb28181.bean.Device;
-import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
-import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch;
-import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
+import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.service.bean.ThirdPartyGB;
@@ -196,4 +193,16 @@
void resetAllCSEQ();
void updateGpsMsgInfo(GPSMsgInfo gpsMsgInfo);
+
+ GPSMsgInfo getGpsMsgInfo(String gbId);
+
+ Long getSN(String method);
+
+ void resetAllSN();
+
+ void updateSubscribe(String key, SubscribeInfo subscribeInfo);
+
+ SubscribeInfo getSubscribe(String key);
+
+ void delSubscribe(String key);
}
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java
index fa6b51c..3a26b4e 100644
--- a/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java
@@ -54,6 +54,11 @@
"WHERE pgs.platformId = '${platformId}'")
List<GbStream> queryGbStreamListInPlatform(String platformId);
+
+ @Select("SELECT gs.*, pgs.platformId as platformId, pgs.catalogId as catalogId FROM gb_stream gs LEFT JOIN platform_gb_stream pgs " +
+ "ON gs.app = pgs.app and gs.stream = pgs.stream WHERE pgs.app is NULL and pgs.stream is NULL")
+ List<GbStream> queryStreamNotInPlatform();
+
@Update("UPDATE gb_stream " +
"SET status=${status} " +
"WHERE app=#{app} AND stream=#{stream}")
@@ -87,4 +92,6 @@
"</foreach> " +
"</script>")
void batchAdd(List<StreamPushItem> subList);
+
+
}
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
index 6ea768a..adda231 100644
--- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -50,8 +50,30 @@
}
@Override
+ public Long getSN(String method) {
+ String key = VideoManagerConstants.SIP_SN_PREFIX + userSetup.getServerId() + "_" + method;
+
+ long result = redis.incr(key, 1L);
+ if (result > Integer.MAX_VALUE) {
+ redis.set(key, 1);
+ result = 1;
+ }
+ return result;
+ }
+
+ @Override
public void resetAllCSEQ() {
String scanKey = VideoManagerConstants.SIP_CSEQ_PREFIX + userSetup.getServerId() + "_*";
+ List<Object> keys = redis.scan(scanKey);
+ for (int i = 0; i < keys.size(); i++) {
+ String key = (String) keys.get(i);
+ redis.set(key, 1);
+ }
+ }
+
+ @Override
+ public void resetAllSN() {
+ String scanKey = VideoManagerConstants.SIP_SN_PREFIX + userSetup.getServerId() + "_*";
List<Object> keys = redis.scan(scanKey);
for (int i = 0; i < keys.size(); i++) {
String key = (String) keys.get(i);
@@ -433,4 +455,25 @@
String key = VideoManagerConstants.WVP_STREAM_GPS_MSG_PREFIX + userSetup.getServerId() + "_" + gpsMsgInfo.getId();
redis.set(key, gpsMsgInfo);
}
+
+ @Override
+ public GPSMsgInfo getGpsMsgInfo(String gbId) {
+ String key = VideoManagerConstants.WVP_STREAM_GPS_MSG_PREFIX + userSetup.getServerId() + "_" + gbId;
+ return (GPSMsgInfo)redis.get(key);
+ }
+
+ @Override
+ public void updateSubscribe(String key, SubscribeInfo subscribeInfo) {
+ redis.set(key, subscribeInfo, subscribeInfo.getExpires());
+ }
+
+ @Override
+ public SubscribeInfo getSubscribe(String key) {
+ return (SubscribeInfo)redis.get(key);
+ }
+
+ @Override
+ public void delSubscribe(String key) {
+ redis.del(key);
+ }
}
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java
index 7f0efcd..d381574 100644
--- a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java
@@ -486,18 +486,21 @@
// 鏇存柊缂撳瓨
parentPlatformCatch.setParentPlatform(parentPlatform);
redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
- // 鍏变韩鎵�鏈夎棰戞祦锛岄渶瑕佸皢鐜版湁瑙嗛娴佹坊鍔犲埌姝ゅ钩鍙�
- List<GbStream> gbStreams = gbStreamMapper.selectAll();
- if (gbStreams.size() > 0) {
- for (GbStream gbStream : gbStreams) {
- gbStream.setCatalogId(parentPlatform.getCatalogId());
- }
- if (parentPlatform.isShareAllLiveStream()) {
- gbStreamService.addPlatformInfo(gbStreams, parentPlatform.getServerGBId(), parentPlatform.getCatalogId());
- }else {
- gbStreamService.delPlatformInfo(gbStreams);
+ if (parentPlatform.isEnable()) {
+ // 鍏变韩鎵�鏈夎棰戞祦锛岄渶瑕佸皢鐜版湁瑙嗛娴佹坊鍔犲埌姝ゅ钩鍙�
+ List<GbStream> gbStreams = gbStreamMapper.queryStreamNotInPlatform();
+ if (gbStreams.size() > 0) {
+ for (GbStream gbStream : gbStreams) {
+ gbStream.setCatalogId(parentPlatform.getCatalogId());
+ }
+ if (parentPlatform.isShareAllLiveStream()) {
+ gbStreamService.addPlatformInfo(gbStreams, parentPlatform.getServerGBId(), parentPlatform.getCatalogId());
+ }else {
+ gbStreamService.delPlatformInfo(gbStreams);
+ }
}
}
+
return result > 0;
}
--
Gitblit v1.8.0