From b4168c02cba462571dd3f5bdc1d0b1ffddbc938a Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期二, 16 四月 2024 00:10:38 +0800
Subject: [PATCH] 优化多wvp国标级联推流

---
 src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java                            |   10 
 src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java                         |   17 +
 src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java                                   |   16 +
 src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java                               |   14 -
 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformPushStreamOnlineLister.java       |   97 ++++++++
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java    |   12 -
 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformWaitPushStreamOnlineListener.java |   27 ++
 src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java                               |    1 
 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java         |   77 +++++-
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java |  188 +++++++---------
 /dev/null                                                                                           |   97 --------
 src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java                              |   26 +-
 src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java                        |   13 +
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java    |    5 
 src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java                                |    4 
 src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java                                 |    1 
 16 files changed, 335 insertions(+), 270 deletions(-)

diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
index af574b9..efe4088 100644
--- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
+++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
@@ -74,6 +74,7 @@
 	public static final String PUSH_STREAM_LIST = "VMP_PUSH_STREAM_LIST_";
 	public static final String WAITE_SEND_PUSH_STREAM = "VMP_WAITE_SEND_PUSH_STREAM:";
 	public static final String START_SEND_PUSH_STREAM = "VMP_START_SEND_PUSH_STREAM:";
+	public static final String PUSH_STREAM_ONLINE = "VMP_PUSH_STREAM_ONLINE:";
 
 
 
diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java
index fb88f54..5385efa 100644
--- a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java
+++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java
@@ -29,9 +29,6 @@
 	private RedisAlarmMsgListener redisAlarmMsgListener;
 
 	@Autowired
-	private RedisStreamMsgListener redisStreamMsgListener;
-
-	@Autowired
 	private RedisPushStreamStatusMsgListener redisPushStreamStatusMsgListener;
 
 	@Autowired
@@ -52,6 +49,9 @@
 	@Autowired
 	private RedisPlatformStartSendRtpListener redisPlatformStartSendRtpListener;
 
+	@Autowired
+	private RedisPlatformPushStreamOnlineLister redisPlatformPushStreamOnlineLister;
+
 
 	/**
 	 * redis娑堟伅鐩戝惉鍣ㄥ鍣� 鍙互娣诲姞澶氫釜鐩戝惉涓嶅悓璇濋鐨剅edis鐩戝惉鍣紝鍙渶瑕佹妸娑堟伅鐩戝惉鍣ㄥ拰鐩稿簲鐨勬秷鎭闃呭鐞嗗櫒缁戝畾锛岃娑堟伅鐩戝惉鍣�
@@ -67,14 +67,14 @@
         container.setConnectionFactory(connectionFactory);
 		container.addMessageListener(redisGPSMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_GPS));
 		container.addMessageListener(redisAlarmMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_SUBSCRIBE_ALARM_RECEIVE));
-		container.addMessageListener(redisStreamMsgListener, new PatternTopic(VideoManagerConstants.WVP_MSG_STREAM_CHANGE_PREFIX + "PUSH"));
 		container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE));
 		container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE));
 		container.addMessageListener(redisPushStreamResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE));
 		container.addMessageListener(redisCloseStreamMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE));
 		container.addMessageListener(redisPushStreamCloseResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED));
-		container.addMessageListener(redisPlatformWaitPushStreamOnlineListener, new PatternTopic(VideoManagerConstants.WAITE_SEND_PUSH_STREAM));
 		container.addMessageListener(redisPlatformStartSendRtpListener, new PatternTopic(VideoManagerConstants.START_SEND_PUSH_STREAM));
+		container.addMessageListener(redisPlatformWaitPushStreamOnlineListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_REQUESTED));
+		container.addMessageListener(redisPlatformPushStreamOnlineLister, new PatternTopic(VideoManagerConstants.PUSH_STREAM_ONLINE));
         return container;
     }
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java
index c0507df..f1744d1 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java
@@ -132,6 +132,11 @@
      */
     private String receiveStream;
 
+    /**
+     * 涓婄骇鐨勭偣鎾被鍨�
+     */
+    private String sessionName;
+
     public String getIp() {
         return ip;
     }
@@ -332,6 +337,14 @@
         this.localIp = localIp;
     }
 
+    public String getSessionName() {
+        return sessionName;
+    }
+
+    public void setSessionName(String sessionName) {
+        this.sessionName = sessionName;
+    }
+
     @Override
     public String toString() {
         return "SendRtpItem{" +
@@ -347,7 +360,7 @@
                 ", stream='" + stream + '\'' +
                 ", tcp=" + tcp +
                 ", tcpActive=" + tcpActive +
-                ", localIp=" + localIp +
+                ", localIp='" + localIp + '\'' +
                 ", localPort=" + localPort +
                 ", mediaServerId='" + mediaServerId + '\'' +
                 ", serverId='" + serverId + '\'' +
@@ -360,6 +373,7 @@
                 ", rtcp=" + rtcp +
                 ", playType=" + playType +
                 ", receiveStream='" + receiveStream + '\'' +
+                ", sessionName='" + sessionName + '\'' +
                 '}';
     }
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
index 242e5ef..af7db2e 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
@@ -16,7 +16,6 @@
 import com.genersoft.iot.vmp.service.IMediaServerService;
 import com.genersoft.iot.vmp.service.IPlayService;
 import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
-import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
 import org.slf4j.Logger;
@@ -78,9 +77,6 @@
 	private DynamicTask dynamicTask;
 
 	@Autowired
-	private RedisGbPlayMsgListener redisGbPlayMsgListener;
-
-	@Autowired
 	private IPlayService playService;
 
 
