From 5b3dc4d5957050c2ce3e3c0013337168d8c9f700 Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期三, 14 九月 2022 16:11:18 +0800
Subject: [PATCH] 优化点播结束后关闭RTPServer

---
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/CatalogQueryMessageHandler.java |    4 
 src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java                                                   |    2 
 src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamResponseListener.java                                     |   62 +++++++++++++++
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java                                           |    2 
 src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java                                                    |    1 
 src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java                                                     |   12 +-
 src/main/java/com/genersoft/iot/vmp/conf/redis/RedisConfig.java                                                           |    8 +
 src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java                                                      |    4 
 src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java                                                     |    6 +
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java                       |   21 ++++
 src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusListMsgListener.java                                |    5 
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java                          |    2 
 src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannelResponse.java                                       |   71 +++++++++++++++++
 src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java                                              |   20 ++--
 src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java                                               |    2 
 15 files changed, 193 insertions(+), 29 deletions(-)

diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
index 7a122c7..6d52308 100644
--- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
+++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
@@ -99,6 +99,12 @@
 	 */
 	public static final String VM_MSG_STREAM_PUSH_REQUESTED = "VM_MSG_STREAM_PUSH_REQUESTED";
 
+
+	/**
+	 * redis 娑堟伅閫氱煡骞冲彴閫氱煡璁惧鎺ㄦ祦缁撴灉
+	 */
+	public static final String VM_MSG_STREAM_PUSH_RESPONSE = "VM_MSG_STREAM_PUSH_RESPONSE";
+
 	/**
 	 * redis 娑堟伅璇锋眰鎵�鏈夌殑鍦ㄧ嚎閫氶亾
 	 */
diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisConfig.java
index 7bdeab4..0b653cf 100644
--- a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisConfig.java
+++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisConfig.java
@@ -12,7 +12,6 @@
 import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.data.redis.listener.PatternTopic;
 import org.springframework.data.redis.listener.RedisMessageListenerContainer;
-import org.springframework.data.redis.serializer.RedisSerializer;
 import org.springframework.data.redis.serializer.StringRedisSerializer;
 
 import com.genersoft.iot.vmp.utils.redis.FastJsonRedisSerializer;
@@ -43,7 +42,10 @@
 	private RedisPushStreamStatusMsgListener redisPushStreamStatusMsgListener;
 
 	@Autowired
-	private RedisPushStreamListMsgListener redisPushStreamListMsgListener;
+	private RedisPushStreamStatusListMsgListener redisPushStreamListMsgListener;
+
+	@Autowired
+	private RedisPushStreamResponseListener redisPushStreamResponseListener;
 
 	@Bean
 	public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
@@ -81,7 +83,7 @@
 		container.addMessageListener(redisGbPlayMsgListener, new PatternTopic(RedisGbPlayMsgListener.WVP_PUSH_STREAM_KEY));
 		container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE));
 		container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE));
+		container.addMessageListener(redisPushStreamResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE));
         return container;
     }
-
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
index 163467d..135afd2 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
@@ -694,7 +694,7 @@
 				dialog = streamSession.getDialogByStream(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getStream());
 			}
 			mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc());
-			mediaServerService.closeRTPServer(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getStream());
+			mediaServerService.closeRTPServer(ssrcTransaction.getMediaServerId(), ssrcTransaction.getStream());
 			streamSession.remove(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getStream());
 
 			if (dialog == null) {
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
index 706d422..a1f0aae 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
@@ -121,7 +121,7 @@
 					StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId);
 					if (streamInfo != null) {
 						redisCatchStorage.stopPlay(streamInfo);
-						mediaServerService.closeRTPServer(device.getDeviceId(), channelId, streamInfo.getStream());
+						mediaServerService.closeRTPServer(streamInfo.getMediaServerId(), streamInfo.getStream());
 					}
 					SsrcTransaction ssrcTransactionForPlay = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, "play", null);
 					if (ssrcTransactionForPlay != null){
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
index 82b3ba4..abcffe2 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -24,6 +24,7 @@
 import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
 import com.genersoft.iot.vmp.service.impl.RedisGbPlayMsgListener;
+import com.genersoft.iot.vmp.service.impl.RedisPushStreamResponseListener;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
 import com.genersoft.iot.vmp.utils.DateUtil;
@@ -74,7 +75,7 @@
     private DynamicTask dynamicTask;
 
     @Autowired
-    private SIPCommander cmder;
+    private RedisPushStreamResponseListener redisPushStreamResponseListener;
 
     @Autowired
     private IPlayService playService;
@@ -556,7 +557,6 @@
             otherWvpPushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                     mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
         }
