From bf6e09d231f49fb0c2cd5a81f6b31cc64d27c368 Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期三, 17 四月 2024 12:56:22 +0800
Subject: [PATCH] 修复多wvp国标级联机制

---
 src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java                            |    4 
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java |   18 ++
 src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java                              |   73 +++++------
 src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java                        |   15 +-
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java    |   37 ++---
 src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java                          |    5 
 src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java                |   57 +++++++++
 src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java                                |    7 
 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java           |   76 ++++++++++++
 src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java               |   17 ++
 src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java                                 |    2 
 src/main/resources/application.yml                                                                  |    2 
 12 files changed, 237 insertions(+), 76 deletions(-)

diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java
index 5ddaed3..dcf2830 100644
--- a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java
+++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java
@@ -42,6 +42,9 @@
 	@Autowired
 	private RedisRpcConfig redisRpcConfig;
 
+	@Autowired
+	private RedisPushStreamResponseListener redisPushStreamCloseResponseListener;
+
 
 	/**
 	 * redis娑堟伅鐩戝惉鍣ㄥ鍣� 鍙互娣诲姞澶氫釜鐩戝惉涓嶅悓璇濋鐨剅edis鐩戝惉鍣紝鍙渶瑕佹妸娑堟伅鐩戝惉鍣ㄥ拰鐩稿簲鐨勬秷鎭闃呭鐞嗗櫒缁戝畾锛岃娑堟伅鐩戝惉鍣�
@@ -61,6 +64,7 @@
 		container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE));
 		container.addMessageListener(redisCloseStreamMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE));
 		container.addMessageListener(redisRpcConfig, new PatternTopic(RedisRpcConfig.REDIS_REQUEST_CHANNEL_KEY));
+		container.addMessageListener(redisPushStreamCloseResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE));
         return container;
     }
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
index 2c134fd..2e0ecf5 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
@@ -15,9 +15,7 @@
 import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
 import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
-import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
 import com.genersoft.iot.vmp.service.*;
-import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
 import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
@@ -135,29 +133,22 @@
 			logger.info("[鏀跺埌bye] 鍋滄鎺ㄦ祦锛歿}, 濯掍綋鑺傜偣锛� {}", streamId, sendRtpItem.getMediaServerId());
 
 			if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) {
-				// 鏌ヨ杩欒矾娴佹槸鍚︽槸鏈钩鍙扮殑
-				StreamPushItem push = pushService.getPush(sendRtpItem.getApp(), sendRtpItem.getStream());
-				if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) {
-					redisRpcService.stopSendRtp(sendRtpItem);
-				}else {
-					MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
-					redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),
-							callIdHeader.getCallId(), null);
-					zlmServerFactory.stopSendRtpStream(mediaInfo, param);
-					if (userSetting.getUseCustomSsrcForParentInvite()) {
-						mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc());
-					}
-
-					ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
-					if (platform != null) {
-						MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
-								sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
-								sendRtpItem.getPlatformId(), platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
-						messageForPushChannel.setPlatFormIndex(platform.getId());
-						redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
+				ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
+				if (platform != null) {
+					redisCatchStorage.sendPlatformStopPlayMsg(sendRtpItem, platform);
+					if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) {
+						redisRpcService.stopSendRtp(sendRtpItem);
 					}else {
-						logger.info("[涓婄骇骞冲彴鍋滄瑙傜湅] 鏈壘鍒板钩鍙皗}鐨勪俊鎭紝鍙戦�乺edis娑堟伅澶辫触", sendRtpItem.getPlatformId());
+						MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
+						redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),
+								callIdHeader.getCallId(), null);
+						zlmServerFactory.stopSendRtpStream(mediaInfo, param);
+						if (userSetting.getUseCustomSsrcForParentInvite()) {
+							mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc());
+						}
 					}
+				}else {
+					logger.info("[涓婄骇骞冲彴鍋滄瑙傜湅] 鏈壘鍒板钩鍙皗}鐨勪俊鎭紝鍙戦�乺edis娑堟伅澶辫触", sendRtpItem.getPlatformId());
 				}
 			}else {
 				MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
index 59ff50c..a51dd37 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -29,6 +29,7 @@
 import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
 import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
+import com.genersoft.iot.vmp.service.redisMsg.RedisPushStreamResponseListener;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
 import com.genersoft.iot.vmp.utils.DateUtil;
@@ -125,6 +126,9 @@
 
     @Autowired
     private SendRtpPortManager sendRtpPortManager;
+
+    @Autowired
+    private RedisPushStreamResponseListener redisPushStreamResponseListener;
 
 
     @Override
@@ -759,6 +763,7 @@
         redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
         // 璁剧疆瓒呮椂
         dynamicTask.startDelay(sendRtpItem.getCallId(), () -> {
+            redisRpcService.stopWaitePushStreamOnline(sendRtpItem);
             logger.info("[ app={}, stream={} ] 绛夊緟璁惧寮�濮嬫帹娴佽秴鏃�", sendRtpItem.getApp(), sendRtpItem.getStream());
             try {
                 responseAck(request, Response.REQUEST_TIMEOUT); // 瓒呮椂
@@ -801,7 +806,18 @@
                 // 鍏朵粬骞冲彴鍐呭
                 otherWvpPushStream(sendRtpItemFromRedis, request, platform);
             }
-
+        });
+        // 娣诲姞鍥炲鐨勬嫆缁濇垨鑰呴敊璇殑閫氱煡
+        redisPushStreamResponseListener.addEvent(sendRtpItem.getApp(), sendRtpItem.getStream(), response -> {
+            if (response.getCode() != 0) {
+                dynamicTask.stop(sendRtpItem.getCallId());
+                redisRpcService.stopWaitePushStreamOnline(sendRtpItem);
+                try {
+                    responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg());
+                } catch (SipException | InvalidArgumentException | ParseException e) {
+                    logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鐐规挱鍥炲: {}", e.getMessage());
+                }
+            }
         });
     }
 
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
index 32bf76a..2a45ad8 100755
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -10,19 +10,22 @@
 import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
 import com.genersoft.iot.vmp.gb28181.bean.*;
 import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
-import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
 import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
+import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
 import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
 import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
 import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
 import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
-import com.genersoft.iot.vmp.media.zlm.dto.*;
+import com.genersoft.iot.vmp.media.zlm.dto.HookType;
+import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
+import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
+import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
 import com.genersoft.iot.vmp.media.zlm.dto.hook.*;
 import com.genersoft.iot.vmp.service.*;
-import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
+import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
 import com.genersoft.iot.vmp.utils.DateUtil;
@@ -77,6 +80,10 @@
 
     @Autowired
     private IRedisCatchStorage redisCatchStorage;
+
+
+    @Autowired
+    private IRedisRpcService redisRpcService;
 
     @Autowired
     private IInviteStreamService inviteStreamService;
@@ -511,41 +518,33 @@
                             }
 
                             if (sendRtpItem.getApp().equals(param.getApp())) {
-                                logger.info(sendRtpItem.toString());
-                                if (userSetting.getServerId().equals(sendRtpItem.getServerId())) {
-                                    MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
-                                            sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
-                                            sendRtpItem.getPlatformId(), null, userSetting.getServerId(), param.getMediaServerId());
-                                    // 閫氱煡鍏朵粬wvp鍋滄鍙戞祦
-                                    redisCatchStorage.sendPushStreamClose(messageForPushChannel);
-                                }else {
-                                    String platformId = sendRtpItem.getPlatformId();
-                                    ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
-                                    Device device = deviceService.getDevice(platformId);
-
-                                    try {
-                                        if (platform != null) {
-                                            commanderFroPlatform.streamByeCmd(platform, sendRtpItem);
-                                            redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(),
-                                                    sendRtpItem.getCallId(), sendRtpItem.getStream());
-                                        } else {
-                                            cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId());
-                                            if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST)
-                                                    || sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) {
-                                                AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
-                                                if (audioBroadcastCatch != null) {
-                                                    // 鏉ヨ嚜涓婄骇骞冲彴鐨勫仠姝㈠璁�
-                                                    logger.info("[鍋滄瀵硅] 鏉ヨ嚜涓婄骇锛屽钩鍙帮細{}, 閫氶亾锛歿}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
-                                                    audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
-                                                }
+                                ParentPlatform platform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
+                                Device device = deviceService.getDevice(sendRtpItem.getPlatformId());
+                                try {
+                                    if (platform != null) {
+                                        commanderFroPlatform.streamByeCmd(platform, sendRtpItem);
+                                        redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),
+                                                sendRtpItem.getCallId(), sendRtpItem.getStream());
+                                        redisCatchStorage.sendPlatformStopPlayMsg(sendRtpItem, platform);
+                                    } else if (device != null) {
+                                        cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId());
+                                        if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST)
+                                                || sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) {
+                                            AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+                                            if (audioBroadcastCatch != null) {
+                                                // 鏉ヨ嚜涓婄骇骞冲彴鐨勫仠姝㈠璁�
+                                                logger.info("[鍋滄瀵硅] 鏉ヨ嚜涓婄骇锛屽钩鍙帮細{}, 閫氶亾锛歿}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+                                                audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
                                             }
                                         }
-                                    } catch (SipException | InvalidArgumentException | ParseException |
-                                             SsrcTransactionNotFoundException e) {
-                                        logger.error("[鍛戒护鍙戦�佸け璐 鍙戦�丅YE: {}", e.getMessage());
+                                    }else {
+                                        // 閫氱煡鍏朵粬wvp鍋滄鍙戞祦
+                                        redisRpcService.rtpSendStopped(sendRtpItem);
                                     }
+                                } catch (SipException | InvalidArgumentException | ParseException |
+                                         SsrcTransactionNotFoundException e) {
+                                    logger.error("[鍛戒护鍙戦�佸け璐 鍙戦�丅YE: {}", e.getMessage());
                                 }
-
                             }
                         }
                     }
@@ -593,11 +592,7 @@
                             redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(),
                                     sendRtpItem.getCallId(), sendRtpItem.getStream());
                             if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) {
-                                MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
-                                        sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
-                                        sendRtpItem.getPlatformId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
-                                messageForPushChannel.setPlatFormIndex(parentPlatform.getId());
-                                redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
+                                redisCatchStorage.sendPlatformStopPlayMsg(sendRtpItem, parentPlatform);
                             }
                         }
                     }
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java
index ec24abe..b503fab 100755
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java
@@ -372,6 +372,6 @@
         param.put("app", sendRtpItem.getApp());
         param.put("stream", sendRtpItem.getStream());
         param.put("ssrc", sendRtpItem.getSsrc());
-        return zlmresTfulUtils.startSendRtp(mediaServerItem, param);
+        return zlmresTfulUtils.stopSendRtp(mediaServerItem, param);
     }
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java
index a601ae9..8d1b7f0 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java
@@ -13,4 +13,9 @@
     void waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback<SendRtpItem> callback);
 
     WVPResult stopSendRtp(SendRtpItem sendRtpItem);
+
+    void stopWaitePushStreamOnline(SendRtpItem sendRtpItem);
+
+    void rtpSendStopped(SendRtpItem sendRtpItem);
+
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java
new file mode 100644
index 0000000..c90771b
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java
@@ -0,0 +1,76 @@
+package com.genersoft.iot.vmp.service.redisMsg;
+
+import com.alibaba.fastjson2.JSON;
+import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.stereotype.Component;
+import org.springframework.util.ObjectUtils;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * 鎺ユ敹redis杩斿洖鐨勬帹娴佺粨鏋�
+ * @author lin
+ */
+@Component
+public class RedisPushStreamResponseListener implements MessageListener {
+
+    private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamResponseListener.class);
+
+    private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
+
+    @Qualifier("taskExecutor")
+    @Autowired
+    private ThreadPoolTaskExecutor taskExecutor;
+
+
+    private Map<String, PushStreamResponseEvent> responseEvents = new ConcurrentHashMap<>();
+
+    public interface PushStreamResponseEvent{
+        void run(MessageForPushChannelResponse response);
+    }
+
+    @Override
+    public void onMessage(Message message, byte[] bytes) {
+        logger.info("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉]锛� {}", new String(message.getBody()));
+        boolean isEmpty = taskQueue.isEmpty();
+        taskQueue.offer(message);
+        if (isEmpty) {
+            taskExecutor.execute(() -> {
+                while (!taskQueue.isEmpty()) {
+                    Message msg = taskQueue.poll();
+                    try {
+                        MessageForPushChannelResponse response = JSON.parseObject(new String(msg.getBody()), MessageForPushChannelResponse.class);
+                        if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){
+                            logger.info("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉]锛氬弬鏁颁笉鍏�");
+                            continue;
+                        }
+                        // 鏌ョ湅姝e湪绛夊緟鐨刬nvite娑堟伅
+                        if (responseEvents.get(response.getApp() + response.getStream()) != null) {
+                            responseEvents.get(response.getApp() + response.getStream()).run(response);
+                        }
+                    }catch (Exception e) {
+                        logger.warn("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉] 鍙戠幇鏈鐞嗙殑寮傚父, \r\n{}", JSON.toJSONString(message));
+                        logger.error("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉] 寮傚父鍐呭锛� ", e);
+                    }
+                }
+            });
+        }
+    }
+
+    public void addEvent(String app, String stream, PushStreamResponseEvent callback) {
+        responseEvents.put(app + stream, callback);
+    }
+
+    public void removeEvent(String app, String stream) {
+        responseEvents.remove(app + stream);
+    }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java
index 7a81eab..682f564 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java
@@ -7,8 +7,10 @@
 import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage;
 import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
 import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