@@ -117,13 +113,7 @@
 		if (parentPlatform != null) {
 			Map<String, Object> param = getSendRtpParam(sendRtpItem);
 			if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) {
-				RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(
-						sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStream(),
-						sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(),
-						sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio());
-				redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> {
-					playService.startSendRtpStreamHand(sendRtpItem, parentPlatform, json, param, callIdHeader);
-				});
+				redisCatchStorage.sendStartSendRtp(sendRtpItem);
 			} else {
 				JSONObject startSendRtpStreamResult = sendRtp(sendRtpItem, mediaInfo, param);
 				if (startSendRtpStreamResult != null) {
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
index ff7427b..69f8142 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
@@ -19,7 +19,6 @@
 import com.genersoft.iot.vmp.service.*;
 import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
 import com.genersoft.iot.vmp.service.bean.RequestStopPushStreamMsg;
-import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
 import gov.nist.javax.sip.message.SIPRequest;
@@ -98,8 +97,6 @@
 	@Autowired
 	private IStreamPushService pushService;
 
-	@Autowired
-	private RedisGbPlayMsgListener redisGbPlayMsgListener;
 
 	@Override
 	public void afterPropertiesSet() throws Exception {
@@ -142,7 +139,7 @@
 					ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
 					if (platform != null) {
 						RequestStopPushStreamMsg streamMsg = RequestStopPushStreamMsg.getInstance(sendRtpItem, platform.getName(), platform.getId());
-						redisGbPlayMsgListener.sendMsgForStopSendRtpStream(sendRtpItem.getServerId(), streamMsg);
+//						redisGbPlayMsgListener.sendMsgForStopSendRtpStream(sendRtpItem.getServerId(), streamMsg);
 					}
 				}else {
 					MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
index 3539922..f12f38a 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -19,7 +19,7 @@
 import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
 import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
 import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
-import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
+import com.genersoft.iot.vmp.service.redisMsg.RedisPlatformPushStreamOnlineLister;
 import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
 import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
 import com.genersoft.iot.vmp.media.zlm.dto.*;
@@ -122,7 +122,7 @@
     private UserSetting userSetting;
 
     @Autowired
-    private ZLMMediaListManager mediaListManager;
+    private RedisPlatformPushStreamOnlineLister mediaListManager;
 
     @Autowired
     private SipConfig config;
@@ -568,19 +568,16 @@
 
                     }
                 } else if (gbStream != null) {
-
-                    String ssrc;
-                    if (userSetting.getUseCustomSsrcForParentInvite() || gb28181Sdp.getSsrc() == null) {
-                        // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡
-                        ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
-                    }else {
-                        ssrc = gb28181Sdp.getSsrc();
-                    }
                     SendRtpItem sendRtpItem = new SendRtpItem();
-                    sendRtpItem.setTcpActive(tcpActive);
+                    if (!userSetting.getUseCustomSsrcForParentInvite() && gb28181Sdp.getSsrc() != null) {
+                        sendRtpItem.setSsrc(gb28181Sdp.getSsrc());
+                    }
+
+                    if (tcpActive != null) {
+                        sendRtpItem.setTcpActive(tcpActive);
+                    }
                     sendRtpItem.setTcp(mediaTransmissionTCP);
                     sendRtpItem.setRtcp(platform.isRtcp());
-                    sendRtpItem.setSsrc(ssrc);
                     sendRtpItem.setPlatformName(platform.getName());
                     sendRtpItem.setPlatformId(platform.getServerGBId());
                     sendRtpItem.setMediaServerId(mediaServerItem.getId());
@@ -593,37 +590,48 @@
                     sendRtpItem.setCallId(callIdHeader.getCallId());
                     sendRtpItem.setFromTag(request.getFromTag());
                     sendRtpItem.setOnlyAudio(false);
-                    sendRtpItem.setPlayType(InviteStreamType.PUSH);
                     sendRtpItem.setStatus(0);
+                    sendRtpItem.setSessionName(sessionName);
 
                     if ("push".equals(gbStream.getStreamType())) {
                         if (streamPushItem != null) {
                             // 浠巖edis鏌ヨ鏄惁姝e湪鎺ユ敹杩欎釜鎺ㄦ祦
                             OnStreamChangedHookParam pushListItem = redisCatchStorage.getPushListItem(gbStream.getApp(), gbStream.getStream());
 
-                            sendRtpItem.setServerId(pushListItem.getSeverId());
                             if (pushListItem != null) {
+                                sendRtpItem.setServerId(pushListItem.getSeverId());
                                 StreamPushItem transform = streamPushService.transform(pushListItem);
                                 transform.setSelf(userSetting.getServerId().equals(pushListItem.getSeverId()));
                                 // 鎺ㄦ祦鐘舵��
-                                pushStream(sendRtpItem, mediaServerItem, platform, request);
+                                sendPushStream(sendRtpItem, mediaServerItem, platform, request);
                             }else {
-                                // 鏈帹娴� 鎷夎捣
+                                if (!platform.isStartOfflinePush()) {
+                                    // 骞冲彴璁剧疆涓叧闂簡鎷夎捣绂荤嚎鐨勬帹娴佸垯鐩存帴鍥炲
+                                    try {
+                                        logger.info("[涓婄骇鐐规挱] 澶辫触锛屾帹娴佽澶囨湭鎺ㄦ祦锛宑hannel: {}, app: {}, stream: {}", sendRtpItem.getChannelId(), sendRtpItem.getApp(), sendRtpItem.getStream());
+                                        responseAck(request, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing");
+                                    } catch (SipException | InvalidArgumentException | ParseException e) {
+                                        logger.error("[鍛戒护鍙戦�佸け璐 invite 閫氶亾鏈帹娴�: {}", e.getMessage());
+                                    }
+                                    return;
+                                }
                                 notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request);
                             }
                         }
                     } else if ("proxy".equals(gbStream.getStreamType())) {
                         if (null != proxyByAppAndStream) {
+                            if (sendRtpItem.getSsrc() == null) {
+                                // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡
+                                String ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
+                                sendRtpItem.setSsrc(ssrc);
+                            }
                             if (proxyByAppAndStream.isStatus()) {
-                                pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive,
-                                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+                                sendProxyStream(sendRtpItem, mediaServerItem, platform, request);
                             } else {
                                 //寮�鍚唬鐞嗘媺娴�
                                 notifyProxyStreamOnline(sendRtpItem, mediaServerItem, platform, request);
                             }
                         }
-
-
                     }
                 }
             }
@@ -649,33 +657,23 @@
     /**
      * 瀹夋帓鎺ㄦ祦
      */
