From 5b3dc4d5957050c2ce3e3c0013337168d8c9f700 Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期三, 14 九月 2022 16:11:18 +0800
Subject: [PATCH] 优化点播结束后关闭RTPServer
---
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/CatalogQueryMessageHandler.java | 4
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java | 2
src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamResponseListener.java | 62 +++++++++++++++
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java | 2
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java | 1
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java | 12 +-
src/main/java/com/genersoft/iot/vmp/conf/redis/RedisConfig.java | 8 +
src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java | 4
src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java | 6 +
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java | 21 ++++
src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusListMsgListener.java | 5
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java | 2
src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannelResponse.java | 71 +++++++++++++++++
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java | 20 ++--
src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java | 2
15 files changed, 193 insertions(+), 29 deletions(-)
diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
index 7a122c7..6d52308 100644
--- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
+++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
@@ -99,6 +99,12 @@
*/
public static final String VM_MSG_STREAM_PUSH_REQUESTED = "VM_MSG_STREAM_PUSH_REQUESTED";
+
+ /**
+ * redis 娑堟伅閫氱煡骞冲彴閫氱煡璁惧鎺ㄦ祦缁撴灉
+ */
+ public static final String VM_MSG_STREAM_PUSH_RESPONSE = "VM_MSG_STREAM_PUSH_RESPONSE";
+
/**
* redis 娑堟伅璇锋眰鎵�鏈夌殑鍦ㄧ嚎閫氶亾
*/
diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisConfig.java
index 7bdeab4..0b653cf 100644
--- a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisConfig.java
+++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisConfig.java
@@ -12,7 +12,6 @@
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
-import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import com.genersoft.iot.vmp.utils.redis.FastJsonRedisSerializer;
@@ -43,7 +42,10 @@
private RedisPushStreamStatusMsgListener redisPushStreamStatusMsgListener;
@Autowired
- private RedisPushStreamListMsgListener redisPushStreamListMsgListener;
+ private RedisPushStreamStatusListMsgListener redisPushStreamListMsgListener;
+
+ @Autowired
+ private RedisPushStreamResponseListener redisPushStreamResponseListener;
@Bean
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
@@ -81,7 +83,7 @@
container.addMessageListener(redisGbPlayMsgListener, new PatternTopic(RedisGbPlayMsgListener.WVP_PUSH_STREAM_KEY));
container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE));
container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE));
+ container.addMessageListener(redisPushStreamResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE));
return container;
}
-
}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
index 163467d..135afd2 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
@@ -694,7 +694,7 @@
dialog = streamSession.getDialogByStream(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getStream());
}
mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc());
- mediaServerService.closeRTPServer(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getStream());
+ mediaServerService.closeRTPServer(ssrcTransaction.getMediaServerId(), ssrcTransaction.getStream());
streamSession.remove(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getStream());
if (dialog == null) {
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
index 706d422..a1f0aae 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
@@ -121,7 +121,7 @@
StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId);
if (streamInfo != null) {
redisCatchStorage.stopPlay(streamInfo);
- mediaServerService.closeRTPServer(device.getDeviceId(), channelId, streamInfo.getStream());
+ mediaServerService.closeRTPServer(streamInfo.getMediaServerId(), streamInfo.getStream());
}
SsrcTransaction ssrcTransactionForPlay = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, "play", null);
if (ssrcTransactionForPlay != null){
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
index 82b3ba4..abcffe2 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -24,6 +24,7 @@
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.service.impl.RedisGbPlayMsgListener;
+import com.genersoft.iot.vmp.service.impl.RedisPushStreamResponseListener;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
@@ -74,7 +75,7 @@
private DynamicTask dynamicTask;
@Autowired
- private SIPCommander cmder;
+ private RedisPushStreamResponseListener redisPushStreamResponseListener;
@Autowired
private IPlayService playService;
@@ -556,7 +557,6 @@
otherWvpPushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
}
-
}
/**
* 閫氱煡娴佷笂绾�
@@ -639,6 +639,23 @@
mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
}
});
+
+ // 娣诲姞鍥炲鐨勬嫆缁濇垨鑰呴敊璇殑閫氱煡
+ redisPushStreamResponseListener.addEvent(gbStream.getApp(), gbStream.getStream(), response -> {
+ if (response.getCode() != 0) {
+ dynamicTask.stop(callIdHeader.getCallId());
+ mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream());
+ try {
+ responseAck(evt, Response.TEMPORARILY_UNAVAILABLE, response.getMsg());
+ } catch (SipException e) {
+ throw new RuntimeException(e);
+ } catch (InvalidArgumentException e) {
+ throw new RuntimeException(e);
+ } catch (ParseException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
}
}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/CatalogQueryMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/CatalogQueryMessageHandler.java
index 1aafbf6..d2db8f0 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/CatalogQueryMessageHandler.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/CatalogQueryMessageHandler.java
@@ -79,8 +79,8 @@
List<DeviceChannel> allChannels = new ArrayList<>();
// 鍥炲骞冲彴
- DeviceChannel deviceChannel = getChannelForPlatform(parentPlatform);
- allChannels.add(deviceChannel);
+// DeviceChannel deviceChannel = getChannelForPlatform(parentPlatform);
+// allChannels.add(deviceChannel);
// 鍥炲鐩綍
if (catalogs.size() > 0) {
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
index 6caff71..a5b5f19 100644
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
@@ -139,6 +139,7 @@
param.put("stream_id", streamId);
JSONObject jsonObject = zlmresTfulUtils.closeRtpServer(serverItem, param);
if (jsonObject != null ) {
+ System.out.println(jsonObject);
if (jsonObject.getInteger("code") == 0) {
result = jsonObject.getInteger("hit") == 1;
}else {
diff --git a/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java b/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java
index 55a4005..0ecc717 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java
@@ -50,7 +50,9 @@
SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId, String ssrc, boolean ssrcCheck, boolean isPlayback, Integer port);
- void closeRTPServer(String deviceId, String channelId, String ssrc);
+ void closeRTPServer(MediaServerItem mediaServerItem, String streamId);
+
+ void closeRTPServer(String mediaServerId, String streamId);
void clearRTPServer(MediaServerItem mediaServerItem);
diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java b/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java
index 6dcc515..8491fc5 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java
@@ -48,6 +48,8 @@
*/
private String mediaServerId;
+
+
public static MessageForPushChannel getInstance(int type, String app, String stream, String gbId,
String platFormId, String platFormName, String serverId,
String mediaServerId){
diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannelResponse.java b/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannelResponse.java
new file mode 100644
index 0000000..10d1b43
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannelResponse.java
@@ -0,0 +1,71 @@
+package com.genersoft.iot.vmp.service.bean;
+
+/**
+ * 褰搑edis鍥炲鎺ㄦ祦缁撴灉涓婄骇骞冲彴
+ * @author lin
+ */
+public class MessageForPushChannelResponse {
+ /**
+ * 閿欒鐜�
+ * 0 鎴愬姛 1 澶辫触
+ */
+ private int code;
+ /**
+ * 閿欒鍐呭
+ */
+ private String msg;
+
+ /**
+ * 娴佸簲鐢ㄥ悕
+ */
+ private String app;
+
+ /**
+ * 娴両d
+ */
+ private String stream;
+
+
+
+ public static MessageForPushChannelResponse getInstance(int code, String msg, String app, String stream){
+ MessageForPushChannelResponse messageForPushChannel = new MessageForPushChannelResponse();
+ messageForPushChannel.setCode(code);
+ messageForPushChannel.setMsg(msg);
+ messageForPushChannel.setApp(app);
+ messageForPushChannel.setStream(stream);
+ return messageForPushChannel;
+ }
+
+
+ public int getCode() {
+ return code;
+ }
+
+ public void setCode(int code) {
+ this.code = code;
+ }
+
+ public String getApp() {
+ return app;
+ }
+
+ public void setApp(String app) {
+ this.app = app;
+ }
+
+ public String getStream() {
+ return stream;
+ }
+
+ public void setStream(String stream) {
+ this.stream = stream;
+ }
+
+ public String getMsg() {
+ return msg;
+ }
+
+ public void setMsg(String msg) {
+ this.msg = msg;
+ }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
index 00e6783..b1f8f2a 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
@@ -145,7 +145,7 @@
if (ssrcTransactions != null && ssrcTransactions.size() > 0) {
for (SsrcTransaction ssrcTransaction : ssrcTransactions) {
mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc());
- mediaServerService.closeRTPServer(deviceId, ssrcTransaction.getChannelId(), ssrcTransaction.getStream());
+ mediaServerService.closeRTPServer(ssrcTransaction.getMediaServerId(), ssrcTransaction.getStream());
streamSession.remove(deviceId, ssrcTransaction.getChannelId(), ssrcTransaction.getStream());
}
}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
index 702967d..d00cb42 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
@@ -164,16 +164,18 @@
}
@Override
- public void closeRTPServer(String deviceId, String channelId, String stream) {
- String mediaServerId = streamSession.getMediaServerId(deviceId, channelId, stream);
- String ssrc = streamSession.getSSRC(deviceId, channelId, stream);
- MediaServerItem mediaServerItem = this.getOne(mediaServerId);
- if (mediaServerItem != null) {
- String streamId = String.format("%s_%s", deviceId, channelId);
- zlmrtpServerFactory.closeRTPServer(mediaServerItem, streamId);
- releaseSsrc(mediaServerItem.getId(), ssrc);
+ public void closeRTPServer(MediaServerItem mediaServerItem, String streamId) {
+ if (mediaServerItem == null) {
+ return;
}
- streamSession.remove(deviceId, channelId, stream);
+ zlmrtpServerFactory.closeRTPServer(mediaServerItem, streamId);
+ releaseSsrc(mediaServerItem.getId(), streamId);
+ }
+
+ @Override
+ public void closeRTPServer(String mediaServerId, String streamId) {
+ MediaServerItem mediaServerItem = this.getOne(mediaServerId);
+ closeRTPServer(mediaServerItem, streamId);
}
@Override
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
index f3e01c9..320b168 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -270,7 +270,7 @@
logger.info("[鐐规挱瓒呮椂] 娑堟伅鏈搷搴� deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
timeoutCallback.run(0, "鐐规挱瓒呮椂");
mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
- mediaServerService.closeRTPServer(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
+ mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream());
streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
}
}, userSetting.getPlayTimeout());
@@ -333,7 +333,7 @@
});
}
// 鍏抽棴rtp server
- mediaServerService.closeRTPServer(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
+ mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream());
// 閲嶆柊寮�鍚痵src server
mediaServerService.openRTPServer(mediaServerItem, finalSsrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), false, finalSsrcInfo.getPort());
@@ -341,7 +341,7 @@
}
}, (event) -> {
dynamicTask.stop(timeOutTaskKey);
- mediaServerService.closeRTPServer(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
+ mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream());
// 閲婃斁ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
@@ -445,7 +445,7 @@
cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null);
}else {
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
- mediaServerService.closeRTPServer(deviceId, channelId, ssrcInfo.getStream());
+ mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
}
cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null);
@@ -533,7 +533,7 @@
});
}
// 鍏抽棴rtp server
- mediaServerService.closeRTPServer(device.getDeviceId(), channelId, ssrcInfo.getStream());
+ mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
// 閲嶆柊寮�鍚痵src server
mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), true, ssrcInfo.getPort());
}
@@ -593,7 +593,7 @@
cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null);
}else {
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
- mediaServerService.closeRTPServer(deviceId, channelId, ssrcInfo.getStream());
+ mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
}
cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null);
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamResponseListener.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamResponseListener.java
new file mode 100644
index 0000000..56c9ff3
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamResponseListener.java
@@ -0,0 +1,62 @@
+package com.genersoft.iot.vmp.service.impl;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.genersoft.iot.vmp.media.zlm.dto.ChannelOnlineEvent;
+import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
+import com.genersoft.iot.vmp.service.IGbStreamService;
+import com.genersoft.iot.vmp.service.IMediaServerService;
+import com.genersoft.iot.vmp.service.IStreamPushService;
+import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse;
+import com.genersoft.iot.vmp.utils.DateUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.stereotype.Component;
+import org.springframework.util.ObjectUtils;
+
+import javax.annotation.Resource;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * 鎺ユ敹redis杩斿洖鐨勬帹娴佺粨鏋�
+ * @author lin
+ */
+@Component
+public class RedisPushStreamResponseListener implements MessageListener {
+
+ private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamResponseListener.class);
+
+ private Map<String, PushStreamResponseEvent> responseEvents = new ConcurrentHashMap<>();
+
+ public interface PushStreamResponseEvent{
+ void run(MessageForPushChannelResponse response);
+ }
+
+ @Override
+ public void onMessage(Message message, byte[] bytes) {
+ //
+ logger.warn("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉]锛� {}", new String(message.getBody()));
+ MessageForPushChannelResponse response = JSON.parseObject(new String(message.getBody()), MessageForPushChannelResponse.class);
+ if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){
+ logger.info("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉]锛氬弬鏁颁笉鍏�");
+ return;
+ }
+ // 鏌ョ湅姝e湪绛夊緟鐨刬nvite娑堟伅
+ if (responseEvents.get(response.getApp() + response.getStream()) != null) {
+ responseEvents.get(response.getApp() + response.getStream()).run(response);
+ }
+ }
+
+ public void addEvent(String app, String stream, PushStreamResponseEvent callback) {
+ responseEvents.put(app + stream, callback);
+ }
+
+ public void removeEvent(String app, String stream) {
+ responseEvents.remove(app + stream);
+ }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamListMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusListMsgListener.java
similarity index 94%
rename from src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamListMsgListener.java
rename to src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusListMsgListener.java
index 23745eb..bedbf44 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamListMsgListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusListMsgListener.java
@@ -9,7 +9,6 @@
import com.genersoft.iot.vmp.utils.DateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
@@ -23,9 +22,9 @@
* @Description: 鎺ユ敹redis鍙戦�佺殑鎺ㄦ祦璁惧鍒楄〃鏇存柊閫氱煡
*/
@Component
-public class RedisPushStreamListMsgListener implements MessageListener {
+public class RedisPushStreamStatusListMsgListener implements MessageListener {
- private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamListMsgListener.class);
+ private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamStatusListMsgListener.class);
@Resource
private IMediaServerService mediaServerService;
--
Gitblit v1.8.0