From 9c6765d44ef2ccb06fdaf525a06e564a331ab892 Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期二, 16 四月 2024 22:10:35 +0800
Subject: [PATCH] 重构多wvp国标级联机制

---
 src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java                            |   18 -
 src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcMessage.java                            |   24 +
 src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcRequest.java                            |   93 +++++
 src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java                                  |  205 ++++++++++++
 src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java                                |    2 
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java    |   16 
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java |   45 --
 /dev/null                                                                                           |   76 ----
 src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcResponse.java                           |   87 +++++
 src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java                              |    4 
 src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java                        |    7 
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java    |   14 
 src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java                          |   16 +
 src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java                |  203 ++++++++++++
 src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java                        |   21 +
 src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java               |  100 ++++++
 src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java                                 |    9 
 17 files changed, 789 insertions(+), 151 deletions(-)

diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java
index 5385efa..5ddaed3 100644
--- a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java
+++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java
@@ -34,23 +34,13 @@
 	@Autowired
 	private RedisPushStreamStatusListMsgListener redisPushStreamListMsgListener;
 
-	@Autowired
-	private RedisPushStreamResponseListener redisPushStreamResponseListener;
 
 	@Autowired
 	private RedisCloseStreamMsgListener redisCloseStreamMsgListener;
 
-	@Autowired
-	private RedisPushStreamCloseResponseListener redisPushStreamCloseResponseListener;
 
 	@Autowired