-    private void pushProxyStream(RequestEvent evt, SIPRequest request, GbStream gbStream, ParentPlatform platform,
-                            CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
-                            int port, Boolean tcpActive, boolean mediaTransmissionTCP,
-                            String channelId, String addressStr, String ssrc, String requesterId) {
-            Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
+    private void sendProxyStream(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) {
+            Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
             if (streamReady != null && streamReady) {
                 // 鑷钩鍙板唴瀹�
-                SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
-                        gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
-
-            if (sendRtpItem == null) {
-                logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�");
-                try {
-                    responseAck(request, Response.BUSY_HERE);
-                } catch (SipException | InvalidArgumentException | ParseException e) {
-                    logger.error("[鍛戒护鍙戦�佸け璐 invite 鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�: {}", e.getMessage());
+                int localPort = sendRtpPortManager.getNextPort(mediaServerItem);
+                if (localPort == 0) {
+                    logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�");
+                    try {
+                        responseAck(request, Response.BUSY_HERE);
+                    } catch (SipException | InvalidArgumentException | ParseException e) {
+                        logger.error("[鍛戒护鍙戦�佸け璐 invite 鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�: {}", e.getMessage());
+                    }
+                    return;
                 }
-                return;
-            }
-            if (tcpActive != null) {
-                sendRtpItem.setTcpActive(tcpActive);
-            }
-            sendRtpItem.setPlayType(InviteStreamType.PUSH);
+            sendRtpItem.setPlayType(InviteStreamType.PROXY);
             // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶�
             sendRtpItem.setStatus(1);
-            sendRtpItem.setCallId(callIdHeader.getCallId());
-            sendRtpItem.setFromTag(request.getFromTag());
             sendRtpItem.setLocalIp(mediaServerItem.getSdpIp());
 
             SIPResponse response = sendStreamAck(request, sendRtpItem, platform);
@@ -683,12 +681,10 @@
                 sendRtpItem.setToTag(response.getToTag());
             }
             redisCatchStorage.updateSendRTPSever(sendRtpItem);
-
         }
-
     }
 
-    private void pushStream(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) {
+    private void sendPushStream(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) {
         // 鎺ㄦ祦
         if (sendRtpItem.getServerId().equals(userSetting.getServerId())) {
             Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
@@ -712,6 +708,11 @@
                 if (response != null) {
                     sendRtpItem.setToTag(response.getToTag());
                 }
+                if (sendRtpItem.getSsrc() == null) {
+                    // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡
+                    String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
+                    sendRtpItem.setSsrc(ssrc);
+                }
                 redisCatchStorage.updateSendRTPSever(sendRtpItem);
 
             } else {
@@ -719,10 +720,6 @@
                 notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request);
             }
         } else {
-            SendRtpItem sendRtpItem = new SendRtpItem();
-            sendRtpItem.setRtcp(platform.isRtcp());
-            sendRtpItem.setTcp(mediaTransmissionTCP);
-            sendRtpItem.setTcpActive();
             // 鍏朵粬骞冲彴鍐呭
             otherWvpPushStream(sendRtpItem, request, platform);
         }
@@ -733,29 +730,28 @@
      */
     private void notifyProxyStreamOnline(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) {
         // TODO 鎺у埗鍚敤浠ヤ娇璁惧涓婄嚎
-        logger.info("[ app={}, stream={} ]閫氶亾鏈帹娴侊紝鍚敤娴佸悗寮�濮嬫帹娴�", gbStream.getApp(), gbStream.getStream());
+        logger.info("[ app={}, stream={} ]閫氶亾鏈帹娴侊紝鍚敤娴佸悗寮�濮嬫帹娴�", sendRtpItem.getApp(), sendRtpItem.getStream());
         // 鐩戝惉娴佷笂绾�
-        HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(gbStream.getApp(), gbStream.getStream(), true, "rtsp", mediaServerItem.getId());
+        HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", mediaServerItem.getId());
         zlmHttpHookSubscribe.addSubscribe(hookSubscribe, (mediaServerItemInUSe, hookParam) -> {
             OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam;
             logger.info("[涓婄骇鐐规挱]鎷夋祦浠g悊宸茬粡灏辩华锛� {}/{}", streamChangedHookParam.getApp(), streamChangedHookParam.getStream());
-            dynamicTask.stop(callIdHeader.getCallId());
-            pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive,
-                    mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+            dynamicTask.stop(sendRtpItem.getCallId());
+            sendProxyStream(sendRtpItem, mediaServerItem, platform, request);
         });
-        dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
-            logger.info("[ app={}, stream={} ] 绛夊緟鎷夋祦浠g悊娴佽秴鏃�", gbStream.getApp(), gbStream.getStream());
+        dynamicTask.startDelay(sendRtpItem.getCallId(), () -> {
+            logger.info("[ app={}, stream={} ] 绛夊緟鎷夋祦浠g悊娴佽秴鏃�", sendRtpItem.getApp(), sendRtpItem.getStream());
             zlmHttpHookSubscribe.removeSubscribe(hookSubscribe);
         }, userSetting.getPlatformPlayTimeout());
-        boolean start = streamProxyService.start(gbStream.getApp(), gbStream.getStream());
+        boolean start = streamProxyService.start(sendRtpItem.getApp(), sendRtpItem.getStream());
         if (!start) {
             try {
-                responseAck(request, Response.BUSY_HERE, "channel [" + gbStream.getGbId() + "] offline");
+                responseAck(request, Response.BUSY_HERE, "channel [" + sendRtpItem.getChannelId() + "] offline");
             } catch (SipException | InvalidArgumentException | ParseException e) {
                 logger.error("[鍛戒护鍙戦�佸け璐 invite 閫氶亾鏈帹娴�: {}", e.getMessage());
             }
             zlmHttpHookSubscribe.removeSubscribe(hookSubscribe);
-            dynamicTask.stop(callIdHeader.getCallId());
+            dynamicTask.stop(sendRtpItem.getCallId());
         }
     }
 
@@ -763,50 +759,28 @@
      * 閫氱煡娴佷笂绾�
      */
     private void notifyPushStreamOnline(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) {
-        if (!platform.isStartOfflinePush()) {
-            // 骞冲彴璁剧疆涓叧闂簡鎷夎捣绂荤嚎鐨勬帹娴佸垯鐩存帴鍥炲
-            try {
-                logger.info("[涓婄骇鐐规挱] 澶辫触锛屾帹娴佽澶囨湭鎺ㄦ祦锛宑hannel: {}, app: {}, stream: {}", gbStream.getGbId(), gbStream.getApp(), gbStream.getStream());
-                responseAck(request, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing");
-            } catch (SipException | InvalidArgumentException | ParseException e) {
-                logger.error("[鍛戒护鍙戦�佸け璐 invite 閫氶亾鏈帹娴�: {}", e.getMessage());
-            }
-            return;
-        }
-        // 鍙戦�乺edis娑堟伅浠ヤ娇璁惧涓婄嚎
-        logger.info("[ app={}, stream={} ]閫氶亾鏈帹娴侊紝鍙戦�乺edis淇℃伅鎺у埗璁惧寮�濮嬫帹娴�", gbStream.getApp(), gbStream.getStream());
-
+        // 鍙戦�乺edis娑堟伅浠ヤ娇璁惧涓婄嚎锛屾祦涓婄嚎鍚庤
+        logger.info("[ app={}, stream={} ]閫氶亾鏈帹娴侊紝鍙戦�乺edis淇℃伅鎺у埗璁惧寮�濮嬫帹娴�", sendRtpItem.getApp(), sendRtpItem.getStream());
         MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1,
-                gbStream.getApp(), gbStream.getStream(), gbStream.getGbId(), gbStream.getPlatformId(),
-                platform.getName(), userSetting.getServerId(), gbStream.getMediaServerId());
+                sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), sendRtpItem.getPlatformId(),
+                platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
         redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
         // 璁剧疆瓒呮椂
-        dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
-            logger.info("[ app={}, stream={} ] 绛夊緟璁惧寮�濮嬫帹娴佽秴鏃�", gbStream.getApp(), gbStream.getStream());
+        dynamicTask.startDelay(sendRtpItem.getCallId(), () -> {
+            logger.info("[ app={}, stream={} ] 绛夊緟璁惧寮�濮嬫帹娴佽秴鏃�", sendRtpItem.getApp(), sendRtpItem.getStream());
             try {
-                redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream());
-                mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream());
+                redisPushStreamResponseListener.removeEvent(sendRtpItem.getApp(), sendRtpItem.getStream());
+                mediaListManager.removedChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream());
                 responseAck(request, Response.REQUEST_TIMEOUT); // 瓒呮椂
             } catch (SipException | InvalidArgumentException | ParseException e) {
                 logger.error("鏈鐞嗙殑寮傚父 ", e);
             }
         }, userSetting.getPlatformPlayTimeout());