-
     }
     /**
      * 閫氱煡娴佷笂绾�
@@ -639,6 +639,23 @@
                             mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                 }
             });
+
+            // 娣诲姞鍥炲鐨勬嫆缁濇垨鑰呴敊璇殑閫氱煡
+            redisPushStreamResponseListener.addEvent(gbStream.getApp(), gbStream.getStream(), response -> {
+                if (response.getCode() != 0) {
+                    dynamicTask.stop(callIdHeader.getCallId());
+                    mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream());
+                    try {
+                        responseAck(evt, Response.TEMPORARILY_UNAVAILABLE, response.getMsg());
+                    } catch (SipException e) {
+                        throw new RuntimeException(e);
+                    } catch (InvalidArgumentException e) {
+                        throw new RuntimeException(e);
+                    } catch (ParseException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            });
         }
     }
 
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/CatalogQueryMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/CatalogQueryMessageHandler.java
index 1aafbf6..d2db8f0 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/CatalogQueryMessageHandler.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/CatalogQueryMessageHandler.java
@@ -79,8 +79,8 @@
             List<DeviceChannel> allChannels = new ArrayList<>();
 
             // 鍥炲骞冲彴
-            DeviceChannel deviceChannel = getChannelForPlatform(parentPlatform);
-            allChannels.add(deviceChannel);
+//            DeviceChannel deviceChannel = getChannelForPlatform(parentPlatform);
+//            allChannels.add(deviceChannel);
 
             // 鍥炲鐩綍
             if (catalogs.size() > 0) {
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
index 6caff71..a5b5f19 100644
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
@@ -139,6 +139,7 @@
             param.put("stream_id", streamId);
             JSONObject jsonObject = zlmresTfulUtils.closeRtpServer(serverItem, param);
             if (jsonObject != null ) {
+                System.out.println(jsonObject);
                 if (jsonObject.getInteger("code") == 0) {
                     result = jsonObject.getInteger("hit") == 1;
                 }else {
diff --git a/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java b/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java
index 55a4005..0ecc717 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java
@@ -50,7 +50,9 @@
 
     SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId, String ssrc, boolean ssrcCheck, boolean isPlayback, Integer port);
 
-    void closeRTPServer(String deviceId, String channelId, String ssrc);
+    void closeRTPServer(MediaServerItem mediaServerItem, String streamId);
+
+    void closeRTPServer(String mediaServerId, String streamId);
 
     void clearRTPServer(MediaServerItem mediaServerItem);
 
diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java b/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java
index 6dcc515..8491fc5 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java
@@ -48,6 +48,8 @@
      */
     private String mediaServerId;
 
