From fd091e545ba174adc36a9d3370e6d4c040ad33fd Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期四, 28 七月 2022 16:18:41 +0800
Subject: [PATCH] 优化hook订阅机制

---
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java |   89 +++++++++++++++++++++++++++-----------------
 1 files changed, 55 insertions(+), 34 deletions(-)

diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
index 7b144d1..feb66b4 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
@@ -10,6 +10,9 @@
 import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderProvider;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
+import com.genersoft.iot.vmp.media.zlm.dto.HookType;
 import com.genersoft.iot.vmp.utils.DateUtil;
 import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
 import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
@@ -348,25 +351,19 @@
 	@Override
 	public void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
 							  ZLMHttpHookSubscribe.Event event, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) {
-		String streamId = ssrcInfo.getStream();
+		String stream = ssrcInfo.getStream();
 		try {
 			if (device == null) {
 				return;
 			}
 			String streamMode = device.getStreamMode().toUpperCase();
 
-			logger.info("{} 鍒嗛厤鐨刏LM涓�: {} [{}:{}]", streamId, mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort());
-			// 娣诲姞璁㈤槄
-			JSONObject subscribeKey = new JSONObject();
-			subscribeKey.put("app", "rtp");
-			subscribeKey.put("stream", streamId);
-			subscribeKey.put("regist", true);
-			subscribeKey.put("schema", "rtmp");
-			subscribeKey.put("mediaServerId", mediaServerItem.getId());
-			subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
-					(MediaServerItem mediaServerItemInUse, JSONObject json)->{
+			logger.info("{} 鍒嗛厤鐨刏LM涓�: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort());
+			HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtmp", mediaServerItem.getId());
+			subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json)->{
 				if (event != null) {
 					event.response(mediaServerItemInUse, json);
+					subscribe.removeSubscribe(hookSubscribe);
 				}
 			});
 			//
@@ -440,7 +437,7 @@
 				errorEvent.response(e);
 			}), e ->{
 				// 杩欓噷涓轰緥閬垮厤涓�涓�氶亾鐨勭偣鎾彧鏈変竴涓猚allID杩欎釜鍙傛暟浣跨敤涓�涓浐瀹氬��
-				streamSession.put(device.getDeviceId(), channelId ,"play", streamId, ssrcInfo.getSsrc(), mediaServerItem.getId(), ((ResponseEvent)e.event).getClientTransaction(), VideoStreamSessionManager.SessionType.play);
+				streamSession.put(device.getDeviceId(), channelId ,"play", stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), ((ResponseEvent)e.event).getClientTransaction(), VideoStreamSessionManager.SessionType.play);
 				streamSession.put(device.getDeviceId(), channelId ,"play", e.dialog);
 				okEvent.response(e);
 			});
@@ -530,21 +527,14 @@
 
 			CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
 					: udpSipProvider.getNewCallId();
-
+			HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtmp", mediaServerItem.getId());
 			// 娣诲姞璁㈤槄
-			JSONObject subscribeKey = new JSONObject();
-			subscribeKey.put("app", "rtp");
-			subscribeKey.put("stream", ssrcInfo.getStream());
-			subscribeKey.put("regist", true);
-			subscribeKey.put("schema", "rtmp");
-			subscribeKey.put("mediaServerId", mediaServerItem.getId());
-			logger.debug("褰曞儚鍥炴斁娣诲姞璁㈤槄锛岃闃呭唴瀹癸細" + subscribeKey);
-			subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
-					(MediaServerItem mediaServerItemInUse, JSONObject json)->{
+			subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json)->{
 						if (hookEvent != null) {
 							InviteStreamInfo inviteStreamInfo = new InviteStreamInfo(mediaServerItemInUse, json, callIdHeader.getCallId(), "rtp", ssrcInfo.getStream());
 							hookEvent.call(inviteStreamInfo);
 						}
+						subscribe.removeSubscribe(hookSubscribe);
 					});
 	        Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, "fromplybck" + tm, null, callIdHeader, ssrcInfo.getSsrc());
 
@@ -643,21 +633,15 @@
 			CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
 					: udpSipProvider.getNewCallId();
 
+			HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, null, mediaServerItem.getId());
 			// 娣诲姞璁㈤槄
-			JSONObject subscribeKey = new JSONObject();
-			subscribeKey.put("app", "rtp");
-			subscribeKey.put("stream", ssrcInfo.getStream());
-			subscribeKey.put("regist", true);
-			subscribeKey.put("mediaServerId", mediaServerItem.getId());
-			logger.debug("褰曞儚鍥炴斁娣诲姞璁㈤槄锛岃闃呭唴瀹癸細" + subscribeKey.toString());
-			subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
-					(MediaServerItem mediaServerItemInUse, JSONObject json)->{
+			subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json)->{
 						hookEvent.call(new InviteStreamInfo(mediaServerItem, json, callIdHeader.getCallId(), "rtp", ssrcInfo.getStream()));
-						subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey);
-						subscribeKey.put("regist", false);
-						subscribeKey.put("schema", "rtmp");
+						subscribe.removeSubscribe(hookSubscribe);
+						hookSubscribe.getContent().put("regist", false);
+						hookSubscribe.getContent().put("schema", "rtmp");
 						// 娣诲姞娴佹敞閿�鐨勮闃咃紝娉ㄩ攢浜嗗悗鍚戣澶囧彂閫乥ye
-						subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
+						subscribe.addSubscribe(hookSubscribe,
 								(MediaServerItem mediaServerItemForEnd, JSONObject jsonForEnd)->{
 									ClientTransaction transaction = streamSession.getTransaction(device.getDeviceId(), channelId, ssrcInfo.getStream(), callIdHeader.getCallId());
 									if (transaction != null) {
@@ -1828,6 +1812,43 @@
 			e.printStackTrace();
 		}
 	}
+	
+	@Override
+	public void playbackControlCmd(Device device, StreamInfo streamInfo, String content,SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent) {
+		try {
+			Request request = headerProvider.createInfoRequest(device, streamInfo, content);
+			if (request == null) {
+				return;
+			}
+			logger.info(request.toString());
+			ClientTransaction clientTransaction = null;
+			if ("TCP".equals(device.getTransport())) {
+				clientTransaction = tcpSipProvider.getNewClientTransaction(request);
+			} else if ("UDP".equals(device.getTransport())) {
+				clientTransaction = udpSipProvider.getNewClientTransaction(request);
+			}
+			CallIdHeader callIdHeader = (CallIdHeader)request.getHeader(CallIdHeader.NAME);
+			if(errorEvent != null) {
+				sipSubscribe.addErrorSubscribe(callIdHeader.getCallId(), (eventResult -> {
+					errorEvent.response(eventResult);
+					sipSubscribe.removeErrorSubscribe(eventResult.callId);
+					sipSubscribe.removeOkSubscribe(eventResult.callId);
+				}));
+			}
+			
+			if(okEvent != null) {
+				sipSubscribe.addOkSubscribe(callIdHeader.getCallId(), eventResult -> {
+					okEvent.response(eventResult);
+					sipSubscribe.removeOkSubscribe(eventResult.callId);
+					sipSubscribe.removeErrorSubscribe(eventResult.callId);
+				});
+			}
+			clientTransaction.sendRequest();
+			
+		} catch (SipException | ParseException | InvalidArgumentException e) {
+			e.printStackTrace();
+		}
+	}
 
 	@Override
 	public boolean sendAlarmMessage(Device device, DeviceAlarm deviceAlarm) {

--
Gitblit v1.8.0