From ddb36e54bd51761138c536ccca889d3f80182334 Mon Sep 17 00:00:00 2001
From: lin <18010473990@163.com>
Date: 星期六, 08 一月 2022 16:47:20 +0800
Subject: [PATCH] 级联平台添加GPS订阅支持

---
 src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java                                    |   78 +++++++
 src/main/java/com/genersoft/iot/vmp/service/impl/RedisGPSMsgListener.java                              |    1 
 src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/SubscribeListenerForPlatform.java          |   52 +++++
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java |  136 ++++++++++++-
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java         |   49 ++++
 src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java                        |   23 +-
 src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java                                  |    4 
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java    |    4 
 src/main/java/com/genersoft/iot/vmp/gb28181/bean/CmdType.java                                          |    8 
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java      |   28 ++
 src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java                           |   43 ++++
 src/main/java/com/genersoft/iot/vmp/service/bean/GPSMsgInfo.java                                       |    2 
 src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java                                   |   17 +
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java             |   41 ++++
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java                 |   10 +
 src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java                                   |    7 
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java    |   10 
 src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java                                 |   59 +++++
 18 files changed, 530 insertions(+), 42 deletions(-)

diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
index e45055b..7f659ab 100644
--- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
+++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
@@ -58,6 +58,10 @@
 
 	public static final String SIP_CSEQ_PREFIX = "VMP_SIP_CSEQ_";
 