+
+
     public static MessageForPushChannel getInstance(int type, String app, String stream, String gbId,
                                                     String platFormId, String platFormName, String serverId,
                                                     String mediaServerId){
diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannelResponse.java b/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannelResponse.java
new file mode 100644
index 0000000..10d1b43
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannelResponse.java
@@ -0,0 +1,71 @@
+package com.genersoft.iot.vmp.service.bean;
+
+/**
+ * 褰搑edis鍥炲鎺ㄦ祦缁撴灉涓婄骇骞冲彴
+ * @author lin
+ */
+public class MessageForPushChannelResponse {
+    /**
+     * 閿欒鐜�
+     * 0 鎴愬姛 1 澶辫触
+     */
+    private int code;
+    /**
+     * 閿欒鍐呭
+     */
+    private String msg;
+
+    /**
+     * 娴佸簲鐢ㄥ悕
+     */
+    private String app;
+
+    /**
+     * 娴両d
+     */
+    private String stream;
+
+
+
+    public static MessageForPushChannelResponse getInstance(int code, String msg, String app, String stream){
+        MessageForPushChannelResponse messageForPushChannel = new MessageForPushChannelResponse();
+        messageForPushChannel.setCode(code);
+        messageForPushChannel.setMsg(msg);
+        messageForPushChannel.setApp(app);
+        messageForPushChannel.setStream(stream);
+        return messageForPushChannel;
+    }
+
+
+    public int getCode() {
+        return code;
+    }
+
+    public void setCode(int code) {
+        this.code = code;
+    }
+
+    public String getApp() {
+        return app;
+    }
+
+    public void setApp(String app) {
+        this.app = app;
+    }
+
+    public String getStream() {
+        return stream;
+    }
+
+    public void setStream(String stream) {
+        this.stream = stream;
+    }
+
+    public String getMsg() {
+        return msg;
+    }
+
+    public void setMsg(String msg) {
+        this.msg = msg;
+    }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
index 00e6783..b1f8f2a 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
@@ -145,7 +145,7 @@
         if (ssrcTransactions != null && ssrcTransactions.size() > 0) {
             for (SsrcTransaction ssrcTransaction : ssrcTransactions) {
                 mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc());
-                mediaServerService.closeRTPServer(deviceId, ssrcTransaction.getChannelId(), ssrcTransaction.getStream());
+                mediaServerService.closeRTPServer(ssrcTransaction.getMediaServerId(), ssrcTransaction.getStream());
                 streamSession.remove(deviceId, ssrcTransaction.getChannelId(), ssrcTransaction.getStream());
             }
         }
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
index 702967d..d00cb42 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
@@ -164,16 +164,18 @@
     }
 
     @Override
-    public void closeRTPServer(String deviceId, String channelId, String stream) {
-        String mediaServerId = streamSession.getMediaServerId(deviceId, channelId, stream);
-        String ssrc = streamSession.getSSRC(deviceId, channelId, stream);
-        MediaServerItem mediaServerItem = this.getOne(mediaServerId);
-        if (mediaServerItem != null) {
-            String streamId = String.format("%s_%s", deviceId, channelId);
-            zlmrtpServerFactory.closeRTPServer(mediaServerItem, streamId);
-            releaseSsrc(mediaServerItem.getId(), ssrc);
+    public void closeRTPServer(MediaServerItem mediaServerItem, String streamId) {
+        if (mediaServerItem == null) {
+            return;
         }
-        streamSession.remove(deviceId, channelId, stream);
+        zlmrtpServerFactory.closeRTPServer(mediaServerItem, streamId);
+        releaseSsrc(mediaServerItem.getId(), streamId);
+    }
+
+    @Override
+    public void closeRTPServer(String mediaServerId, String streamId) {
+        MediaServerItem mediaServerItem = this.getOne(mediaServerId);
+        closeRTPServer(mediaServerItem, streamId);
     }
 
     @Override
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
index f3e01c9..320b168 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -270,7 +270,7 @@
                 logger.info("[鐐规挱瓒呮椂] 娑堟伅鏈搷搴� deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
                 timeoutCallback.run(0, "鐐规挱瓒呮椂");
                 mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
-                mediaServerService.closeRTPServer(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
+                mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream());
                 streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
             }
         }, userSetting.getPlayTimeout());
@@ -333,7 +333,7 @@
                                 });
                     }
                     // 鍏抽棴rtp server
-                    mediaServerService.closeRTPServer(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
+                    mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream());
                     // 閲嶆柊寮�鍚痵src server
                     mediaServerService.openRTPServer(mediaServerItem, finalSsrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), false, finalSsrcInfo.getPort());
 