-        // 鍐欏叆redis寰呭彂娴佷俊鎭紝渚涘叾浠杦vp璇诲彇骞剁敓鎴愬彂娴佷俊鎭�
-        SendRtpItem sendRtpItemTemp = new SendRtpItem();
-        sendRtpItemTemp.setIp(addressStr);
-        sendRtpItemTemp.setPort(port);
-        sendRtpItemTemp.setSsrc(ssrc);
-        sendRtpItemTemp.setPlatformId(requesterId);
-        sendRtpItemTemp.setPlatformName(platform.getName());
-        sendRtpItemTemp.setTcp(mediaTransmissionTCP);
-        sendRtpItemTemp.setRtcp(platform.isRtcp());
-        sendRtpItemTemp.setTcpActive(tcpActive);
-        sendRtpItemTemp.setPlayType(InviteStreamType.PUSH);
-        redisCatchStorage.addWaiteSendRtpItem(sendRtpItemTemp, userSetting.getPlatformPlayTimeout());
+        redisCatchStorage.addWaiteSendRtpItem(sendRtpItem, userSetting.getPlatformPlayTimeout());
         // 娣诲姞涓婄嚎鐨勯�氱煡
-        mediaListManager.addChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream(), (sendRtpItemFromRedis) -> {
-            dynamicTask.stop(callIdHeader.getCallId());
-            redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream());
+        mediaListManager.addChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream(), (sendRtpItemFromRedis) -> {
+            dynamicTask.stop(sendRtpItem.getCallId());
+            redisPushStreamResponseListener.removeEvent(sendRtpItem.getApp(), sendRtpItem.getStream());
             if (sendRtpItemFromRedis.getServerId().equals(userSetting.getServerId())) {
 
                 int localPort = sendRtpPortManager.getNextPort(mediaServerItem);
@@ -823,18 +797,18 @@
                     }
                     return;
                 }
-                sendRtpItemTemp.setLocalPort(localPort);
-                sendRtpItemTemp.setLocalIp(ObjectUtils.isEmpty(platform.getSendStreamIp()): );
-                // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶�
-                sendRtpItemTemp.setStatus(1);
-                sendRtpItemTemp.setCallId(callIdHeader.getCallId());
-
-                sendRtpItemTemp.setFromTag(request.getFromTag());
-                SIPResponse response = sendStreamAck(request, sendRtpItemTemp, platform);
-                if (response != null) {
-                    sendRtpItemTemp.setToTag(response.getToTag());
+                sendRtpItem.setLocalPort(localPort);
+                if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) {
+                    sendRtpItem.setLocalIp(platform.getSendStreamIp());
                 }
-                redisCatchStorage.updateSendRTPSever(sendRtpItemTemp);
+
+                // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶�
+                sendRtpItem.setStatus(1);
+                SIPResponse response = sendStreamAck(request, sendRtpItem, platform);
+                if (response != null) {
+                    sendRtpItem.setToTag(response.getToTag());
+                }
+                redisCatchStorage.updateSendRTPSever(sendRtpItem);
             } else {
                 // 鍏朵粬骞冲彴鍐呭
                 otherWvpPushStream(sendRtpItemFromRedis, request, platform);
@@ -842,10 +816,10 @@
         });
 
         // 娣诲姞鍥炲鐨勬嫆缁濇垨鑰呴敊璇殑閫氱煡