-	private RedisPlatformWaitPushStreamOnlineListener redisPlatformWaitPushStreamOnlineListener;
-
-	@Autowired
-	private RedisPlatformStartSendRtpListener redisPlatformStartSendRtpListener;
-
-	@Autowired
-	private RedisPlatformPushStreamOnlineLister redisPlatformPushStreamOnlineLister;
+	private RedisRpcConfig redisRpcConfig;
 
 
 	/**
@@ -69,12 +59,8 @@
 		container.addMessageListener(redisAlarmMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_SUBSCRIBE_ALARM_RECEIVE));
 		container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE));
 		container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE));
-		container.addMessageListener(redisPushStreamResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE));
 		container.addMessageListener(redisCloseStreamMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE));
-		container.addMessageListener(redisPushStreamCloseResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED));
-		container.addMessageListener(redisPlatformStartSendRtpListener, new PatternTopic(VideoManagerConstants.START_SEND_PUSH_STREAM));
-		container.addMessageListener(redisPlatformWaitPushStreamOnlineListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_REQUESTED));
-		container.addMessageListener(redisPlatformPushStreamOnlineLister, new PatternTopic(VideoManagerConstants.PUSH_STREAM_ONLINE));
+		container.addMessageListener(redisRpcConfig, new PatternTopic(RedisRpcConfig.REDIS_REQUEST_CHANNEL_KEY));
         return container;
     }
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java
new file mode 100644
index 0000000..545fdc1
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java
@@ -0,0 +1,205 @@
+package com.genersoft.iot.vmp.conf.redis;
+
+import com.alibaba.fastjson2.JSON;
+import com.genersoft.iot.vmp.common.CommonCallback;
+import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage;
+import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
+import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
+import com.genersoft.iot.vmp.service.redisMsg.control.RedisRpcController;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.stereotype.Component;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+
+@Component
+public class RedisRpcConfig implements MessageListener {
+
+    private final static Logger logger = LoggerFactory.getLogger(RedisRpcConfig.class);
+
+    public final static String REDIS_REQUEST_CHANNEL_KEY = "WVP_REDIS_REQUEST_CHANNEL_KEY";
+
+    private final Random random = new Random();
+
+    @Autowired
+    private UserSetting userSetting;
+
+    @Autowired
+    private RedisRpcController redisRpcController;
+
+    @Autowired
+    private RedisTemplate<Object, Object> redisTemplate;
+
+    private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
+
+    @Qualifier("taskExecutor")
+    @Autowired
+    private ThreadPoolTaskExecutor taskExecutor;
+
+    @Override
+    public void onMessage(Message message, byte[] pattern) {
+        boolean isEmpty = taskQueue.isEmpty();
+        taskQueue.offer(message);
+        if (isEmpty) {
+            taskExecutor.execute(() -> {
+                while (!taskQueue.isEmpty()) {
+                    Message msg = taskQueue.poll();
+                    try {
+                        RedisRpcMessage redisRpcMessage = JSON.parseObject(new String(msg.getBody()), RedisRpcMessage.class);
+                        if (redisRpcMessage.getRequest() != null) {
+                            handlerRequest(redisRpcMessage.getRequest());
+                        } else if (redisRpcMessage.getResponse() != null){
+                            handlerResponse(redisRpcMessage.getResponse());
+                        } else {
+                            logger.error("[redis rpc 瑙f瀽澶辫触] {}", JSON.toJSONString(redisRpcMessage));
+                        }
+                    } catch (Exception e) {
+                        logger.error("[redis rpc 瑙f瀽寮傚父] ", e);
+                    }
+                }
+            });
+        }
+    }
+
+    private void handlerResponse(RedisRpcResponse response) {
+        if (userSetting.getServerId().equals(response.getToId())) {
+            return;
+        }
+        response(response);
+    }
+
+    private void handlerRequest(RedisRpcRequest request) {
+        try {
+            if (userSetting.getServerId().equals(request.getFromId())) {
+                return;
+            }
+            Method method = getMethod(request.getUri());
+            // 娌℃湁鎼哄甫鐩爣ID鐨勫彲浠ョ悊瑙d负鍝釜wvp鏈夌粨鏋滃氨鍝釜鍥炲锛屾惡甯︾洰鏍嘔D锛屼絾鏄鏋滄槸涓嶅瓨鍦ㄧ殑uri鍒欑洿鎺ュ洖澶�404
+            if (userSetting.getServerId().equals(request.getToId())) {
+                if (method == null) {
+                    // 鍥炲404缁撴灉
+                    RedisRpcResponse response = request.getResponse();
+                    response.setStatusCode(404);
+                    sendResponse(response);
+                    return;
+                }
+                RedisRpcResponse response = (RedisRpcResponse)method.invoke(redisRpcController, request);
+                if(response != null) {
+                    sendResponse(response);
+                }
+            }else {
+                if (method == null) {
+                    return;
+                }
+                RedisRpcResponse response = (RedisRpcResponse)method.invoke(redisRpcController, request);
+                if (response != null) {
+                    sendResponse(response);
+                }
+            }
+        }catch (InvocationTargetException | IllegalAccessException e) {
+            logger.error("[redis rpc ] 澶勭悊璇锋眰澶辫触 ", e);
+        }
+
+    }
+
+    private Method getMethod(String name) {
+        // 鍚姩鍚庢壂鎻忔墍鏈夌殑璺緞娉ㄨВ
+        Method[] methods = redisRpcController.getClass().getMethods();
+        for (Method method : methods) {
+            if (method.getName().equals(name)) {
+                return method;
+            }
+        }
+        return null;
+    }
+
+    private void sendResponse(RedisRpcResponse response){
+        response.setToId(userSetting.getServerId());
+        RedisRpcMessage message = new RedisRpcMessage();
+        message.setResponse(response);
+        redisTemplate.convertAndSend(REDIS_REQUEST_CHANNEL_KEY, message);
+    }
+
+    private void sendRequest(RedisRpcRequest request){
+        RedisRpcMessage message = new RedisRpcMessage();
+        message.setRequest(request);
+        redisTemplate.convertAndSend(REDIS_REQUEST_CHANNEL_KEY, message);
+    }
+
+
+    private final Map<Long, SynchronousQueue<RedisRpcResponse>> topicSubscribers = new ConcurrentHashMap<>();
+    private final Map<Long, CommonCallback<RedisRpcResponse>> callbacks = new ConcurrentHashMap<>();
+
+    public RedisRpcResponse request(RedisRpcRequest request, int timeOut) {
+        request.setSn((long) random.nextInt(1000) + 1);
+        SynchronousQueue<RedisRpcResponse> subscribe = subscribe(request.getSn());
+        try {
+            sendRequest(request);
+            return subscribe.poll(timeOut, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            logger.warn("[redis rpc timeout] uri: {}, sn: {}", request.getUri(), request.getSn(), e);
+        } finally {
+            this.unsubscribe(request.getSn());
+        }
+        return null;
+    }
+
+    public void request(RedisRpcRequest request, CommonCallback<RedisRpcResponse> callback) {
+        request.setSn((long) random.nextInt(1000) + 1);
+        setCallback(request.getSn(), callback);
+        sendRequest(request);
+    }
+
+    public Boolean response(RedisRpcResponse response) {
+        SynchronousQueue<RedisRpcResponse> queue = topicSubscribers.get(response.getSn());
+        CommonCallback<RedisRpcResponse> callback = callbacks.get(response.getSn());
+        if (queue != null) {
+            try {
+                return queue.offer(response, 2, TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                logger.error("{}", e.getMessage(), e);
+            }
+        }else if (callback != null) {
+            callback.run(response);
+            callbacks.remove(response.getSn());
+        }
+        return false;
+    }
+
+    private void unsubscribe(long key) {
+        topicSubscribers.remove(key);
+    }
+
+
+    private SynchronousQueue<RedisRpcResponse> subscribe(long key) {
+        SynchronousQueue<RedisRpcResponse> queue = null;
+        if (!topicSubscribers.containsKey(key))
+            topicSubscribers.put(key, queue = new SynchronousQueue<>());
+        return queue;
+    }
+
+    private void setCallback(long key, CommonCallback<RedisRpcResponse> callback)  {
+        if (!callbacks.containsKey(key)) {
+            callbacks.put(key, callback);
+        }
+
+    }
+
+    public void removeCallback(long key)  {
+        callbacks.remove(key);
+    }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcMessage.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcMessage.java
new file mode 100644
index 0000000..061df6f
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcMessage.java
@@ -0,0 +1,24 @@
+package com.genersoft.iot.vmp.conf.redis.bean;
+
+public class RedisRpcMessage {
+
+    private RedisRpcRequest request;
+
+    private RedisRpcResponse response;
+
+    public RedisRpcRequest getRequest() {
+        return request;
+    }
+
+    public void setRequest(RedisRpcRequest request) {
+        this.request = request;
+    }
+
+    public RedisRpcResponse getResponse() {
+        return response;
+    }
+
+    public void setResponse(RedisRpcResponse response) {
+        this.response = response;
+    }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcRequest.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcRequest.java
new file mode 100644
index 0000000..e31eb45
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcRequest.java
@@ -0,0 +1,93 @@
+package com.genersoft.iot.vmp.conf.redis.bean;
+
+/**
+ * 閫氳繃redis鍙戦�佽姹�
+ */
+public class RedisRpcRequest {
+
+    /**
+     * 鏉ヨ嚜鐨刉VP ID
+     */
+    private String fromId;
+
+
+    /**
+     * 鐩爣鐨刉VP ID
+     */
+    private String toId;
+
+    /**
+     * 搴忓垪鍙�
+     */
+    private long sn;
+
+    /**
+     * 璁块棶鐨勮矾寰�
+     */
+    private String uri;
+
+    /**
+     * 鍙傛暟
+     */
+    private Object param;
+
+    public String getFromId() {
+        return fromId;
+    }
+
+    public void setFromId(String fromId) {
+        this.fromId = fromId;
+    }
+
+    public String getToId() {
+        return toId;
+    }
+
+    public void setToId(String toId) {
+        this.toId = toId;
+    }
+
+    public String getUri() {
+        return uri;
+    }
+
+    public void setUri(String uri) {
+        this.uri = uri;
+    }
+
+    public Object getParam() {
+        return param;
+    }
+
+    public void setParam(Object param) {
+        this.param = param;
+    }
+
+    public long getSn() {
+        return sn;
+    }
+
+    public void setSn(long sn) {
+        this.sn = sn;
+    }
+
+    @Override
+    public String toString() {
+        return "RedisRpcRequest{" +
+                "fromId='" + fromId + '\'' +
+                ", toId='" + toId + '\'' +
+                ", sn='" + sn + '\'' +
+                ", uri='" + uri + '\'' +
+                ", param=" + param +
+                '}';
+    }
+
+    public RedisRpcResponse getResponse() {
+        RedisRpcResponse response = new RedisRpcResponse();
+        response.setFromId(fromId);
+        response.setToId(toId);
+        response.setSn(sn);
+        response.setUri(uri);
+        return response;
+    }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcResponse.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcResponse.java
new file mode 100644
index 0000000..ef94816
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcResponse.java
@@ -0,0 +1,87 @@
+package com.genersoft.iot.vmp.conf.redis.bean;
+
+/**
+ * 閫氳繃redis鍙戦�佸洖澶�
+ */
+public class RedisRpcResponse {
+
+    /**
+     * 鏉ヨ嚜鐨刉VP ID
+     */
+    private String fromId;
+
+
+    /**
+     * 鐩爣鐨刉VP ID
+     */
+    private String toId;
+
+
+    /**
+     * 搴忓垪鍙�
+     */
+    private long sn;
+
+    /**
+     * 鐘舵�佺爜
+     */
+    private int statusCode;
+
+    /**
+     * 璁块棶鐨勮矾寰�
+     */
+    private String uri;
+
+    /**
+     * 鍙傛暟
+     */
+    private Object body;
+
+    public String getFromId() {
+        return fromId;
+    }
+
+    public void setFromId(String fromId) {
+        this.fromId = fromId;
+    }
+
+    public String getToId() {
+        return toId;
+    }
+
+    public void setToId(String toId) {
+        this.toId = toId;
+    }
+
+    public long getSn() {
+        return sn;
+    }
+
+    public void setSn(long sn) {
+        this.sn = sn;
+    }
+
+    public int getStatusCode() {
+        return statusCode;
+    }
+
+    public void setStatusCode(int statusCode) {
+        this.statusCode = statusCode;
+    }
+
+    public String getUri() {
+        return uri;
+    }
+
+    public void setUri(String uri) {
+        this.uri = uri;
+    }
+
+    public Object getBody() {
+        return body;
+    }
+
+    public void setBody(Object body) {
+        this.body = body;
+    }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
index af7db2e..b748451 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
@@ -15,9 +15,11 @@
 import com.genersoft.iot.vmp.service.IDeviceService;
 import com.genersoft.iot.vmp.service.IMediaServerService;
 import com.genersoft.iot.vmp.service.IPlayService;
-import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
+import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
+import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
+import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.InitializingBean;
@@ -54,6 +56,8 @@
 
 	@Autowired
     private IRedisCatchStorage redisCatchStorage;
+	@Autowired
+    private IRedisRpcService redisRpcService;
 
 	@Autowired
     private UserSetting userSetting;
@@ -113,7 +117,15 @@
 		if (parentPlatform != null) {
 			Map<String, Object> param = getSendRtpParam(sendRtpItem);
 			if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) {
-				redisCatchStorage.sendStartSendRtp(sendRtpItem);
+//				redisCatchStorage.sendStartSendRtp(sendRtpItem);
+				WVPResult wvpResult = redisRpcService.startSendRtp(sendRtpItem);
+				if (wvpResult.getCode() == 0) {
+					MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, sendRtpItem.getApp(), sendRtpItem.getStream(),
+							sendRtpItem.getChannelId(), parentPlatform.getServerGBId(), parentPlatform.getName(), userSetting.getServerId(),
+							sendRtpItem.getMediaServerId());
+					messageForPushChannel.setPlatFormIndex(parentPlatform.getId());
+					redisCatchStorage.sendPlatformStartPlayMsg(messageForPushChannel);
+				}
 			} else {
 				JSONObject startSendRtpStreamResult = sendRtp(sendRtpItem, mediaInfo, param);
 				if (startSendRtpStreamResult != null) {
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
index 69f8142..2c134fd 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
@@ -18,7 +18,7 @@
 import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
 import com.genersoft.iot.vmp.service.*;
 import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
-import com.genersoft.iot.vmp.service.bean.RequestStopPushStreamMsg;
+import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
 import gov.nist.javax.sip.message.SIPRequest;
@@ -97,6 +97,9 @@
 	@Autowired
 	private IStreamPushService pushService;
 
+	@Autowired
+	private IRedisRpcService redisRpcService;
+
 
 	@Override
 	public void afterPropertiesSet() throws Exception {
@@ -134,13 +137,8 @@
 			if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) {
 				// 鏌ヨ杩欒矾娴佹槸鍚︽槸鏈钩鍙扮殑
 				StreamPushItem push = pushService.getPush(sendRtpItem.getApp(), sendRtpItem.getStream());
-				if (push!= null && !push.isSelf()) {
-					// 涓嶆槸鏈钩鍙扮殑灏卞彂閫乺edis娑堟伅璁╁叾浠杦vp鍋滄鍙戞祦
-					ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
-					if (platform != null) {
-						RequestStopPushStreamMsg streamMsg = RequestStopPushStreamMsg.getInstance(sendRtpItem, platform.getName(), platform.getId());
-//						redisGbPlayMsgListener.sendMsgForStopSendRtpStream(sendRtpItem.getServerId(), streamMsg);
-					}
+				if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) {
+					redisRpcService.stopSendRtp(sendRtpItem);
 				}else {
 					MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
 					redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
index f12f38a..59ff50c 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -19,7 +19,7 @@
 import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
 import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
 import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
-import com.genersoft.iot.vmp.service.redisMsg.RedisPlatformPushStreamOnlineLister;
+import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
 import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
 import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
 import com.genersoft.iot.vmp.media.zlm.dto.*;
@@ -29,7 +29,6 @@
 import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
 import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
-import com.genersoft.iot.vmp.service.redisMsg.RedisPushStreamResponseListener;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
 import com.genersoft.iot.vmp.utils.DateUtil;
@@ -86,16 +85,13 @@
     private IRedisCatchStorage redisCatchStorage;
 
     @Autowired
-    private IInviteStreamService inviteStreamService;
+    private IRedisRpcService redisRpcService;
 
     @Autowired
     private SSRCFactory ssrcFactory;
 
     @Autowired
     private DynamicTask dynamicTask;
-
-    @Autowired
-    private RedisPushStreamResponseListener redisPushStreamResponseListener;
 
     @Autowired
     private IPlayService playService;
@@ -120,9 +116,6 @@
 
     @Autowired
     private UserSetting userSetting;
-
-    @Autowired
-    private RedisPlatformPushStreamOnlineLister mediaListManager;
 
     @Autowired
     private SipConfig config;
@@ -594,15 +587,17 @@
                     sendRtpItem.setSessionName(sessionName);
 
                     if ("push".equals(gbStream.getStreamType())) {
+                        sendRtpItem.setPlayType(InviteStreamType.PUSH);
                         if (streamPushItem != null) {
                             // 浠巖edis鏌ヨ鏄惁姝e湪鎺ユ敹杩欎釜鎺ㄦ祦
                             OnStreamChangedHookParam pushListItem = redisCatchStorage.getPushListItem(gbStream.getApp(), gbStream.getStream());
-
                             if (pushListItem != null) {
                                 sendRtpItem.setServerId(pushListItem.getSeverId());
+                                sendRtpItem.setMediaServerId(pushListItem.getMediaServerId());
+
                                 StreamPushItem transform = streamPushService.transform(pushListItem);
                                 transform.setSelf(userSetting.getServerId().equals(pushListItem.getSeverId()));
-                                // 鎺ㄦ祦鐘舵��
+                                // 寮�濮嬫帹娴�
                                 sendPushStream(sendRtpItem, mediaServerItem, platform, request);
                             }else {
                                 if (!platform.isStartOfflinePush()) {
@@ -702,8 +697,6 @@
                 }
                 // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶�
                 sendRtpItem.setStatus(1);
-                sendRtpItem.setFromTag(request.getFromTag());
-                sendRtpItem.setLocalIp(mediaServerItem.getSdpIp());
                 SIPResponse response = sendStreamAck(request, sendRtpItem, platform);
                 if (response != null) {
                     sendRtpItem.setToTag(response.getToTag());
@@ -714,7 +707,6 @@
                     sendRtpItem.setSsrc(ssrc);
                 }
                 redisCatchStorage.updateSendRTPSever(sendRtpItem);
-
             } else {
                 // 涓嶅湪绾� 鎷夎捣
                 notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request);
@@ -769,18 +761,14 @@
         dynamicTask.startDelay(sendRtpItem.getCallId(), () -> {
             logger.info("[ app={}, stream={} ] 绛夊緟璁惧寮�濮嬫帹娴佽秴鏃�", sendRtpItem.getApp(), sendRtpItem.getStream());
             try {
-                redisPushStreamResponseListener.removeEvent(sendRtpItem.getApp(), sendRtpItem.getStream());
-                mediaListManager.removedChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream());
                 responseAck(request, Response.REQUEST_TIMEOUT); // 瓒呮椂
             } catch (SipException | InvalidArgumentException | ParseException e) {
                 logger.error("鏈鐞嗙殑寮傚父 ", e);
             }
         }, userSetting.getPlatformPlayTimeout());
-        redisCatchStorage.addWaiteSendRtpItem(sendRtpItem, userSetting.getPlatformPlayTimeout());
-        // 娣诲姞涓婄嚎鐨勯�氱煡
-        mediaListManager.addChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream(), (sendRtpItemFromRedis) -> {
+        //
+        redisRpcService.waitePushStreamOnline(sendRtpItem, (sendRtpItemFromRedis) -> {
             dynamicTask.stop(sendRtpItem.getCallId());
-            redisPushStreamResponseListener.removeEvent(sendRtpItem.getApp(), sendRtpItem.getStream());
             if (sendRtpItemFromRedis.getServerId().equals(userSetting.getServerId())) {
 
                 int localPort = sendRtpPortManager.getNextPort(mediaServerItem);
@@ -813,19 +801,7 @@
                 // 鍏朵粬骞冲彴鍐呭
                 otherWvpPushStream(sendRtpItemFromRedis, request, platform);
             }
-        });
 
-        // 娣诲姞鍥炲鐨勬嫆缁濇垨鑰呴敊璇殑閫氱煡
-        redisPushStreamResponseListener.addEvent(sendRtpItem.getApp(), sendRtpItem.getStream(), response -> {
-            if (response.getCode() != 0) {
-                dynamicTask.stop(sendRtpItem.getCallId());
-                mediaListManager.removedChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream());
-                try {
-                    responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg());
-                } catch (SipException | InvalidArgumentException | ParseException e) {
-                    logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鐐规挱鍥炲: {}", e.getMessage());
-                }
-            }
         });
     }
 
@@ -836,12 +812,9 @@
      */
     private void otherWvpPushStream(SendRtpItem sendRtpItem, SIPRequest request, ParentPlatform platform) {
         logger.info("[绾ц仈鐐规挱]鐩存挱娴佹潵鑷叾浠栧钩鍙帮紝鍙戦�乺edis娑堟伅");
-        // 鍙戦�乺edis娑堟伅
-        redisCatchStorage.sendStartSendRtp(sendRtpItem);
+        sendRtpItem = redisRpcService.getSendRtpItem(sendRtpItem);
         // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶�
         sendRtpItem.setStatus(1);
-        sendRtpItem.setCallId(request.getCallIdHeader().getCallId());
-        sendRtpItem.setFromTag(request.getFromTag());
         SIPResponse response = sendStreamAck(request, sendRtpItem, platform);
         if (response != null) {
             sendRtpItem.setToTag(response.getToTag());
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
index 346b4a2..32bf76a 100755
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -23,7 +23,6 @@
 import com.genersoft.iot.vmp.service.*;
 import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
-import com.genersoft.iot.vmp.service.redisMsg.RedisPlatformPushStreamOnlineLister;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
 import com.genersoft.iot.vmp.utils.DateUtil;
@@ -99,9 +98,6 @@
 
     @Autowired
     private EventPublisher eventPublisher;
-
-    @Autowired
-    private RedisPlatformPushStreamOnlineLister zlmMediaListManager;
 
     @Autowired
     private ZlmHttpHookSubscribe subscribe;
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java
index 027e990..ec24abe 100755
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java
@@ -365,4 +365,13 @@
         }
         return result;
     }
+
+    public JSONObject stopSendRtpStream(MediaServerItem mediaServerItem, SendRtpItem sendRtpItem) {
+        Map<String, Object> param = new HashMap<>();
+        param.put("vhost", "__defaultVhost__");
+        param.put("app", sendRtpItem.getApp());
+        param.put("stream", sendRtpItem.getStream());
+        param.put("ssrc", sendRtpItem.getSsrc());
+        return zlmresTfulUtils.startSendRtp(mediaServerItem, param);
+    }
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java b/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java
index 2e6151d..972db32 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java
@@ -6,7 +6,6 @@
 import com.genersoft.iot.vmp.media.zlm.dto.ServerKeepaliveData;
 import com.genersoft.iot.vmp.service.bean.MediaServerLoad;
 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
-import com.genersoft.iot.vmp.vmanager.bean.RecordFile;
 
 import java.util.List;
 
@@ -97,4 +96,5 @@
 
     List<MediaServerItem> getAllWithAssistPort();
 
+    MediaServerItem getMediaServerByAppAndStream(String app, String stream);
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
index 190d665..aeb0dc8 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
@@ -25,7 +25,6 @@
 import com.genersoft.iot.vmp.utils.JsonUtil;
 import com.genersoft.iot.vmp.utils.redis.RedisUtil;
 import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
-import com.genersoft.iot.vmp.vmanager.bean.RecordFile;
 import okhttp3.OkHttpClient;
 import okhttp3.Request;
 import okhttp3.Response;
@@ -36,19 +35,15 @@
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.jdbc.datasource.DataSourceTransactionManager;
-import org.springframework.scheduling.annotation.Async;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.TransactionDefinition;
 import org.springframework.transaction.TransactionStatus;
-import org.springframework.util.Assert;
 import org.springframework.util.ObjectUtils;
 
 import java.io.File;
 import java.time.LocalDateTime;
 import java.util.*;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 
 /**
  * 濯掍綋鏈嶅姟鍣ㄨ妭鐐圭鐞�
@@ -751,6 +746,22 @@
 
     @Override
     public List<MediaServerItem> getAllWithAssistPort() {
+
         return mediaServerMapper.queryAllWithAssistPort();
     }
+
+    @Override
+    public MediaServerItem getMediaServerByAppAndStream(String app, String stream) {
+        List<MediaServerItem> mediaServerItemList = getAllOnline();
+        if (mediaServerItemList.isEmpty()) {
+            return null;
+        }
+        for (MediaServerItem mediaServerItem : mediaServerItemList) {
+            Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, app, stream);
+            if (streamReady) {
+                return mediaServerItem;
+            }
+        }
+        return null;
+    }
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java
new file mode 100644
index 0000000..a601ae9
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java
@@ -0,0 +1,16 @@
+package com.genersoft.iot.vmp.service.redisMsg;
+
+import com.genersoft.iot.vmp.common.CommonCallback;
+import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
+import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
+
+public interface IRedisRpcService {
+
+    SendRtpItem getSendRtpItem(SendRtpItem sendRtpItem);
+
+    WVPResult startSendRtp(SendRtpItem sendRtpItem);
+
+    void waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback<SendRtpItem> callback);
+
+    WVPResult stopSendRtp(SendRtpItem sendRtpItem);
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformPushStreamOnlineLister.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformPushStreamOnlineLister.java
deleted file mode 100755
index 8ad0807..0000000
--- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformPushStreamOnlineLister.java
+++ /dev/null
@@ -1,97 +0,0 @@
-package com.genersoft.iot.vmp.service.redisMsg;
-
-import com.alibaba.fastjson2.JSON;
-import com.genersoft.iot.vmp.conf.UserSetting;
-import com.genersoft.iot.vmp.gb28181.bean.GbStream;
-import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData;
-import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
-import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
-import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
-import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
-import com.genersoft.iot.vmp.media.zlm.dto.*;
-import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
-import com.genersoft.iot.vmp.service.IMediaServerService;
-import com.genersoft.iot.vmp.service.IStreamProxyService;
-import com.genersoft.iot.vmp.service.IStreamPushService;
-import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse;
-import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
-import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
-import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
-import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper;
-import com.genersoft.iot.vmp.storager.dao.StreamPushMapper;
-import com.genersoft.iot.vmp.utils.DateUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.data.redis.connection.Message;
-import org.springframework.data.redis.connection.MessageListener;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-import org.springframework.stereotype.Component;
-
-import java.text.ParseException;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-/**
- * @author lin
- */
-@Component
-public class RedisPlatformPushStreamOnlineLister implements MessageListener {
-
-    private final Logger logger = LoggerFactory.getLogger("RedisPlatformPushStreamOnlineLister");
-
-    private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
-
-    @Qualifier("taskExecutor")
-    @Autowired
-    private ThreadPoolTaskExecutor taskExecutor;
-
-    /**
-     * 閫氳繃redis娑堟伅鎺ユ敹娴佷笂绾跨殑閫氱煡锛屽鏋滄湰鏈虹敱瀵硅繖涓祦鐨勭洃鍚紝鍒欏洖璋�
-     */
-    @Override
-    public void onMessage(Message message, byte[] pattern) {
-        boolean isEmpty = taskQueue.isEmpty();
-        taskQueue.offer(message);
-        if (isEmpty) {
-            taskExecutor.execute(() -> {
-                while (!taskQueue.isEmpty()) {
-                    Message msg = taskQueue.poll();
-                    SendRtpItem sendRtpItem = JSON.parseObject(new String(msg.getBody()), SendRtpItem.class);
-                    sendStreamEvent(sendRtpItem);
-                }
-            });
-        }
-    }
-
-    private final Map<String, ChannelOnlineEvent> channelOnPublishEvents = new ConcurrentHashMap<>();
-
-    public void sendStreamEvent(SendRtpItem sendRtpItem) {
-        // 鏌ョ湅鎺ㄦ祦鐘舵��
-        ChannelOnlineEvent channelOnlineEventLister = getChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream());
-        if (channelOnlineEventLister != null)  {
-            try {
-                channelOnlineEventLister.run(sendRtpItem);
-            } catch (ParseException e) {
-                logger.error("sendStreamEvent: ", e);
-            }
-            removedChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream());
-        }
-    }
-
-    public void addChannelOnlineEventLister(String app, String stream, ChannelOnlineEvent callback) {
-        this.channelOnPublishEvents.put(app + "_" + stream, callback);
-    }
-
-    public void removedChannelOnlineEventLister(String app, String stream) {
-        this.channelOnPublishEvents.remove(app + "_" + stream);
-    }
-
-    public ChannelOnlineEvent getChannelOnlineEventLister(String app, String stream) {
-        return this.channelOnPublishEvents.get(app + "_" + stream);
-    }
-
-
-}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java
deleted file mode 100755
index 25dd334..0000000
--- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java
+++ /dev/null
@@ -1,122 +0,0 @@
-package com.genersoft.iot.vmp.service.redisMsg;
-
-import com.alibaba.fastjson2.JSON;
-import com.alibaba.fastjson2.JSONObject;
-import com.genersoft.iot.vmp.conf.UserSetting;
-import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
-import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
-import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
-import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
-import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
-import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
-import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
-import com.genersoft.iot.vmp.service.IMediaServerService;
-import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.data.redis.connection.Message;
-import org.springframework.data.redis.connection.MessageListener;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-import org.springframework.stereotype.Component;
-import org.springframework.util.ObjectUtils;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-/**
- * 鏀跺埌娑堟伅鍚庡紑濮嬬粰涓婄骇鍙戞祦
- * @author lin
- */
-@Component
-public class RedisPlatformStartSendRtpListener implements MessageListener {
-
-    private final static Logger logger = LoggerFactory.getLogger(RedisPlatformStartSendRtpListener.class);
-
-    private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
-
-    @Autowired
-    private ZLMServerFactory zlmServerFactory;
-
-    @Autowired
-    private IMediaServerService mediaServerService;
-
-    @Qualifier("taskExecutor")
-    @Autowired
-    private ThreadPoolTaskExecutor taskExecutor;
-
-
-    @Override
-    public void onMessage(Message message, byte[] bytes) {
-        logger.info("[REDIS娑堟伅-鏀跺埌涓婄骇绛夊埌璁惧鎺ㄦ祦鐨剅edis娑堟伅]锛� {}", new String(message.getBody()));
-        boolean isEmpty = taskQueue.isEmpty();
-        taskQueue.offer(message);
-        if (isEmpty) {
-            taskExecutor.execute(() -> {
-                while (!taskQueue.isEmpty()) {
-                    Message msg = taskQueue.poll();
-                    try {
-                        SendRtpItem sendRtpItem = JSON.parseObject(new String(msg.getBody()), SendRtpItem.class);
-                        sendRtpItem.getMediaServerId();
-                        MediaServerItem mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
-                        if (mediaServer == null) {
-                            return;
-                        }
-                        Map<String, Object> sendRtpParam = getSendRtpParam(sendRtpItem);
-                        sendRtp(sendRtpItem, mediaServer, sendRtpParam);
-
-                    }catch (Exception e) {
-                        logger.warn("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉] 鍙戠幇鏈鐞嗙殑寮傚父, \r\n{}", JSON.toJSONString(message));
-                        logger.error("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉] 寮傚父鍐呭锛� ", e);
-                    }
-                }
-            });
-        }
-    }
-
-    private Map<String, Object> getSendRtpParam(SendRtpItem sendRtpItem) {
-        String isUdp = sendRtpItem.isTcp() ? "0" : "1";
-        Map<String, Object> param = new HashMap<>(12);
-        param.put("vhost","__defaultVhost__");
-        param.put("app",sendRtpItem.getApp());
-        param.put("stream",sendRtpItem.getStream());
-        param.put("ssrc", sendRtpItem.getSsrc());
-        param.put("dst_url",sendRtpItem.getIp());
-        param.put("dst_port", sendRtpItem.getPort());
-        param.put("src_port", sendRtpItem.getLocalPort());
-        param.put("pt", sendRtpItem.getPt());
-        param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
-        param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
-        param.put("is_udp", isUdp);
-        if (!sendRtpItem.isTcp()) {
-            // udp妯″紡涓嬪紑鍚痳tcp淇濇椿
-            param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0");
-        }
-        return param;
-    }
-
-    private JSONObject sendRtp(SendRtpItem sendRtpItem, MediaServerItem mediaInfo, Map<String, Object> param){
-        JSONObject startSendRtpStreamResult = null;
-        if (sendRtpItem.getLocalPort() != 0) {
-            if (sendRtpItem.isTcpActive()) {
-                startSendRtpStreamResult = zlmServerFactory.startSendRtpPassive(mediaInfo, param);
-            }else {
-                param.put("dst_url", sendRtpItem.getIp());
-                param.put("dst_port", sendRtpItem.getPort());
-                startSendRtpStreamResult = zlmServerFactory.startSendRtpStream(mediaInfo, param);
-            }
-        }else {
-            if (sendRtpItem.isTcpActive()) {
-                startSendRtpStreamResult = zlmServerFactory.startSendRtpPassive(mediaInfo, param);
-            }else {
-                param.put("dst_url", sendRtpItem.getIp());
-                param.put("dst_port", sendRtpItem.getPort());
-                startSendRtpStreamResult = zlmServerFactory.startSendRtpStream(mediaInfo, param);
-            }
-        }
-        return startSendRtpStreamResult;
-
-    }
-}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformWaitPushStreamOnlineListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformWaitPushStreamOnlineListener.java
deleted file mode 100755
index 25600a2..0000000
--- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformWaitPushStreamOnlineListener.java
+++ /dev/null
@@ -1,106 +0,0 @@
-package com.genersoft.iot.vmp.service.redisMsg;
-
-import com.alibaba.fastjson2.JSON;
-import com.genersoft.iot.vmp.conf.UserSetting;
-import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
-import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
-import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
-import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
-import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
-import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
-import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
-import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
-import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.data.redis.connection.Message;
-import org.springframework.data.redis.connection.MessageListener;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-import org.springframework.stereotype.Component;
-import org.springframework.util.ObjectUtils;
-
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-/**
- * 涓婄骇绛夊埌璁惧鎺ㄦ祦鐨剅edis娑堟伅
- * @author lin
- */
-@Component
-public class RedisPlatformWaitPushStreamOnlineListener implements MessageListener {
-
-    private final static Logger logger = LoggerFactory.getLogger(RedisPlatformWaitPushStreamOnlineListener.class);
-
-    private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
-
-    @Autowired
-    private UserSetting userSetting;
-
-    @Autowired
-    private IRedisCatchStorage redisCatchStorage;
-
-    @Autowired
-    private ZlmHttpHookSubscribe hookSubscribe;
-
-    @Autowired
-    private RedisPlatformPushStreamOnlineLister redisPlatformPushStreamOnlineLister;
-
-    @Autowired
-    private SSRCFactory ssrcFactory;
-
-    @Qualifier("taskExecutor")
-    @Autowired
-    private ThreadPoolTaskExecutor taskExecutor;
-
-
-    /**
-     * 褰撲笂绾х偣鎾椂锛岃繖閲岃礋璐g洃鍚瓑鍒版祦涓婄嚎锛屾祦涓婄嚎鍚庡鏋滄槸鍦ㄥ綋鍓嶆湇鍔″垯鐩存帴鍥炶皟锛屽鏋滄槸鍏朵粬wvp锛屽垯鐢眗edis娑堟伅杩涜閫氱煡
-     */
-    @Override
-    public void onMessage(Message message, byte[] bytes) {
-        logger.info("[REDIS娑堟伅-鏀跺埌涓婄骇绛夊埌璁惧鎺ㄦ祦鐨剅edis娑堟伅]锛� {}", new String(message.getBody()));
-        boolean isEmpty = taskQueue.isEmpty();
-        taskQueue.offer(message);
-        if (isEmpty) {
-            taskExecutor.execute(() -> {
-                while (!taskQueue.isEmpty()) {
-                    Message msg = taskQueue.poll();
-                    try {
-                        MessageForPushChannel messageForPushChannel = JSON.parseObject(new String(msg.getBody()), MessageForPushChannel.class);
-                        if (messageForPushChannel == null
-                                || ObjectUtils.isEmpty(messageForPushChannel.getApp())
-                                || ObjectUtils.isEmpty(messageForPushChannel.getStream())
-                        || userSetting.getServerId().equals(messageForPushChannel.getServerId())){
-                            continue;
-                        }
-
-                        // 鐩戝惉娴佷笂绾裤�� 娴佷笂绾跨洿鎺ュ彂閫乻endRtpItem娑堟伅缁欏疄闄呯殑淇′护澶勭悊鑰�
-                        HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed(
-                                messageForPushChannel.getApp(), messageForPushChannel.getStream(), true, "rtsp",
-                                null);
-                        hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> {
-                            // 璇诲彇redis涓殑涓婄骇鐐规挱淇℃伅锛岀敓鎴恠endRtpItm鍙戦�佸嚭鍘�
-                            SendRtpItem sendRtpItem = redisCatchStorage.getWaiteSendRtpItem(messageForPushChannel.getApp(), messageForPushChannel.getStream());
-                            if (sendRtpItem.getSsrc() == null) {
-                                // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡
-                                String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItemInUse.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItemInUse.getId());
-                                sendRtpItem.setSsrc(ssrc);
-                                sendRtpItem.setMediaServerId(mediaServerItemInUse.getId());
-                                sendRtpItem.setLocalIp(mediaServerItemInUse.getSdpIp());
-                                redisPlatformPushStreamOnlineLister.sendStreamEvent(sendRtpItem);
-                                // 閫氱煡鍏朵粬wvp锛� 鐢盧edisPlatformPushStreamOnlineLister鎺ユ敹姝ょ洃鍚��
-                                redisCatchStorage.sendPushStreamOnline(sendRtpItem);
-                            }
-                        });
-
-
-                    }catch (Exception e) {
-                        logger.warn("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉] 鍙戠幇鏈鐞嗙殑寮傚父, \r\n{}", JSON.toJSONString(message));
-                        logger.error("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉] 寮傚父鍐呭锛� ", e);
-                    }
-                }
-            });
-        }
-    }
-}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java
deleted file mode 100755
index a031573..0000000
--- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java
+++ /dev/null
@@ -1,99 +0,0 @@
-package com.genersoft.iot.vmp.service.redisMsg;
-
-import com.alibaba.fastjson2.JSON;
-import com.genersoft.iot.vmp.conf.UserSetting;
-import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
-import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
-import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
-import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
-import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
-import com.genersoft.iot.vmp.service.IMediaServerService;
-import com.genersoft.iot.vmp.service.IStreamPushService;
-import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
-import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse;
-import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
-import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.data.redis.connection.Message;
-import org.springframework.data.redis.connection.MessageListener;
-import org.springframework.stereotype.Component;
-
-import javax.sip.InvalidArgumentException;
-import javax.sip.SipException;
-import java.text.ParseException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * 鎺ユ敹redis鍙戦�佺殑缁撴潫鎺ㄦ祦璇锋眰
- * @author lin
- */
-@Component
-public class RedisPushStreamCloseResponseListener implements MessageListener {
-
-    private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamCloseResponseListener.class);
-
-    @Autowired
-    private IStreamPushService streamPushService;
-
-    @Autowired
-    private IRedisCatchStorage redisCatchStorage;
-
-    @Autowired
-    private IVideoManagerStorage storager;
-
-    @Autowired
-    private ISIPCommanderForPlatform commanderFroPlatform;
-
-    @Autowired
-    private UserSetting userSetting;
-
-    @Autowired
-    private IMediaServerService mediaServerService;
-
-    @Autowired
-    private ZLMServerFactory zlmServerFactory;
-
-
-    private Map<String, PushStreamResponseEvent> responseEvents = new ConcurrentHashMap<>();
-
-    public interface PushStreamResponseEvent{
-        void run(MessageForPushChannelResponse response);
-    }
-
-    @Override
-    public void onMessage(Message message, byte[] bytes) {
-        logger.info("[REDIS娑堟伅-鎺ㄦ祦缁撴潫]锛� {}", new String(message.getBody()));
-        MessageForPushChannel pushChannel = JSON.parseObject(message.getBody(), MessageForPushChannel.class);
-        StreamPushItem push = streamPushService.getPush(pushChannel.getApp(), pushChannel.getStream());
-        if (push != null) {
-            List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChannelId(
-                    push.getGbId());
-            if (!sendRtpItems.isEmpty()) {
-                for (SendRtpItem sendRtpItem : sendRtpItems) {
-                    ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
-                    if (parentPlatform != null) {
-                        redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStream());
-                        try {
-                            commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem);
-                        } catch (SipException | InvalidArgumentException | ParseException e) {
-                            logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鍙戦�丅YE: {}", e.getMessage());
-                        }
-                    }
-                }
-            }
-        }
-
-    }
-
-    public void addEvent(String app, String stream, PushStreamResponseEvent callback) {
-        responseEvents.put(app + stream, callback);
-    }
-
-    public void removeEvent(String app, String stream) {
-        responseEvents.remove(app + stream);
-    }
-}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java
deleted file mode 100755
index c90771b..0000000
--- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java
+++ /dev/null
@@ -1,76 +0,0 @@
-package com.genersoft.iot.vmp.service.redisMsg;
-
-import com.alibaba.fastjson2.JSON;
-import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.data.redis.connection.Message;
-import org.springframework.data.redis.connection.MessageListener;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-import org.springframework.stereotype.Component;
-import org.springframework.util.ObjectUtils;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-/**
- * 鎺ユ敹redis杩斿洖鐨勬帹娴佺粨鏋�
- * @author lin
- */
-@Component
-public class RedisPushStreamResponseListener implements MessageListener {
-
-    private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamResponseListener.class);
-
-    private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
-
-    @Qualifier("taskExecutor")
-    @Autowired
-    private ThreadPoolTaskExecutor taskExecutor;
-
-
-    private Map<String, PushStreamResponseEvent> responseEvents = new ConcurrentHashMap<>();
-
-    public interface PushStreamResponseEvent{
-        void run(MessageForPushChannelResponse response);
-    }
-
-    @Override
-    public void onMessage(Message message, byte[] bytes) {
-        logger.info("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉]锛� {}", new String(message.getBody()));
-        boolean isEmpty = taskQueue.isEmpty();
-        taskQueue.offer(message);
-        if (isEmpty) {
-            taskExecutor.execute(() -> {
-                while (!taskQueue.isEmpty()) {
-                    Message msg = taskQueue.poll();
-                    try {
-                        MessageForPushChannelResponse response = JSON.parseObject(new String(msg.getBody()), MessageForPushChannelResponse.class);
-                        if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){
-                            logger.info("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉]锛氬弬鏁颁笉鍏�");
-                            continue;
-                        }
-                        // 鏌ョ湅姝e湪绛夊緟鐨刬nvite娑堟伅
-                        if (responseEvents.get(response.getApp() + response.getStream()) != null) {
-                            responseEvents.get(response.getApp() + response.getStream()).run(response);
-                        }
-                    }catch (Exception e) {
-                        logger.warn("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉] 鍙戠幇鏈鐞嗙殑寮傚父, \r\n{}", JSON.toJSONString(message));
-                        logger.error("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉] 寮傚父鍐呭锛� ", e);
-                    }
-                }
-            });
-        }
-    }
-
-    public void addEvent(String app, String stream, PushStreamResponseEvent callback) {
-        responseEvents.put(app + stream, callback);
-    }
-
-    public void removeEvent(String app, String stream) {
-        responseEvents.remove(app + stream);
-    }
-}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java
new file mode 100644
index 0000000..7a81eab
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java
@@ -0,0 +1,203 @@
+package com.genersoft.iot.vmp.service.redisMsg.control;
+
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONObject;
+import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
+import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage;
+import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
+import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
+import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
+import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
+import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
+import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
+import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
+import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
+import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
+import com.genersoft.iot.vmp.service.IMediaServerService;
+import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
+import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Component;
+
+/**
+ * 鍏朵粬wvp鍙戣捣鐨剅pc璋冪敤锛岃繖閲岀殑鏂规硶琚� RedisRpcConfig 閫氳繃鍙嶅皠瀵绘壘瀵瑰簲鐨勬柟娉曞悕绉拌皟鐢�
+ */
+@Component
+public class RedisRpcController {
+
+    private final static Logger logger = LoggerFactory.getLogger(RedisRpcController.class);
+
+    @Autowired
+    private SSRCFactory ssrcFactory;
+
+    @Autowired
+    private IMediaServerService mediaServerService;
+
+    @Autowired
+    private SendRtpPortManager sendRtpPortManager;
+
+    @Autowired
+    private IRedisCatchStorage redisCatchStorage;
+
+    @Autowired
+    private UserSetting userSetting;
+
+    @Autowired
+    private ZlmHttpHookSubscribe hookSubscribe;
+
+    @Autowired
+    private ZLMServerFactory zlmServerFactory;
+
+
+    @Autowired
+    private RedisTemplate<Object, Object> redisTemplate;
+
+
+    /**
+     * 鑾峰彇鍙戞祦鐨勪俊鎭�
+     */
+    public RedisRpcResponse getSendRtpItem(RedisRpcRequest request) {
+        SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class);
+        logger.info("[redis-rpc] 鑾峰彇鍙戞祦鐨勪俊鎭細 {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() );
+        // 鏌ヨ鏈骇鏄惁鏈夎繖涓祦
+        MediaServerItem mediaServerItem = mediaServerService.getMediaServerByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream());
+        if (mediaServerItem == null) {
+            RedisRpcResponse response = request.getResponse();
+            response.setStatusCode(200);
+        }
+        // 鑷钩鍙板唴瀹�
+        int localPort = sendRtpPortManager.getNextPort(mediaServerItem);
+        if (localPort == 0) {
+            logger.info("[redis-rpc] getSendRtpItem->鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�" );
+            RedisRpcResponse response = request.getResponse();
+            response.setStatusCode(200);
+        }
+        // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶�
+        sendRtpItem.setStatus(1);
+        sendRtpItem.setServerId(userSetting.getServerId());
+        sendRtpItem.setLocalIp(mediaServerItem.getSdpIp());
+        if (sendRtpItem.getSsrc() == null) {
+            // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡
+            String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
+            sendRtpItem.setSsrc(ssrc);
+        }
+        redisCatchStorage.updateSendRTPSever(sendRtpItem);
+        RedisRpcResponse response = request.getResponse();
+        response.setStatusCode(200);
+        response.setBody(sendRtpItem);
+        return response;
+    }
+
+    /**
+     * 鐩戝惉娴佷笂绾�
+     */
+    public RedisRpcResponse waitePushStreamOnline(RedisRpcRequest request) {
+        SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class);
+        logger.info("[redis-rpc] 鐩戝惉娴佷笂绾匡細 {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() );
+        // 鏌ヨ鏈骇鏄惁鏈夎繖涓祦
+        MediaServerItem mediaServerItem = mediaServerService.getMediaServerByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream());
+        if (mediaServerItem != null) {
+            logger.info("[redis-rpc] 鐩戝惉娴佷笂绾挎椂鍙戠幇娴佸凡瀛樺湪鐩存帴杩斿洖锛� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() );
+            RedisRpcResponse response = request.getResponse();
+            response.setBody(sendRtpItem);
+            response.setStatusCode(200);
+        }
+        // 鐩戝惉娴佷笂绾裤�� 娴佷笂绾跨洿鎺ュ彂閫乻endRtpItem娑堟伅缁欏疄闄呯殑淇′护澶勭悊鑰�
+        HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed(
+                sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null);
+
+        hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> {
+            logger.info("[redis-rpc] 鐩戝惉娴佷笂绾匡紝娴佸凡涓婄嚎锛� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() );
+            // 璇诲彇redis涓殑涓婄骇鐐规挱淇℃伅锛岀敓鎴恠endRtpItm鍙戦�佸嚭鍘�
+            if (sendRtpItem.getSsrc() == null) {
+                // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡
+                String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItemInUse.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItemInUse.getId());
+                sendRtpItem.setSsrc(ssrc);
+            }
+            sendRtpItem.setMediaServerId(mediaServerItemInUse.getId());
+            sendRtpItem.setLocalIp(mediaServerItemInUse.getSdpIp());
+            sendRtpItem.setServerId(userSetting.getServerId());
+            RedisRpcResponse response = request.getResponse();
+            response.setBody(sendRtpItem);
+            response.setStatusCode(200);
+            // 鎵嬪姩鍙戦�佺粨鏋�
+            sendResponse(response);
+
+        });
+        return null;
+    }
+
+
+    /**
+     * 寮�濮嬪彂娴�
+     */
+    public RedisRpcResponse startSendRtp(RedisRpcRequest request) {
+        SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class);
+        MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId());
+        if (mediaServerItem == null) {
+            logger.info("[redis-rpc] startSendRtp->鏈壘鍒癕ediaServer锛� {}", sendRtpItem.getMediaServerId() );
+            RedisRpcResponse response = request.getResponse();
+            response.setStatusCode(200);
+        }
+
+        Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
+        if (!streamReady) {
+            logger.info("[redis-rpc] startSendRtp->娴佷笉鍦ㄧ嚎锛� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() );
+            RedisRpcResponse response = request.getResponse();
+            response.setStatusCode(200);
+        }
+        JSONObject jsonObject = zlmServerFactory.startSendRtp(mediaServerItem, sendRtpItem);
+        RedisRpcResponse response = request.getResponse();
+        response.setStatusCode(200);
+        if (jsonObject.getInteger("code") == 0) {
+            WVPResult wvpResult = WVPResult.success();
+            response.setBody(wvpResult);
+        }else {
+            WVPResult wvpResult = WVPResult.fail(jsonObject.getInteger("code"), jsonObject.getString("msg"));
+            response.setBody(wvpResult);
+        }
+        return response;
+    }
+
+    /**
+     * 鍋滄鍙戞祦
+     */
+    public RedisRpcResponse stopSendRtp(RedisRpcRequest request) {
+        SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class);
+        logger.info("[redis-rpc] 鍋滄鎺ㄦ祦锛� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() );
+        MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId());
+        if (mediaServerItem == null) {
+            logger.info("[redis-rpc] stopSendRtp->鏈壘鍒癕ediaServer锛� {}", sendRtpItem.getMediaServerId() );
+            RedisRpcResponse response = request.getResponse();
+            response.setStatusCode(200);
+        }
+        JSONObject jsonObject = zlmServerFactory.stopSendRtpStream(mediaServerItem, sendRtpItem);
+        RedisRpcResponse response = request.getResponse();
+        response.setStatusCode(200);
+        if (jsonObject.getInteger("code") == 0) {
+            logger.info("[redis-rpc] 鍋滄鎺ㄦ祦鎴愬姛锛� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() );
+            WVPResult wvpResult = WVPResult.success();
+            response.setBody(wvpResult);
+        }else {
+            int code = jsonObject.getInteger("code");
+            String msg = jsonObject.getString("msg");
+            logger.info("[redis-rpc] 鍋滄鎺ㄦ祦澶辫触锛� {}/{}, code锛� {}, msg: {}", sendRtpItem.getApp(), sendRtpItem.getStream(),code, msg );
+            WVPResult wvpResult = WVPResult.fail(code, msg);
+            response.setBody(wvpResult);
+        }
+        return response;
+    }
+
+    private void sendResponse(RedisRpcResponse response){
+        response.setToId(userSetting.getServerId());
+        RedisRpcMessage message = new RedisRpcMessage();
+        message.setResponse(response);
+        redisTemplate.convertAndSend(RedisRpcConfig.REDIS_REQUEST_CHANNEL_KEY, message);
+    }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java
new file mode 100644
index 0000000..e9a00a9
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java
@@ -0,0 +1,100 @@
+package com.genersoft.iot.vmp.service.redisMsg.service;
+
+import com.alibaba.fastjson2.JSON;
+import com.genersoft.iot.vmp.common.CommonCallback;
+import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
+import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
+import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
+import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
+import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
+import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
+import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
+import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
+import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
+import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+public class RedisRpcServiceImpl implements IRedisRpcService {
+
+
+    @Autowired
+    private RedisRpcConfig redisRpcConfig;
+
+    @Autowired
+    private UserSetting userSetting;
+
+    @Autowired
+    private ZlmHttpHookSubscribe hookSubscribe;
+
+    @Autowired
+    private SSRCFactory ssrcFactory;
+
+    private RedisRpcRequest buildRequest(String uri, Object param) {
+        RedisRpcRequest request = new RedisRpcRequest();
+        request.setFromId(userSetting.getServerId());
+        request.setParam(param);
+        request.setUri(uri);
+        return request;
+    }
+
+    @Override
+    public SendRtpItem getSendRtpItem(SendRtpItem sendRtpItem) {
+
+        RedisRpcRequest request = buildRequest("getSendRtpItem", sendRtpItem);
+        RedisRpcResponse response = redisRpcConfig.request(request, 10);
+        return JSON.parseObject(response.getBody().toString(), SendRtpItem.class);
+    }
+
+    @Override
+    public WVPResult startSendRtp(SendRtpItem sendRtpItem) {
+        RedisRpcRequest request = buildRequest("startSendRtp", sendRtpItem);
+        request.setToId(sendRtpItem.getServerId());
+        RedisRpcResponse response = redisRpcConfig.request(request, 10);
+        return JSON.parseObject(response.getBody().toString(), WVPResult.class);
+    }
+
+    @Override
+    public WVPResult stopSendRtp(SendRtpItem sendRtpItem) {
+        RedisRpcRequest request = buildRequest("stopSendRtp", sendRtpItem);
+        request.setToId(sendRtpItem.getServerId());
+        RedisRpcResponse response = redisRpcConfig.request(request, 10);
+        return JSON.parseObject(response.getBody().toString(), WVPResult.class);
+    }
+
+    @Override
+    public void waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback<SendRtpItem> callback) {
+        // 鐩戝惉娴佷笂绾裤�� 娴佷笂绾跨洿鎺ュ彂閫乻endRtpItem娑堟伅缁欏疄闄呯殑淇′护澶勭悊鑰�
+        HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed(
+                sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null);
+        hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> {
+
+            // 璇诲彇redis涓殑涓婄骇鐐规挱淇℃伅锛岀敓鎴恠endRtpItm鍙戦�佸嚭鍘�
+            if (sendRtpItem.getSsrc() == null) {
+                // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡
+                String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItemInUse.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItemInUse.getId());
+                sendRtpItem.setSsrc(ssrc);
+            }
+            sendRtpItem.setMediaServerId(mediaServerItemInUse.getId());
+            sendRtpItem.setLocalIp(mediaServerItemInUse.getSdpIp());
+            sendRtpItem.setServerId(userSetting.getServerId());
+            if (callback != null) {
+                callback.run(sendRtpItem);
+            }
+            hookSubscribe.removeSubscribe(hook);
+        });
+        RedisRpcRequest request = buildRequest("waitePushStreamOnline", sendRtpItem);
+        request.setToId(sendRtpItem.getServerId());
+        redisRpcConfig.request(request, response -> {
+            SendRtpItem sendRtpItemFromOther = JSON.parseObject(response.getBody().toString(), SendRtpItem.class);
+            if (callback != null) {
+                callback.run(sendRtpItemFromOther);
+            }
+        });
+
+    }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
index 7088837..60ebfab 100755
--- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -682,19 +682,20 @@
     @Override
     public void addWaiteSendRtpItem(SendRtpItem sendRtpItem, int platformPlayTimeout) {
         String key = VideoManagerConstants.WAITE_SEND_PUSH_STREAM + sendRtpItem.getApp() + "_" + sendRtpItem.getStream();
-        redisTemplate.opsForValue().set(key, platformPlayTimeout);
+        redisTemplate.opsForValue().set(key, sendRtpItem);
     }
 
     @Override
     public SendRtpItem getWaiteSendRtpItem(String app, String stream) {
         String key = VideoManagerConstants.WAITE_SEND_PUSH_STREAM + app + "_" + stream;
-        return (SendRtpItem)redisTemplate.opsForValue().get(key);
+        return JsonUtil.redisJsonToObject(redisTemplate, key, SendRtpItem.class);
     }
 
     @Override
     public void sendStartSendRtp(SendRtpItem sendRtpItem) {
         String key = VideoManagerConstants.START_SEND_PUSH_STREAM + sendRtpItem.getApp() + "_" + sendRtpItem.getStream();
-        redisTemplate.opsForValue().set(key, JSON.toJSONString(sendRtpItem));
+        logger.info("[redis鍙戦�侀�氱煡] 閫氱煡鍏朵粬WVP鎺ㄦ祦 {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getPlatformId());
+        redisTemplate.convertAndSend(key, JSON.toJSON(sendRtpItem));
     }
 
     @Override

--
Gitblit v1.8.0