+	public static final String SIP_SN_PREFIX = "VMP_SIP_SN_";
+
+	public static final String SIP_SUBSCRIBE_PREFIX = "SIP_SUBSCRIBE_";
+
 	//************************** redis 娑堟伅*********************************
 	public static final String WVP_MSG_STREAM_CHANGE_PREFIX = "WVP_MSG_STREAM_CHANGE_";
 	public static final String WVP_MSG_GPS_PREFIX = "WVP_MSG_GPS_";
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CmdType.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CmdType.java
new file mode 100644
index 0000000..ed5cad6
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CmdType.java
@@ -0,0 +1,8 @@
+package com.genersoft.iot.vmp.gb28181.bean;
+
+public class CmdType {
+
+    public static final String CATALOG = "Catalog";
+    public static final String ALARM = "Alarm";
+    public static final String MOBILE_POSITION = "MobilePosition";
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java
new file mode 100644
index 0000000..66d67bf
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java
@@ -0,0 +1,78 @@
+package com.genersoft.iot.vmp.gb28181.bean;
+
+import javax.sip.RequestEvent;
+import javax.sip.header.*;
+import javax.sip.message.Request;
+
+public class SubscribeInfo {
+
+    public SubscribeInfo() {
+    }
+
+    public SubscribeInfo(RequestEvent evt, String id) {
+        this.id = id;
+        Request request = evt.getRequest();
+        CallIdHeader callIdHeader = (CallIdHeader)request.getHeader(CallIdHeader.NAME);
+        this.callId = callIdHeader.getCallId();
+        FromHeader fromHeader = (FromHeader)request.getHeader(FromHeader.NAME);
+        this.fromTag = fromHeader.getTag();
+        ExpiresHeader expiresHeader = (ExpiresHeader)request.getHeader(ExpiresHeader.NAME);
+        this.expires = expiresHeader.getExpires();
+        this.event = (EventHeader)request.getHeader(EventHeader.NAME);
+    }
+
+    private String id;
+    private int expires;
+    private String callId;
+    private EventHeader event;
+    private String fromTag;
+    private String toTag;
+
+    public String getId() {
+        return id;
+    }
+
+    public int getExpires() {
+        return expires;
+    }
+
+    public String getCallId() {
+        return callId;
+    }
+
+    public EventHeader getEvent() {
+        return event;
+    }
+
+    public String getFromTag() {
+        return fromTag;
+    }
+
+    public void setToTag(String toTag) {
+        this.toTag = toTag;
+    }
+
+    public String getToTag() {
+        return toTag;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public void setExpires(int expires) {
+        this.expires = expires;
+    }
+
+    public void setCallId(String callId) {
+        this.callId = callId;
+    }
+
+    public void setEvent(EventHeader event) {
+        this.event = event;
+    }
+
+    public void setFromTag(String fromTag) {
+        this.fromTag = fromTag;
+    }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/SubscribeListenerForPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/SubscribeListenerForPlatform.java
new file mode 100644
index 0000000..bac8e3d
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/SubscribeListenerForPlatform.java
@@ -0,0 +1,52 @@
+package com.genersoft.iot.vmp.gb28181.event.subscribe;
+
+import com.genersoft.iot.vmp.common.VideoManagerConstants;
+import com.genersoft.iot.vmp.conf.DynamicTask;
+import com.genersoft.iot.vmp.conf.RedisKeyExpirationEventMessageListener;
+import com.genersoft.iot.vmp.conf.UserSetup;
+import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
+import org.checkerframework.checker.units.qual.A;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.listener.RedisMessageListenerContainer;
+import org.springframework.stereotype.Component;
+
+/**    
+ * 骞冲彴璁㈤槄鍒版湡浜嬩欢
+ */
+@Component
+public class SubscribeListenerForPlatform extends RedisKeyExpirationEventMessageListener {
+
+    private Logger logger = LoggerFactory.getLogger(SubscribeListenerForPlatform.class);
+
+	@Autowired
+	private UserSetup userSetup;
+
+    @Autowired
+    private DynamicTask dynamicTask;
+
+    public SubscribeListenerForPlatform(RedisMessageListenerContainer listenerContainer, UserSetup userSetup) {
+        super(listenerContainer, userSetup);
+    }
+
+
+    /**
+     * 鐩戝惉澶辨晥鐨刱ey
+     * @param message
+     * @param pattern
+     */
+    @Override
+    public void onMessage(Message message, byte[] pattern) {
+        //  鑾峰彇澶辨晥鐨刱ey
+        String expiredKey = message.toString();
+        logger.debug(expiredKey);
+        // 璁㈤槄鍒版湡
+        String PLATFORM_KEEPLIVEKEY_PREFIX = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_";
+        if (expiredKey.startsWith(PLATFORM_KEEPLIVEKEY_PREFIX)) {
+            // 鍙栨秷瀹氭椂浠诲姟
+            dynamicTask.stopCron(expiredKey);
+        }
+    }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java
new file mode 100644
index 0000000..ce990a0
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java
@@ -0,0 +1,59 @@
+package com.genersoft.iot.vmp.gb28181.task;
+
+import com.genersoft.iot.vmp.gb28181.bean.GbStream;
+import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
+import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo;
+import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
+import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
+import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
+import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
+
+import java.text.SimpleDateFormat;
+import java.util.List;
+
+public class GPSSubscribeTask implements Runnable{
+
+    private IRedisCatchStorage redisCatchStorage;
+    private IVideoManagerStorager storager;
+    private ISIPCommanderForPlatform sipCommanderForPlatform;
+    private String platformId;
+    private String sn;
+    private String key;
+
+    private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+    public GPSSubscribeTask(IRedisCatchStorage redisCatchStorage, ISIPCommanderForPlatform sipCommanderForPlatform, IVideoManagerStorager storager, String platformId, String sn, String key) {
+        this.redisCatchStorage = redisCatchStorage;
+        this.storager = storager;
+        this.platformId = platformId;
+        this.sn = sn;
+        this.key = key;
+        this.sipCommanderForPlatform = sipCommanderForPlatform;
+    }
+
+    @Override
+    public void run() {
+
+        SubscribeInfo subscribe = redisCatchStorage.getSubscribe(key);
+        if (subscribe != null) {
+            System.out.println("鍙戦�丟PS娑堟伅");
+            ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformId);
+            if (parentPlatform == null || parentPlatform.isStatus()) {
+                // TODO 鏆傛椂鍙鐞嗚棰戞祦鐨勫洖澶�,鍚庣画澧炲姞瀵瑰浗鏍囪澶囩殑鏀寔
+                List<GbStream> gbStreams = storager.queryGbStreamListInPlatform(platformId);
+                if (gbStreams.size() > 0) {
+                    for (GbStream gbStream : gbStreams) {
+                        String gbId = gbStream.getGbId();
+                        GPSMsgInfo gpsMsgInfo = redisCatchStorage.getGpsMsgInfo(gbId);
+                        if (gpsMsgInfo != null && gbStream.isStatus()) {
+                            // 鍙戦�丟PS娑堟伅
+                            sipCommanderForPlatform.sendMobilePosition(parentPlatform, gpsMsgInfo, subscribe);
+                        }
+                    }
+                }
+            }
+        }
+
+
+    }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java
index fe30293..e8b4124 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java
@@ -2,7 +2,9 @@
 
 import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
 import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
+import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo;
 import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
+import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
 
 import javax.sip.header.WWWAuthenticateHeader;
 
@@ -61,4 +63,12 @@
      */
     boolean deviceStatusResponse(ParentPlatform parentPlatform, String sn, String fromTag);
 
+    /**
+     * 鍚戜笂绾у洖澶嶇Щ鍔ㄤ綅缃闃呮秷鎭�
+     * @param parentPlatform 骞冲彴淇℃伅
+     * @param gpsMsgInfo GPS淇℃伅
+     * @param subscribeInfo 璁㈤槄鐩稿叧鐨勪俊鎭�
+     * @return
+     */
+    boolean sendMobilePosition(ParentPlatform parentPlatform, GPSMsgInfo gpsMsgInfo, SubscribeInfo subscribeInfo);
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java
index cc41af7..27125e1 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java
@@ -2,6 +2,7 @@
 
 import com.genersoft.iot.vmp.conf.SipConfig;
 import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
+import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import gov.nist.javax.sip.message.MessageFactoryImpl;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
@@ -32,6 +33,9 @@
 	@Autowired
 	private SipFactory sipFactory;
 
+	@Autowired
+	private IRedisCatchStorage redisCatchStorage;
+
 
 	public Request createKeetpaliveMessageRequest(ParentPlatform parentPlatform, String content, String viaTag, String fromTag, String toTag, CallIdHeader callIdHeader) throws ParseException, InvalidArgumentException, PeerUnavailableException {
 		Request request = null;
@@ -57,7 +61,7 @@
 		// Forwards
 		MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70);
 		// ceq
-		CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(1L, Request.MESSAGE);
+		CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(Request.MESSAGE), Request.MESSAGE);
 
 		request = sipFactory.createMessageFactory().createRequest(requestURI, Request.MESSAGE, callIdHeader, cSeqHeader, fromHeader,
 				toHeader, viaHeaders, maxForwards);
@@ -122,7 +126,7 @@
 										 String callId, WWWAuthenticateHeader www , CallIdHeader callIdHeader) throws ParseException, PeerUnavailableException, InvalidArgumentException {
 
 
-		Request registerRequest = createRegisterRequest(parentPlatform, 2L, fromTag, viaTag, callIdHeader);
+		Request registerRequest = createRegisterRequest(parentPlatform, redisCatchStorage.getCSEQ(Request.REGISTER), fromTag, viaTag, callIdHeader);
 
 		String realm = www.getRealm();
 		String nonce = www.getNonce();
@@ -208,7 +212,7 @@
 		// Forwards
 		MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70);
 		// ceq
-		CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(1L, Request.MESSAGE);
+		CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(Request.MESSAGE), Request.MESSAGE);
 		MessageFactoryImpl messageFactory = (MessageFactoryImpl) sipFactory.createMessageFactory();
 		// 璁剧疆缂栫爜锛� 闃叉涓枃涔辩爜
 		messageFactory.setDefaultContentEncodingCharset("gb2312");
@@ -223,4 +227,43 @@
 		request.setContent(content, contentTypeHeader);
 		return request;
 	}