+import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
 import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
 import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
+import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
 import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
 import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
 import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
@@ -18,12 +20,17 @@
 import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
 import com.genersoft.iot.vmp.service.IMediaServerService;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
+import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
 import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.stereotype.Component;
+
+import javax.sip.InvalidArgumentException;
+import javax.sip.SipException;
+import java.text.ParseException;
 
 /**
  * 鍏朵粬wvp鍙戣捣鐨剅pc璋冪敤锛岃繖閲岀殑鏂规硶琚� RedisRpcConfig 閫氳繃鍙嶅皠瀵绘壘瀵瑰簲鐨勬柟娉曞悕绉拌皟鐢�
@@ -57,6 +64,14 @@
 
     @Autowired
     private RedisTemplate<Object, Object> redisTemplate;
+
+
+    @Autowired
+    private ISIPCommanderForPlatform commanderFroPlatform;
+
+
+    @Autowired
+    private IVideoManagerStorage storager;
 
 
     /**
@@ -133,6 +148,20 @@
         return null;
     }
 
+    /**
+     * 鍋滄鐩戝惉娴佷笂绾�
+     */
+    public RedisRpcResponse stopWaitePushStreamOnline(RedisRpcRequest request) {
+        SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class);
+        logger.info("[redis-rpc] 鍋滄鐩戝惉娴佷笂绾匡細 {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() );
+
+        // 鐩戝惉娴佷笂绾裤�� 娴佷笂绾跨洿鎺ュ彂閫乻endRtpItem娑堟伅缁欏疄闄呯殑淇′护澶勭悊鑰�
+        HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed(
+                sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null);
+        hookSubscribe.removeSubscribe(hook);
+        return null;
+    }
+
 
     /**
      * 寮�濮嬪彂娴�
@@ -194,6 +223,34 @@
         return response;
     }
 
+    /**
+     * 鍏朵粬wvp閫氱煡鎺ㄦ祦宸茬粡鍋滄浜�
+     */
+    public RedisRpcResponse rtpSendStopped(RedisRpcRequest request) {
+        SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class);
+        logger.info("[redis-rpc] 鎺ㄦ祦宸茬粡鍋滄锛� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() );
+        SendRtpItem sendRtpItemInCatch = redisCatchStorage.querySendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getStream(), sendRtpItem.getCallId());
+        RedisRpcResponse response = request.getResponse();
+        response.setStatusCode(200);
+        if (sendRtpItemInCatch == null) {
+            return response;
+        }
+        String platformId = sendRtpItem.getPlatformId();
+        ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
+        if (platform == null) {
+            return response;
+        }
+        try {
+            commanderFroPlatform.streamByeCmd(platform, sendRtpItem);
+            redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(),
+                    sendRtpItem.getCallId(), sendRtpItem.getStream());
+            redisCatchStorage.sendPlatformStopPlayMsg(sendRtpItem, platform);
+        } catch (SipException | InvalidArgumentException | ParseException e) {
+            logger.error("[鍛戒护鍙戦�佸け璐 鍙戦�丅YE: {}", e.getMessage());
+        }
+        return response;
+    }
+
     private void sendResponse(RedisRpcResponse response){
         response.setToId(userSetting.getServerId());
         RedisRpcMessage message = new RedisRpcMessage();
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java
index e9a00a9..f11b9aa 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java
@@ -97,4 +97,21 @@
         });
 
     }
