From def56793ba7c636fbbcabad38e3ac113a3764087 Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期三, 28 六月 2023 16:52:27 +0800
Subject: [PATCH] 增加上级推流和停止推流的通知
---
src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java | 1
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java | 8 ++
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java | 14 ++++
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java | 42 ++++++++++---
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java | 4 +
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java | 76 +++++++++++++++++++++++++
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java | 13 ++++
src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java | 6 +-
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java | 1
9 files changed, 151 insertions(+), 14 deletions(-)
diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java
index 9f48426..a65491b 100644
--- a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java
+++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java
@@ -63,6 +63,7 @@
container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE));
container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE));
container.addMessageListener(redisPushStreamResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE));
+// container.addMessageListener(, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED));
return container;
}
}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
index 7a26e78..f40e0c8 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
@@ -3,6 +3,8 @@
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.DynamicTask;
+import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
@@ -14,6 +16,7 @@
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
+import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@@ -56,6 +59,9 @@
@Autowired
private IRedisCatchStorage redisCatchStorage;
+
+ @Autowired
+ private UserSetting userSetting;
@Autowired
private IVideoManagerStorage storager;
@@ -155,6 +161,13 @@
} else if (jsonObject.getInteger("code") == 0) {
logger.info("璋冪敤ZLM鎺ㄦ祦鎺ュ彛, 缁撴灉锛� {}", jsonObject);
logger.info("RTP鎺ㄦ祦鎴愬姛[ {}/{} ]锛寋}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port"));
+ if (sendRtpItem.getPlayType() == InviteStreamType.PUSH) {
+ MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, sendRtpItem.getApp(), sendRtpItem.getStreamId(),
+ sendRtpItem.getChannelId(), parentPlatform.getServerGBId(), parentPlatform.getName(), userSetting.getServerId(),
+ sendRtpItem.getMediaServerId());
+ messageForPushChannel.setPlatFormIndex(parentPlatform.getId());
+ redisCatchStorage.sendPlatformStartPlayMsg(messageForPushChannel);
+ }
} else {
logger.error("RTP鎺ㄦ祦澶辫触: {}, 鍙傛暟锛歿}",jsonObject.getString("msg"), JSON.toJSONString(param));
if (sendRtpItem.isOnlyAudio()) {
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
index addd633..78e2956 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
@@ -1,11 +1,9 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.genersoft.iot.vmp.common.StreamInfo;
+import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
-import com.genersoft.iot.vmp.gb28181.bean.Device;
-import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType;
-import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
-import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
+import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
@@ -15,6 +13,7 @@
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.service.IMediaServerService;
+import com.genersoft.iot.vmp.service.IPlatformService;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
@@ -25,7 +24,9 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import javax.sip.*;
+import javax.sip.InvalidArgumentException;
+import javax.sip.RequestEvent;
+import javax.sip.SipException;
import javax.sip.address.SipURI;
import javax.sip.header.CallIdHeader;
import javax.sip.header.FromHeader;
@@ -52,6 +53,9 @@
private IRedisCatchStorage redisCatchStorage;
@Autowired
+ private IPlatformService platformService;
+
+ @Autowired
private IDeviceService deviceService;
@Autowired
@@ -68,6 +72,9 @@
@Autowired
private VideoStreamSessionManager streamSession;
+
+ @Autowired
+ private UserSetting userSetting;
@Override
public void afterPropertiesSet() throws Exception {
@@ -103,6 +110,19 @@
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
redisCatchStorage.deleteSendRTPServer(platformGbId, channelId, callIdHeader.getCallId(), null);
zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param);
+ if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) {
+ ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
+ if (platform != null) {
+ MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
+ sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(),
+ sendRtpItem.getPlatformId(), platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
+ messageForPushChannel.setPlatFormIndex(platform.getId());
+ redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
+ }else {
+ logger.info("[涓婄骇骞冲彴鍋滄瑙傜湅] 鏈壘鍒板钩鍙皗}鐨勪俊鎭紝鍙戦�乺edis娑堟伅澶辫触", sendRtpItem.getPlatformId());
+ }
+ }
+
int totalReaderCount = zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId);
if (totalReaderCount <= 0) {
logger.info("[鏀跺埌bye] {} 鏃犲叾瀹冭鐪嬭�咃紝閫氱煡璁惧鍋滄鎺ㄦ祦", streamId);
@@ -119,12 +139,12 @@
logger.error("[鏀跺埌bye] {} 鏃犲叾瀹冭鐪嬭�咃紝閫氱煡璁惧鍋滄鎺ㄦ祦锛� 鍙戦�丅YE澶辫触 {}",streamId, e.getMessage());
}
}
- if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) {
- MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
- sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(),
- sendRtpItem.getPlatformId(), null, null, sendRtpItem.getMediaServerId());
- redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
- }
+// if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) {
+// MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
+// sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(),
+// sendRtpItem.getPlatformId(), null, null, sendRtpItem.getMediaServerId());
+// redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
+// }
}
}
// 鍙兘鏄澶囦富鍔ㄥ仠姝�
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
index 9ffb524..e8432cc 100644
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -19,6 +19,7 @@
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.*;
import com.genersoft.iot.vmp.service.*;
+import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.vmanager.bean.DeferredResultEx;
@@ -463,6 +464,13 @@
}
redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(),
sendRtpItem.getCallId(), sendRtpItem.getStreamId());
+ if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) {
+ MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
+ sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(),
+ sendRtpItem.getPlatformId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
+ messageForPushChannel.setPlatFormIndex(parentPlatform.getId());
+ redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
+ }
}
}
}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java b/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java
index 886f8c2..1a9e3e5 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java
@@ -34,7 +34,7 @@
/**
* 璇锋眰鐨勫钩鍙拌嚜澧濱D
*/
- private String platFormIndex;
+ private int platFormIndex;
/**
* 璇锋眰骞冲彴鍚嶇О
@@ -132,11 +132,11 @@
this.mediaServerId = mediaServerId;
}
- public String getPlatFormIndex() {
+ public int getPlatFormIndex() {
return platFormIndex;
}
- public void setPlatFormIndex(String platFormIndex) {
+ public void setPlatFormIndex(int platFormIndex) {
this.platFormIndex = platFormIndex;
}
}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
index 3e2385e..8231fb3 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
@@ -129,6 +129,7 @@
case WvpRedisMsgCmd.REQUEST_PUSH_STREAM:
RequestPushStreamMsg param = JSON.to(RequestPushStreamMsg.class, wvpRedisMsg.getContent());
requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
+
break;
default:
break;
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java
new file mode 100644
index 0000000..4e73578
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java
@@ -0,0 +1,76 @@
+package com.genersoft.iot.vmp.service.redisMsg;
+
+import com.alibaba.fastjson2.JSON;
+import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.stereotype.Component;
+import org.springframework.util.ObjectUtils;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * 鎺ユ敹redis鍙戦�佺殑缁撴潫鎺ㄦ祦璇锋眰
+ * @author lin
+ */
+@Component
+public class RedisPushStreamCloseResponseListener implements MessageListener {
+
+ private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamCloseResponseListener.class);
+
+ private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
+
+ @Qualifier("taskExecutor")
+ @Autowired
+ private ThreadPoolTaskExecutor taskExecutor;
+
+
+ private Map<String, PushStreamResponseEvent> responseEvents = new ConcurrentHashMap<>();
+
+ public interface PushStreamResponseEvent{
+ void run(MessageForPushChannelResponse response);
+ }
+
+ @Override
+ public void onMessage(Message message, byte[] bytes) {
+ logger.info("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉]锛� {}", new String(message.getBody()));
+ boolean isEmpty = taskQueue.isEmpty();
+ taskQueue.offer(message);
+ if (isEmpty) {
+ taskExecutor.execute(() -> {
+ while (!taskQueue.isEmpty()) {
+ Message msg = taskQueue.poll();
+ try {
+ MessageForPushChannelResponse response = JSON.parseObject(new String(msg.getBody()), MessageForPushChannelResponse.class);
+ if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){
+ logger.info("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉]锛氬弬鏁颁笉鍏�");
+ continue;
+ }
+ // 鏌ョ湅姝e湪绛夊緟鐨刬nvite娑堟伅
+ if (responseEvents.get(response.getApp() + response.getStream()) != null) {
+ responseEvents.get(response.getApp() + response.getStream()).run(response);
+ }
+ }catch (Exception e) {
+ logger.warn("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉] 鍙戠幇鏈鐞嗙殑寮傚父, \r\n{}", JSON.toJSONString(message));
+ logger.error("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉] 寮傚父鍐呭锛� ", e);
+ }
+ }
+ });
+ }
+ }
+
+ public void addEvent(String app, String stream, PushStreamResponseEvent callback) {
+ responseEvents.put(app + stream, callback);
+ }
+
+ public void removeEvent(String app, String stream) {
+ responseEvents.remove(app + stream);
+ }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
index 42708f7..9722fa1 100644
--- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
@@ -263,4 +263,8 @@
void removeAllDevice();
void sendDeviceOrChannelStatus(String deviceId, String channelId, boolean online);
+
+ void sendPlatformStartPlayMsg(MessageForPushChannel messageForPushChannel);
+
+ void sendPlatformStopPlayMsg(MessageForPushChannel messageForPushChannel);
}
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
index d4abcb4..e54324f 100644
--- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -925,4 +925,18 @@
// 浣跨敤 RedisTemplate<Object, Object> 鍙戦�佸瓧绗︿覆娑堟伅浼氬鑷村彂閫佺殑娑堟伅澶氬甫浜嗗弻寮曞彿
stringRedisTemplate.convertAndSend(key, msg.toString());
}
+
+ @Override
+ public void sendPlatformStartPlayMsg(MessageForPushChannel msg) {
+ String key = VideoManagerConstants.VM_MSG_STREAM_START_PLAY_NOTIFY;
+ logger.info("[redis鍙戦�侀�氱煡] 鎺ㄦ祦琚笂绾у钩鍙拌鐪� {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId());
+ redisTemplate.convertAndSend(key, JSON.toJSON(msg));
+ }
+
+ @Override
+ public void sendPlatformStopPlayMsg(MessageForPushChannel msg) {
+ String key = VideoManagerConstants.VM_MSG_STREAM_STOP_PLAY_NOTIFY;
+ logger.info("[redis鍙戦�侀�氱煡] 涓婄骇骞冲彴鍋滄瑙傜湅 {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId());
+ redisTemplate.convertAndSend(key, JSON.toJSON(msg));
+ }
}
--
Gitblit v1.8.0