-        redisPushStreamResponseListener.addEvent(gbStream.getApp(), gbStream.getStream(), response -> {
+        redisPushStreamResponseListener.addEvent(sendRtpItem.getApp(), sendRtpItem.getStream(), response -> {
             if (response.getCode() != 0) {
-                dynamicTask.stop(callIdHeader.getCallId());
-                mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream());
+                dynamicTask.stop(sendRtpItem.getCallId());
+                mediaListManager.removedChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream());
                 try {
                     responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg());
                 } catch (SipException | InvalidArgumentException | ParseException e) {
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
index 65685aa..346b4a2 100755
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -18,14 +18,12 @@
 import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
-import com.genersoft.iot.vmp.media.zlm.dto.HookType;
-import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
-import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
-import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
+import com.genersoft.iot.vmp.media.zlm.dto.*;
 import com.genersoft.iot.vmp.media.zlm.dto.hook.*;
 import com.genersoft.iot.vmp.service.*;
 import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
+import com.genersoft.iot.vmp.service.redisMsg.RedisPlatformPushStreamOnlineLister;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
 import com.genersoft.iot.vmp.utils.DateUtil;
@@ -103,7 +101,7 @@
     private EventPublisher eventPublisher;
 
     @Autowired
-    private ZLMMediaListManager zlmMediaListManager;
+    private RedisPlatformPushStreamOnlineLister zlmMediaListManager;
 
     @Autowired
     private ZlmHttpHookSubscribe subscribe;
@@ -129,6 +127,9 @@
 
     @Autowired
     private RedisTemplate<Object, Object> redisTemplate;
+
+    @Autowired
+    private IStreamPushService streamPushService;
 
     /**
      * 鏈嶅姟鍣ㄥ畾鏃朵笂鎶ユ椂闂达紝涓婃姤闂撮殧鍙厤缃紝榛樿10s涓婃姤涓�娆�
@@ -236,10 +237,7 @@
                 // 閴存潈閫氳繃
                 redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), param.getStream(), streamAuthorityInfo);
             }
-        } else {
-            zlmMediaListManager.sendStreamEvent(param.getApp(), param.getStream(), param.getMediaServerId());
         }
-
 
         HookResultForOnPublish result = HookResultForOnPublish.SUCCESS();
         result.setEnable_audio(true);
@@ -465,8 +463,7 @@
                                     || param.getOriginType() == OriginType.RTMP_PUSH.ordinal()
                                     || param.getOriginType() == OriginType.RTC_PUSH.ordinal()) {
                                 param.setSeverId(userSetting.getServerId());
-                                zlmMediaListManager.addPush(param);
-
+                                streamPushService.updatePush(param);
                                 // 鍐椾綑鏁版嵁锛岃嚜宸辩郴缁熶腑鑷敤
                                 redisCatchStorage.addPushListItem(param.getApp(), param.getStream(), param);
                             }
@@ -483,10 +480,13 @@
                                 }
                             }
                             GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream());
-                            if (gbStream != null) {
-//									eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF);
+                            // 鏌ユ壘鏄惁鍏宠仈浜嗗浗鏍囷紝 鍏宠仈浜嗕笉鍒犻櫎锛� 缃负绂荤嚎
+                            if (gbStream == null) {
+                                storager.removeMedia(param.getApp(), param.getStream());
+                            }else {
+//                                eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF);
+                                storager.mediaOffline(param.getApp(), param.getStream());
                             }
-                            zlmMediaListManager.removeMedia(param.getApp(), param.getStream());
                         }
                         GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream());
                         if (gbStream != null) {
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java
deleted file mode 100755
index cbc5fde..0000000
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java
+++ /dev/null
@@ -1,138 +0,0 @@
-package com.genersoft.iot.vmp.media.zlm;
-
-import com.genersoft.iot.vmp.conf.UserSetting;
-import com.genersoft.iot.vmp.gb28181.bean.GbStream;
-import com.genersoft.iot.vmp.media.zlm.dto.*;
-import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
-import com.genersoft.iot.vmp.service.IMediaServerService;
-import com.genersoft.iot.vmp.service.IStreamProxyService;
-import com.genersoft.iot.vmp.service.IStreamPushService;
-import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
-import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
-import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
-import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper;
-import com.genersoft.iot.vmp.storager.dao.StreamPushMapper;
-import com.genersoft.iot.vmp.utils.DateUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-import java.text.ParseException;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * @author lin
- */
-@Component
-public class ZLMMediaListManager {
-
-    private Logger logger = LoggerFactory.getLogger("ZLMMediaListManager");
-
-    @Autowired
-    private ZLMRESTfulUtils zlmresTfulUtils;
-
-    @Autowired
-    private IRedisCatchStorage redisCatchStorage;
-
-    @Autowired
-    private IVideoManagerStorage storager;
-
-    @Autowired
-    private GbStreamMapper gbStreamMapper;
-
-    @Autowired
-    private PlatformGbStreamMapper platformGbStreamMapper;
-
-    @Autowired
-    private IStreamPushService streamPushService;
-
-    @Autowired
-    private IStreamProxyService streamProxyService;
-
-    @Autowired
-    private StreamPushMapper streamPushMapper;
-
-    @Autowired
-    private ZlmHttpHookSubscribe subscribe;
-
-    @Autowired
-    private UserSetting userSetting;
-
-    @Autowired
-    private ZLMServerFactory zlmServerFactory;
-
-    @Autowired
-    private IMediaServerService mediaServerService;
-
-    private Map<String, ChannelOnlineEvent> channelOnPublishEvents = new ConcurrentHashMap<>();
-
-    public StreamPushItem addPush(OnStreamChangedHookParam onStreamChangedHookParam) {
-        StreamPushItem transform = streamPushService.transform(onStreamChangedHookParam);
-        StreamPushItem pushInDb = streamPushService.getPush(onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream());
-        transform.setPushIng(onStreamChangedHookParam.isRegist());
-        transform.setUpdateTime(DateUtil.getNow());
-        transform.setPushTime(DateUtil.getNow());
-        transform.setSelf(userSetting.getServerId().equals(onStreamChangedHookParam.getSeverId()));
-        if (pushInDb == null) {
-            transform.setCreateTime(DateUtil.getNow());
-            streamPushMapper.add(transform);
-        }else {
-            streamPushMapper.update(transform);
-            gbStreamMapper.updateMediaServer(onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream(), onStreamChangedHookParam.getMediaServerId());
-        }
-        ChannelOnlineEvent channelOnlineEventLister = getChannelOnlineEventLister(transform.getApp(), transform.getStream());
-        if ( channelOnlineEventLister != null)  {
-            try {
-                channelOnlineEventLister.run(transform.getApp(), transform.getStream(), transform.getServerId());;
-            } catch (ParseException e) {
-                logger.error("addPush: ", e);
-            }
-            removedChannelOnlineEventLister(transform.getApp(), transform.getStream());
-        }
-        return transform;
-    }
-
-    public void sendStreamEvent(String app, String stream, String mediaServerId) {
-        MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
-        // 鏌ョ湅鎺ㄦ祦鐘舵��
-        Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, app, stream);
-        if (streamReady != null && streamReady) {
-            ChannelOnlineEvent channelOnlineEventLister = getChannelOnlineEventLister(app, stream);
-            if (channelOnlineEventLister != null)  {
-                try {
-                    channelOnlineEventLister.run(app, stream, mediaServerId);
-                } catch (ParseException e) {
-                    logger.error("sendStreamEvent: ", e);
-                }
-                removedChannelOnlineEventLister(app, stream);
-            }
-        }
-    }
-
-    public int removeMedia(String app, String streamId) {
-        // 鏌ユ壘鏄惁鍏宠仈浜嗗浗鏍囷紝 鍏宠仈浜嗕笉鍒犻櫎锛� 缃负绂荤嚎
-        GbStream gbStream = gbStreamMapper.selectOne(app, streamId);
-        int result;
-        if (gbStream == null) {
-            result = storager.removeMedia(app, streamId);
-        }else {
-            result =storager.mediaOffline(app, streamId);
-        }
-        return result;
-    }
-
-    public void addChannelOnlineEventLister(String app, String stream, ChannelOnlineEvent callback) {
-        this.channelOnPublishEvents.put(app + "_" + stream, callback);
-    }
-
-    public void removedChannelOnlineEventLister(String app, String stream) {
-        this.channelOnPublishEvents.remove(app + "_" + stream);
-    }
-
-    public ChannelOnlineEvent getChannelOnlineEventLister(String app, String stream) {
-        return this.channelOnPublishEvents.get(app + "_" + stream);
-    }
-
-}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java
index 10b1eff..32a42b1 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java
@@ -118,4 +118,5 @@
     Map<String, StreamPushItem> getAllAppAndStreamMap();
 
 
+    void updatePush(OnStreamChangedHookParam param);
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
index 8c0e9b0..bc05276 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -31,7 +31,6 @@
 import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
 import com.genersoft.iot.vmp.service.*;
 import com.genersoft.iot.vmp.service.bean.*;
-import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
 import com.genersoft.iot.vmp.storager.dao.CloudRecordServiceMapper;
@@ -133,9 +132,6 @@
     @Qualifier("taskExecutor")
     @Autowired
     private ThreadPoolTaskExecutor taskExecutor;
-
-    @Autowired
-    private RedisGbPlayMsgListener redisGbPlayMsgListener;
 
     @Autowired
     private ZlmHttpHookSubscribe hookSubscribe;
@@ -1366,15 +1362,7 @@
             param.put("udp_rtcp_timeout", sendRtpItem.isRtcp() ? "1" : "0");
         }
 
-        if (mediaInfo == null) {
-            RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(
-                    sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStream(),
-                    sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(),
-                    sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio());
-            redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> {
-                startSendRtpStreamHand(sendRtpItem, platform, json, param, callIdHeader);
-            });
-        } else {
+        if (mediaInfo != null) {
             // 濡傛灉鏄弗鏍兼ā寮忥紝闇�瑕佸叧闂鍙e崰鐢�
             JSONObject startSendRtpStreamResult = null;
             if (sendRtpItem.getLocalPort() != 0) {
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
index e2d7e68..bb51edc 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
@@ -553,4 +553,21 @@
     public Map<String, StreamPushItem> getAllAppAndStreamMap() {
         return streamPushMapper.getAllAppAndStreamMap();
     }
+
+    @Override
+    public void updatePush(OnStreamChangedHookParam param) {
+        StreamPushItem transform = transform(param);
+        StreamPushItem pushInDb = getPush(param.getApp(), param.getStream());
+        transform.setPushIng(param.isRegist());
+        transform.setUpdateTime(DateUtil.getNow());
+        transform.setPushTime(DateUtil.getNow());
+        transform.setSelf(userSetting.getServerId().equals(param.getSeverId()));
+        if (pushInDb == null) {
+            transform.setCreateTime(DateUtil.getNow());
+            streamPushMapper.add(transform);
+        }else {
+            streamPushMapper.update(transform);
+            gbStreamMapper.updateMediaServer(param.getApp(), param.getStream(), param.getMediaServerId());
+        }
+    }
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformPushStreamOnlineLister.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformPushStreamOnlineLister.java
new file mode 100755
index 0000000..8ad0807
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformPushStreamOnlineLister.java
@@ -0,0 +1,97 @@
+package com.genersoft.iot.vmp.service.redisMsg;
+
+import com.alibaba.fastjson2.JSON;
+import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.gb28181.bean.GbStream;
+import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData;
+import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
+import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
+import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
+import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
+import com.genersoft.iot.vmp.media.zlm.dto.*;
+import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
+import com.genersoft.iot.vmp.service.IMediaServerService;
+import com.genersoft.iot.vmp.service.IStreamProxyService;
+import com.genersoft.iot.vmp.service.IStreamPushService;
+import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse;
+import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
+import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
+import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
+import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper;
+import com.genersoft.iot.vmp.storager.dao.StreamPushMapper;
+import com.genersoft.iot.vmp.utils.DateUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.stereotype.Component;
+
+import java.text.ParseException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * @author lin
+ */
+@Component
+public class RedisPlatformPushStreamOnlineLister implements MessageListener {
+
+    private final Logger logger = LoggerFactory.getLogger("RedisPlatformPushStreamOnlineLister");
+
+    private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
+
+    @Qualifier("taskExecutor")
+    @Autowired
+    private ThreadPoolTaskExecutor taskExecutor;
+
+    /**
+     * 閫氳繃redis娑堟伅鎺ユ敹娴佷笂绾跨殑閫氱煡锛屽鏋滄湰鏈虹敱瀵硅繖涓祦鐨勭洃鍚紝鍒欏洖璋�
+     */
+    @Override
+    public void onMessage(Message message, byte[] pattern) {
+        boolean isEmpty = taskQueue.isEmpty();
+        taskQueue.offer(message);
+        if (isEmpty) {
+            taskExecutor.execute(() -> {
+                while (!taskQueue.isEmpty()) {
+                    Message msg = taskQueue.poll();
+                    SendRtpItem sendRtpItem = JSON.parseObject(new String(msg.getBody()), SendRtpItem.class);
+                    sendStreamEvent(sendRtpItem);
+                }
+            });
+        }
+    }
+
+    private final Map<String, ChannelOnlineEvent> channelOnPublishEvents = new ConcurrentHashMap<>();
+
+    public void sendStreamEvent(SendRtpItem sendRtpItem) {
+        // 鏌ョ湅鎺ㄦ祦鐘舵��
+        ChannelOnlineEvent channelOnlineEventLister = getChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream());
+        if (channelOnlineEventLister != null)  {
+            try {
+                channelOnlineEventLister.run(sendRtpItem);
+            } catch (ParseException e) {
+                logger.error("sendStreamEvent: ", e);
+            }
+            removedChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream());
+        }
+    }
+
+    public void addChannelOnlineEventLister(String app, String stream, ChannelOnlineEvent callback) {
+        this.channelOnPublishEvents.put(app + "_" + stream, callback);
+    }
+
+    public void removedChannelOnlineEventLister(String app, String stream) {
+        this.channelOnPublishEvents.remove(app + "_" + stream);
+    }
+
+    public ChannelOnlineEvent getChannelOnlineEventLister(String app, String stream) {
+        return this.channelOnPublishEvents.get(app + "_" + stream);
+    }
+
+
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java
index 14a96e8..25dd334 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java
@@ -1,12 +1,16 @@
 package com.genersoft.iot.vmp.service.redisMsg;
 
 import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONObject;
 import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
+import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
 import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
 import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
 import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
+import com.genersoft.iot.vmp.service.IMediaServerService;
 import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -18,6 +22,8 @@
 import org.springframework.stereotype.Component;
 import org.springframework.util.ObjectUtils;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 /**
@@ -32,10 +38,10 @@
     private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
 
     @Autowired
-    private UserSetting userSetting;
+    private ZLMServerFactory zlmServerFactory;
 
     @Autowired
-    private ZlmHttpHookSubscribe hookSubscribe;
+    private IMediaServerService mediaServerService;
 
     @Qualifier("taskExecutor")
     @Autowired
@@ -52,23 +58,14 @@
                 while (!taskQueue.isEmpty()) {
                     Message msg = taskQueue.poll();
                     try {
-                        MessageForPushChannel messageForPushChannel = JSON.parseObject(new String(msg.getBody()), MessageForPushChannel.class);
-                        if (messageForPushChannel == null
-                                || ObjectUtils.isEmpty(messageForPushChannel.getApp())
-                                || ObjectUtils.isEmpty(messageForPushChannel.getStream())
-                        || userSetting.getServerId().equals(messageForPushChannel.getServerId())){
-                            continue;
+                        SendRtpItem sendRtpItem = JSON.parseObject(new String(msg.getBody()), SendRtpItem.class);
+                        sendRtpItem.getMediaServerId();
+                        MediaServerItem mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
+                        if (mediaServer == null) {
+                            return;
                         }
-
-                        // 鐩戝惉娴佷笂绾裤�� 娴佷笂绾跨洿鎺ュ彂閫乻endRtpItem娑堟伅缁欏疄闄呯殑淇′护澶勭悊鑰�
-                        HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed(
-                                messageForPushChannel.getApp(), messageForPushChannel.getStream(), true, "rtsp",
-                                null);
-                        hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> {
-                            // 璇诲彇redis涓殑涓婄骇鐐规挱淇℃伅锛岀敓鎴恠endRtpItm鍙戦�佸嚭鍘�
-
-                        });
-
+                        Map<String, Object> sendRtpParam = getSendRtpParam(sendRtpItem);
+                        sendRtp(sendRtpItem, mediaServer, sendRtpParam);
 
                     }catch (Exception e) {
                         logger.warn("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉] 鍙戠幇鏈鐞嗙殑寮傚父, \r\n{}", JSON.toJSONString(message));
@@ -78,4 +75,48 @@
             });
         }
     }
+
+    private Map<String, Object> getSendRtpParam(SendRtpItem sendRtpItem) {
+        String isUdp = sendRtpItem.isTcp() ? "0" : "1";
+        Map<String, Object> param = new HashMap<>(12);
+        param.put("vhost","__defaultVhost__");
+        param.put("app",sendRtpItem.getApp());
+        param.put("stream",sendRtpItem.getStream());
+        param.put("ssrc", sendRtpItem.getSsrc());
+        param.put("dst_url",sendRtpItem.getIp());
+        param.put("dst_port", sendRtpItem.getPort());
+        param.put("src_port", sendRtpItem.getLocalPort());
+        param.put("pt", sendRtpItem.getPt());
+        param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
+        param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
+        param.put("is_udp", isUdp);
+        if (!sendRtpItem.isTcp()) {
+            // udp妯″紡涓嬪紑鍚痳tcp淇濇椿
+            param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0");
+        }
+        return param;
+    }
+
+    private JSONObject sendRtp(SendRtpItem sendRtpItem, MediaServerItem mediaInfo, Map<String, Object> param){
+        JSONObject startSendRtpStreamResult = null;
+        if (sendRtpItem.getLocalPort() != 0) {
+            if (sendRtpItem.isTcpActive()) {
+                startSendRtpStreamResult = zlmServerFactory.startSendRtpPassive(mediaInfo, param);
+            }else {
+                param.put("dst_url", sendRtpItem.getIp());
+                param.put("dst_port", sendRtpItem.getPort());
+                startSendRtpStreamResult = zlmServerFactory.startSendRtpStream(mediaInfo, param);
+            }
+        }else {
+            if (sendRtpItem.isTcpActive()) {
+                startSendRtpStreamResult = zlmServerFactory.startSendRtpPassive(mediaInfo, param);
+            }else {
+                param.put("dst_url", sendRtpItem.getIp());
+                param.put("dst_port", sendRtpItem.getPort());
+                startSendRtpStreamResult = zlmServerFactory.startSendRtpStream(mediaInfo, param);
+            }
+        }
+        return startSendRtpStreamResult;
+
+    }
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformWaitPushStreamOnlineListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformWaitPushStreamOnlineListener.java
index f3b415d..25600a2 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformWaitPushStreamOnlineListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformWaitPushStreamOnlineListener.java
@@ -2,12 +2,15 @@
 
 import com.alibaba.fastjson2.JSON;
 import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
+import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
 import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
 import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
 import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
 import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
+import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -35,13 +38,25 @@
     private UserSetting userSetting;
 
     @Autowired
+    private IRedisCatchStorage redisCatchStorage;
+
+    @Autowired
     private ZlmHttpHookSubscribe hookSubscribe;
+
+    @Autowired
+    private RedisPlatformPushStreamOnlineLister redisPlatformPushStreamOnlineLister;
+
+    @Autowired
+    private SSRCFactory ssrcFactory;
 
     @Qualifier("taskExecutor")
     @Autowired
     private ThreadPoolTaskExecutor taskExecutor;
 
 
+    /**
+     * 褰撲笂绾х偣鎾椂锛岃繖閲岃礋璐g洃鍚瓑鍒版祦涓婄嚎锛屾祦涓婄嚎鍚庡鏋滄槸鍦ㄥ綋鍓嶆湇鍔″垯鐩存帴鍥炶皟锛屽鏋滄槸鍏朵粬wvp锛屽垯鐢眗edis娑堟伅杩涜閫氱煡
+     */
     @Override
     public void onMessage(Message message, byte[] bytes) {
         logger.info("[REDIS娑堟伅-鏀跺埌涓婄骇绛夊埌璁惧鎺ㄦ祦鐨剅edis娑堟伅]锛� {}", new String(message.getBody()));
@@ -66,7 +81,17 @@
                                 null);
                         hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> {
                             // 璇诲彇redis涓殑涓婄骇鐐规挱淇℃伅锛岀敓鎴恠endRtpItm鍙戦�佸嚭鍘�
-
+                            SendRtpItem sendRtpItem = redisCatchStorage.getWaiteSendRtpItem(messageForPushChannel.getApp(), messageForPushChannel.getStream());
+                            if (sendRtpItem.getSsrc() == null) {
+                                // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡
+                                String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItemInUse.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItemInUse.getId());
+                                sendRtpItem.setSsrc(ssrc);
+                                sendRtpItem.setMediaServerId(mediaServerItemInUse.getId());
+                                sendRtpItem.setLocalIp(mediaServerItemInUse.getSdpIp());
+                                redisPlatformPushStreamOnlineLister.sendStreamEvent(sendRtpItem);
+                                // 閫氱煡鍏朵粬wvp锛� 鐢盧edisPlatformPushStreamOnlineLister鎺ユ敹姝ょ洃鍚��
+                                redisCatchStorage.sendPushStreamOnline(sendRtpItem);
+                            }
                         });
 
 
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java
deleted file mode 100755
index 7d5ba60..0000000
--- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java
+++ /dev/null
@@ -1,97 +0,0 @@
-package com.genersoft.iot.vmp.service.redisMsg;
-
-import com.alibaba.fastjson2.JSON;
-import com.alibaba.fastjson2.JSONObject;
-import com.genersoft.iot.vmp.conf.UserSetting;
-import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
-import com.genersoft.iot.vmp.media.zlm.dto.ChannelOnlineEvent;
-import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.data.redis.connection.Message;
-import org.springframework.data.redis.connection.MessageListener;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-import org.springframework.stereotype.Component;
-
-import java.text.ParseException;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-
-/**
- * 鎺ユ敹鍏朵粬wvp鍙戦�佹祦鍙樺寲閫氱煡
- * @author lin
- */
-@Component
-public class RedisStreamMsgListener implements MessageListener {
-
-    private final static Logger logger = LoggerFactory.getLogger(RedisStreamMsgListener.class);
-
-    @Autowired
-    private UserSetting userSetting;
-
-    @Autowired
-    private ZLMMediaListManager zlmMediaListManager;
-
-    private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
-
-    @Qualifier("taskExecutor")
-    @Autowired
-    private ThreadPoolTaskExecutor taskExecutor;
-
-    @Override
-    public void onMessage(Message message, byte[] bytes) {
-        boolean isEmpty = taskQueue.isEmpty();
-        taskQueue.offer(message);
-        if (isEmpty) {
-            taskExecutor.execute(() -> {
-                while (!taskQueue.isEmpty()) {
-                    Message msg = taskQueue.poll();
-                    try {
-                        JSONObject steamMsgJson = JSON.parseObject(msg.getBody(), JSONObject.class);
-                        if (steamMsgJson == null) {
-                            logger.warn("[鏀跺埌redis 娴佸彉鍖朷娑堟伅瑙f瀽澶辫触");
-                            continue;
-                        }
-                        String serverId = steamMsgJson.getString("serverId");
-
-                        if (userSetting.getServerId().equals(serverId)) {
-                            // 鑷繁鍙戦�佺殑娑堟伅蹇界暐鍗冲彲
-                            continue;
-                        }
-                        logger.info("[鏀跺埌redis 娴佸彉鍖朷锛� {}", new String(message.getBody()));
-                        String app = steamMsgJson.getString("app");
-                        String stream = steamMsgJson.getString("stream");
-                        boolean register = steamMsgJson.getBoolean("register");
-                        String mediaServerId = steamMsgJson.getString("mediaServerId");
-                        OnStreamChangedHookParam onStreamChangedHookParam = new OnStreamChangedHookParam();
-                        onStreamChangedHookParam.setSeverId(serverId);
-                        onStreamChangedHookParam.setApp(app);
-                        onStreamChangedHookParam.setStream(stream);
-                        onStreamChangedHookParam.setRegist(register);
-                        onStreamChangedHookParam.setMediaServerId(mediaServerId);
-                        onStreamChangedHookParam.setCreateStamp(System.currentTimeMillis()/1000);
-                        onStreamChangedHookParam.setAliveSecond(0L);
-                        onStreamChangedHookParam.setTotalReaderCount("0");
-                        onStreamChangedHookParam.setOriginType(0);
-                        onStreamChangedHookParam.setOriginTypeStr("0");
-                        onStreamChangedHookParam.setOriginTypeStr("unknown");
-                        ChannelOnlineEvent channelOnlineEventLister = zlmMediaListManager.getChannelOnlineEventLister(app, stream);
-                        if ( channelOnlineEventLister != null)  {
-                            try {
-                                channelOnlineEventLister.run(app, stream, serverId);;
-                            } catch (ParseException e) {
-                                logger.error("addPush: ", e);
-                            }
-                            zlmMediaListManager.removedChannelOnlineEventLister(app, stream);
-                        }
-                    }catch (Exception e) {
-                        logger.warn("[REDIS娑堟伅-娴佸彉鍖朷 鍙戠幇鏈鐞嗙殑寮傚父, \r\n{}", JSON.toJSONString(message));
-                        logger.error("[REDIS娑堟伅-娴佸彉鍖朷 寮傚父鍐呭锛� ", e);
-                    }
-                }
-            });
-        }
-    }
-}
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
index 1e5e93d..108bc17 100755
--- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
@@ -219,5 +219,9 @@
 
     void addWaiteSendRtpItem(SendRtpItem sendRtpItem, int platformPlayTimeout);
 