+
+    public Request createNotifyRequest(ParentPlatform parentPlatform, String content, String fromTag, String toTag, CallIdHeader callIdHeader) throws PeerUnavailableException, ParseException, InvalidArgumentException {
+		Request request = null;
+		// sipuri
+		SipURI requestURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerIP()+ ":" + parentPlatform.getServerPort());
+		// via
+		ArrayList<ViaHeader> viaHeaders = new ArrayList<ViaHeader>();
+		ViaHeader viaHeader = sipFactory.createHeaderFactory().createViaHeader(parentPlatform.getDeviceIp(), Integer.parseInt(parentPlatform.getDevicePort()),
+				parentPlatform.getTransport(), null);
+		viaHeader.setRPort();
+		viaHeaders.add(viaHeader);
+		// from
+		SipURI fromSipURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getDeviceGBId(),
+				parentPlatform.getDeviceIp() + ":" + parentPlatform.getDevicePort());
+		Address fromAddress = sipFactory.createAddressFactory().createAddress(fromSipURI);
+		FromHeader fromHeader = sipFactory.createHeaderFactory().createFromHeader(fromAddress, fromTag);
+		// to
+		SipURI toSipURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerGBDomain());
+		Address toAddress = sipFactory.createAddressFactory().createAddress(toSipURI);
+		ToHeader toHeader = sipFactory.createHeaderFactory().createToHeader(toAddress, toTag);
+
+		// Forwards
+		MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70);
+		// ceq
+		CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(Request.NOTIFY), Request.NOTIFY);
+		MessageFactoryImpl messageFactory = (MessageFactoryImpl) sipFactory.createMessageFactory();
+		// 璁剧疆缂栫爜锛� 闃叉涓枃涔辩爜
+		messageFactory.setDefaultContentEncodingCharset("gb2312");
+		request = messageFactory.createRequest(requestURI, Request.NOTIFY, callIdHeader, cSeqHeader, fromHeader,
+				toHeader, viaHeaders, maxForwards);
+		List<String> agentParam = new ArrayList<>();
+		agentParam.add("wvp-pro");
+		UserAgentHeader userAgentHeader = sipFactory.createHeaderFactory().createUserAgentHeader(agentParam);
+		request.addHeader(userAgentHeader);
+
+		ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("APPLICATION", "MANSCDP+xml");
+		request.setContent(content, contentTypeHeader);
+		return request;
+    }
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
index cf8b0a5..1707bde 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
@@ -3,9 +3,11 @@
 import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
 import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
 import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch;