@@ -341,7 +341,7 @@
             }
         }, (event) -> {
             dynamicTask.stop(timeOutTaskKey);
-            mediaServerService.closeRTPServer(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
+            mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream());
             // 閲婃斁ssrc
             mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
 
@@ -445,7 +445,7 @@
                 cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null);
             }else {
                 mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
-                mediaServerService.closeRTPServer(deviceId, channelId, ssrcInfo.getStream());
+                mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
                 streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
             }
             cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null);
@@ -533,7 +533,7 @@
                                     });
                                 }
                                 // 鍏抽棴rtp server
-                                mediaServerService.closeRTPServer(device.getDeviceId(), channelId, ssrcInfo.getStream());
+                                mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
                                 // 閲嶆柊寮�鍚痵src server
                                 mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), true, ssrcInfo.getPort());
                             }
@@ -593,7 +593,7 @@
                 cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null);
             }else {
                 mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
-                mediaServerService.closeRTPServer(deviceId, channelId, ssrcInfo.getStream());
+                mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
                 streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
             }
             cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null);
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamResponseListener.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamResponseListener.java
new file mode 100644
index 0000000..56c9ff3
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamResponseListener.java
@@ -0,0 +1,62 @@
+package com.genersoft.iot.vmp.service.impl;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.genersoft.iot.vmp.media.zlm.dto.ChannelOnlineEvent;
+import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
+import com.genersoft.iot.vmp.service.IGbStreamService;
+import com.genersoft.iot.vmp.service.IMediaServerService;
+import com.genersoft.iot.vmp.service.IStreamPushService;
+import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse;
+import com.genersoft.iot.vmp.utils.DateUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.stereotype.Component;
+import org.springframework.util.ObjectUtils;
+
+import javax.annotation.Resource;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * 鎺ユ敹redis杩斿洖鐨勬帹娴佺粨鏋�
+ * @author lin
+ */
+@Component
+public class RedisPushStreamResponseListener implements MessageListener {
+
+    private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamResponseListener.class);
+
+    private Map<String, PushStreamResponseEvent> responseEvents = new ConcurrentHashMap<>();
+
+    public interface PushStreamResponseEvent{
+        void run(MessageForPushChannelResponse response);
+    }
+
+    @Override
+    public void onMessage(Message message, byte[] bytes) {
+        //
+        logger.warn("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉]锛� {}", new String(message.getBody()));
+        MessageForPushChannelResponse response = JSON.parseObject(new String(message.getBody()), MessageForPushChannelResponse.class);
+        if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){
+            logger.info("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉]锛氬弬鏁颁笉鍏�");
+            return;
+        }
+        // 鏌ョ湅姝e湪绛夊緟鐨刬nvite娑堟伅
+        if (responseEvents.get(response.getApp() + response.getStream()) != null) {
+            responseEvents.get(response.getApp() + response.getStream()).run(response);
+        }
+    }
+
+    public void addEvent(String app, String stream, PushStreamResponseEvent callback) {
+        responseEvents.put(app + stream, callback);
+    }
+
+    public void removeEvent(String app, String stream) {
+        responseEvents.remove(app + stream);
+    }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamListMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusListMsgListener.java
similarity index 94%
rename from src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamListMsgListener.java
rename to src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusListMsgListener.java
index 23745eb..bedbf44 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamListMsgListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusListMsgListener.java
@@ -9,7 +9,6 @@
 import com.genersoft.iot.vmp.utils.DateUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.data.redis.connection.Message;
 import org.springframework.data.redis.connection.MessageListener;
 import org.springframework.stereotype.Component;
@@ -23,9 +22,9 @@
  * @Description: 鎺ユ敹redis鍙戦�佺殑鎺ㄦ祦璁惧鍒楄〃鏇存柊閫氱煡
  */
 @Component
-public class RedisPushStreamListMsgListener implements MessageListener {
+public class RedisPushStreamStatusListMsgListener implements MessageListener {
 
-    private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamListMsgListener.class);
+    private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamStatusListMsgListener.class);
     @Resource
     private IMediaServerService mediaServerService;
 

--
Gitblit v1.8.0