From 3291c4b2e67d510186ca5fbfac8ec5af1a9d4f16 Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期四, 21 三月 2024 16:54:44 +0800
Subject: [PATCH] 修复多平台推流无人观看redis通知
---
src/main/java/com/genersoft/iot/vmp/service/bean/WvpRedisMsgCmd.java | 10 ++
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java | 2
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java | 2
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java | 116 ++++++++++++++++++----------
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java | 2
src/main/java/com/genersoft/iot/vmp/service/bean/RequestStopPushStreamMsg.java | 49 ++++++++++++
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java | 2
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java | 50 ++++++++++++
8 files changed, 186 insertions(+), 47 deletions(-)
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
index 5de1f19..ff7427b 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
@@ -15,8 +15,11 @@
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
+import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
+import com.genersoft.iot.vmp.service.bean.RequestStopPushStreamMsg;
+import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import gov.nist.javax.sip.message.SIPRequest;
@@ -92,6 +95,12 @@
@Autowired
private UserSetting userSetting;
+ @Autowired
+ private IStreamPushService pushService;
+
+ @Autowired
+ private RedisGbPlayMsgListener redisGbPlayMsgListener;
+
@Override
public void afterPropertiesSet() throws Exception {
// 娣诲姞娑堟伅澶勭悊鐨勮闃�
@@ -124,58 +133,81 @@
param.put("stream",streamId);
param.put("ssrc",sendRtpItem.getSsrc());
logger.info("[鏀跺埌bye] 鍋滄鎺ㄦ祦锛歿}, 濯掍綋鑺傜偣锛� {}", streamId, sendRtpItem.getMediaServerId());
- MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
- redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),
- callIdHeader.getCallId(), null);
- zlmServerFactory.stopSendRtpStream(mediaInfo, param);
- if (userSetting.getUseCustomSsrcForParentInvite()) {
- mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc());
- }
+
if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) {
- ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
- if (platform != null) {
- MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
- sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
- sendRtpItem.getPlatformId(), platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
- messageForPushChannel.setPlatFormIndex(platform.getId());
- redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
+ // 鏌ヨ杩欒矾娴佹槸鍚︽槸鏈钩鍙扮殑
+ StreamPushItem push = pushService.getPush(sendRtpItem.getApp(), sendRtpItem.getStream());
+ if (push!= null && !push.isSelf()) {
+ // 涓嶆槸鏈钩鍙扮殑灏卞彂閫乺edis娑堟伅璁╁叾浠杦vp鍋滄鍙戞祦
+ ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
+ if (platform != null) {
+ RequestStopPushStreamMsg streamMsg = RequestStopPushStreamMsg.getInstance(sendRtpItem, platform.getName(), platform.getId());
+ redisGbPlayMsgListener.sendMsgForStopSendRtpStream(sendRtpItem.getServerId(), streamMsg);
+ }
}else {
- logger.info("[涓婄骇骞冲彴鍋滄瑙傜湅] 鏈壘鍒板钩鍙皗}鐨勪俊鎭紝鍙戦�乺edis娑堟伅澶辫触", sendRtpItem.getPlatformId());
+ MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
+ redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),
+ callIdHeader.getCallId(), null);
+ zlmServerFactory.stopSendRtpStream(mediaInfo, param);
+ if (userSetting.getUseCustomSsrcForParentInvite()) {
+ mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc());
+ }
+
+ ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
+ if (platform != null) {
+ MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
+ sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
+ sendRtpItem.getPlatformId(), platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
+ messageForPushChannel.setPlatFormIndex(platform.getId());
+ redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
+ }else {
+ logger.info("[涓婄骇骞冲彴鍋滄瑙傜湅] 鏈壘鍒板钩鍙皗}鐨勪俊鎭紝鍙戦�乺edis娑堟伅澶辫触", sendRtpItem.getPlatformId());
+ }
+ }
+ }else {
+ MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
+ redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),
+ callIdHeader.getCallId(), null);
+ zlmServerFactory.stopSendRtpStream(mediaInfo, param);
+ if (userSetting.getUseCustomSsrcForParentInvite()) {
+ mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc());
}
}
+ MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
+ if (mediaInfo != null) {
+ AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+ if (audioBroadcastCatch != null && audioBroadcastCatch.getSipTransactionInfo().getCallId().equals(callIdHeader.getCallId())) {
+ // 鏉ヨ嚜涓婄骇骞冲彴鐨勫仠姝㈠璁�
+ logger.info("[鍋滄瀵硅] 鏉ヨ嚜涓婄骇锛屽钩鍙帮細{}, 閫氶亾锛歿}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+ audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+ }
- AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
- if (audioBroadcastCatch != null && audioBroadcastCatch.getSipTransactionInfo().getCallId().equals(callIdHeader.getCallId())) {
- // 鏉ヨ嚜涓婄骇骞冲彴鐨勫仠姝㈠璁�
- logger.info("[鍋滄瀵硅] 鏉ヨ嚜涓婄骇锛屽钩鍙帮細{}, 閫氶亾锛歿}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
- audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
- }
-
- int totalReaderCount = zlmServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId);
- if (totalReaderCount <= 0) {
- logger.info("[鏀跺埌bye] {} 鏃犲叾瀹冭鐪嬭�咃紝閫氱煡璁惧鍋滄鎺ㄦ祦", streamId);
- if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) {
- Device device = deviceService.getDevice(sendRtpItem.getDeviceId());
- if (device == null) {
- logger.info("[鏀跺埌bye] {} 閫氱煡璁惧鍋滄鎺ㄦ祦鏃舵湭鎵惧埌璁惧淇℃伅", streamId);
- }
- try {
- logger.info("[鍋滄鐐规挱] {}/{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
- cmder.streamByeCmd(device, sendRtpItem.getChannelId(), streamId, null);
- } catch (InvalidArgumentException | ParseException | SipException |
- SsrcTransactionNotFoundException e) {
- logger.error("[鏀跺埌bye] {} 鏃犲叾瀹冭鐪嬭�咃紝閫氱煡璁惧鍋滄鎺ㄦ祦锛� 鍙戦�丅YE澶辫触 {}",streamId, e.getMessage());
+ int totalReaderCount = zlmServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId);
+ if (totalReaderCount <= 0) {
+ logger.info("[鏀跺埌bye] {} 鏃犲叾瀹冭鐪嬭�咃紝閫氱煡璁惧鍋滄鎺ㄦ祦", streamId);
+ if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) {
+ Device device = deviceService.getDevice(sendRtpItem.getDeviceId());
+ if (device == null) {
+ logger.info("[鏀跺埌bye] {} 閫氱煡璁惧鍋滄鎺ㄦ祦鏃舵湭鎵惧埌璁惧淇℃伅", streamId);
+ }
+ try {
+ logger.info("[鍋滄鐐规挱] {}/{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+ cmder.streamByeCmd(device, sendRtpItem.getChannelId(), streamId, null);
+ } catch (InvalidArgumentException | ParseException | SipException |
+ SsrcTransactionNotFoundException e) {
+ logger.error("[鏀跺埌bye] {} 鏃犲叾瀹冭鐪嬭�咃紝閫氱煡璁惧鍋滄鎺ㄦ祦锛� 鍙戦�丅YE澶辫触 {}",streamId, e.getMessage());
+ }
}
}
}
}
- // 鍙兘鏄澶囧彂閫佺殑鍋滄
- SsrcTransaction ssrcTransaction = streamSession.getSsrcTransactionByCallId(callIdHeader.getCallId());
- if (ssrcTransaction == null) {
- return;
- }
- logger.info("[鏀跺埌bye] 鏉ヨ嚜璁惧锛歿}, 閫氶亾宸插仠姝㈡帹娴�: {}", ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId());
+ // 鍙兘鏄澶囧彂閫佺殑鍋滄
+ SsrcTransaction ssrcTransaction = streamSession.getSsrcTransactionByCallId(callIdHeader.getCallId());
+ if (ssrcTransaction == null) {
+ return;
+ }
+ logger.info("[鏀跺埌bye] 鏉ヨ嚜璁惧锛歿}, 閫氶亾宸插仠姝㈡帹娴�: {}", ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId());
ParentPlatform platform = platformService.queryPlatformByServerGBId(ssrcTransaction.getDeviceId());
if (platform != null ) {
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
index 7aed575..9346eb8 100755
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -579,7 +579,7 @@
}
// 鏀跺埌鏃犱汉瑙傜湅璇存槑娴佷篃娌℃湁鍦ㄥ線涓婄骇鎺ㄩ��
if (redisCatchStorage.isChannelSendingRTP(inviteInfo.getChannelId())) {
- List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId(
+ List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChannelId(
inviteInfo.getChannelId());
if (!sendRtpItems.isEmpty()) {
for (SendRtpItem sendRtpItem : sendRtpItems) {
diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/RequestStopPushStreamMsg.java b/src/main/java/com/genersoft/iot/vmp/service/bean/RequestStopPushStreamMsg.java
new file mode 100755
index 0000000..fcba511
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/service/bean/RequestStopPushStreamMsg.java
@@ -0,0 +1,49 @@
+package com.genersoft.iot.vmp.service.bean;
+
+import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
+
+
+public class RequestStopPushStreamMsg {
+
+
+ private SendRtpItem sendRtpItem;
+
+
+ private String platformName;
+
+
+ private int platFormIndex;
+
+ public SendRtpItem getSendRtpItem() {
+ return sendRtpItem;
+ }
+
+ public void setSendRtpItem(SendRtpItem sendRtpItem) {
+ this.sendRtpItem = sendRtpItem;
+ }
+
+ public String getPlatformName() {
+ return platformName;
+ }
+
+ public void setPlatformName(String platformName) {
+ this.platformName = platformName;
+ }
+
+
+ public int getPlatFormIndex() {
+ return platFormIndex;
+ }
+
+ public void setPlatFormIndex(int platFormIndex) {
+ this.platFormIndex = platFormIndex;
+ }
+
+ public static RequestStopPushStreamMsg getInstance(SendRtpItem sendRtpItem, String platformName, int platFormIndex) {
+ RequestStopPushStreamMsg streamMsg = new RequestStopPushStreamMsg();
+ streamMsg.setSendRtpItem(sendRtpItem);
+ streamMsg.setPlatformName(platformName);
+ streamMsg.setPlatFormIndex(platFormIndex);
+ return streamMsg;
+ }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/WvpRedisMsgCmd.java b/src/main/java/com/genersoft/iot/vmp/service/bean/WvpRedisMsgCmd.java
index cb11886..e9ee4cb 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/bean/WvpRedisMsgCmd.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/bean/WvpRedisMsgCmd.java
@@ -6,7 +6,17 @@
public class WvpRedisMsgCmd {
+ /**
+ * 璇锋眰鑾峰彇鎺ㄦ祦淇℃伅
+ */
public static final String GET_SEND_ITEM = "GetSendItem";
+ /**
+ * 璇锋眰鎺ㄦ祦鐨勮姹�
+ */
public static final String REQUEST_PUSH_STREAM = "RequestPushStream";
+ /**
+ * 鍋滄鎺ㄦ祦鐨勮姹�
+ */
+ public static final String REQUEST_STOP_PUSH_STREAM = "RequestStopPushStream";
}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
index eb261e3..3b990f0 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
@@ -133,7 +133,10 @@
case WvpRedisMsgCmd.REQUEST_PUSH_STREAM:
RequestPushStreamMsg param = JSON.to(RequestPushStreamMsg.class, wvpRedisMsg.getContent());
requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
-
+ break;
+ case WvpRedisMsgCmd.REQUEST_STOP_PUSH_STREAM:
+ RequestStopPushStreamMsg streamMsg = JSON.to(RequestStopPushStreamMsg.class, wvpRedisMsg.getContent());
+ requestStopPushStreamMsgHand(streamMsg, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
break;
default:
break;
@@ -397,6 +400,19 @@
redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
}
+ /**
+ * 鍙戦�佽姹傛帹娴佺殑娑堟伅
+ */
+ public void sendMsgForStopSendRtpStream(String serverId, RequestStopPushStreamMsg streamMsg) {
+ String key = UUID.randomUUID().toString();
+ WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId,
+ WvpRedisMsgCmd.REQUEST_STOP_PUSH_STREAM, key, JSON.toJSONString(streamMsg));
+
+ JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg);
+ logger.info("[REDIS 璇锋眰鍏朵粬骞冲彴鍋滄鎺ㄦ祦] {}: {}", serverId, jsonObject);
+ redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
+ }
+
private SendRtpItem querySendRTPServer(String platformGbId, String channelId, String streamId, String callId) {
if (platformGbId == null) {
platformGbId = "*";
@@ -423,4 +439,36 @@
return null;
}
}
+
+ /**
+ * 澶勭悊鏀跺埌鐨勮姹傛帹娴佺殑璇锋眰
+ */
+ private void requestStopPushStreamMsgHand(RequestStopPushStreamMsg streamMsg, String fromId, String serial) {
+ SendRtpItem sendRtpItem = streamMsg.getSendRtpItem();
+ if (sendRtpItem == null) {
+ logger.info("[REDIS 鎵ц鍏朵粬骞冲彴鐨勮姹傚仠姝㈡帹娴乚 澶辫触锛� sendRtpItem涓篘ULL");
+ return;
+ }
+ MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
+ if (mediaInfo == null) {
+ // TODO 鍥炲閿欒
+ return;
+ }
+ Map<String, Object> param = new HashMap<>();
+ param.put("vhost","__defaultVhost__");
+ param.put("app",sendRtpItem.getApp());
+ param.put("stream",sendRtpItem.getStream());
+ param.put("ssrc", sendRtpItem.getSsrc());
+
+ if (zlmServerFactory.stopSendRtpStream(mediaInfo, param)) {
+ logger.info("[REDIS 鎵ц鍏朵粬骞冲彴鐨勮姹傚仠姝㈡帹娴乚 鎴愬姛锛� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
+ // 鍙戦�乺edis娑堟伅
+ MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
+ sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
+ sendRtpItem.getPlatformId(), streamMsg.getPlatformName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
+ messageForPushChannel.setPlatFormIndex(streamMsg.getPlatFormIndex());
+ redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
+ }
+
+ }
}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java
index 1d7c2fd..fe0ccd2 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java
@@ -73,7 +73,7 @@
MessageForPushChannel pushChannel = JSON.parseObject(message.getBody(), MessageForPushChannel.class);
StreamPushItem push = streamPushService.getPush(pushChannel.getApp(), pushChannel.getStream());
if (push != null) {
- List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId(
+ List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChannelId(
push.getGbId());
if (!sendRtpItems.isEmpty()) {
for (SendRtpItem sendRtpItem : sendRtpItems) {
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
index b663c5c..78fd280 100755
--- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
@@ -181,7 +181,7 @@
*/
void sendStreamPushRequestedMsgForStatus();
- List<SendRtpItem> querySendRTPServerByChnnelId(String channelId);
+ List<SendRtpItem> querySendRTPServerByChannelId(String channelId);
List<SendRtpItem> querySendRTPServerByStream(String stream);
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
index 27bbdba..18a037d 100755
--- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -184,7 +184,7 @@
}
@Override
- public List<SendRtpItem> querySendRTPServerByChnnelId(String channelId) {
+ public List<SendRtpItem> querySendRTPServerByChannelId(String channelId) {
if (channelId == null) {
return null;
}
--
Gitblit v1.8.0