+import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo;
 import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderPlarformProvider;
+import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -92,7 +94,7 @@
                     callIdHeader = udpSipProvider.getNewCallId();
                 }
 
-                request = headerProviderPlarformProvider.createRegisterRequest(parentPlatform, 1L, "FromRegister" + tm, null, callIdHeader);
+                request = headerProviderPlarformProvider.createRegisterRequest(parentPlatform, redisCatchStorage.getCSEQ(Request.REGISTER), "FromRegister" + tm, null, callIdHeader);
                 // 灏� callid 鍐欏叆缂撳瓨锛� 绛夋敞鍐屾垚鍔熷彲浠ユ洿鏂扮姸鎬�
                 redisCatchStorage.updatePlatformRegisterInfo(callIdHeader.getCallId(), parentPlatform.getServerGBId());
 
@@ -325,4 +327,41 @@
         }
         return true;
     }
+
+    @Override
+    public boolean sendMobilePosition(ParentPlatform parentPlatform, GPSMsgInfo gpsMsgInfo, SubscribeInfo subscribeInfo) {
+        if (parentPlatform == null) {
+            return false;
+        }
+
+        try {
+            StringBuffer deviceStatusXml = new StringBuffer(600);
+            deviceStatusXml.append("<?xml version=\"1.0\" encoding=\"GB2312\"?>\r\n");
+            deviceStatusXml.append("<Notify>\r\n");
+            deviceStatusXml.append("<CmdType>MobilePosition</CmdType>\r\n");
+            deviceStatusXml.append("<SN>" + (int)((Math.random()*9+1)*100000) + "</SN>\r\n");
+            deviceStatusXml.append("<DeviceID>" + gpsMsgInfo.getId() + "</DeviceID>\r\n");
+            deviceStatusXml.append("<Time>" + gpsMsgInfo.getTime() + "</Time>\r\n");
+            deviceStatusXml.append("<Longitude>" + gpsMsgInfo.getLng() + "</Longitude>\r\n");
+            deviceStatusXml.append("<Latitude>" + gpsMsgInfo.getLat() + "</Latitude>\r\n");
+            deviceStatusXml.append("<Speed>" + gpsMsgInfo.getSpeed() + "</Speed>\r\n");
+            deviceStatusXml.append("<Direction>" + gpsMsgInfo.getDirection() + "</Direction>\r\n");
+            deviceStatusXml.append("<Altitude>" + gpsMsgInfo.getAltitude() + "</Altitude>\r\n");
+            deviceStatusXml.append("</Notify>\r\n");
+
+            CallIdHeader callIdHeader = parentPlatform.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
+                    : udpSipProvider.getNewCallId();
+            callIdHeader.setCallId(subscribeInfo.getCallId());
+
+            String tm = Long.toString(System.currentTimeMillis());
+
+            Request request = headerProviderPlarformProvider.createNotifyRequest(parentPlatform, deviceStatusXml.toString(), subscribeInfo.getToTag(), subscribeInfo.getFromTag(), callIdHeader);
+            transmitRequest(parentPlatform, request);
+
+        } catch (SipException | ParseException | InvalidArgumentException e) {
+            e.printStackTrace();
+            return false;
+        }
+        return true;
+    }
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java
index e7f8f72..d4de725 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java
@@ -18,6 +18,7 @@
 import javax.sip.address.AddressFactory;
 import javax.sip.address.SipURI;
 import javax.sip.header.ContentTypeHeader;
