From a71063dd1fc25d99486b36ba65c3081a3c8c7c01 Mon Sep 17 00:00:00 2001
From: lawrencehj <1934378145@qq.com>
Date: 星期日, 14 三月 2021 21:20:47 +0800
Subject: [PATCH] 增加上级点播停止后通知设备停止推流功能,并自动与本地播放协同
---
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java | 25 +++++++-----
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java | 10 +++++
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java | 26 +++++++++++++
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/ByeRequestProcessor.java | 37 ++++++++++++++----
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java | 13 ++++++
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java | 1
6 files changed, 93 insertions(+), 19 deletions(-)
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java
index b3b2fba..de9d837 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java
@@ -156,6 +156,7 @@
processor.setRequestEvent(evt);
processor.setRedisCatchStorage(redisCatchStorage);
processor.setZlmrtpServerFactory(zlmrtpServerFactory);
+ processor.setSIPCommander(cmder);
return processor;
} else if (Request.CANCEL.equals(method)) {
CancelRequestProcessor processor = new CancelRequestProcessor();
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/ByeRequestProcessor.java
index a14a4cc..c9ea567 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/ByeRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/ByeRequestProcessor.java
@@ -1,16 +1,23 @@
package com.genersoft.iot.vmp.gb28181.transmit.request.impl;
+import javax.sip.address.SipURI;
import javax.sip.Dialog;
import javax.sip.DialogState;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.SipException;
+import javax.sip.header.FromHeader;
+import javax.sip.header.HeaderAddress;
+import javax.sip.header.ToHeader;
import javax.sip.message.Response;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
+import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.request.SIPRequestAbstractProcessor;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
+
+import org.apache.log4j.Logger;
import java.text.ParseException;
import java.util.HashMap;
@@ -18,12 +25,14 @@
/**
* @Description: BYE璇锋眰澶勭悊鍣�
- * @author: swwheihei
- * @date: 2020骞�5鏈�3鏃� 涓嬪崍5:32:05
+ * @author: lawrencehj
+ * @date: 2021骞�3鏈�9鏃�
*/
public class ByeRequestProcessor extends SIPRequestAbstractProcessor {
- private IRedisCatchStorage redisCatchStorage;
+ private ISIPCommander cmder;
+
+ private IRedisCatchStorage redisCatchStorage;
private ZLMRTPServerFactory zlmrtpServerFactory;
@@ -38,10 +47,8 @@
Dialog dialog = evt.getDialog();
if (dialog == null) return;
if (dialog.getState().equals(DialogState.TERMINATED)) {
- String remoteUri = dialog.getRemoteParty().getURI().toString();
- String localUri = dialog.getLocalParty().getURI().toString();
- String platformGbId = remoteUri.substring(remoteUri.indexOf(":") + 1, remoteUri.indexOf("@"));
- String channelId = localUri.substring(remoteUri.indexOf(":") + 1, remoteUri.indexOf("@"));
+ String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser();
+ String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser();
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId);
String streamId = sendRtpItem.getStreamId();
Map<String, Object> param = new HashMap<>();
@@ -50,6 +57,11 @@
param.put("stream",streamId);
System.out.println("鍋滄鍚戜笂绾ф帹娴侊細" + streamId);
zlmrtpServerFactory.stopSendRtpStream(param);
+ redisCatchStorage.deleteSendRTPServer(platformGbId, channelId);
+ if (zlmrtpServerFactory.totalReaderCount(streamId) == 0) {
+ System.out.println(streamId + "鏃犲叾瀹冭鐪嬭�咃紝閫氱煡璁惧鍋滄鎺ㄦ祦");
+ cmder.streamByeCmd(streamId);
+ }
}
} catch (SipException e) {
e.printStackTrace();
@@ -58,8 +70,6 @@
} catch (ParseException e) {
e.printStackTrace();
}
- // TODO 浼樺厛绾�99 Bye Request娑堟伅瀹炵幇锛屾娑堟伅涓�鑸负绾ц仈娑堟伅锛屼笂绾х粰涓嬬骇鍙戦�佽棰戝仠姝㈡寚浠�
-
}
/***
@@ -89,4 +99,13 @@
public void setZlmrtpServerFactory(ZLMRTPServerFactory zlmrtpServerFactory) {
this.zlmrtpServerFactory = zlmrtpServerFactory;
}
+
+ public ISIPCommander getSIPCommander() {
+ return cmder;
+ }
+
+ public void setSIPCommander(ISIPCommander cmder) {
+ this.cmder = cmder;
+ }
+
}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
index 90b5369..51f61ef 100644
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -267,20 +267,25 @@
}
String streamId = json.getString("stream");
-
- cmder.streamByeCmd(streamId);
StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(streamId);
- if (streamInfo!=null){
- redisCatchStorage.stopPlay(streamInfo);
- storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
- }else{
- streamInfo = redisCatchStorage.queryPlaybackByStreamId(streamId);
- redisCatchStorage.stopPlayback(streamInfo);
- }
-
+
JSONObject ret = new JSONObject();
ret.put("code", 0);
ret.put("close", true);
+
+ if (streamInfo != null) {
+ if (redisCatchStorage.isChannelSendingRTP(streamInfo.getChannelId())) {
+ ret.put("close", false);
+ } else {
+ cmder.streamByeCmd(streamId);
+ redisCatchStorage.stopPlay(streamInfo);
+ storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
+ }
+ }else{
+ cmder.streamByeCmd(streamId);
+ streamInfo = redisCatchStorage.queryPlaybackByStreamId(streamId);
+ redisCatchStorage.stopPlayback(streamInfo);
+ }
return new ResponseEntity<String>(ret.toString(),HttpStatus.OK);
}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
index 00951ba..1f1693d 100644
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
@@ -153,6 +153,16 @@
}
/**
+ * 鏌ヨ杞帹鐨勬祦鏄惁鏈夊叾瀹冭鐪嬭��
+ * @param streamId
+ * @return
+ */
+ public int totalReaderCount(String streamId) {
+ JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo("rtp", "rtmp", streamId);
+ return mediaInfo.getInteger("totalReaderCount");
+ }
+
+ /**
* 璋冪敤zlm RESTful API 鈥斺�� stopSendRtp
*/
public Boolean stopSendRtpStream(Map<String, Object>param) {
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
index ca70620..9061184 100644
--- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
@@ -89,4 +89,17 @@
*/
SendRtpItem querySendRTPServer(String platformGbId, String channelId);
+ /**
+ * 鍒犻櫎RTP鎺ㄩ�佷俊鎭紦瀛�
+ * @param platformGbId
+ * @param channelId
+ */
+ void deleteSendRTPServer(String platformGbId, String channelId);
+
+ /**
+ * 鏌ヨ鏌愪釜閫氶亾鏄惁瀛樺湪涓婄骇鐐规挱锛圧TP鎺ㄩ�侊級
+ * @param channelId
+ */
+ boolean isChannelSendingRTP(String channelId);
+
}
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
index 6153e5f..3feb347 100644
--- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -225,4 +225,30 @@
return (SendRtpItem)redis.get(key);
}
+ /**
+ * 鍒犻櫎RTP鎺ㄩ�佷俊鎭紦瀛�
+ * @param platformGbId
+ * @param channelId
+ */
+ @Override
+ public void deleteSendRTPServer(String platformGbId, String channelId) {
+ String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + platformGbId + "_" + channelId;
+ redis.del(key);
+ }
+
+ /**
+ * 鏌ヨ鏌愪釜閫氶亾鏄惁瀛樺湪涓婄骇鐐规挱锛圧TP鎺ㄩ�侊級
+ * @param channelId
+ */
+ @Override
+ public boolean isChannelSendingRTP(String channelId) {
+ String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + "*_" + channelId;
+ List<Object> RtpStreams = redis.scan(key);
+ if (RtpStreams.size() > 0) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
}
--
Gitblit v1.8.0