+    SendRtpItem getWaiteSendRtpItem(String app, String stream);
+
     void sendStartSendRtp(SendRtpItem sendRtpItem);
+
+    void sendPushStreamOnline(SendRtpItem sendRtpItem);
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
index 60084df..7088837 100755
--- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -686,8 +686,21 @@
     }
 
     @Override
+    public SendRtpItem getWaiteSendRtpItem(String app, String stream) {
+        String key = VideoManagerConstants.WAITE_SEND_PUSH_STREAM + app + "_" + stream;
+        return (SendRtpItem)redisTemplate.opsForValue().get(key);
+    }
+
+    @Override
     public void sendStartSendRtp(SendRtpItem sendRtpItem) {
         String key = VideoManagerConstants.START_SEND_PUSH_STREAM + sendRtpItem.getApp() + "_" + sendRtpItem.getStream();
         redisTemplate.opsForValue().set(key, JSON.toJSONString(sendRtpItem));
     }
+
+    @Override
+    public void sendPushStreamOnline(SendRtpItem sendRtpItem) {
+        String key = VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED;
+        logger.info("[redis鍙戦�侀�氱煡] 娴佷笂绾� {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getPlatformId());
+        redisTemplate.convertAndSend(key, JSON.toJSON(sendRtpItem));
+    }
 }

--
Gitblit v1.8.0