+import javax.sip.header.ExpiresHeader;
 import javax.sip.header.HeaderFactory;
 import javax.sip.header.ViaHeader;
 import javax.sip.message.MessageFactory;
@@ -153,7 +154,7 @@
 	 * @throws InvalidArgumentException
 	 * @throws ParseException
 	 */
-	public void responseAck(RequestEvent evt, String sdp) throws SipException, InvalidArgumentException, ParseException {
+	public void responseSdpAck(RequestEvent evt, String sdp) throws SipException, InvalidArgumentException, ParseException {
 		Response response = getMessageFactory().createResponse(Response.OK, evt.getRequest());
 		SipFactory sipFactory = SipFactory.getInstance();
 		ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("APPLICATION", "SDP");
@@ -168,6 +169,31 @@
 		getServerTransaction(evt).sendResponse(response);
 	}
 
+	/**
+	 * 鍥炲甯ml鐨�200
+	 * @param evt
+	 * @param xml
+	 * @throws SipException
+	 * @throws InvalidArgumentException
+	 * @throws ParseException
+	 */
+	public Response responseXmlAck(RequestEvent evt, String xml) throws SipException, InvalidArgumentException, ParseException {
+		Response response = getMessageFactory().createResponse(Response.OK, evt.getRequest());
+		SipFactory sipFactory = SipFactory.getInstance();
+		ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("APPLICATION", "MANSCDP+xml");
+		response.setContent(xml, contentTypeHeader);
+
+		SipURI sipURI = (SipURI)evt.getRequest().getRequestURI();
+
+		Address concatAddress = sipFactory.createAddressFactory().createAddress(
+				sipFactory.createAddressFactory().createSipURI(sipURI.getUser(),  sipURI.getHost()+":"+sipURI.getPort()
+				));
+		response.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress));
+		response.addHeader(evt.getRequest().getHeader(ExpiresHeader.NAME));
+		getServerTransaction(evt).sendResponse(response);
+		return response;
+	}
+
 	public Element getRootElement(RequestEvent evt) throws DocumentException {
 		return getRootElement(evt, "gb2312");
 	}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