+
+    @Override
+    public void stopWaitePushStreamOnline(SendRtpItem sendRtpItem) {
+        HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed(
+                sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null);
+        hookSubscribe.removeSubscribe(hook);
+        RedisRpcRequest request = buildRequest("stopWaitePushStreamOnline", sendRtpItem);
+        request.setToId(sendRtpItem.getServerId());
+        redisRpcConfig.request(request, 10);
+    }
+
+    @Override
+    public void rtpSendStopped(SendRtpItem sendRtpItem) {
+        RedisRpcRequest request = buildRequest("rtpSendStopped", sendRtpItem);
+        request.setToId(sendRtpItem.getServerId());
+        redisRpcConfig.request(request, 10);
+    }
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
index 108bc17..0cf6c39 100755
--- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
@@ -2,10 +2,7 @@
 
 import com.alibaba.fastjson2.JSONObject;
 import com.genersoft.iot.vmp.common.SystemAllInfo;
-import com.genersoft.iot.vmp.gb28181.bean.AlarmChannelMessage;
-import com.genersoft.iot.vmp.gb28181.bean.Device;
-import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch;
-import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
+import com.genersoft.iot.vmp.gb28181.bean.*;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
 import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
@@ -207,7 +204,7 @@
 
     void sendPlatformStartPlayMsg(MessageForPushChannel messageForPushChannel);
 
