From a71063dd1fc25d99486b36ba65c3081a3c8c7c01 Mon Sep 17 00:00:00 2001
From: lawrencehj <1934378145@qq.com>
Date: 星期日, 14 三月 2021 21:20:47 +0800
Subject: [PATCH] 增加上级点播停止后通知设备停止推流功能,并自动与本地播放协同

---
 src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java                     |   25 +++++++-----
 src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java                     |   10 +++++
 src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java               |   26 +++++++++++++
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/ByeRequestProcessor.java |   37 ++++++++++++++----
 src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java                       |   13 ++++++
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java              |    1 
 6 files changed, 93 insertions(+), 19 deletions(-)

diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java
index b3b2fba..de9d837 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java
@@ -156,6 +156,7 @@
 			processor.setRequestEvent(evt);
 			processor.setRedisCatchStorage(redisCatchStorage);
 			processor.setZlmrtpServerFactory(zlmrtpServerFactory);
+			processor.setSIPCommander(cmder);
 			return processor;
 		} else if (Request.CANCEL.equals(method)) {
 			CancelRequestProcessor processor = new CancelRequestProcessor();
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/ByeRequestProcessor.java
index a14a4cc..c9ea567 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/ByeRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/ByeRequestProcessor.java
@@ -1,16 +1,23 @@
 package com.genersoft.iot.vmp.gb28181.transmit.request.impl;
 
+import javax.sip.address.SipURI;
 import javax.sip.Dialog;
 import javax.sip.DialogState;
 import javax.sip.InvalidArgumentException;
 import javax.sip.RequestEvent;
 import javax.sip.SipException;
+import javax.sip.header.FromHeader;
+import javax.sip.header.HeaderAddress;
+import javax.sip.header.ToHeader;
 import javax.sip.message.Response;
 
 import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
+import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
 import com.genersoft.iot.vmp.gb28181.transmit.request.SIPRequestAbstractProcessor;
 import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
+
+import org.apache.log4j.Logger;
 
 import java.text.ParseException;
 import java.util.HashMap;
@@ -18,12 +25,14 @@
 
 /**    
  * @Description: BYE璇锋眰澶勭悊鍣�
- * @author: swwheihei
- * @date:   2020骞�5鏈�3鏃� 涓嬪崍5:32:05     
+ * @author: lawrencehj
+ * @date:   2021骞�3鏈�9鏃�     
  */
 public class ByeRequestProcessor extends SIPRequestAbstractProcessor {
 
-    private IRedisCatchStorage redisCatchStorage;
+	private ISIPCommander cmder;
+
+	private IRedisCatchStorage redisCatchStorage;
 
 	private ZLMRTPServerFactory zlmrtpServerFactory;
 
@@ -38,10 +47,8 @@
 			Dialog dialog = evt.getDialog();
 			if (dialog == null) return;
 			if (dialog.getState().equals(DialogState.TERMINATED)) {
-				String remoteUri = dialog.getRemoteParty().getURI().toString();
-				String localUri = dialog.getLocalParty().getURI().toString();
-				String platformGbId = remoteUri.substring(remoteUri.indexOf(":") + 1, remoteUri.indexOf("@"));
-				String channelId = localUri.substring(remoteUri.indexOf(":") + 1, remoteUri.indexOf("@"));
+				String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser();
+				String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser();
 				SendRtpItem sendRtpItem =  redisCatchStorage.querySendRTPServer(platformGbId, channelId);
 				String streamId = sendRtpItem.getStreamId();
 				Map<String, Object> param = new HashMap<>();
@@ -50,6 +57,11 @@
 				param.put("stream",streamId);
 				System.out.println("鍋滄鍚戜笂绾ф帹娴侊細" + streamId);
 				zlmrtpServerFactory.stopSendRtpStream(param);
+				redisCatchStorage.deleteSendRTPServer(platformGbId, channelId);
+				if (zlmrtpServerFactory.totalReaderCount(streamId) == 0) {
+					System.out.println(streamId + "鏃犲叾瀹冭鐪嬭�咃紝閫氱煡璁惧鍋滄鎺ㄦ祦");
+					cmder.streamByeCmd(streamId);
+				}
 			}
 		} catch (SipException e) {
 			e.printStackTrace();
@@ -58,8 +70,6 @@
 		} catch (ParseException e) {
 			e.printStackTrace();
 		}
-		// TODO 浼樺厛绾�99 Bye Request娑堟伅瀹炵幇锛屾娑堟伅涓�鑸负绾ц仈娑堟伅锛屼笂绾х粰涓嬬骇鍙戦�佽棰戝仠姝㈡寚浠�
-		
 	}
 
 	/***
@@ -89,4 +99,13 @@
 	public void setZlmrtpServerFactory(ZLMRTPServerFactory zlmrtpServerFactory) {
 		this.zlmrtpServerFactory = zlmrtpServerFactory;
 	}
+
+	public ISIPCommander getSIPCommander() {
+		return cmder;
+	}
+
+	public void setSIPCommander(ISIPCommander cmder) {
+		this.cmder = cmder;
+	}
+
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
index 90b5369..51f61ef 100644
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -267,20 +267,25 @@
 		}
 		
 		String streamId = json.getString("stream");
-
-		cmder.streamByeCmd(streamId);
 		StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(streamId);
-		if (streamInfo!=null){
-			redisCatchStorage.stopPlay(streamInfo);
-			storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
-		}else{
-			streamInfo = redisCatchStorage.queryPlaybackByStreamId(streamId);
-			redisCatchStorage.stopPlayback(streamInfo);
-		}
-		
+
 		JSONObject ret = new JSONObject();
 		ret.put("code", 0);
 		ret.put("close", true);
+
+		if (streamInfo != null) {
+			if (redisCatchStorage.isChannelSendingRTP(streamInfo.getChannelId())) {
+				ret.put("close", false);
+			} else {
+				cmder.streamByeCmd(streamId);
+				redisCatchStorage.stopPlay(streamInfo);
+				storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
+			}
+		}else{
+			cmder.streamByeCmd(streamId);
+			streamInfo = redisCatchStorage.queryPlaybackByStreamId(streamId);
+			redisCatchStorage.stopPlayback(streamInfo);
+		}
 		return new ResponseEntity<String>(ret.toString(),HttpStatus.OK);
 	}
 	
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
index 00951ba..1f1693d 100644
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
@@ -153,6 +153,16 @@
     }
 
     /**
+     * 鏌ヨ杞帹鐨勬祦鏄惁鏈夊叾瀹冭鐪嬭��
+     * @param streamId
+     * @return
+     */
+    public int totalReaderCount(String streamId) {
+        JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo("rtp", "rtmp", streamId);
+        return mediaInfo.getInteger("totalReaderCount");
+    }
+
+    /**
      * 璋冪敤zlm RESTful API 鈥斺�� stopSendRtp
      */
     public Boolean stopSendRtpStream(Map<String, Object>param) {
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
index ca70620..9061184 100644
--- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
@@ -89,4 +89,17 @@
      */
     SendRtpItem querySendRTPServer(String platformGbId, String channelId);
 
+    /**
+     * 鍒犻櫎RTP鎺ㄩ�佷俊鎭紦瀛�
+     * @param platformGbId
+     * @param channelId
+     */
+    void deleteSendRTPServer(String platformGbId, String channelId);
+
+    /**
+     * 鏌ヨ鏌愪釜閫氶亾鏄惁瀛樺湪涓婄骇鐐规挱锛圧TP鎺ㄩ�侊級
+     * @param channelId
+     */
+    boolean isChannelSendingRTP(String channelId);
+
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
index 6153e5f..3feb347 100644
--- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -225,4 +225,30 @@
         return (SendRtpItem)redis.get(key);
     }
 
+    /**
+     * 鍒犻櫎RTP鎺ㄩ�佷俊鎭紦瀛�
+     * @param platformGbId
+     * @param channelId
+     */
+    @Override
+    public void deleteSendRTPServer(String platformGbId, String channelId) {
+        String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + platformGbId + "_" + channelId;
+        redis.del(key);
+    }
+
+    /**
+     * 鏌ヨ鏌愪釜閫氶亾鏄惁瀛樺湪涓婄骇鐐规挱锛圧TP鎺ㄩ�侊級
+     * @param channelId
+     */
+    @Override
+    public boolean isChannelSendingRTP(String channelId) {
+        String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + "*_" + channelId;
+        List<Object> RtpStreams = redis.scan(key);
+        if (RtpStreams.size() > 0) {
+            return true;
+        } else {
+            return false;
+        }
+    }
+
 }

--
Gitblit v1.8.0