index 203d58b..2a9abad 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -253,7 +253,7 @@
 						content.append("f=\r\n");
 
 						try {
-							responseAck(evt, content.toString());
+							responseSdpAck(evt, content.toString());
 						} catch (SipException e) {
 							e.printStackTrace();
 						} catch (InvalidArgumentException e) {
@@ -310,7 +310,7 @@
 					content.append("f=\r\n");
 
 					try {
-						responseAck(evt, content.toString());
+						responseSdpAck(evt, content.toString());
 					} catch (SipException e) {
 						e.printStackTrace();
 					} catch (InvalidArgumentException e) {
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
index 2e58d9d..67bb56c 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
@@ -62,9 +62,7 @@
 	@Autowired
 	private DeviceOffLineDetector offLineDetector;
 
-	private static final String NOTIFY_CATALOG = "Catalog";
-	private static final String NOTIFY_ALARM = "Alarm";
-	private static final String NOTIFY_MOBILE_POSITION = "MobilePosition";
+
 	private String method = "NOTIFY";
 
 	@Autowired
@@ -82,13 +80,13 @@
 			Element rootElement = getRootElement(evt);
 			String cmd = XmlUtil.getText(rootElement, "CmdType");
 
-			if (NOTIFY_CATALOG.equals(cmd)) {
+			if (CmdType.CATALOG.equals(cmd)) {
 				logger.info("鎺ユ敹鍒癈atalog閫氱煡");
 				processNotifyCatalogList(evt);
-			} else if (NOTIFY_ALARM.equals(cmd)) {
+			} else if (CmdType.ALARM.equals(cmd)) {
 				logger.info("鎺ユ敹鍒癆larm閫氱煡");
 				processNotifyAlarm(evt);
-			} else if (NOTIFY_MOBILE_POSITION.equals(cmd)) {
+			} else if (CmdType.MOBILE_POSITION.equals(cmd)) {
 				logger.info("鎺ユ敹鍒癕obilePosition閫氱煡");
 				processNotifyMobilePosition(evt);
 			} else {
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java
index be4b2ce..9c5be8e 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java
@@ -1,8 +1,21 @@
 package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
 
+import com.genersoft.iot.vmp.common.VideoManagerConstants;
+import com.genersoft.iot.vmp.conf.DynamicTask;
+import com.genersoft.iot.vmp.conf.UserSetup;
+import com.genersoft.iot.vmp.gb28181.bean.CmdType;
+import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo;
+import com.genersoft.iot.vmp.gb28181.task.GPSSubscribeTask;
 import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
+import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
 import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
 import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
+import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
+import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
+import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
+import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
+import org.dom4j.DocumentException;
+import org.dom4j.Element;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.InitializingBean;
@@ -13,7 +26,10 @@
 import javax.sip.RequestEvent;
 import javax.sip.ServerTransaction;
 import javax.sip.SipException;
+import javax.sip.header.CallIdHeader;
 import javax.sip.header.ExpiresHeader;
+import javax.sip.header.Header;
+import javax.sip.header.ToHeader;
 import javax.sip.message.Request;
 import javax.sip.message.Response;
 import java.text.ParseException;
@@ -29,6 +45,21 @@
 
 	@Autowired
 	private SIPProcessorObserver sipProcessorObserver;
+
+	@Autowired
+	private IRedisCatchStorage redisCatchStorage;
+
+	@Autowired
+	private ISIPCommanderForPlatform sipCommanderForPlatform;
+
+	@Autowired
+	private IVideoManagerStorager storager;
+
+	@Autowired
+	private DynamicTask dynamicTask;
+
+	@Autowired
+	private UserSetup userSetup;
 
 	@Override
 	public void afterPropertiesSet() throws Exception {
@@ -46,21 +77,39 @@
 		Request request = evt.getRequest();
 
 		try {
-			Response response = null;
-			response = getMessageFactory().createResponse(200, request);
-			if (response != null) {
-				ExpiresHeader expireHeader = getHeaderFactory().createExpiresHeader(30);
-				response.setExpires(expireHeader);
-			}
-			logger.info("response : " + response.toString());
-			ServerTransaction transaction = getServerTransaction(evt);
-			if (transaction != null) {
-				transaction.sendResponse(response);
-				transaction.getDialog().delete();
-				transaction.terminate();
+			Element rootElement = getRootElement(evt);
+			String cmd = XmlUtil.getText(rootElement, "CmdType");
+			if (CmdType.MOBILE_POSITION.equals(cmd)) {
+				logger.info("鎺ユ敹鍒癕obilePosition璁㈤槄");
+				processNotifyMobilePosition(evt, rootElement);
+//			} else if (CmdType.ALARM.equals(cmd)) {
+//				logger.info("鎺ユ敹鍒癆larm璁㈤槄");
+//				processNotifyAlarm(evt, rootElement);
+//			} else if (CmdType.CATALOG.equals(cmd)) {
+//				logger.info("鎺ユ敹鍒癈atalog璁㈤槄");
+//				processNotifyCatalogList(evt, rootElement);
 			} else {
-				logger.info("processRequest serverTransactionId is null.");
+				logger.info("鎺ユ敹鍒版秷鎭細" + cmd);
+//				responseAck(evt, Response.OK);
+
+				Response response = null;
+				response = getMessageFactory().createResponse(200, request);
+				if (response != null) {
+					ExpiresHeader expireHeader = getHeaderFactory().createExpiresHeader(30);
+					response.setExpires(expireHeader);
+				}
+				logger.info("response : " + response.toString());
+				ServerTransaction transaction = getServerTransaction(evt);
+				if (transaction != null) {
+					transaction.sendResponse(response);
+					transaction.getDialog().delete();
+					transaction.terminate();
+				} else {
+					logger.info("processRequest serverTransactionId is null.");
+				}
 			}
+
+
 
 		} catch (ParseException e) {
 			e.printStackTrace();
@@ -68,8 +117,67 @@
 			e.printStackTrace();
 		} catch (InvalidArgumentException e) {
 			e.printStackTrace();
+		} catch (DocumentException e) {
+			e.printStackTrace();
 		}
-		
+
+	}
+
+	/**
+	 * 澶勭悊绉诲姩浣嶇疆璁㈤槄娑堟伅
+	 */
+	private void processNotifyMobilePosition(RequestEvent evt, Element rootElement) {
+		String platformId = SipUtils.getUserIdFromFromHeader(evt.getRequest());
+		String deviceID = XmlUtil.getText(rootElement, "DeviceID");
+		SubscribeInfo subscribeInfo = new SubscribeInfo(evt, platformId);
+		String sn = XmlUtil.getText(rootElement, "SN");
+		String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() +  "_MobilePosition_" + platformId;
+
+		StringBuilder resultXml = new StringBuilder(200);
+		resultXml.append("<?xml version=\"1.0\" ?>\r\n")
+				.append("<Response>\r\n")
+				.append("<CmdType>MobilePosition</CmdType>\r\n")
+				.append("<SN>" + sn + "</SN>\r\n")
+				.append("<DeviceID>" + deviceID + "</DeviceID>\r\n")
+				.append("<Result>OK</Result>\r\n")
+				.append("</Response>\r\n");
+
+		if (subscribeInfo.getExpires() > 0) {
+			if (redisCatchStorage.getSubscribe(key) != null) {
+				dynamicTask.stopCron(key);
+			}
+			String interval = XmlUtil.getText(rootElement, "Interval"); // GPS涓婃姤鏃堕棿闂撮殧
+			dynamicTask.startCron(key, new GPSSubscribeTask(redisCatchStorage, sipCommanderForPlatform, storager,  platformId, sn, key), Integer.parseInt(interval));
+
+			redisCatchStorage.updateSubscribe(key, subscribeInfo);
+		}else if (subscribeInfo.getExpires() == 0) {
+			dynamicTask.stopCron(key);
+			redisCatchStorage.delSubscribe(key);
+		}
+
+
+
+		try {
+			Response response = responseXmlAck(evt, resultXml.toString());
+			ToHeader toHeader = (ToHeader)response.getHeader(ToHeader.NAME);
+			subscribeInfo.setToTag(toHeader.getTag());
+			redisCatchStorage.updateSubscribe(key, subscribeInfo);
+
+		} catch (SipException e) {
+			e.printStackTrace();
+		} catch (InvalidArgumentException e) {
+			e.printStackTrace();
+		} catch (ParseException e) {
+			e.printStackTrace();
+		}
+	}
+
+	private void processNotifyAlarm(RequestEvent evt, Element rootElement) {
+
+	}
+
+	private void processNotifyCatalogList(RequestEvent evt, Element rootElement) {
+
 	}
 
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/GPSMsgInfo.java b/src/main/java/com/genersoft/iot/vmp/service/bean/GPSMsgInfo.java
index 3c5b2bf..855cc5e 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/bean/GPSMsgInfo.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/bean/GPSMsgInfo.java
@@ -23,7 +23,7 @@
     private double speed;
 
     /**
-     * 浜х敓閫氱煡鏃堕棿,
+     * 浜х敓閫氱煡鏃堕棿, 鏃堕棿鏍煎紡锛� 2020-01-14T14:32:12
      */
     private String time;
 
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGPSMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGPSMsgListener.java
index e3bfcde..4e390b7 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGPSMsgListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGPSMsgListener.java
@@ -17,6 +17,7 @@
     @Override
     public void onMessage(Message message, byte[] bytes) {
         GPSMsgInfo gpsMsgInfo = JSON.parseObject(message.getBody(), GPSMsgInfo.class);
+        System.out.println(JSON.toJSON(gpsMsgInfo));
         redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo);
     }
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
index 709d548..ea85cf1 100644
--- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
@@ -2,10 +2,7 @@
 
 import com.alibaba.fastjson.JSONObject;
 import com.genersoft.iot.vmp.common.StreamInfo;
-import com.genersoft.iot.vmp.gb28181.bean.Device;
-import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
-import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch;
-import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
+import com.genersoft.iot.vmp.gb28181.bean.*;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
 import com.genersoft.iot.vmp.service.bean.ThirdPartyGB;
@@ -196,4 +193,16 @@
     void resetAllCSEQ();
 
     void updateGpsMsgInfo(GPSMsgInfo gpsMsgInfo);
+
+    GPSMsgInfo getGpsMsgInfo(String gbId);
+
+    Long getSN(String method);
+
+    void resetAllSN();
+
+    void updateSubscribe(String key, SubscribeInfo subscribeInfo);
+
+    SubscribeInfo getSubscribe(String key);
+
+    void delSubscribe(String key);
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java
index fa6b51c..3a26b4e 100644
--- a/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java
@@ -54,6 +54,11 @@
             "WHERE pgs.platformId = '${platformId}'")
     List<GbStream> queryGbStreamListInPlatform(String platformId);
 
+
+    @Select("SELECT gs.*, pgs.platformId as platformId, pgs.catalogId as catalogId FROM gb_stream gs  LEFT JOIN platform_gb_stream pgs " +
+            "ON  gs.app = pgs.app and gs.stream = pgs.stream WHERE pgs.app is NULL and pgs.stream is NULL")
+    List<GbStream> queryStreamNotInPlatform();
+
     @Update("UPDATE gb_stream " +
             "SET status=${status} " +
             "WHERE app=#{app} AND stream=#{stream}")
@@ -87,4 +92,6 @@
             "</foreach> " +
             "</script>")
     void batchAdd(List<StreamPushItem> subList);
+
+
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
index 6ea768a..adda231 100644
--- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -50,8 +50,30 @@
     }
 
     @Override
+    public Long getSN(String method) {
+        String key = VideoManagerConstants.SIP_SN_PREFIX  + userSetup.getServerId() + "_" +  method;
+
+        long result =  redis.incr(key, 1L);
+        if (result > Integer.MAX_VALUE) {
+            redis.set(key, 1);
+            result = 1;
+        }
+        return result;
+    }
+
+    @Override
     public void resetAllCSEQ() {
         String scanKey = VideoManagerConstants.SIP_CSEQ_PREFIX  + userSetup.getServerId() + "_*";
+        List<Object> keys = redis.scan(scanKey);
+        for (int i = 0; i < keys.size(); i++) {
+            String key = (String) keys.get(i);
+            redis.set(key, 1);
+        }
+    }
+
+    @Override
+    public void resetAllSN() {
+        String scanKey = VideoManagerConstants.SIP_SN_PREFIX  + userSetup.getServerId() + "_*";
         List<Object> keys = redis.scan(scanKey);
         for (int i = 0; i < keys.size(); i++) {
             String key = (String) keys.get(i);
@@ -433,4 +455,25 @@
         String key = VideoManagerConstants.WVP_STREAM_GPS_MSG_PREFIX + userSetup.getServerId() + "_" + gpsMsgInfo.getId();
         redis.set(key, gpsMsgInfo);
     }
+
+    @Override
+    public GPSMsgInfo getGpsMsgInfo(String gbId) {
+        String key = VideoManagerConstants.WVP_STREAM_GPS_MSG_PREFIX + userSetup.getServerId() + "_" + gbId;
+        return (GPSMsgInfo)redis.get(key);
+    }
+
+    @Override
+    public void updateSubscribe(String key, SubscribeInfo subscribeInfo) {
+        redis.set(key, subscribeInfo, subscribeInfo.getExpires());
+    }
+
+    @Override
+    public SubscribeInfo getSubscribe(String key) {
+        return (SubscribeInfo)redis.get(key);
+    }
+
+    @Override
+    public void delSubscribe(String key) {
+        redis.del(key);
+    }
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java
index 7f0efcd..d381574 100644
--- a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java
@@ -486,18 +486,21 @@
 		// 鏇存柊缂撳瓨
 		parentPlatformCatch.setParentPlatform(parentPlatform);
 		redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
-		// 鍏变韩鎵�鏈夎棰戞祦锛岄渶瑕佸皢鐜版湁瑙嗛娴佹坊鍔犲埌姝ゅ钩鍙�
-		List<GbStream> gbStreams = gbStreamMapper.selectAll();
-		if (gbStreams.size() > 0) {
-			for (GbStream gbStream : gbStreams) {
-				gbStream.setCatalogId(parentPlatform.getCatalogId());
-			}
-			if (parentPlatform.isShareAllLiveStream()) {
-				gbStreamService.addPlatformInfo(gbStreams, parentPlatform.getServerGBId(), parentPlatform.getCatalogId());
-			}else {
-				gbStreamService.delPlatformInfo(gbStreams);
+		if (parentPlatform.isEnable()) {
+			// 鍏变韩鎵�鏈夎棰戞祦锛岄渶瑕佸皢鐜版湁瑙嗛娴佹坊鍔犲埌姝ゅ钩鍙�
+			List<GbStream> gbStreams = gbStreamMapper.queryStreamNotInPlatform();
+			if (gbStreams.size() > 0) {
+				for (GbStream gbStream : gbStreams) {
+					gbStream.setCatalogId(parentPlatform.getCatalogId());
+				}
+				if (parentPlatform.isShareAllLiveStream()) {
+					gbStreamService.addPlatformInfo(gbStreams, parentPlatform.getServerGBId(), parentPlatform.getCatalogId());
+				}else {
+					gbStreamService.delPlatformInfo(gbStreams);
+				}
 			}
 		}
+
 		return result > 0;
 	}
 

--
Gitblit v1.8.0