-    void sendPlatformStopPlayMsg(MessageForPushChannel messageForPushChannel);
+    void sendPlatformStopPlayMsg(SendRtpItem sendRtpItem, ParentPlatform platform);
 
     void addPushListItem(String app, String stream, OnStreamChangedHookParam param);
 
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
index 60ebfab..d1e8052 100755
--- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -5,10 +5,7 @@
 import com.genersoft.iot.vmp.common.SystemAllInfo;
 import com.genersoft.iot.vmp.common.VideoManagerConstants;
 import com.genersoft.iot.vmp.conf.UserSetting;
-import com.genersoft.iot.vmp.gb28181.bean.AlarmChannelMessage;
-import com.genersoft.iot.vmp.gb28181.bean.Device;
-import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch;
-import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
+import com.genersoft.iot.vmp.gb28181.bean.*;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
 import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
@@ -644,9 +641,15 @@
     }
 
     @Override
-    public void sendPlatformStopPlayMsg(MessageForPushChannel msg) {
+    public void sendPlatformStopPlayMsg(SendRtpItem sendRtpItem, ParentPlatform platform) {
+
+        MessageForPushChannel msg = MessageForPushChannel.getInstance(0,
+                sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
+                sendRtpItem.getPlatformId(), platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
+        msg.setPlatFormIndex(platform.getId());
+
         String key = VideoManagerConstants.VM_MSG_STREAM_STOP_PLAY_NOTIFY;
-        logger.info("[redis鍙戦�侀�氱煡] 鍙戦�� 涓婄骇骞冲彴鍋滄瑙傜湅 {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId());
+        logger.info("[redis鍙戦�侀�氱煡] 鍙戦�� 涓婄骇骞冲彴鍋滄瑙傜湅 {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), platform.getServerGBId());
         redisTemplate.convertAndSend(key, JSON.toJSON(msg));
     }
 
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
index 3f47844..69f947e 100644
--- a/src/main/resources/application.yml
+++ b/src/main/resources/application.yml
@@ -2,4 +2,4 @@
   application:
     name: wvp
   profiles:
-    active: local
\ No newline at end of file
+    active: local2
\ No newline at end of file

--
Gitblit v1.8.0