From d41d6b34af2485198ed01e1888db1571e4da1a6a Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期二, 23 四月 2024 20:59:20 +0800
Subject: [PATCH] Merge branch 'refs/heads/2.7.0'

---
 src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java                                             |   20 
 src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java                                          |   17 
 src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcMessage.java                                             |   24 
 src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java                                                    |   65 +
 src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java                                               |    9 
 src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java                                       |   60 +
 src/main/java/com/genersoft/iot/vmp/vmanager/log/LogController.java                                                  |   11 
 src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java                                     |   29 
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java                    |    2 
 src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcResponse.java                                            |   99 +
 数据库/2.7.0/更新-mysql-2.7.0.sql                                                                                         |   12 
 src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java                                                  |   30 
 src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java                                         |   78 +
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java                     |   22 
 src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java                                           |   22 
 src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java                                                 |   13 
 src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java                                                            |    2 
 src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java                                          |    1 
 src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java                          |    4 
 src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ChannelOnlineEvent.java                                            |    4 
 数据库/2.7.0/更新-postgresql-kingbase-2.7.0.sql                                                                           |   12 
 src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java                                              |    3 
 src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcRequest.java                                             |   93 +
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java        |  363 +----
 src/main/java/com/genersoft/iot/vmp/gb28181/utils/XmlUtil.java                                                       |    7 
 src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java                                                |   12 
 src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java                                                   |  225 ++++
 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java                            |    0 
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java |  199 +++
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java                     |   23 
 src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java                                                |    3 
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java                  |  511 ++++-----
 /dev/null                                                                                                            |   97 -
 web_src/src/components/channelList.vue                                                                               |   12 
 src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java                                               |  357 ++++++
 src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java                                 |  304 +++++
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java                           |    4 
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java                  |  276 ++--
 src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java                                |  155 ++
 src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java                                                  |    9 
 src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java                                                  |    1 
 src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java                                            |   29 
 42 files changed, 2,374 insertions(+), 845 deletions(-)

diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
index df230d4..c4911e4 100644
--- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
+++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
@@ -72,6 +72,9 @@
 
 	public static final String REGISTER_EXPIRE_TASK_KEY_PREFIX = "VMP_device_register_expire_";
 	public static final String PUSH_STREAM_LIST = "VMP_PUSH_STREAM_LIST_";
+	public static final String WAITE_SEND_PUSH_STREAM = "VMP_WAITE_SEND_PUSH_STREAM:";
+	public static final String START_SEND_PUSH_STREAM = "VMP_START_SEND_PUSH_STREAM:";
+	public static final String PUSH_STREAM_ONLINE = "VMP_PUSH_STREAM_ONLINE:";
 
 
 
diff --git a/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java b/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java
index a9f5c88..a9b17ae 100644
--- a/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java
+++ b/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java
@@ -66,7 +66,7 @@
 
     private List<String> allowedOrigins = new ArrayList<>();
 
-    private int maxNotifyCountQueue = 10000;
+    private int maxNotifyCountQueue = 100000;
 
     private int registerAgainAfterTime = 60;
 
diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java
index c14ebcd..dcf2830 100644
--- a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java
+++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java
@@ -29,25 +29,21 @@
 	private RedisAlarmMsgListener redisAlarmMsgListener;
 
 	@Autowired
-	private RedisStreamMsgListener redisStreamMsgListener;
-
-	@Autowired
-	private RedisGbPlayMsgListener redisGbPlayMsgListener;
-
-	@Autowired
 	private RedisPushStreamStatusMsgListener redisPushStreamStatusMsgListener;
 
 	@Autowired
 	private RedisPushStreamStatusListMsgListener redisPushStreamListMsgListener;
 
-	@Autowired
-	private RedisPushStreamResponseListener redisPushStreamResponseListener;
 
 	@Autowired
 	private RedisCloseStreamMsgListener redisCloseStreamMsgListener;
 
+
 	@Autowired
-	private RedisPushStreamCloseResponseListener redisPushStreamCloseResponseListener;
+	private RedisRpcConfig redisRpcConfig;
+
+	@Autowired
+	private RedisPushStreamResponseListener redisPushStreamCloseResponseListener;
 
 
 	/**
@@ -64,13 +60,11 @@
         container.setConnectionFactory(connectionFactory);
 		container.addMessageListener(redisGPSMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_GPS));
 		container.addMessageListener(redisAlarmMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_SUBSCRIBE_ALARM_RECEIVE));
-		container.addMessageListener(redisStreamMsgListener, new PatternTopic(VideoManagerConstants.WVP_MSG_STREAM_CHANGE_PREFIX + "PUSH"));
-		container.addMessageListener(redisGbPlayMsgListener, new PatternTopic(RedisGbPlayMsgListener.WVP_PUSH_STREAM_KEY));
 		container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE));
 		container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE));
-		container.addMessageListener(redisPushStreamResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE));
 		container.addMessageListener(redisCloseStreamMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE));
-		container.addMessageListener(redisPushStreamCloseResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED));
+		container.addMessageListener(redisRpcConfig, new PatternTopic(RedisRpcConfig.REDIS_REQUEST_CHANNEL_KEY));
+		container.addMessageListener(redisPushStreamCloseResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE));
         return container;
     }
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java
new file mode 100644
index 0000000..661e370
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java
@@ -0,0 +1,225 @@
+package com.genersoft.iot.vmp.conf.redis;
+
+import com.alibaba.fastjson2.JSON;
+import com.genersoft.iot.vmp.common.CommonCallback;
+import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage;
+import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
+import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
+import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
+import com.genersoft.iot.vmp.service.redisMsg.control.RedisRpcController;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.stereotype.Component;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+
+@Component
+public class RedisRpcConfig implements MessageListener {
+
+    private final static Logger logger = LoggerFactory.getLogger(RedisRpcConfig.class);
+
+    public final static String REDIS_REQUEST_CHANNEL_KEY = "WVP_REDIS_REQUEST_CHANNEL_KEY";
+
+    private final Random random = new Random();
+
+    @Autowired
+    private UserSetting userSetting;
+
+    @Autowired
+    private RedisRpcController redisRpcController;
+
+    @Autowired
+    private RedisTemplate<Object, Object> redisTemplate;
+
+    @Autowired
+    private ZlmHttpHookSubscribe hookSubscribe;
+
+    private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
+
+    @Qualifier("taskExecutor")
+    @Autowired
+    private ThreadPoolTaskExecutor taskExecutor;
+
+    @Override
+    public void onMessage(Message message, byte[] pattern) {
+        boolean isEmpty = taskQueue.isEmpty();
+        taskQueue.offer(message);
+        if (isEmpty) {
+            taskExecutor.execute(() -> {
+                while (!taskQueue.isEmpty()) {
+                    Message msg = taskQueue.poll();
+                    try {
+                        RedisRpcMessage redisRpcMessage = JSON.parseObject(new String(msg.getBody()), RedisRpcMessage.class);
+                        if (redisRpcMessage.getRequest() != null) {
+                            handlerRequest(redisRpcMessage.getRequest());
+                        } else if (redisRpcMessage.getResponse() != null){
+                            handlerResponse(redisRpcMessage.getResponse());
+                        } else {
+                            logger.error("[redis rpc 瑙f瀽澶辫触] {}", JSON.toJSONString(redisRpcMessage));
+                        }
+                    } catch (Exception e) {
+                        logger.error("[redis rpc 瑙f瀽寮傚父] ", e);
+                    }
+                }
+            });
+        }
+    }
+
+    private void handlerResponse(RedisRpcResponse response) {
+        if (userSetting.getServerId().equals(response.getToId())) {
+            return;
+        }
+        logger.info("[redis-rpc] << {}", response);
+        response(response);
+    }
+
+    private void handlerRequest(RedisRpcRequest request) {
+        try {
+            if (userSetting.getServerId().equals(request.getFromId())) {
+                return;
+            }
+            logger.info("[redis-rpc] << {}", request);
+            Method method = getMethod(request.getUri());
+            // 娌℃湁鎼哄甫鐩爣ID鐨勫彲浠ョ悊瑙d负鍝釜wvp鏈夌粨鏋滃氨鍝釜鍥炲锛屾惡甯︾洰鏍嘔D锛屼絾鏄鏋滄槸涓嶅瓨鍦ㄧ殑uri鍒欑洿鎺ュ洖澶�404
+            if (userSetting.getServerId().equals(request.getToId())) {
+                if (method == null) {
+                    // 鍥炲404缁撴灉
+                    RedisRpcResponse response = request.getResponse();
+                    response.setStatusCode(404);
+                    sendResponse(response);
+                    return;
+                }
+                RedisRpcResponse response = (RedisRpcResponse)method.invoke(redisRpcController, request);
+                if(response != null) {
+                    sendResponse(response);
+                }
+            }else {
+                if (method == null) {
+                    return;
+                }
+                RedisRpcResponse response = (RedisRpcResponse)method.invoke(redisRpcController, request);
+                if (response != null) {
+                    sendResponse(response);
+                }
+            }
+        }catch (InvocationTargetException | IllegalAccessException e) {
+            logger.error("[redis rpc ] 澶勭悊璇锋眰澶辫触 ", e);
+        }
+
+    }
+
+    private Method getMethod(String name) {
+        // 鍚姩鍚庢壂鎻忔墍鏈夌殑璺緞娉ㄨВ
+        Method[] methods = redisRpcController.getClass().getMethods();
+        for (Method method : methods) {
+            if (method.getName().equals(name)) {
+                return method;
+            }
+        }
+        return null;
+    }
+
+    private void sendResponse(RedisRpcResponse response){
+        logger.info("[redis-rpc] >> {}", response);
+        response.setToId(userSetting.getServerId());
+        RedisRpcMessage message = new RedisRpcMessage();
+        message.setResponse(response);
+        redisTemplate.convertAndSend(REDIS_REQUEST_CHANNEL_KEY, message);
+    }
+
+    private void sendRequest(RedisRpcRequest request){
+        logger.info("[redis-rpc] >> {}", request);
+        RedisRpcMessage message = new RedisRpcMessage();
+        message.setRequest(request);
+        redisTemplate.convertAndSend(REDIS_REQUEST_CHANNEL_KEY, message);
+    }
+
+
+    private final Map<Long, SynchronousQueue<RedisRpcResponse>> topicSubscribers = new ConcurrentHashMap<>();
+    private final Map<Long, CommonCallback<RedisRpcResponse>> callbacks = new ConcurrentHashMap<>();
+
+    public RedisRpcResponse request(RedisRpcRequest request, int timeOut) {
+        request.setSn((long) random.nextInt(1000) + 1);
+        SynchronousQueue<RedisRpcResponse> subscribe = subscribe(request.getSn());
+
+        try {
+            sendRequest(request);
+            return subscribe.poll(timeOut, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            logger.warn("[redis rpc timeout] uri: {}, sn: {}", request.getUri(), request.getSn(), e);
+        } finally {
+            this.unsubscribe(request.getSn());
+        }
+        return null;
+    }
+
+    public void request(RedisRpcRequest request, CommonCallback<RedisRpcResponse> callback) {
+        request.setSn((long) random.nextInt(1000) + 1);
+        setCallback(request.getSn(), callback);
+        sendRequest(request);
+    }
+
+    public Boolean response(RedisRpcResponse response) {
+        SynchronousQueue<RedisRpcResponse> queue = topicSubscribers.get(response.getSn());
+        CommonCallback<RedisRpcResponse> callback = callbacks.get(response.getSn());
+        if (queue != null) {
+            try {
+                return queue.offer(response, 2, TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                logger.error("{}", e.getMessage(), e);
+            }
+        }else if (callback != null) {
+            callback.run(response);
+            callbacks.remove(response.getSn());
+        }
+        return false;
+    }
+
+    private void unsubscribe(long key) {
+        topicSubscribers.remove(key);
+    }
+
+
+    private SynchronousQueue<RedisRpcResponse> subscribe(long key) {
+        SynchronousQueue<RedisRpcResponse> queue = null;
+        if (!topicSubscribers.containsKey(key))
+            topicSubscribers.put(key, queue = new SynchronousQueue<>());
+        return queue;
+    }
+
+    private void setCallback(long key, CommonCallback<RedisRpcResponse> callback)  {
+        // TODO 濡傛灉澶氫釜涓婄骇鐐规挱鍚屼竴涓�氶亾浼氭湁闂
+        callbacks.put(key, callback);
+    }
+
+    public void removeCallback(long key)  {
+        callbacks.remove(key);
+    }
+
+
+    public int getCallbackCount(){
+        return callbacks.size();
+    }
+
+//    @Scheduled(fixedRate = 1000)   //姣�1绉掓墽琛屼竴娆�
+//    public void execute(){
+//        logger.info("callbacks鐨勯暱搴�: " + callbacks.size());
+//        logger.info("闃熷垪鐨勯暱搴�: " + topicSubscribers.size());
+//        logger.info("HOOK鐩戝惉鐨勯暱搴�: " + hookSubscribe.size());
+//        logger.info("");
+//    }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcMessage.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcMessage.java
new file mode 100644
index 0000000..061df6f
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcMessage.java
@@ -0,0 +1,24 @@
+package com.genersoft.iot.vmp.conf.redis.bean;
+
+public class RedisRpcMessage {
+
+    private RedisRpcRequest request;
+
+    private RedisRpcResponse response;
+
+    public RedisRpcRequest getRequest() {
+        return request;
+    }
+
+    public void setRequest(RedisRpcRequest request) {
+        this.request = request;
+    }
+
+    public RedisRpcResponse getResponse() {
+        return response;
+    }
+
+    public void setResponse(RedisRpcResponse response) {
+        this.response = response;
+    }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcRequest.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcRequest.java
new file mode 100644
index 0000000..a02db67
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcRequest.java
@@ -0,0 +1,93 @@
+package com.genersoft.iot.vmp.conf.redis.bean;
+
+/**
+ * 閫氳繃redis鍙戦�佽姹�
+ */
+public class RedisRpcRequest {
+
+    /**
+     * 鏉ヨ嚜鐨刉VP ID
+     */
+    private String fromId;
+
+
+    /**
+     * 鐩爣鐨刉VP ID
+     */
+    private String toId;
+
+    /**
+     * 搴忓垪鍙�
+     */
+    private long sn;
+
+    /**
+     * 璁块棶鐨勮矾寰�
+     */
+    private String uri;
+
+    /**
+     * 鍙傛暟
+     */
+    private Object param;
+
+    public String getFromId() {
+        return fromId;
+    }
+
+    public void setFromId(String fromId) {
+        this.fromId = fromId;
+    }
+
+    public String getToId() {
+        return toId;
+    }
+
+    public void setToId(String toId) {
+        this.toId = toId;
+    }
+
+    public String getUri() {
+        return uri;
+    }
+
+    public void setUri(String uri) {
+        this.uri = uri;
+    }
+
+    public Object getParam() {
+        return param;
+    }
+
+    public void setParam(Object param) {
+        this.param = param;
+    }
+
+    public long getSn() {
+        return sn;
+    }
+
+    public void setSn(long sn) {
+        this.sn = sn;
+    }
+
+    @Override
+    public String toString() {
+        return "RedisRpcRequest{" +
+                "uri='" + uri + '\'' +
+                ", fromId='" + fromId + '\'' +
+                ", toId='" + toId + '\'' +
+                ", sn=" + sn +
+                ", param=" + param +
+                '}';
+    }
+
+    public RedisRpcResponse getResponse() {
+        RedisRpcResponse response = new RedisRpcResponse();
+        response.setFromId(fromId);
+        response.setToId(toId);
+        response.setSn(sn);
+        response.setUri(uri);
+        return response;
+    }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcResponse.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcResponse.java
new file mode 100644
index 0000000..21f9e7e
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcResponse.java
@@ -0,0 +1,99 @@
+package com.genersoft.iot.vmp.conf.redis.bean;
+
+/**
+ * 閫氳繃redis鍙戦�佸洖澶�
+ */
+public class RedisRpcResponse {
+
+    /**
+     * 鏉ヨ嚜鐨刉VP ID
+     */
+    private String fromId;
+
+
+    /**
+     * 鐩爣鐨刉VP ID
+     */
+    private String toId;
+
+
+    /**
+     * 搴忓垪鍙�
+     */
+    private long sn;
+
+    /**
+     * 鐘舵�佺爜
+     */
+    private int statusCode;
+
+    /**
+     * 璁块棶鐨勮矾寰�
+     */
+    private String uri;
+
+    /**
+     * 鍙傛暟
+     */
+    private Object body;
+
+    public String getFromId() {
+        return fromId;
+    }
+
+    public void setFromId(String fromId) {
+        this.fromId = fromId;
+    }
+
+    public String getToId() {
+        return toId;
+    }
+
+    public void setToId(String toId) {
+        this.toId = toId;
+    }
+
+    public long getSn() {
+        return sn;
+    }
+
+    public void setSn(long sn) {
+        this.sn = sn;
+    }
+
+    public int getStatusCode() {
+        return statusCode;
+    }
+
+    public void setStatusCode(int statusCode) {
+        this.statusCode = statusCode;
+    }
+
+    public String getUri() {
+        return uri;
+    }
+
+    public void setUri(String uri) {
+        this.uri = uri;
+    }
+
+    public Object getBody() {
+        return body;
+    }
+
+    public void setBody(Object body) {
+        this.body = body;
+    }
+
+    @Override
+    public String toString() {
+        return "RedisRpcResponse{" +
+                "uri='" + uri + '\'' +
+                ", fromId='" + fromId + '\'' +
+                ", toId='" + toId + '\'' +
+                ", sn=" + sn +
+                ", statusCode=" + statusCode +
+                ", body=" + body +
+                '}';
+    }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java
index 32b6fac..8290a45 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java
@@ -187,6 +187,18 @@
 	private double latitude;
 
 	/**
+	 * 缁忓害
+	 */
+	@Schema(description = "鑷畾涔夌粡搴�")
+	private double customLongitude;
+
+	/**
+	 * 绾害
+	 */
+	@Schema(description = "鑷畾涔夌含搴�")
+	private double customLatitude;
+
+	/**
 	 * 缁忓害 GCJ02
 	 */
 	@Schema(description = "GCJ02鍧愭爣绯荤粡搴�")
@@ -226,7 +238,7 @@
 	 *  鏄惁鍚湁闊抽
 	 */
 	@Schema(description = "鏄惁鍚湁闊抽")
-	private boolean hasAudio;
+	private Boolean hasAudio;
 
 	/**
 	 * 鏍囪閫氶亾鐨勭被鍨嬶紝0->鍥芥爣閫氶亾 1->鐩存挱娴侀�氶亾 2->涓氬姟鍒嗙粍/铏氭嫙缁勭粐/琛屾斂鍖哄垝
@@ -586,4 +598,20 @@
 	public void setStreamIdentification(String streamIdentification) {
 		this.streamIdentification = streamIdentification;
 	}
+
+	public double getCustomLongitude() {
+		return customLongitude;
+	}
+
+	public void setCustomLongitude(double customLongitude) {
+		this.customLongitude = customLongitude;
+	}
+
+	public double getCustomLatitude() {
+		return customLatitude;
+	}
+
+	public void setCustomLatitude(double customLatitude) {
+		this.customLatitude = customLatitude;
+	}
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java
index c133c82..55f09df 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java
@@ -2,6 +2,8 @@
 
 import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
 
+import com.genersoft.iot.vmp.common.VideoManagerConstants;
+
 public class SendRtpItem {
 
     /**
@@ -23,6 +25,11 @@
      * 骞冲彴id
      */
     private String platformId;
+
+    /**
+     * 骞冲彴鍚嶇О
+     */
+    private String platformName;
 
      /**
      * 瀵瑰簲璁惧id
@@ -64,6 +71,11 @@
     private boolean tcpActive;
 
     /**
+     * 鑷繁鎺ㄦ祦浣跨敤鐨処P
+     */
+    private String localIp;
+
+    /**
      * 鑷繁鎺ㄦ祦浣跨敤鐨勭鍙�
      */
     private int localPort;
@@ -81,7 +93,7 @@
     /**
      *  invite 鐨� callId
      */
-    private String CallId;
+    private String callId;
 
     /**
      *  invite 鐨� fromTag
@@ -124,6 +136,11 @@
      */
     private String receiveStream;
 
+    /**
+     * 涓婄骇鐨勭偣鎾被鍨�
+     */
+    private String sessionName;
+
     public static SendRtpItem getInstance(RequestPushStreamMsg requestPushStreamMsg) {
         SendRtpItem sendRtpItem = new SendRtpItem();
         sendRtpItem.setMediaServerId(requestPushStreamMsg.getMediaServerId());
@@ -138,7 +155,7 @@
         sendRtpItem.setUsePs(requestPushStreamMsg.isPs());
         sendRtpItem.setOnlyAudio(requestPushStreamMsg.isOnlyAudio());
         return sendRtpItem;
-        
+
     }
 
     public static SendRtpItem getInstance(String app, String stream, String ssrc, String dstIp, Integer dstPort, boolean tcp, int sendLocalPort, Integer pt) {
@@ -262,11 +279,11 @@
     }
 
     public String getCallId() {
-        return CallId;
+        return callId;
     }
 
     public void setCallId(String callId) {
-        CallId = callId;
+        this.callId = callId;
     }
 
     public InviteStreamType getPlayType() {
@@ -341,6 +358,30 @@
         this.receiveStream = receiveStream;
     }
 
+    public String getPlatformName() {
+        return platformName;
+    }
+
+    public void setPlatformName(String platformName) {
+        this.platformName = platformName;
+    }
+
+    public String getLocalIp() {
+        return localIp;
+    }
+
+    public void setLocalIp(String localIp) {
+        this.localIp = localIp;
+    }
+
+    public String getSessionName() {
+        return sessionName;
+    }
+
+    public void setSessionName(String sessionName) {
+        this.sessionName = sessionName;
+    }
+
     @Override
     public String toString() {
         return "SendRtpItem{" +
@@ -348,6 +389,7 @@
                 ", port=" + port +
                 ", ssrc='" + ssrc + '\'' +
                 ", platformId='" + platformId + '\'' +
+                ", platformName='" + platformName + '\'' +
                 ", deviceId='" + deviceId + '\'' +
                 ", app='" + app + '\'' +
                 ", channelId='" + channelId + '\'' +
@@ -355,10 +397,11 @@
                 ", stream='" + stream + '\'' +
                 ", tcp=" + tcp +
                 ", tcpActive=" + tcpActive +
+                ", localIp='" + localIp + '\'' +
                 ", localPort=" + localPort +
                 ", mediaServerId='" + mediaServerId + '\'' +
                 ", serverId='" + serverId + '\'' +
-                ", CallId='" + CallId + '\'' +
+                ", CallId='" + callId + '\'' +
                 ", fromTag='" + fromTag + '\'' +
                 ", toTag='" + toTag + '\'' +
                 ", pt=" + pt +
@@ -367,6 +410,18 @@
                 ", rtcp=" + rtcp +
                 ", playType=" + playType +
                 ", receiveStream='" + receiveStream + '\'' +
+                ", sessionName='" + sessionName + '\'' +
                 '}';
     }
+
+    public String getRedisKey() {
+        String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX +
+                serverId + "_"
+                + mediaServerId + "_"
+                + platformId + "_"
+                + channelId + "_"
+                + stream + "_"
+                + callId;
+        return key;
+    }
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java
index 557563e..18ad2b0 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java
@@ -49,6 +49,7 @@
         ParentPlatform parentPlatform = null;
 
         Map<String, List<ParentPlatform>> parentPlatformMap = new HashMap<>();
+        Map<String, DeviceChannel> channelMap = new HashMap<>();
         if (!ObjectUtils.isEmpty(event.getPlatformId())) {
             subscribe = subscribeHolder.getCatalogSubscribe(event.getPlatformId());
             if (subscribe == null) {
@@ -67,6 +68,7 @@
                     for (DeviceChannel deviceChannel : event.getDeviceChannels()) {
                         List<ParentPlatform> parentPlatformsForGB = storager.queryPlatFormListForGBWithGBId(deviceChannel.getChannelId(), platforms);
                         parentPlatformMap.put(deviceChannel.getChannelId(), parentPlatformsForGB);
+                        channelMap.put(deviceChannel.getChannelId(), deviceChannel);
                     }
                 }
             }else if (event.getGbStreams() != null) {
@@ -174,7 +176,7 @@
                                 }
                                 logger.info("[Catalog浜嬩欢: {}]骞冲彴锛歿}锛屽奖鍝嶉�氶亾{}", event.getType(), platform.getServerGBId(), gbId);
                                 List<DeviceChannel> deviceChannelList = new ArrayList<>();
-                                DeviceChannel deviceChannel = storager.queryChannelInParentPlatform(platform.getServerGBId(), gbId);
+                                DeviceChannel deviceChannel = channelMap.get(gbId);
                                 deviceChannelList.add(deviceChannel);
                                 GbStream gbStream = storager.queryStreamInParentPlatform(platform.getServerGBId(), gbId);
                                 if(gbStream != null){
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
index 1c8353d..9fb1d4d 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
@@ -592,6 +592,7 @@
         Integer finalIndex = index;
         String catalogXmlContent = getCatalogXmlContentForCatalogAddOrUpdate(parentPlatform, channels,
                 deviceChannels.size(), type, subscribeInfo);
+        System.out.println(catalogXmlContent);
         logger.info("[鍙戦�丯OTIFY閫氱煡]绫诲瀷锛� {}锛屽彂閫佹暟閲忥細 {}", type, channels.size());
         sendNotify(parentPlatform, catalogXmlContent, subscribeInfo, eventResult -> {
             logger.error("鍙戦�丯OTIFY閫氱煡娑堟伅澶辫触銆傞敊璇細{} {}", eventResult.statusCode, eventResult.msg);
@@ -621,7 +622,6 @@
 
     private  String getCatalogXmlContentForCatalogAddOrUpdate(ParentPlatform parentPlatform, List<DeviceChannel> channels, int sumNum, String type, SubscribeInfo subscribeInfo) {
         StringBuffer catalogXml = new StringBuffer(600);
-
         String characterSet = parentPlatform.getCharacterSet();
         catalogXml.append("<?xml version=\"1.0\" encoding=\"" + characterSet + "\"?>\r\n")
                 .append("<Notify>\r\n")
@@ -660,6 +660,8 @@
                                 .append("<Owner> " + channel.getOwner()+ "</Owner>\r\n")
                                 .append("<CivilCode>" + channel.getCivilCode() + "</CivilCode>\r\n")
                                 .append("<Address>" + channel.getAddress() + "</Address>\r\n");
+                        catalogXml.append("<Longitude>" + channel.getLongitude() + "</Longitude>\r\n");
+                        catalogXml.append("<Latitude>" + channel.getLatitude() + "</Latitude>\r\n");
                     }
                     if (!"presence".equals(subscribeInfo.getEventType())) {
                         catalogXml.append("<Event>" + type + "</Event>\r\n");
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java
index 7cbfe70..f3f7431 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java
@@ -6,7 +6,6 @@
 import com.google.common.primitives.Bytes;
 import gov.nist.javax.sip.message.SIPRequest;
 import gov.nist.javax.sip.message.SIPResponse;
-import org.apache.commons.lang3.ArrayUtils;
 import org.dom4j.Document;
 import org.dom4j.DocumentException;
 import org.dom4j.Element;
@@ -172,6 +171,7 @@
 		return getRootElement(evt, "gb2312");
 	}
 	public Element getRootElement(RequestEvent evt, String charset) throws DocumentException {
+
 		if (charset == null) {
 			charset = "gb2312";
 		}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
index 10922f4..b76751c 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
@@ -13,10 +13,11 @@
 import com.genersoft.iot.vmp.media.service.IMediaServerService;
 import com.genersoft.iot.vmp.service.IDeviceService;
 import com.genersoft.iot.vmp.service.IPlayService;
-import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
-import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
+import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
+import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
+import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.InitializingBean;
@@ -51,6 +52,8 @@
 
 	@Autowired
     private IRedisCatchStorage redisCatchStorage;
+	@Autowired
+    private IRedisRpcService redisRpcService;
 
 	@Autowired
     private UserSetting userSetting;
@@ -68,9 +71,6 @@
 	private DynamicTask dynamicTask;
 
 	@Autowired
-	private RedisGbPlayMsgListener redisGbPlayMsgListener;
-
-	@Autowired
 	private IPlayService playService;
 
 
@@ -86,7 +86,7 @@
 		logger.info("[鏀跺埌ACK]锛� 鏉ヨ嚜->{}", fromUserId);
 		SendRtpItem sendRtpItem =  redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId());
 		if (sendRtpItem == null) {
-			logger.warn("[鏀跺埌ACK]锛氭湭鎵惧埌鏉ヨ嚜{}锛岀洰鏍囦负({})鐨勬帹娴佷俊鎭�",fromUserId, toUserId);
+			logger.warn("[鏀跺埌ACK]锛氭湭鎵惧埌鏉ヨ嚜{}锛宑allId: {}", fromUserId, callIdHeader.getCallId());
 			return;
 		}
 		// tcp涓诲姩鏃讹紝姝ゆ椂鏄骇鑱斾笅绾у钩鍙帮紝鍦ㄥ洖澶�200ok鏃讹紝鏈湴宸茬粡璇锋眰zlm寮�鍚洃鍚紝璺宠繃涓嬮潰姝ラ
@@ -106,10 +106,13 @@
 
 		if (parentPlatform != null) {
 			if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) {
-				RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(sendRtpItem);
-				redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, () -> {
-					playService.startSendRtpStreamFailHand(sendRtpItem, parentPlatform, callIdHeader);
-				});
+				WVPResult wvpResult = redisRpcService.startSendRtp(sendRtpItem.getRedisKey(), sendRtpItem);
+				if (wvpResult.getCode() == 0) {
+                    RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(sendRtpItem);
+                    redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, () -> {
+                        playService.startSendRtpStreamFailHand(sendRtpItem, parentPlatform, callIdHeader);
+                    });
+				}
 			} else {
 				try {
 					if (sendRtpItem.isTcpActive()) {
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
index 302b694..b8eb703 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
@@ -16,10 +16,10 @@
 import com.genersoft.iot.vmp.media.bean.MediaServer;
 import com.genersoft.iot.vmp.media.service.IMediaServerService;
 import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
+import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
+import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import com.genersoft.iot.vmp.service.*;
-import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
-import com.genersoft.iot.vmp.service.bean.RequestStopPushStreamMsg;
-import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
+import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
 import gov.nist.javax.sip.message.SIPRequest;
@@ -91,7 +91,8 @@
 	private IStreamPushService pushService;
 
 	@Autowired
-	private RedisGbPlayMsgListener redisGbPlayMsgListener;
+	private IRedisRpcService redisRpcService;
+
 
 	@Override
 	public void afterPropertiesSet() throws Exception {
@@ -138,17 +139,8 @@
 					if (userSetting.getUseCustomSsrcForParentInvite()) {
 						mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc());
 					}
-
-					ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
-					if (platform != null) {
-						MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
-								sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
-								sendRtpItem.getPlatformId(), platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
-						messageForPushChannel.setPlatFormIndex(platform.getId());
-						redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
-					}else {
-						logger.info("[涓婄骇骞冲彴鍋滄瑙傜湅] 鏈壘鍒板钩鍙皗}鐨勪俊鎭紝鍙戦�乺edis娑堟伅澶辫触", sendRtpItem.getPlatformId());
-					}
+				}else {
+					logger.info("[涓婄骇骞冲彴鍋滄瑙傜湅] 鏈壘鍒板钩鍙皗}鐨勪俊鎭紝鍙戦�乺edis娑堟伅澶辫触", sendRtpItem.getPlatformId());
 				}
 			}else {
 				MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
index 46e779d..f1f277f 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -31,11 +31,17 @@
 import com.genersoft.iot.vmp.service.IPlayService;
 import com.genersoft.iot.vmp.service.IStreamProxyService;
 import com.genersoft.iot.vmp.service.IStreamPushService;
+import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
+import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
+import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
+import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
+import com.genersoft.iot.vmp.media.zlm.dto.*;
+import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
+import com.genersoft.iot.vmp.service.*;
 import com.genersoft.iot.vmp.service.bean.ErrorCallback;
 import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
 import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
-import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
 import com.genersoft.iot.vmp.service.redisMsg.RedisPushStreamResponseListener;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
@@ -51,6 +57,7 @@
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.InitializingBean;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.stereotype.Component;
 
 import javax.sdp.*;
@@ -64,6 +71,7 @@
 import java.util.Map;
 import java.util.Random;
 import java.util.Vector;
+import java.util.*;
 
 /**
  * SIP鍛戒护绫诲瀷锛� INVITE璇锋眰
@@ -92,16 +100,16 @@
     private IRedisCatchStorage redisCatchStorage;
 
     @Autowired
-    private IInviteStreamService inviteStreamService;
+    private IRedisRpcService redisRpcService;
+
+    @Autowired
+    private RedisTemplate<Object, Object> redisTemplate;
 
     @Autowired
     private SSRCFactory ssrcFactory;
 
     @Autowired
     private DynamicTask dynamicTask;
-
-    @Autowired
-    private RedisPushStreamResponseListener redisPushStreamResponseListener;
 
     @Autowired
     private IPlayService playService;
@@ -125,17 +133,16 @@
     private UserSetting userSetting;
 
     @Autowired
-    private ZLMMediaListManager mediaListManager;
-
-    @Autowired
     private SipConfig config;
-
-
-    @Autowired
-    private RedisGbPlayMsgListener redisGbPlayMsgListener;
 
     @Autowired
     private VideoStreamSessionManager streamSession;
+
+    @Autowired
+    private SendRtpPortManager sendRtpPortManager;
+
+    @Autowired
+    private RedisPushStreamResponseListener redisPushStreamResponseListener;
 
 
     @Override
@@ -552,43 +559,79 @@
 
                     }
                 } else if (gbStream != null) {
-
-                    String ssrc;
-                    if (userSetting.getUseCustomSsrcForParentInvite() || gb28181Sdp.getSsrc() == null) {
-                        // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡
-                        ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
-                    }else {
-                        ssrc = gb28181Sdp.getSsrc();
+                    SendRtpItem sendRtpItem = new SendRtpItem();
+                    if (!userSetting.getUseCustomSsrcForParentInvite() && gb28181Sdp.getSsrc() != null) {
+                        sendRtpItem.setSsrc(gb28181Sdp.getSsrc());
                     }
 
+                    if (tcpActive != null) {
+                        sendRtpItem.setTcpActive(tcpActive);
+                    }
+                    sendRtpItem.setTcp(mediaTransmissionTCP);
+                    sendRtpItem.setRtcp(platform.isRtcp());
+                    sendRtpItem.setPlatformName(platform.getName());
+                    sendRtpItem.setPlatformId(platform.getServerGBId());
+                    sendRtpItem.setMediaServerId(mediaServerItem.getId());
+                    sendRtpItem.setChannelId(channelId);
+                    sendRtpItem.setIp(addressStr);
+                    sendRtpItem.setPort(port);
+                    sendRtpItem.setUsePs(true);
+                    sendRtpItem.setApp(gbStream.getApp());
+                    sendRtpItem.setStream(gbStream.getStream());
+                    sendRtpItem.setCallId(callIdHeader.getCallId());
+                    sendRtpItem.setFromTag(request.getFromTag());
+                    sendRtpItem.setOnlyAudio(false);
+                    sendRtpItem.setStatus(0);
+                    sendRtpItem.setSessionName(sessionName);
+                    // 娓呯悊鍙兘瀛樺湪鐨勭紦瀛橀伩鍏嶇敤鍒版棫鐨勬暟鎹�
+                    List<SendRtpItem> sendRtpItemList = redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, gbStream.getStream());
+                    if (!sendRtpItemList.isEmpty()) {
+                        for (SendRtpItem rtpItem : sendRtpItemList) {
+                            redisCatchStorage.deleteSendRTPServer(rtpItem);
+                        }
+                    }
                     if ("push".equals(gbStream.getStreamType())) {
+                        sendRtpItem.setPlayType(InviteStreamType.PUSH);
                         if (streamPushItem != null) {
                             // 浠巖edis鏌ヨ鏄惁姝e湪鎺ユ敹杩欎釜鎺ㄦ祦
                             StreamPushItem pushListItem = redisCatchStorage.getPushListItem(gbStream.getApp(), gbStream.getStream());
                             if (pushListItem != null) {
-                                pushListItem.setSelf(userSetting.getServerId().equals(pushListItem.getServerId()));
-                                // 鎺ㄦ祦鐘舵��
-                                pushStream(evt, request, gbStream, pushListItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
-                                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+                                sendRtpItem.setServerId(pushListItem.getSeverId());
+                                sendRtpItem.setMediaServerId(pushListItem.getMediaServerId());
+
+                                StreamPushItem transform = streamPushService.transform(pushListItem);
+                                transform.setSelf(userSetting.getServerId().equals(pushListItem.getSeverId()));
+                                redisCatchStorage.updateSendRTPSever(sendRtpItem);
+                                // 寮�濮嬫帹娴�
+                                sendPushStream(sendRtpItem, mediaServerItem, platform, request);
                             }else {
-                                // 鏈帹娴� 鎷夎捣
-                                notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
-                                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+                                if (!platform.isStartOfflinePush()) {
+                                    // 骞冲彴璁剧疆涓叧闂簡鎷夎捣绂荤嚎鐨勬帹娴佸垯鐩存帴鍥炲
+                                    try {
+                                        logger.info("[涓婄骇鐐规挱] 澶辫触锛屾帹娴佽澶囨湭鎺ㄦ祦锛宑hannel: {}, app: {}, stream: {}", sendRtpItem.getChannelId(), sendRtpItem.getApp(), sendRtpItem.getStream());
+                                        responseAck(request, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing");
+                                    } catch (SipException | InvalidArgumentException | ParseException e) {
+                                        logger.error("[鍛戒护鍙戦�佸け璐 invite 閫氶亾鏈帹娴�: {}", e.getMessage());
+                                    }
+                                    return;
+                                }
+                                notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request);
                             }
                         }
                     } else if ("proxy".equals(gbStream.getStreamType())) {
                         if (null != proxyByAppAndStream) {
+                            if (sendRtpItem.getSsrc() == null) {
+                                // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡
+                                String ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
+                                sendRtpItem.setSsrc(ssrc);
+                            }
                             if (proxyByAppAndStream.isStatus()) {
-                                pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive,
-                                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+                                sendProxyStream(sendRtpItem, mediaServerItem, platform, request);
                             } else {
                                 //寮�鍚唬鐞嗘媺娴�
-                                notifyStreamOnline(evt, request, gbStream, null, platform, callIdHeader, mediaServerItem, port, tcpActive,
-                                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+                                notifyProxyStreamOnline(sendRtpItem, mediaServerItem, platform, request);
                             }
                         }
-
-
                     }
                 }
             }
@@ -614,58 +657,13 @@
     /**
      * 瀹夋帓鎺ㄦ祦
      */
-    private void pushProxyStream(RequestEvent evt, SIPRequest request, GbStream gbStream, ParentPlatform platform,
-                            CallIdHeader callIdHeader, MediaServer mediaServer,
-                            int port, Boolean tcpActive, boolean mediaTransmissionTCP,
-                            String channelId, String addressStr, String ssrc, String requesterId) {
-            Boolean streamReady = mediaServerService.isStreamReady(mediaServer, gbStream.getApp(), gbStream.getStream());
+    private void sendProxyStream(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) {
+            Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
             if (streamReady != null && streamReady) {
 
                 // 鑷钩鍙板唴瀹�
-                SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServer, addressStr, port, ssrc, requesterId,
-                        gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
-
-            if (sendRtpItem == null) {
-                logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�");
-                try {
-                    responseAck(request, Response.BUSY_HERE);
-                } catch (SipException | InvalidArgumentException | ParseException e) {
-                    logger.error("[鍛戒护鍙戦�佸け璐 invite 鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�: {}", e.getMessage());
-                }
-                return;
-            }
-            if (tcpActive != null) {
-                sendRtpItem.setTcpActive(tcpActive);
-            }
-            sendRtpItem.setPlayType(InviteStreamType.PUSH);
-            // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶�
-            sendRtpItem.setStatus(1);
-            sendRtpItem.setCallId(callIdHeader.getCallId());
-            sendRtpItem.setFromTag(request.getFromTag());
-
-            SIPResponse response = sendStreamAck(mediaServer, request, sendRtpItem, platform, evt);
-            if (response != null) {
-                sendRtpItem.setToTag(response.getToTag());
-            }
-            redisCatchStorage.updateSendRTPSever(sendRtpItem);
-
-        }
-
-    }
-
-    private void pushStream(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
-                            CallIdHeader callIdHeader, MediaServer mediaServerItem,
-                            int port, Boolean tcpActive, boolean mediaTransmissionTCP,
-                            String channelId, String addressStr, String ssrc, String requesterId) {
-        // 鎺ㄦ祦
-        if (streamPushItem.isSelf()) {
-            Boolean streamReady = mediaServerService.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
-            if (streamReady != null && streamReady) {
-                // 鑷钩鍙板唴瀹�
-                SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
-                        gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
-
-                if (sendRtpItem == null) {
+                int localPort = sendRtpPortManager.getNextPort(mediaServerItem);
+                if (localPort == 0) {
                     logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�");
                     try {
                         responseAck(request, Response.BUSY_HERE);
@@ -674,226 +672,197 @@
                     }
                     return;
                 }
-                if (tcpActive != null) {
-                    sendRtpItem.setTcpActive(tcpActive);
+            sendRtpItem.setPlayType(InviteStreamType.PROXY);
+            // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶�
+            sendRtpItem.setStatus(1);
+            sendRtpItem.setLocalIp(mediaServerItem.getSdpIp());
+
+            SIPResponse response = sendStreamAck(mediaServer, request, sendRtpItem, platform, evt);
+            if (response != null) {
+                sendRtpItem.setToTag(response.getToTag());
+            }
+            redisCatchStorage.updateSendRTPSever(sendRtpItem);
+        }
+    }
+
+    private void sendPushStream(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) {
+        // 鎺ㄦ祦
+        if (sendRtpItem.getServerId().equals(userSetting.getServerId())) {
+            Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
+            if (streamReady != null && streamReady) {
+                // 鑷钩鍙板唴瀹�
+                int localPort = sendRtpPortManager.getNextPort(mediaServerItem);
+                if (localPort == 0) {
+                    logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�");
+                    try {
+                        responseAck(request, Response.BUSY_HERE);
+                    } catch (SipException | InvalidArgumentException | ParseException e) {
+                        logger.error("[鍛戒护鍙戦�佸け璐 invite 鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�: {}", e.getMessage());
+                    }
+                    return;
                 }
-                sendRtpItem.setPlayType(InviteStreamType.PUSH);
                 // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶�
                 sendRtpItem.setStatus(1);
-                sendRtpItem.setCallId(callIdHeader.getCallId());
-
-                sendRtpItem.setFromTag(request.getFromTag());
-                SIPResponse response = sendStreamAck(mediaServerItem, request, sendRtpItem, platform, evt);
+                SIPResponse response = sendStreamAck(request, sendRtpItem, platform);
                 if (response != null) {
                     sendRtpItem.setToTag(response.getToTag());
                 }
+                if (sendRtpItem.getSsrc() == null) {
+                    // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡
+                    String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
+                    sendRtpItem.setSsrc(ssrc);
+                }
                 redisCatchStorage.updateSendRTPSever(sendRtpItem);
-
             } else {
                 // 涓嶅湪绾� 鎷夎捣
-                notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
-                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+                notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request);
             }
-
         } else {
             // 鍏朵粬骞冲彴鍐呭
-            otherWvpPushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
-                    mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+            otherWvpPushStream(sendRtpItem, request, platform);
         }
     }
 
     /**
      * 閫氱煡娴佷笂绾�
      */
-    private void notifyStreamOnline(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
-                                    CallIdHeader callIdHeader, MediaServer mediaServerItem,
-                                    int port, Boolean tcpActive, boolean mediaTransmissionTCP,
-                                    String channelId, String addressStr, String ssrc, String requesterId) {
-        if ("proxy".equals(gbStream.getStreamType())) {
-            // TODO 鎺у埗鍚敤浠ヤ娇璁惧涓婄嚎
-            logger.info("[ app={}, stream={} ]閫氶亾鏈帹娴侊紝鍚敤娴佸悗寮�濮嬫帹娴�", gbStream.getApp(), gbStream.getStream());
-            // 鐩戝惉娴佷笂绾�
-            Hook hook = Hook.getInstance(HookType.on_media_arrival, gbStream.getApp(), gbStream.getStream(), mediaServerItem.getId());
-            this.hookSubscribe.addSubscribe(hook, (hookData) -> {
-                logger.info("[涓婄骇鐐规挱]鎷夋祦浠g悊宸茬粡灏辩华锛� {}/{}", hookData.getApp(), hookData.getStream());
-                dynamicTask.stop(callIdHeader.getCallId());
-                pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive,
-                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
-            });
-            dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
-                logger.info("[ app={}, stream={} ] 绛夊緟鎷夋祦浠g悊娴佽秴鏃�", gbStream.getApp(), gbStream.getStream());
-                this.hookSubscribe.removeSubscribe(hook);
-            }, userSetting.getPlatformPlayTimeout());
-            boolean start = streamProxyService.start(gbStream.getApp(), gbStream.getStream());
-            if (!start) {
-                try {
-                    responseAck(request, Response.BUSY_HERE, "channel [" + gbStream.getGbId() + "] offline");
-                } catch (SipException | InvalidArgumentException | ParseException e) {
-                    logger.error("[鍛戒护鍙戦�佸け璐 invite 閫氶亾鏈帹娴�: {}", e.getMessage());
-                }
-                this.hookSubscribe.removeSubscribe(hook);
-                dynamicTask.stop(callIdHeader.getCallId());
+    private void notifyProxyStreamOnline(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) {
+        // TODO 鎺у埗鍚敤浠ヤ娇璁惧涓婄嚎
+        logger.info("[ app={}, stream={} ]閫氶亾鏈帹娴侊紝鍚敤娴佸悗寮�濮嬫帹娴�", sendRtpItem.getApp(), sendRtpItem.getStream());
+        // 鐩戝惉娴佷笂绾�
+        HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", mediaServerItem.getId());
+        zlmHttpHookSubscribe.addSubscribe(hookSubscribe, (mediaServerItemInUSe, hookParam) -> {
+            OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam;
+            logger.info("[涓婄骇鐐规挱]鎷夋祦浠g悊宸茬粡灏辩华锛� {}/{}", streamChangedHookParam.getApp(), streamChangedHookParam.getStream());
+            dynamicTask.stop(sendRtpItem.getCallId());
+            sendProxyStream(sendRtpItem, mediaServerItem, platform, request);
+        });
+        dynamicTask.startDelay(sendRtpItem.getCallId(), () -> {
+            logger.info("[ app={}, stream={} ] 绛夊緟鎷夋祦浠g悊娴佽秴鏃�", sendRtpItem.getApp(), sendRtpItem.getStream());
+            zlmHttpHookSubscribe.removeSubscribe(hookSubscribe);
+        }, userSetting.getPlatformPlayTimeout());
+        boolean start = streamProxyService.start(sendRtpItem.getApp(), sendRtpItem.getStream());
+        if (!start) {
+            try {
+                responseAck(request, Response.BUSY_HERE, "channel [" + sendRtpItem.getChannelId() + "] offline");
+            } catch (SipException | InvalidArgumentException | ParseException e) {
+                logger.error("[鍛戒护鍙戦�佸け璐 invite 閫氶亾鏈帹娴�: {}", e.getMessage());
             }
-        } else if ("push".equals(gbStream.getStreamType())) {
-            if (!platform.isStartOfflinePush()) {
-                // 骞冲彴璁剧疆涓叧闂簡鎷夎捣绂荤嚎鐨勬帹娴佸垯鐩存帴鍥炲
-                try {
-                    logger.info("[涓婄骇鐐规挱] 澶辫触锛屾帹娴佽澶囨湭鎺ㄦ祦锛宑hannel: {}, app: {}, stream: {}", gbStream.getGbId(), gbStream.getApp(), gbStream.getStream());
-                    responseAck(request, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing");
-                } catch (SipException | InvalidArgumentException | ParseException e) {
-                    logger.error("[鍛戒护鍙戦�佸け璐 invite 閫氶亾鏈帹娴�: {}", e.getMessage());
-                }
-                return;
-            }
-            // 鍙戦�乺edis娑堟伅浠ヤ娇璁惧涓婄嚎
-            logger.info("[ app={}, stream={} ]閫氶亾鏈帹娴侊紝鍙戦�乺edis淇℃伅鎺у埗璁惧寮�濮嬫帹娴�", gbStream.getApp(), gbStream.getStream());
-
-            MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1,
-                    gbStream.getApp(), gbStream.getStream(), gbStream.getGbId(), gbStream.getPlatformId(),
-                    platform.getName(), null, gbStream.getMediaServerId());
-            redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
-            // 璁剧疆瓒呮椂
-            dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
-                logger.info("[ app={}, stream={} ] 绛夊緟璁惧寮�濮嬫帹娴佽秴鏃�", gbStream.getApp(), gbStream.getStream());
-                try {
-                    redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream());
-                    mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream());
-                    responseAck(request, Response.REQUEST_TIMEOUT); // 瓒呮椂
-                } catch (SipException | InvalidArgumentException | ParseException e) {
-                    logger.error("鏈鐞嗙殑寮傚父 ", e);
-                }
-            }, userSetting.getPlatformPlayTimeout());
-            // 娣诲姞鐩戝惉
-            int finalPort = port;
-            Boolean finalTcpActive = tcpActive;
-
-            // 娣诲姞鍦ㄦ湰鏈轰笂绾跨殑閫氱煡
-            mediaListManager.addChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream(), (app, stream, serverId) -> {
-                dynamicTask.stop(callIdHeader.getCallId());
-                redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream());
-                if (serverId.equals(userSetting.getServerId())) {
-                    SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId,
-                            app, stream, channelId, mediaTransmissionTCP, platform.isRtcp());
-
-                    if (sendRtpItem == null) {
-                        logger.warn("涓婄骇鐐规椂鍒涘缓sendRTPItem澶辫触锛屽彲鑳芥槸鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�");
-                        try {
-                            responseAck(request, Response.BUSY_HERE);
-                        } catch (SipException e) {
-                            logger.error("鏈鐞嗙殑寮傚父 ", e);
-                        } catch (InvalidArgumentException e) {
-                            logger.error("鏈鐞嗙殑寮傚父 ", e);
-                        } catch (ParseException e) {
-                            logger.error("鏈鐞嗙殑寮傚父 ", e);
-                        }
-                        return;
-                    }
-                    if (finalTcpActive != null) {
-                        sendRtpItem.setTcpActive(finalTcpActive);
-                    }
-                    sendRtpItem.setPlayType(InviteStreamType.PUSH);
-                    // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶�
-                    sendRtpItem.setStatus(1);
-                    sendRtpItem.setCallId(callIdHeader.getCallId());
-
-                    sendRtpItem.setFromTag(request.getFromTag());
-                    SIPResponse response = sendStreamAck(mediaServerItem, request, sendRtpItem, platform, evt);
-                    if (response != null) {
-                        sendRtpItem.setToTag(response.getToTag());
-                    }
-                    redisCatchStorage.updateSendRTPSever(sendRtpItem);
-                } else {
-                    // 鍏朵粬骞冲彴鍐呭
-                    otherWvpPushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
-                            mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
-                }
-            });
-
-            // 娣诲姞鍥炲鐨勬嫆缁濇垨鑰呴敊璇殑閫氱煡
-            redisPushStreamResponseListener.addEvent(gbStream.getApp(), gbStream.getStream(), response -> {
-                if (response.getCode() != 0) {
-                    dynamicTask.stop(callIdHeader.getCallId());
-                    mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream());
-                    try {
-                        responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg());
-                    } catch (SipException | InvalidArgumentException | ParseException e) {
-                        logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鐐规挱鍥炲: {}", e.getMessage());
-                    }
-                }
-            });
+            zlmHttpHookSubscribe.removeSubscribe(hookSubscribe);
+            dynamicTask.stop(sendRtpItem.getCallId());
         }
     }
 
     /**
-     * 鏉ヨ嚜鍏朵粬wvp鐨勬帹娴�
+     * 閫氱煡娴佷笂绾�
      */
-    private void otherWvpPushStream(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
-                                    CallIdHeader callIdHeader, MediaServer mediaServerItem,
-                                    int port, Boolean tcpActive, boolean mediaTransmissionTCP,
-                                    String channelId, String addressStr, String ssrc, String requesterId) {
-        logger.info("[绾ц仈鐐规挱]鐩存挱娴佹潵鑷叾浠栧钩鍙帮紝鍙戦�乺edis娑堟伅");
-        // 鍙戦�乺edis娑堟伅
-        redisGbPlayMsgListener.sendMsg(streamPushItem.getServerId(), streamPushItem.getMediaServerId(),
-                streamPushItem.getApp(), streamPushItem.getStream(), addressStr, port, ssrc, requesterId,
-                channelId, mediaTransmissionTCP, platform.isRtcp(),platform.getName(), responseSendItemMsg -> {
-                    SendRtpItem sendRtpItem = responseSendItemMsg.getSendRtpItem();
-                    if (sendRtpItem == null || responseSendItemMsg.getMediaServerItem() == null) {
-                        logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�");
-                        try {
-                            responseAck(request, Response.BUSY_HERE);
-                        } catch (SipException e) {
-                            logger.error("鏈鐞嗙殑寮傚父 ", e);
-                        } catch (InvalidArgumentException e) {
-                            logger.error("鏈鐞嗙殑寮傚父 ", e);
-                        } catch (ParseException e) {
-                            logger.error("鏈鐞嗙殑寮傚父 ", e);
-                        }
-                        return;
-                    }
-                    // 鏀跺埌sendItem
-                    if (tcpActive != null) {
-                        sendRtpItem.setTcpActive(tcpActive);
-                    }
-                    sendRtpItem.setPlayType(InviteStreamType.PUSH);
-                    // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶�
-                    sendRtpItem.setStatus(1);
-                    sendRtpItem.setCallId(callIdHeader.getCallId());
-
-                    sendRtpItem.setFromTag(request.getFromTag());
-                    SIPResponse response = sendStreamAck(responseSendItemMsg.getMediaServerItem(), request, sendRtpItem, platform, evt);
-                    if (response != null) {
-                        sendRtpItem.setToTag(response.getToTag());
-                    }
-                    redisCatchStorage.updateSendRTPSever(sendRtpItem);
-                }, (wvpResult) -> {
-
-                    // 閿欒
-                    if (wvpResult.getCode() == RedisGbPlayMsgListener.ERROR_CODE_OFFLINE) {
-                        // 绂荤嚎
-                        // 鏌ヨ鏄惁鍦ㄦ湰鏈轰笂绾夸簡
-                        StreamPushItem currentStreamPushItem = streamPushService.getPush(streamPushItem.getApp(), streamPushItem.getStream());
-                        if (currentStreamPushItem.isPushIng()) {
-                            // 鍦ㄧ嚎鐘舵��
-                            pushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
-                                    mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
-
-                        } else {
-                            // 涓嶅湪绾� 鎷夎捣
-                            notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
-                                    mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
-                        }
-                    }
+    private void notifyPushStreamOnline(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) {
+        // 鍙戦�乺edis娑堟伅浠ヤ娇璁惧涓婄嚎锛屾祦涓婄嚎鍚庤
+        logger.info("[ app={}, stream={} ]閫氶亾鏈帹娴侊紝鍙戦�乺edis淇℃伅鎺у埗璁惧寮�濮嬫帹娴�", sendRtpItem.getApp(), sendRtpItem.getStream());
+        MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1,
+                sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), sendRtpItem.getPlatformId(),
+                platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
+        redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
+        // 璁剧疆瓒呮椂
+        dynamicTask.startDelay(sendRtpItem.getCallId(), () -> {
+            redisRpcService.stopWaitePushStreamOnline(sendRtpItem);
+            logger.info("[ app={}, stream={} ] 绛夊緟璁惧寮�濮嬫帹娴佽秴鏃�", sendRtpItem.getApp(), sendRtpItem.getStream());
+            try {
+                responseAck(request, Response.REQUEST_TIMEOUT); // 瓒呮椂
+            } catch (SipException | InvalidArgumentException | ParseException e) {
+                logger.error("鏈鐞嗙殑寮傚父 ", e);
+            }
+        }, userSetting.getPlatformPlayTimeout());
+        //
+        long key = redisRpcService.waitePushStreamOnline(sendRtpItem, (sendRtpItemKey) -> {
+            dynamicTask.stop(sendRtpItem.getCallId());
+            if (sendRtpItemKey == null) {
+                logger.warn("[绾ц仈鐐规挱] 绛夊緟鎺ㄦ祦寰楀埌缁撴灉鏈┖锛� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
+                try {
+                    responseAck(request, Response.BUSY_HERE);
+                } catch (SipException | InvalidArgumentException | ParseException e) {
+                    logger.error("鏈鐞嗙殑寮傚父 ", e);
+                }
+                return;
+            }
+            SendRtpItem sendRtpItemFromRedis = (SendRtpItem)redisTemplate.opsForValue().get(sendRtpItemKey);
+            if (sendRtpItemFromRedis == null) {
+                logger.warn("[绾ц仈鐐规挱] 绛夊緟鎺ㄦ祦, 鏈壘鍒皉edis涓紦瀛樼殑鍙戞祦淇℃伅锛� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
+                try {
+                    responseAck(request, Response.BUSY_HERE);
+                } catch (SipException | InvalidArgumentException | ParseException e) {
+                    logger.error("鏈鐞嗙殑寮傚父 ", e);
+                }
+                return;
+            }
+            if (sendRtpItemFromRedis.getServerId().equals(userSetting.getServerId())) {
+                logger.info("[绾ц仈鐐规挱] 绛夊緟鐨勬帹娴佸湪鏈钩鍙颁笂绾� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
+                int localPort = sendRtpPortManager.getNextPort(mediaServerItem);
+                if (localPort == 0) {
+                    logger.warn("涓婄骇鐐规椂鍒涘缓sendRTPItem澶辫触锛屽彲鑳芥槸鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�");
                     try {
                         responseAck(request, Response.BUSY_HERE);
-                    } catch (InvalidArgumentException | ParseException | SipException e) {
-                        logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鐐规挱鍥炲 BUSY_HERE: {}", e.getMessage());
+                    } catch (SipException | InvalidArgumentException | ParseException e) {
+                        logger.error("鏈鐞嗙殑寮傚父 ", e);
                     }
-                });
+                    return;
+                }
+                sendRtpItem.setLocalPort(localPort);
+                if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) {
+                    sendRtpItem.setLocalIp(platform.getSendStreamIp());
+                }
+
+                // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶�
+                sendRtpItem.setStatus(1);
+                SIPResponse response = sendStreamAck(request, sendRtpItem, platform);
+                if (response != null) {
+                    sendRtpItem.setToTag(response.getToTag());
+                }
+                redisCatchStorage.updateSendRTPSever(sendRtpItem);
+            } else {
+                // 鍏朵粬骞冲彴鍐呭
+                otherWvpPushStream(sendRtpItemFromRedis, request, platform);
+            }
+        });
+        // 娣诲姞鍥炲鐨勬嫆缁濇垨鑰呴敊璇殑閫氱煡
+        // redis娑堟伅渚嬪锛� PUBLISH VM_MSG_STREAM_PUSH_RESPONSE  '{"code":1,"msg":"澶辫触","app":"1","stream":"2"}'
+        redisPushStreamResponseListener.addEvent(sendRtpItem.getApp(), sendRtpItem.getStream(), response -> {
+            if (response.getCode() != 0) {
+                dynamicTask.stop(sendRtpItem.getCallId());
+                redisRpcService.stopWaitePushStreamOnline(sendRtpItem);
+                redisRpcService.removeCallback(key);
+                try {
+                    responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg());
+                } catch (SipException | InvalidArgumentException | ParseException e) {
+                    logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鐐规挱鍥炲: {}", e.getMessage());
+                }
+            }
+        });
     }
 
-    public SIPResponse sendStreamAck(MediaServer mediaServerItem, SIPRequest request, SendRtpItem sendRtpItem, ParentPlatform platform, RequestEvent evt) {
 
-        String sdpIp = mediaServerItem.getSdpIp();
+
+    /**
+     * 鏉ヨ嚜鍏朵粬wvp鐨勬帹娴�
+     */
+    private void otherWvpPushStream(SendRtpItem sendRtpItem, SIPRequest request, ParentPlatform platform) {
+        logger.info("[绾ц仈鐐规挱] 鏉ヨ嚜鍏朵粬wvp鐨勬帹娴� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
+        sendRtpItem = redisRpcService.getSendRtpItem(sendRtpItem.getRedisKey());
+        if (sendRtpItem == null) {
+            return;
+        }
+        // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶�
+        sendRtpItem.setStatus(1);
+        SIPResponse response = sendStreamAck(request, sendRtpItem, platform);
+        if (response != null) {
+            sendRtpItem.setToTag(response.getToTag());
+        }
+        redisCatchStorage.updateSendRTPSever(sendRtpItem);
+    }
+
+    public SIPResponse sendStreamAck(MediaServerItem mediaServerItem, SIPRequest request, SendRtpItem sendRtpItem, ParentPlatform platform, RequestEvent evt) {
+
+        String sdpIp = sendRtpItem.getLocalIp();
         if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) {
             sdpIp = platform.getSendStreamIp();
         }
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java
index cd97786..de93804 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java
@@ -1,7 +1,5 @@
 package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
 
-import com.genersoft.iot.vmp.conf.CivilCodeFileConf;
-import com.genersoft.iot.vmp.conf.DynamicTask;
 import com.genersoft.iot.vmp.conf.SipConfig;
 import com.genersoft.iot.vmp.conf.UserSetting;
 import com.genersoft.iot.vmp.gb28181.bean.Device;
@@ -20,10 +18,10 @@
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
 
 import javax.sip.RequestEvent;
 import javax.sip.header.FromHeader;
-import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -39,8 +37,6 @@
 
     private final static Logger logger = LoggerFactory.getLogger(NotifyRequestForCatalogProcessor.class);
 
-	private final List<DeviceChannel> updateChannelOnlineList = new CopyOnWriteArrayList<>();
-	private final List<DeviceChannel> updateChannelOfflineList = new CopyOnWriteArrayList<>();
 	private final Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>();
 
 	private final Map<String, DeviceChannel> addChannelMap = new ConcurrentHashMap<>();
@@ -60,275 +56,110 @@
 	private IDeviceChannelService deviceChannelService;
 
 	@Autowired
-	private DynamicTask dynamicTask;
-
-	@Autowired
-	private CivilCodeFileConf civilCodeFileConf;
-
-	@Autowired
 	private SipConfig sipConfig;
 
-	private final static String talkKey = "notify-request-for-catalog-task";
+	@Transactional
+	public void process(List<RequestEvent> evtList) {
+		if (evtList.isEmpty()) {
+			return;
+		}
+		for (RequestEvent evt : evtList) {
+			try {
+				long start = System.currentTimeMillis();
+				FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
+				String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
 
-	public void process(RequestEvent evt) {
-		try {
-			long start = System.currentTimeMillis();
-			FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
-			String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
+				Device device = redisCatchStorage.getDevice(deviceId);
+				if (device == null || !device.isOnLine()) {
+					logger.warn("[鏀跺埌鐩綍璁㈤槄]锛歿}, 浣嗘槸璁惧宸茬粡绂荤嚎", (device != null ? device.getDeviceId():"" ));
+					return;
+				}
+				Element rootElement = getRootElement(evt, device.getCharset());
+				if (rootElement == null) {
+					logger.warn("[ 鏀跺埌鐩綍璁㈤槄 ] content cannot be null, {}", evt.getRequest());
+					return;
+				}
+				Element deviceListElement = rootElement.element("DeviceList");
+				if (deviceListElement == null) {
+					return;
+				}
+				Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
+				if (deviceListIterator != null) {
 
-			Device device = redisCatchStorage.getDevice(deviceId);
-			if (device == null || !device.isOnLine()) {
-				logger.warn("[鏀跺埌鐩綍璁㈤槄]锛歿}, 浣嗘槸璁惧宸茬粡绂荤嚎", (device != null ? device.getDeviceId():"" ));
-				return;
-			}
-			Element rootElement = getRootElement(evt, device.getCharset());
-			if (rootElement == null) {
-				logger.warn("[ 鏀跺埌鐩綍璁㈤槄 ] content cannot be null, {}", evt.getRequest());
-				return;
-			}
-			Element deviceListElement = rootElement.element("DeviceList");
-			if (deviceListElement == null) {
-				return;
-			}
-			Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
-			if (deviceListIterator != null) {
-
-				// 閬嶅巻DeviceList
-				while (deviceListIterator.hasNext()) {
-					Element itemDevice = deviceListIterator.next();
-					Element channelDeviceElement = itemDevice.element("DeviceID");
-					if (channelDeviceElement == null) {
-						continue;
-					}
-					Element eventElement = itemDevice.element("Event");
-					String event;
-					if (eventElement == null) {
-						logger.warn("[鏀跺埌鐩綍璁㈤槄]锛歿}, 浣嗘槸Event涓虹┖, 璁句负榛樿鍊� ADD", (device != null ? device.getDeviceId():"" ));
-						event = CatalogEvent.ADD;
-					}else {
-						event = eventElement.getText().toUpperCase();
-					}
-					DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event);
-					if (channel == null) {
-						logger.info("[鏀跺埌鐩綍璁㈤槄]锛氫絾鏄В鏋愬け璐� {}", new String(evt.getRequest().getRawContent()));
-						continue;
-					}
-					if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) {
-						channel.setParentId(null);
-					}
-					channel.setDeviceId(device.getDeviceId());
-					logger.info("[鏀跺埌鐩綍璁㈤槄]锛歿}/{}", device.getDeviceId(), channel.getChannelId());
-					switch (event) {
-						case CatalogEvent.ON:
-							// 涓婄嚎
-							logger.info("[鏀跺埌閫氶亾涓婄嚎閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId());
-							updateChannelOnlineList.add(channel);
-							if (updateChannelOnlineList.size() > 300) {
-								executeSaveForOnline();
-							}
-							if (userSetting.getDeviceStatusNotify()) {
-								// 鍙戦�乺edis娑堟伅
-								redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), true);
-							}
-
-							break;
-						case CatalogEvent.OFF :
-							// 绂荤嚎
-							logger.info("[鏀跺埌閫氶亾绂荤嚎閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId());
-							if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
-								logger.info("[鏀跺埌閫氶亾绂荤嚎閫氱煡] 浣嗘槸骞冲彴宸查厤缃嫆缁濇娑堟伅锛屾潵鑷澶�: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId());
-							}else {
-								updateChannelOfflineList.add(channel);
-								if (updateChannelOfflineList.size() > 300) {
-									executeSaveForOffline();
-								}
-								if (userSetting.getDeviceStatusNotify()) {
-									// 鍙戦�乺edis娑堟伅
-									redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
-								}
-							}
-							break;
-						case CatalogEvent.VLOST:
-							// 瑙嗛涓㈠け
-							logger.info("[鏀跺埌閫氶亾瑙嗛涓㈠け閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId());
-							if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
-								logger.info("[鏀跺埌閫氶亾瑙嗛涓㈠け閫氱煡] 浣嗘槸骞冲彴宸查厤缃嫆缁濇娑堟伅锛屾潵鑷澶�: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId());
-							}else {
-								updateChannelOfflineList.add(channel);
-								if (updateChannelOfflineList.size() > 300) {
-									executeSaveForOffline();
-								}
-								if (userSetting.getDeviceStatusNotify()) {
-									// 鍙戦�乺edis娑堟伅
-									redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
-								}
-							}
-							break;
-						case CatalogEvent.DEFECT:
-							// 鏁呴殰
-							logger.info("[鏀跺埌閫氶亾瑙嗛鏁呴殰閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId());
-							if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
-								logger.info("[鏀跺埌閫氶亾瑙嗛鏁呴殰閫氱煡] 浣嗘槸骞冲彴宸查厤缃嫆缁濇娑堟伅锛屾潵鑷澶�: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId());
-							}else {
-								updateChannelOfflineList.add(channel);
-								if (updateChannelOfflineList.size() > 300) {
-									executeSaveForOffline();
-								}
-								if (userSetting.getDeviceStatusNotify()) {
-									// 鍙戦�乺edis娑堟伅
-									redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
-								}
-							}
-							break;
-						case CatalogEvent.ADD:
-							// 澧炲姞
-							logger.info("[鏀跺埌澧炲姞閫氶亾閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId());
-							// 鍒ゆ柇姝ら�氶亾鏄惁瀛樺湪
-							DeviceChannel deviceChannel = deviceChannelService.getOne(deviceId, channel.getChannelId());
-							if (deviceChannel != null) {
-								logger.info("[澧炲姞閫氶亾] 宸插瓨鍦紝涓嶅彂閫侀�氱煡鍙洿鏂帮紝璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId());
-								channel.setId(deviceChannel.getId());
-								updateChannelMap.put(channel.getChannelId(), channel);
-								if (updateChannelMap.keySet().size() > 300) {
-									executeSaveForUpdate();
-								}
-							}else {
-								addChannelMap.put(channel.getChannelId(), channel);
-								if (userSetting.getDeviceStatusNotify()) {
-									// 鍙戦�乺edis娑堟伅
-									redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true);
-								}
-
-								if (addChannelMap.keySet().size() > 300) {
-									executeSaveForAdd();
-								}
-							}
-
-							break;
-						case CatalogEvent.DEL:
-							// 鍒犻櫎
-							logger.info("[鏀跺埌鍒犻櫎閫氶亾閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId());
-							deleteChannelList.add(channel);
-							if (userSetting.getDeviceStatusNotify()) {
-								// 鍙戦�乺edis娑堟伅
-								redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), false);
-							}
-							if (deleteChannelList.size() > 300) {
-								executeSaveForDelete();
-							}
-							break;
-						case CatalogEvent.UPDATE:
-							// 鏇存柊
-							logger.info("[鏀跺埌鏇存柊閫氶亾閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId());
-							// 鍒ゆ柇姝ら�氶亾鏄惁瀛樺湪
-							DeviceChannel deviceChannelForUpdate = deviceChannelService.getOne(deviceId, channel.getChannelId());
-							if (deviceChannelForUpdate != null) {
-								channel.setId(deviceChannelForUpdate.getId());
-								channel.setUpdateTime(DateUtil.getNow());
-								updateChannelMap.put(channel.getChannelId(), channel);
-								if (updateChannelMap.keySet().size() > 300) {
-									executeSaveForUpdate();
-								}
-							}else {
-								addChannelMap.put(channel.getChannelId(), channel);
-								if (addChannelMap.keySet().size() > 300) {
-									executeSaveForAdd();
-								}
-								if (userSetting.getDeviceStatusNotify()) {
-									// 鍙戦�乺edis娑堟伅
-									redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true);
-								}
-							}
-							break;
-						default:
-							logger.warn("[ NotifyCatalog ] event not found 锛� {}", event );
-
-					}
-					// 杞彂鍙樺寲淇℃伅
-					eventPublisher.catalogEventPublish(null, channel, event);
-
-					if (!updateChannelMap.keySet().isEmpty()
-							|| !addChannelMap.keySet().isEmpty()
-							|| !updateChannelOnlineList.isEmpty()
-							|| !updateChannelOfflineList.isEmpty()
-							|| !deleteChannelList.isEmpty()) {
-
-						if (!dynamicTask.contains(talkKey)) {
-							dynamicTask.startDelay(talkKey, this::executeSave, 1000);
+					// 閬嶅巻DeviceList
+					while (deviceListIterator.hasNext()) {
+						Element itemDevice = deviceListIterator.next();
+						Element eventElement = itemDevice.element("Event");
+						String event;
+						if (eventElement == null) {
+							logger.warn("[鏀跺埌鐩綍璁㈤槄]锛歿}, 浣嗘槸Event涓虹┖, 璁句负榛樿鍊� ADD", (device != null ? device.getDeviceId():"" ));
+							event = CatalogEvent.ADD;
+						}else {
+							event = eventElement.getText().toUpperCase();
 						}
+						DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event);
+						if (channel == null) {
+							logger.info("[鏀跺埌鐩綍璁㈤槄]锛氫絾鏄В鏋愬け璐� {}", new String(evt.getRequest().getRawContent()));
+							continue;
+						}
+						if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) {
+							channel.setParentId(null);
+						}
+						channel.setDeviceId(device.getDeviceId());
+						logger.info("[鏀跺埌鐩綍璁㈤槄]锛歿}, {}/{}",event, device.getDeviceId(), channel.getChannelId());
+						switch (event) {
+							case CatalogEvent.ON:
+								// 涓婄嚎
+								deviceChannelService.online(channel);
+								if (userSetting.getDeviceStatusNotify()) {
+									// 鍙戦�乺edis娑堟伅
+									redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), true);
+								}
+
+								break;
+							case CatalogEvent.OFF :
+							case CatalogEvent.VLOST:
+							case CatalogEvent.DEFECT:
+								// 绂荤嚎
+								if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
+									logger.info("[鐩綍璁㈤槄] 绂荤嚎 浣嗘槸骞冲彴宸查厤缃嫆缁濇娑堟伅锛屾潵鑷澶�: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId());
+								}else {
+									deviceChannelService.offline(channel);
+									if (userSetting.getDeviceStatusNotify()) {
+										// 鍙戦�乺edis娑堟伅
+										redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
+									}
+								}
+								break;
+							case CatalogEvent.DEL:
+								// 鍒犻櫎
+								deviceChannelService.delete(channel);
+								if (userSetting.getDeviceStatusNotify()) {
+									// 鍙戦�乺edis娑堟伅
+									redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), false);
+								}
+								break;
+							case CatalogEvent.ADD:
+							case CatalogEvent.UPDATE:
+								// 鏇存柊
+								channel.setUpdateTime(DateUtil.getNow());
+								deviceChannelService.updateChannel(deviceId,channel);
+								if (userSetting.getDeviceStatusNotify()) {
+									// 鍙戦�乺edis娑堟伅
+									redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true);
+								}
+								break;
+							default:
+								logger.warn("[ NotifyCatalog ] event not found 锛� {}", event );
+
+						}
+						// 杞彂鍙樺寲淇℃伅
+						eventPublisher.catalogEventPublish(null, channel, event);
 					}
 				}
+			} catch (DocumentException e) {
+				logger.error("鏈鐞嗙殑寮傚父 ", e);
 			}
-		} catch (DocumentException e) {
-			logger.error("鏈鐞嗙殑寮傚父 ", e);
 		}
 	}
-
-	private void executeSave(){
-		try {
-			executeSaveForAdd();
-		} catch (Exception e) {
-			logger.error("[瀛樺偍鏀跺埌鐨勫鍔犻�氶亾] 寮傚父锛� ", e );
-		}
-		try {
-			executeSaveForUpdate();
-		} catch (Exception e) {
-			logger.error("[瀛樺偍鏀跺埌鐨勬洿鏂伴�氶亾] 寮傚父锛� ", e );
-		}
-		try {
-			executeSaveForDelete();
-		} catch (Exception e) {
-			logger.error("[瀛樺偍鏀跺埌鐨勫垹闄ら�氶亾] 寮傚父锛� ", e );
-		}
-		try {
-			executeSaveForOnline();
-		} catch (Exception e) {
-			logger.error("[瀛樺偍鏀跺埌鐨勯�氶亾涓婄嚎] 寮傚父锛� ", e );
-		}
-		try {
-			executeSaveForOffline();
-		} catch (Exception e) {
-			logger.error("[瀛樺偍鏀跺埌鐨勯�氶亾绂荤嚎] 寮傚父锛� ", e );
-		}
-		dynamicTask.stop(talkKey);
-	}
-
-	private void executeSaveForUpdate(){
-		if (!updateChannelMap.values().isEmpty()) {
-			ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values());
-			updateChannelMap.clear();
-			deviceChannelService.batchUpdateChannel(deviceChannels);
-		}
-
-	}
-
-	private void executeSaveForAdd(){
-		if (!addChannelMap.values().isEmpty()) {
-			ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(addChannelMap.values());
-			addChannelMap.clear();
-			deviceChannelService.batchAddChannel(deviceChannels);
-		}
-	}
-
-	private void executeSaveForDelete(){
-		if (!deleteChannelList.isEmpty()) {
-			deviceChannelService.deleteChannels(deleteChannelList);
-			deleteChannelList.clear();
-		}
-	}
-
-	private void executeSaveForOnline(){
-		if (!updateChannelOnlineList.isEmpty()) {
-			deviceChannelService.channelsOnline(updateChannelOnlineList);
-			updateChannelOnlineList.clear();
-		}
-	}
-
-	private void executeSaveForOffline(){
-		if (!updateChannelOfflineList.isEmpty()) {
-			deviceChannelService.channelsOffline(updateChannelOfflineList);
-			updateChannelOfflineList.clear();
-		}
-	}
-
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java
new file mode 100755
index 0000000..05a5336
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java
@@ -0,0 +1,199 @@
+package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
+
+import com.alibaba.fastjson2.JSONObject;
+import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.gb28181.bean.Device;
+import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
+import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
+import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
+import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
+import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
+import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
+import com.genersoft.iot.vmp.service.IDeviceChannelService;
+import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
+import com.genersoft.iot.vmp.utils.DateUtil;
+import org.dom4j.DocumentException;
+import org.dom4j.Element;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
+import org.springframework.util.ObjectUtils;
+
+import javax.sip.RequestEvent;
+import javax.sip.header.FromHeader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * SIP鍛戒护绫诲瀷锛� NOTIFY璇锋眰涓殑绉诲姩浣嶇疆璇锋眰澶勭悊
+ */
+@Component
+public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessorParent {
+
+
+    private final static Logger logger = LoggerFactory.getLogger(NotifyRequestForMobilePositionProcessor.class);
+
+
+	@Autowired
+	private UserSetting userSetting;
+
+	@Autowired
+	private EventPublisher eventPublisher;
+
+	@Autowired
+	private IRedisCatchStorage redisCatchStorage;
+
+	@Autowired
+	private IDeviceChannelService deviceChannelService;
+
+	@Transactional
+	public void process(List<RequestEvent> eventList) {
+		if (eventList.isEmpty()) {
+			return;
+		}
+		Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>();
+		List<MobilePosition> addMobilePositionList = new ArrayList<>();
+		for (RequestEvent evt : eventList) {
+			try {
+				FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
+				String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
+				long startTime = System.currentTimeMillis();
+				// 鍥炲 200 OK
+				Element rootElement = getRootElement(evt);
+				if (rootElement == null) {
+					logger.error("澶勭悊MobilePosition绉诲姩浣嶇疆Notify鏃舵湭鑾峰彇鍒版秷鎭綋,{}", evt.getRequest());
+					return;
+				}
+				Device device = redisCatchStorage.getDevice(deviceId);
+				if (device == null) {
+					logger.error("澶勭悊MobilePosition绉诲姩浣嶇疆Notify鏃舵湭鑾峰彇鍒癲evice,{}", deviceId);
+					return;
+				}
+				MobilePosition mobilePosition = new MobilePosition();
+				mobilePosition.setDeviceId(device.getDeviceId());
+				mobilePosition.setDeviceName(device.getName());
+				mobilePosition.setCreateTime(DateUtil.getNow());
+				List<Element> elements = rootElement.elements();
+				for (Element element : elements) {
+					switch (element.getName()){
+						case "DeviceID":
+							String channelId = element.getStringValue();
+							if (!deviceId.equals(channelId)) {
+								mobilePosition.setChannelId(channelId);
+							}
+							continue;
+						case "Time":
+							String timeVal = element.getStringValue();
+							if (ObjectUtils.isEmpty(timeVal)) {
+								mobilePosition.setTime(DateUtil.getNow());
+							} else {
+								mobilePosition.setTime(SipUtils.parseTime(timeVal));
+							}
+							continue;
+						case "Longitude":
+							mobilePosition.setLongitude(Double.parseDouble(element.getStringValue()));
+							continue;
+						case "Latitude":
+							mobilePosition.setLatitude(Double.parseDouble(element.getStringValue()));
+							continue;
+						case "Speed":
+							String speedVal = element.getStringValue();
+							if (NumericUtil.isDouble(speedVal)) {
+								mobilePosition.setSpeed(Double.parseDouble(speedVal));
+							} else {
+								mobilePosition.setSpeed(0.0);
+							}
+							continue;
+						case "Direction":
+							String directionVal = element.getStringValue();
+							if (NumericUtil.isDouble(directionVal)) {
+								mobilePosition.setDirection(Double.parseDouble(directionVal));
+							} else {
+								mobilePosition.setDirection(0.0);
+							}
+							continue;
+						case "Altitude":
+							String altitudeVal = element.getStringValue();
+							if (NumericUtil.isDouble(altitudeVal)) {
+								mobilePosition.setAltitude(Double.parseDouble(altitudeVal));
+							} else {
+								mobilePosition.setAltitude(0.0);
+							}
+							continue;
+
+					}
+				}
+
+//			logger.info("[鏀跺埌绉诲姩浣嶇疆璁㈤槄閫氱煡]锛歿}/{}->{}.{}, 鏃堕棿锛� {}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
+//					mobilePosition.getLongitude(), mobilePosition.getLatitude(), System.currentTimeMillis() - startTime);
+				mobilePosition.setReportSource("Mobile Position");
+
+				// 鏇存柊device channel 鐨勭粡绾害
+				DeviceChannel deviceChannel = new DeviceChannel();
+				deviceChannel.setDeviceId(device.getDeviceId());
+				deviceChannel.setLongitude(mobilePosition.getLongitude());
+				deviceChannel.setLatitude(mobilePosition.getLatitude());
+				deviceChannel.setGpsTime(mobilePosition.getTime());
+				updateChannelMap.put(deviceId + mobilePosition.getChannelId(), deviceChannel);
+				addMobilePositionList.add(mobilePosition);
+
+
+				// 鍚戝叧鑱斾簡璇ラ�氶亾骞朵笖寮�鍚Щ鍔ㄤ綅缃闃呯殑涓婄骇骞冲彴鍙戦�佺Щ鍔ㄤ綅缃闃呮秷鎭�
+				try {
+					eventPublisher.mobilePositionEventPublish(mobilePosition);
+				}catch (Exception e) {
+					logger.error("[鍚戜笂绾ц浆鍙戠Щ鍔ㄤ綅缃け璐 ", e);
+				}
+				if (mobilePosition.getChannelId().equals(mobilePosition.getDeviceId()) || mobilePosition.getChannelId() == null) {
+					List<DeviceChannel> channels = deviceChannelService.queryChaneListByDeviceId(mobilePosition.getDeviceId());
+					channels.forEach(channel -> {
+						// 鍙戦�乺edis娑堟伅銆� 閫氱煡浣嶇疆淇℃伅鐨勫彉鍖�
+						JSONObject jsonObject = new JSONObject();
+						jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime()));
+						jsonObject.put("serial", channel.getDeviceId());
+						jsonObject.put("code", channel.getChannelId());
+						jsonObject.put("longitude", mobilePosition.getLongitude());
+						jsonObject.put("latitude", mobilePosition.getLatitude());
+						jsonObject.put("altitude", mobilePosition.getAltitude());
+						jsonObject.put("direction", mobilePosition.getDirection());
+						jsonObject.put("speed", mobilePosition.getSpeed());
+						redisCatchStorage.sendMobilePositionMsg(jsonObject);
+					});
+				}else {
+					// 鍙戦�乺edis娑堟伅銆� 閫氱煡浣嶇疆淇℃伅鐨勫彉鍖�
+					JSONObject jsonObject = new JSONObject();
+					jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime()));
+					jsonObject.put("serial", mobilePosition.getDeviceId());
+					jsonObject.put("code", mobilePosition.getChannelId());
+					jsonObject.put("longitude", mobilePosition.getLongitude());
+					jsonObject.put("latitude", mobilePosition.getLatitude());
+					jsonObject.put("altitude", mobilePosition.getAltitude());
+					jsonObject.put("direction", mobilePosition.getDirection());
+					jsonObject.put("speed", mobilePosition.getSpeed());
+					redisCatchStorage.sendMobilePositionMsg(jsonObject);
+				}
+			} catch (DocumentException e) {
+				logger.error("鏈鐞嗙殑寮傚父 ", e);
+			}
+		}
+		if(!updateChannelMap.isEmpty()) {
+			List<DeviceChannel>  channels = new ArrayList<>(updateChannelMap.values());
+			logger.info("[绉诲姩浣嶇疆璁㈤槄]鏇存柊閫氶亾浣嶇疆锛� {}", channels.size());
+			deviceChannelService.batchUpdateChannelGPS(channels);
+			updateChannelMap.clear();
+		}
+		if (userSetting.isSavePositionHistory() && !addMobilePositionList.isEmpty()) {
+			try {
+				logger.info("[绉诲姩浣嶇疆璁㈤槄] 娣诲姞閫氶亾杞ㄨ抗鐐逛綅锛� {}", addMobilePositionList.size());
+				deviceChannelService.batchAddMobilePosition(addMobilePositionList);
+			}catch (Exception e) {
+				logger.info("[绉诲姩浣嶇疆璁㈤槄] b娣诲姞閫氶亾杞ㄨ抗鐐逛綅淇濆瓨澶辫触锛� {}", addMobilePositionList.size());
+			}
+			addMobilePositionList.clear();
+		}
+	}
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
index e54aa2d..8a42624 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
@@ -25,6 +25,8 @@
 import org.springframework.beans.factory.InitializingBean;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Component;
 import org.springframework.util.ObjectUtils;
@@ -35,6 +37,7 @@
 import javax.sip.header.FromHeader;
 import javax.sip.message.Response;
 import java.text.ParseException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
@@ -76,6 +79,9 @@
 	@Autowired
 	private NotifyRequestForCatalogProcessor notifyRequestForCatalogProcessor;
 
+	@Autowired
+	private NotifyRequestForMobilePositionProcessor notifyRequestForMobilePositionProcessor;
+
 	private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
 
 	@Qualifier("taskExecutor")
@@ -97,61 +103,73 @@
 				responseAck((SIPRequest) evt.getRequest(), Response.BUSY_HERE, null, null);
 				logger.error("[notify] 寰呭鐞嗘秷鎭槦鍒楀凡婊� {}锛岃繑鍥�486 BUSY_HERE锛屾秷鎭笉鍋氬鐞�", userSetting.getMaxNotifyCountQueue());
 				return;
-			}else {
+			} else {
 				responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null);
 			}
 
-		}catch (SipException | InvalidArgumentException | ParseException e) {
+		} catch (SipException | InvalidArgumentException | ParseException e) {
 			logger.error("鏈鐞嗙殑寮傚父 ", e);
 		}
 		boolean runed = !taskQueue.isEmpty();
-		logger.info("[notify] 寰呭鐞嗘秷鎭暟閲忥細 {}", taskQueue.size());
 		taskQueue.offer(new HandlerCatchData(evt, null, null));
-		if (!runed) {
-			taskExecutor.execute(()-> {
-				while (!taskQueue.isEmpty()) {
-					try {
-						HandlerCatchData take = taskQueue.poll();
-						if (take == null) {
-							continue;
-						}
-						Element rootElement = getRootElement(take.getEvt());
-						if (rootElement == null) {
-							logger.error("澶勭悊NOTIFY娑堟伅鏃舵湭鑾峰彇鍒版秷鎭綋,{}", take.getEvt().getRequest());
-							continue;
-						}
-						String cmd = XmlUtil.getText(rootElement, "CmdType");
-
-						if (CmdType.CATALOG.equals(cmd)) {
-							logger.info("鎺ユ敹鍒癈atalog閫氱煡");
-							notifyRequestForCatalogProcessor.process(take.getEvt());
-						} else if (CmdType.ALARM.equals(cmd)) {
-							logger.info("鎺ユ敹鍒癆larm閫氱煡");
-							processNotifyAlarm(take.getEvt());
-						} else if (CmdType.MOBILE_POSITION.equals(cmd)) {
-							logger.info("鎺ユ敹鍒癕obilePosition閫氱煡");
-							processNotifyMobilePosition(take.getEvt());
-						} else {
-							logger.info("鎺ユ敹鍒版秷鎭細" + cmd);
-						}
-					} catch (DocumentException e) {
-						logger.error("澶勭悊NOTIFY娑堟伅鏃堕敊璇�", e);
-					}
+	}
+	@Scheduled(fixedRate = 200)   //姣�200姣鎵ц涓�娆�
+	public void executeTaskQueue(){
+		if (taskQueue.isEmpty()) {
+			return;
+		}
+		try {
+			List<RequestEvent> catalogEventList = new ArrayList<>();
+			List<RequestEvent> alarmEventList = new ArrayList<>();
+			List<RequestEvent> mobilePositionEventList = new ArrayList<>();
+			for (HandlerCatchData take : taskQueue) {
+				if (take == null) {
+					continue;
 				}
-			});
+				Element rootElement = getRootElement(take.getEvt());
+				if (rootElement == null) {
+					logger.error("澶勭悊NOTIFY娑堟伅鏃舵湭鑾峰彇鍒版秷鎭綋,{}", take.getEvt().getRequest());
+					continue;
+				}
+				String cmd = XmlUtil.getText(rootElement, "CmdType");
+
+				if (CmdType.CATALOG.equals(cmd)) {
+					catalogEventList.add(take.getEvt());
+				} else if (CmdType.ALARM.equals(cmd)) {
+					alarmEventList.add(take.getEvt());
+				} else if (CmdType.MOBILE_POSITION.equals(cmd)) {
+					mobilePositionEventList.add(take.getEvt());
+				} else {
+					logger.info("鎺ユ敹鍒版秷鎭細" + cmd);
+				}
+			}
+			taskQueue.clear();
+			if (!alarmEventList.isEmpty()) {
+				processNotifyAlarm(alarmEventList);
+			}
+			if (!catalogEventList.isEmpty()) {
+				notifyRequestForCatalogProcessor.process(catalogEventList);
+			}
+			if (!mobilePositionEventList.isEmpty()) {
+				notifyRequestForMobilePositionProcessor.process(mobilePositionEventList);
+			}
+		} catch (DocumentException e) {
+			logger.error("澶勭悊NOTIFY娑堟伅鏃堕敊璇�", e);
 		}
 	}
+
+
 
 	/**
 	 * 澶勭悊MobilePosition绉诲姩浣嶇疆Notify
 	 *
 	 * @param evt
 	 */
-	private void processNotifyMobilePosition(RequestEvent evt) {
+	@Async("taskExecutor")
+	public void processNotifyMobilePosition(RequestEvent evt) {
 		try {
 			FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
 			String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
-
 			// 鍥炲 200 OK
 			Element rootElement = getRootElement(evt);
 			if (rootElement == null) {
@@ -179,6 +197,13 @@
 			if (device == null) {
 				logger.warn("[mobilePosition绉诲姩浣嶇疆Notify] 鏈壘鍒伴�氶亾{}鎵�灞炵殑璁惧", channelId);
 				return;
+			}
+			// 鍏煎璁惧閮ㄥ垎璁惧涓婃姤鏄�氶亾缂栧彿涓庤澶囩紪鍙蜂竴鑷寸殑鎯呭喌
+			if(deviceId.equals(channelId)) {
+				List<DeviceChannel> deviceChannels = deviceChannelService.queryChaneListByDeviceId(deviceId);
+				if (deviceChannels.size() == 1) {
+					channelId = deviceChannels.get(0).getChannelId();
+				}
 			}
 			if (!ObjectUtils.isEmpty(device.getName())) {
 				mobilePosition.setDeviceName(device.getName());
@@ -210,8 +235,8 @@
 			} else {
 				mobilePosition.setAltitude(0.0);
 			}
-			logger.info("[鏀跺埌绉诲姩浣嶇疆璁㈤槄閫氱煡]锛歿}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
-					mobilePosition.getLongitude(), mobilePosition.getLatitude());
+//			logger.info("[鏀跺埌绉诲姩浣嶇疆璁㈤槄閫氱煡]锛歿}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
+//					mobilePosition.getLongitude(), mobilePosition.getLatitude());
 			mobilePosition.setReportSource("Mobile Position");
 
 			// 鏇存柊device channel 鐨勭粡绾害
@@ -221,12 +246,12 @@
 			deviceChannel.setLongitude(mobilePosition.getLongitude());
 			deviceChannel.setLatitude(mobilePosition.getLatitude());
 			deviceChannel.setGpsTime(mobilePosition.getTime());
-			deviceChannel = deviceChannelService.updateGps(deviceChannel, device);
-
-			mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84());
-			mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84());
-			mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
-			mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
+//			deviceChannel = deviceChannelService.updateGps(deviceChannel, device);
+//
+//			mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84());
+//			mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84());
+//			mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
+//			mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
 
 			deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
 
@@ -237,95 +262,97 @@
 
 	/***
 	 * 澶勭悊alarm璁惧鎶ヨNotify
-	 *
-	 * @param evt
 	 */
-	private void processNotifyAlarm(RequestEvent evt) {
+	private void processNotifyAlarm(List<RequestEvent> evtList) {
 		if (!sipConfig.isAlarm()) {
 			return;
 		}
-		try {
-			FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
-			String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
+		if (!evtList.isEmpty()) {
+			for (RequestEvent evt : evtList) {
+				try {
+					FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
+					String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
 
-			Element rootElement = getRootElement(evt);
-			if (rootElement == null) {
-				logger.error("澶勭悊alarm璁惧鎶ヨNotify鏃舵湭鑾峰彇鍒版秷鎭綋{}", evt.getRequest());
-				return;
-			}
-			Element deviceIdElement = rootElement.element("DeviceID");
-			String channelId = deviceIdElement.getText().toString();
+					Element rootElement = getRootElement(evt);
+					if (rootElement == null) {
+						logger.error("澶勭悊alarm璁惧鎶ヨNotify鏃舵湭鑾峰彇鍒版秷鎭綋{}", evt.getRequest());
+						return;
+					}
+					Element deviceIdElement = rootElement.element("DeviceID");
+					String channelId = deviceIdElement.getText().toString();
 
-			Device device = redisCatchStorage.getDevice(deviceId);
-			if (device == null) {
-				logger.warn("[ NotifyAlarm ] 鏈壘鍒拌澶囷細{}", deviceId);
-				return;
-			}
-			rootElement = getRootElement(evt, device.getCharset());
-			if (rootElement == null) {
-				logger.warn("[ NotifyAlarm ] content cannot be null, {}", evt.getRequest());
-				return;
-			}
-			DeviceAlarm deviceAlarm = new DeviceAlarm();
-			deviceAlarm.setDeviceId(deviceId);
-			deviceAlarm.setAlarmPriority(XmlUtil.getText(rootElement, "AlarmPriority"));
-			deviceAlarm.setAlarmMethod(XmlUtil.getText(rootElement, "AlarmMethod"));
-			String alarmTime = XmlUtil.getText(rootElement, "AlarmTime");
-			if (alarmTime == null) {
-				logger.warn("[ NotifyAlarm ] AlarmTime cannot be null");
-				return;
-			}
-			deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime));
-			if (XmlUtil.getText(rootElement, "AlarmDescription") == null) {
-				deviceAlarm.setAlarmDescription("");
-			} else {
-				deviceAlarm.setAlarmDescription(XmlUtil.getText(rootElement, "AlarmDescription"));
-			}
-			if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Longitude"))) {
-				deviceAlarm.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude")));
-			} else {
-				deviceAlarm.setLongitude(0.00);
-			}
-			if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Latitude"))) {
-				deviceAlarm.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude")));
-			} else {
-				deviceAlarm.setLatitude(0.00);
-			}
-			logger.info("[鏀跺埌Notify-Alarm]锛歿}/{}", device.getDeviceId(), deviceAlarm.getChannelId());
-			if ("4".equals(deviceAlarm.getAlarmMethod())) {
-				MobilePosition mobilePosition = new MobilePosition();
-				mobilePosition.setChannelId(channelId);
-				mobilePosition.setCreateTime(DateUtil.getNow());
-				mobilePosition.setDeviceId(deviceAlarm.getDeviceId());
-				mobilePosition.setTime(deviceAlarm.getAlarmTime());
-				mobilePosition.setLongitude(deviceAlarm.getLongitude());
-				mobilePosition.setLatitude(deviceAlarm.getLatitude());
-				mobilePosition.setReportSource("GPS Alarm");
+					Device device = redisCatchStorage.getDevice(deviceId);
+					if (device == null) {
+						logger.warn("[ NotifyAlarm ] 鏈壘鍒拌澶囷細{}", deviceId);
+						return;
+					}
+					rootElement = getRootElement(evt, device.getCharset());
+					if (rootElement == null) {
+						logger.warn("[ NotifyAlarm ] content cannot be null, {}", evt.getRequest());
+						return;
+					}
+					DeviceAlarm deviceAlarm = new DeviceAlarm();
+					deviceAlarm.setDeviceId(deviceId);
+					deviceAlarm.setAlarmPriority(XmlUtil.getText(rootElement, "AlarmPriority"));
+					deviceAlarm.setAlarmMethod(XmlUtil.getText(rootElement, "AlarmMethod"));
+					String alarmTime = XmlUtil.getText(rootElement, "AlarmTime");
+					if (alarmTime == null) {
+						logger.warn("[ NotifyAlarm ] AlarmTime cannot be null");
+						return;
+					}
+					deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime));
+					if (XmlUtil.getText(rootElement, "AlarmDescription") == null) {
+						deviceAlarm.setAlarmDescription("");
+					} else {
+						deviceAlarm.setAlarmDescription(XmlUtil.getText(rootElement, "AlarmDescription"));
+					}
+					if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Longitude"))) {
+						deviceAlarm.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude")));
+					} else {
+						deviceAlarm.setLongitude(0.00);
+					}
+					if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Latitude"))) {
+						deviceAlarm.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude")));
+					} else {
+						deviceAlarm.setLatitude(0.00);
+					}
+					logger.info("[鏀跺埌Notify-Alarm]锛歿}/{}", device.getDeviceId(), deviceAlarm.getChannelId());
+					if ("4".equals(deviceAlarm.getAlarmMethod())) {
+						MobilePosition mobilePosition = new MobilePosition();
+						mobilePosition.setChannelId(channelId);
+						mobilePosition.setCreateTime(DateUtil.getNow());
+						mobilePosition.setDeviceId(deviceAlarm.getDeviceId());
+						mobilePosition.setTime(deviceAlarm.getAlarmTime());
+						mobilePosition.setLongitude(deviceAlarm.getLongitude());
+						mobilePosition.setLatitude(deviceAlarm.getLatitude());
+						mobilePosition.setReportSource("GPS Alarm");
 
-				// 鏇存柊device channel 鐨勭粡绾害
-				DeviceChannel deviceChannel = new DeviceChannel();
-				deviceChannel.setDeviceId(device.getDeviceId());
-				deviceChannel.setChannelId(channelId);
-				deviceChannel.setLongitude(mobilePosition.getLongitude());
-				deviceChannel.setLatitude(mobilePosition.getLatitude());
-				deviceChannel.setGpsTime(mobilePosition.getTime());
+						// 鏇存柊device channel 鐨勭粡绾害
+						DeviceChannel deviceChannel = new DeviceChannel();
+						deviceChannel.setDeviceId(device.getDeviceId());
+						deviceChannel.setChannelId(channelId);
+						deviceChannel.setLongitude(mobilePosition.getLongitude());
+						deviceChannel.setLatitude(mobilePosition.getLatitude());
+						deviceChannel.setGpsTime(mobilePosition.getTime());
 
-				deviceChannel = deviceChannelService.updateGps(deviceChannel, device);
+						deviceChannel = deviceChannelService.updateGps(deviceChannel, device);
 
-				mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84());
-				mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84());
-				mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
-				mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
+						mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84());
+						mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84());
+						mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
+						mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
 
-				deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
+						deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
+					}
+
+					// 鍥炲200 OK
+					if (redisCatchStorage.deviceIsOnline(deviceId)) {
+						publisher.deviceAlarmEventPublish(deviceAlarm);
+					}
+				} catch (DocumentException e) {
+					logger.error("鏈鐞嗙殑寮傚父 ", e);
+				}
 			}
-
-			// 鍥炲200 OK
-			if (redisCatchStorage.deviceIsOnline(deviceId)) {
-				publisher.deviceAlarmEventPublish(deviceAlarm);
-			}
-		} catch (DocumentException e) {
-			logger.error("鏈鐞嗙殑寮傚父 ", e);
 		}
 	}
 
@@ -353,4 +380,9 @@
 	public void setRedisCatchStorage(IRedisCatchStorage redisCatchStorage) {
 		this.redisCatchStorage = redisCatchStorage;
 	}
+
+	@Scheduled(fixedRate = 10000)   //姣�1绉掓墽琛屼竴娆�
+	public void execute(){
+		logger.info("[寰呭鐞哊otify娑堟伅鏁伴噺]: {}", taskQueue.size());
+	}
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/utils/XmlUtil.java b/src/main/java/com/genersoft/iot/vmp/gb28181/utils/XmlUtil.java
index 70702bb..6596f53 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/utils/XmlUtil.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/utils/XmlUtil.java
@@ -532,16 +532,17 @@
                     String status = getText(itemDevice, "Status");
                     if (status != null) {
                         // ONLINE OFFLINE HIKVISION DS-7716N-E4 NVR鐨勫吋瀹规�у鐞�
-                        if (status.equals("ON") || status.equals("On") || status.equals("ONLINE") || status.equals("OK")) {
+                        if (status.equalsIgnoreCase("ON") || status.equalsIgnoreCase("On") || status.equalsIgnoreCase("ONLINE") || status.equalsIgnoreCase("OK")) {
                             deviceChannel.setStatus(true);
                         }
-                        if (status.equals("OFF") || status.equals("Off") || status.equals("OFFLINE")) {
+                        if (status.equalsIgnoreCase("OFF") || status.equalsIgnoreCase("Off") || status.equalsIgnoreCase("OFFLINE")) {
                             deviceChannel.setStatus(false);
                         }
                     }else {
                         deviceChannel.setStatus(true);
                     }
-
+//                    logger.info("鐘舵�佸瓧绗︿覆锛� {}", status);
+//                    logger.info("鐘舵�佺粨鏋滐細 {}", deviceChannel.isStatus());
                     // 缁忓害
                     String longitude = getText(itemDevice, "Longitude");
                     if (NumericUtil.isDouble(longitude)) {
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
index f5b1b6c..030dd7d 100755
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -5,6 +5,8 @@
 import com.genersoft.iot.vmp.conf.UserSetting;
 import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
 import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
+import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
+import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
 import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
 import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
 import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
@@ -21,6 +23,8 @@
 import com.genersoft.iot.vmp.media.zlm.event.HookZlmServerKeepaliveEvent;
 import com.genersoft.iot.vmp.media.zlm.event.HookZlmServerStartEvent;
 import com.genersoft.iot.vmp.service.*;
+import com.genersoft.iot.vmp.service.bean.SSRCInfo;
+import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
 import org.slf4j.Logger;
@@ -66,6 +70,10 @@
     @Autowired
     private IRedisCatchStorage redisCatchStorage;
 
+
+    @Autowired
+    private IRedisRpcService redisRpcService;
+
     @Autowired
     private IInviteStreamService inviteStreamService;
 
@@ -86,9 +94,6 @@
 
     @Autowired
     private EventPublisher eventPublisher;
-
-    @Autowired
-    private ZLMMediaListManager zlmMediaListManager;
 
     @Autowired
     private HookSubscribe subscribe;
@@ -117,6 +122,9 @@
 
     @Autowired
     private ApplicationEventPublisher applicationEventPublisher;
+
+    @Autowired
+    private IStreamPushService streamPushService;
 
     /**
      * 鏈嶅姟鍣ㄥ畾鏃朵笂鎶ユ椂闂达紝涓婃姤闂撮殧鍙厤缃紝榛樿10s涓婃姤涓�娆�
@@ -172,6 +180,54 @@
         if (mediaServer == null) {
             return new HookResultForOnPublish(0, "success");
         }
+        // 鎺ㄦ祦閴存潈鐨勫鐞�
+        if (!"rtp".equals(param.getApp())) {
+            StreamProxyItem stream = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream());
+            if (stream != null) {
+                HookResultForOnPublish result = HookResultForOnPublish.SUCCESS();
+                result.setEnable_audio(stream.isEnableAudio());
+                result.setEnable_mp4(stream.isEnableMp4());
+                return result;
+            }
+            if (userSetting.getPushAuthority()) {
+                // 鎺ㄦ祦閴存潈
+                if (param.getParams() == null) {
+                    logger.info("鎺ㄦ祦閴存潈澶辫触锛� 缂哄皯蹇呰鍙傛暟锛歴ign=md5(user琛ㄧ殑pushKey)");
+                    return new HookResultForOnPublish(401, "Unauthorized");
+                }
+                Map<String, String> paramMap = urlParamToMap(param.getParams());
+                String sign = paramMap.get("sign");
+                if (sign == null) {
+                    logger.info("鎺ㄦ祦閴存潈澶辫触锛� 缂哄皯蹇呰鍙傛暟锛歴ign=md5(user琛ㄧ殑pushKey)");
+                    return new HookResultForOnPublish(401, "Unauthorized");
+                }
+                // 鎺ㄦ祦鑷畾涔夋挱鏀鹃壌鏉冪爜
+                String callId = paramMap.get("callId");
+                // 閴存潈閰嶇疆
+                boolean hasAuthority = userService.checkPushAuthority(callId, sign);
+                if (!hasAuthority) {
+                    logger.info("鎺ㄦ祦閴存潈澶辫触锛� sign 鏃犳潈闄�: callId={}. sign={}", callId, sign);
+                    return new HookResultForOnPublish(401, "Unauthorized");
+                }
+                StreamAuthorityInfo streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(param);
+                streamAuthorityInfo.setCallId(callId);
+                streamAuthorityInfo.setSign(sign);
+                // 閴存潈閫氳繃
+                redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), param.getStream(), streamAuthorityInfo);
+            }
+        } else {
+            zlmMediaListManager.sendStreamEvent(param.getApp(), param.getStream(), param.getMediaServerId());
+        }
+
+
+        HookResultForOnPublish result = HookResultForOnPublish.SUCCESS();
+        result.setEnable_audio(true);
+        taskExecutor.execute(() -> {
+            ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_publish, json);
+            if (subscribe != null) {
+                subscribe.response(mediaInfo, param);
+            }
+        });
 
         ResultForOnPublish resultForOnPublish = mediaService.authenticatePublish(mediaServer, param.getApp(), param.getStream(), param.getParams());
         if (resultForOnPublish != null) {
@@ -207,6 +263,221 @@
             MediaDepartureEvent mediaDepartureEvent = MediaDepartureEvent.getInstance(this, param, mediaServer);
             applicationEventPublisher.publishEvent(mediaDepartureEvent);
         }
+
+        JSONObject json = (JSONObject) JSON.toJSON(param);
+        taskExecutor.execute(() -> {
+            ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_stream_changed, json);
+            MediaServerItem mediaInfo = mediaServerService.getOne(param.getMediaServerId());
+            if (mediaInfo == null) {
+                logger.info("[ZLM HOOK] 娴佸彉鍖栨湭鎵惧埌ZLM, {}", param.getMediaServerId());
+                return;
+            }
+            if (subscribe != null) {
+                subscribe.response(mediaInfo, param);
+            }
+
+            List<OnStreamChangedHookParam.MediaTrack> tracks = param.getTracks();
+            // TODO 閲嶆瀯姝ゅ閫昏緫
+            if (param.isRegist()) {
+                // 澶勭悊娴佹敞鍐岀殑閴存潈淇℃伅锛� 娴佹敞閿�杩欓噷涓嶅啀鍒犻櫎閴存潈淇℃伅锛屼笅娆℃潵浜嗘柊鐨勯壌鏉冧俊鎭細瀵瑰氨鐨勮繘琛岃鐩�
+                if (param.getOriginType() == OriginType.RTMP_PUSH.ordinal()
+                        || param.getOriginType() == OriginType.RTSP_PUSH.ordinal()
+                        || param.getOriginType() == OriginType.RTC_PUSH.ordinal()) {
+                    StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(param.getApp(), param.getStream());
+                    if (streamAuthorityInfo == null) {
+                        streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(param);
+                    } else {
+                        streamAuthorityInfo.setOriginType(param.getOriginType());
+                        streamAuthorityInfo.setOriginTypeStr(param.getOriginTypeStr());
+                    }
+                    redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), param.getStream(), streamAuthorityInfo);
+                }
+            }
+            if ("rtsp".equals(param.getSchema())) {
+                logger.info("娴佸彉鍖栵細娉ㄥ唽->{}, app->{}, stream->{}", param.isRegist(), param.getApp(), param.getStream());
+                if (param.isRegist()) {
+                    mediaServerService.addCount(param.getMediaServerId());
+                } else {
+                    mediaServerService.removeCount(param.getMediaServerId());
+                }
+
+                int updateStatusResult = streamProxyService.updateStatus(param.isRegist(), param.getApp(), param.getStream());
+                if (updateStatusResult > 0) {
+
+                }
+
+                if ("rtp".equals(param.getApp()) && !param.isRegist()) {
+                    InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, param.getStream());
+                    if (inviteInfo != null && (inviteInfo.getType() == InviteSessionType.PLAY || inviteInfo.getType() == InviteSessionType.PLAYBACK)) {
+                        inviteStreamService.removeInviteInfo(inviteInfo);
+                        storager.stopPlay(inviteInfo.getDeviceId(), inviteInfo.getChannelId());
+                    }
+                } else if ("broadcast".equals(param.getApp())) {
+                    // 璇煶瀵硅鎺ㄦ祦  stream闇�瑕佹弧瓒虫牸寮廳eviceId_channelId
+                    if (param.getStream().indexOf("_") > 0) {
+                        String[] streamArray = param.getStream().split("_");
+                        if (streamArray.length == 2) {
+                            String deviceId = streamArray[0];
+                            String channelId = streamArray[1];
+                            Device device = deviceService.getDevice(deviceId);
+                            if (device != null) {
+                                if (param.isRegist()) {
+                                    if (audioBroadcastManager.exit(deviceId, channelId)) {
+                                        playService.stopAudioBroadcast(deviceId, channelId);
+                                    }
+                                    // 寮�鍚闊冲璁查�氶亾
+                                    try {
+                                        playService.audioBroadcastCmd(device, channelId, mediaInfo, param.getApp(), param.getStream(), 60, false, (msg) -> {
+                                            logger.info("[璇煶瀵硅] 閫氶亾寤虹珛鎴愬姛, device: {}, channel: {}", deviceId, channelId);
+                                        });
+                                    } catch (InvalidArgumentException | ParseException | SipException e) {
+                                        logger.error("[鍛戒护鍙戦�佸け璐 璇煶瀵硅: {}", e.getMessage());
+                                    }
+                                } else {
+                                    // 娴佹敞閿�
+                                    playService.stopAudioBroadcast(deviceId, channelId);
+                                }
+                            } else {
+                                logger.info("[璇煶瀵硅] 鏈壘鍒拌澶囷細{}", deviceId);
+                            }
+                        }
+                    }
+                } else if ("talk".equals(param.getApp())) {
+                    // 璇煶瀵硅鎺ㄦ祦  stream闇�瑕佹弧瓒虫牸寮廳eviceId_channelId
+                    if (param.getStream().indexOf("_") > 0) {
+                        String[] streamArray = param.getStream().split("_");
+                        if (streamArray.length == 2) {
+                            String deviceId = streamArray[0];
+                            String channelId = streamArray[1];
+                            Device device = deviceService.getDevice(deviceId);
+                            if (device != null) {
+                                if (param.isRegist()) {
+                                    if (audioBroadcastManager.exit(deviceId, channelId)) {
+                                        playService.stopAudioBroadcast(deviceId, channelId);
+                                    }
+                                    // 寮�鍚闊冲璁查�氶亾
+                                    playService.talkCmd(device, channelId, mediaInfo, param.getStream(), (msg) -> {
+                                        logger.info("[璇煶瀵硅] 閫氶亾寤虹珛鎴愬姛, device: {}, channel: {}", deviceId, channelId);
+                                    });
+                                } else {
+                                    // 娴佹敞閿�
+                                    playService.stopTalk(device, channelId, param.isRegist());
+                                }
+                            } else {
+                                logger.info("[璇煶瀵硅] 鏈壘鍒拌澶囷細{}", deviceId);
+                            }
+                        }
+                    }
+
+                } else {
+                    if (!"rtp".equals(param.getApp())) {
+                        String type = OriginType.values()[param.getOriginType()].getType();
+                        if (param.isRegist()) {
+                            StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(
+                                    param.getApp(), param.getStream());
+                            String callId = null;
+                            if (streamAuthorityInfo != null) {
+                                callId = streamAuthorityInfo.getCallId();
+                            }
+                            StreamInfo streamInfoByAppAndStream = mediaService.getStreamInfoByAppAndStream(mediaInfo,
+                                    param.getApp(), param.getStream(), tracks, callId);
+                            param.setStreamInfo(new StreamContent(streamInfoByAppAndStream));
+                            redisCatchStorage.addStream(mediaInfo, type, param.getApp(), param.getStream(), param);
+                            if (param.getOriginType() == OriginType.RTSP_PUSH.ordinal()
+                                    || param.getOriginType() == OriginType.RTMP_PUSH.ordinal()
+                                    || param.getOriginType() == OriginType.RTC_PUSH.ordinal()) {
+                                param.setSeverId(userSetting.getServerId());
+                                zlmMediaListManager.addPush(param);
+
+                                // 鍐椾綑鏁版嵁锛岃嚜宸辩郴缁熶腑鑷敤
+                                redisCatchStorage.addPushListItem(param.getApp(), param.getStream(), param);
+                            }
+                        } else {
+                            // 鍏煎娴佹敞閿�鏃剁被鍨嬩粠redis璁板綍鑾峰彇
+                            OnStreamChangedHookParam onStreamChangedHookParam = redisCatchStorage.getStreamInfo(
+                                    param.getApp(), param.getStream(), param.getMediaServerId());
+                            if (onStreamChangedHookParam != null) {
+                                type = OriginType.values()[onStreamChangedHookParam.getOriginType()].getType();
+                                redisCatchStorage.removeStream(mediaInfo.getId(), type, param.getApp(), param.getStream());
+                                if ("PUSH".equalsIgnoreCase(type)) {
+                                    // 鍐椾綑鏁版嵁锛岃嚜宸辩郴缁熶腑鑷敤
+                                    redisCatchStorage.removePushListItem(param.getApp(), param.getStream(), param.getMediaServerId());
+                                }
+                            }
+                            GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream());
+                            if (gbStream != null) {
+//									eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF);
+                            }
+                            zlmMediaListManager.removeMedia(param.getApp(), param.getStream());
+                        }
+                        GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream());
+                        if (gbStream != null) {
+                            if (userSetting.isUsePushingAsStatus()) {
+                                eventPublisher.catalogEventPublishForStream(null, gbStream, param.isRegist() ? CatalogEvent.ON : CatalogEvent.OFF);
+                            }
+                        }
+                        if (type != null) {
+                            // 鍙戦�佹祦鍙樺寲redis娑堟伅
+                            JSONObject jsonObject = new JSONObject();
+                            jsonObject.put("serverId", userSetting.getServerId());
+                            jsonObject.put("app", param.getApp());
+                            jsonObject.put("stream", param.getStream());
+                            jsonObject.put("register", param.isRegist());
+                            jsonObject.put("mediaServerId", param.getMediaServerId());
+                            redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
+                        }
+                    }
+                }
+                if (!param.isRegist()) {
+                    List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(param.getStream());
+                    if (!sendRtpItems.isEmpty()) {
+                        for (SendRtpItem sendRtpItem : sendRtpItems) {
+                            if (sendRtpItem == null) {
+                                continue;
+                            }
+
+                            if (sendRtpItem.getApp().equals(param.getApp())) {
+                                logger.info(sendRtpItem.toString());
+                                if (userSetting.getServerId().equals(sendRtpItem.getServerId())) {
+                                    MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
+                                            sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
+                                            sendRtpItem.getPlatformId(), null, userSetting.getServerId(), param.getMediaServerId());
+                                    // 閫氱煡鍏朵粬wvp鍋滄鍙戞祦
+                                    redisCatchStorage.sendPushStreamClose(messageForPushChannel);
+                                }else {
+                                    String platformId = sendRtpItem.getPlatformId();
+                                    ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
+                                    Device device = deviceService.getDevice(platformId);
+
+                                    try {
+                                        if (platform != null) {
+                                            commanderFroPlatform.streamByeCmd(platform, sendRtpItem);
+                                            redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(),
+                                                    sendRtpItem.getCallId(), sendRtpItem.getStream());
+                                        } else {
+                                            cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId());
+                                            if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST)
+                                                    || sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) {
+                                                AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+                                                if (audioBroadcastCatch != null) {
+                                                    // 鏉ヨ嚜涓婄骇骞冲彴鐨勫仠姝㈠璁�
+                                                    logger.info("[鍋滄瀵硅] 鏉ヨ嚜涓婄骇锛屽钩鍙帮細{}, 閫氶亾锛歿}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+                                                    audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+                                                }
+                                            }
+                                        }
+                                    } catch (SipException | InvalidArgumentException | ParseException |
+                                             SsrcTransactionNotFoundException e) {
+                                        logger.error("[鍛戒护鍙戦�佸け璐 鍙戦�丅YE: {}", e.getMessage());
+                                    }
+                                }
+
+                            }
+                        }
+                    }
+                }
+            }
+        });
         return HookResult.SUCCESS();
     }
 
@@ -220,6 +491,62 @@
         logger.info("[ZLM HOOK]娴佹棤浜鸿鐪嬶細{}->{}->{}/{}", param.getMediaServerId(), param.getSchema(),
                 param.getApp(), param.getStream());
         JSONObject ret = new JSONObject();
+        ret.put("code", 0);
+        // 鍥芥爣绫诲瀷鐨勬祦
+        if ("rtp".equals(param.getApp())) {
+            ret.put("close", userSetting.getStreamOnDemand());
+            // 鍥芥爣娴侊紝 鐐规挱/褰曞儚鍥炴斁/褰曞儚涓嬭浇
+            InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, param.getStream());
+            // 鐐规挱
+            if (inviteInfo != null) {
+                // 褰曞儚涓嬭浇
+                if (inviteInfo.getType() == InviteSessionType.DOWNLOAD) {
+                    ret.put("close", false);
+                    return ret;
+                }
+                // 鏀跺埌鏃犱汉瑙傜湅璇存槑娴佷篃娌℃湁鍦ㄥ線涓婄骇鎺ㄩ��
+                if (redisCatchStorage.isChannelSendingRTP(inviteInfo.getChannelId())) {
+                    List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChannelId(
+                            inviteInfo.getChannelId());
+                    if (!sendRtpItems.isEmpty()) {
+                        for (SendRtpItem sendRtpItem : sendRtpItems) {
+                            ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
+                            try {
+                                commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem.getCallId());
+                            } catch (SipException | InvalidArgumentException | ParseException e) {
+                                logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鍙戦�丅YE: {}", e.getMessage());
+                            }
+                            redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(),
+                                    sendRtpItem.getCallId(), sendRtpItem.getStream());
+                            if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) {
+                                MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
+                                        sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
+                                        sendRtpItem.getPlatformId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
+                                messageForPushChannel.setPlatFormIndex(parentPlatform.getId());
+                                redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
+                            }
+                        }
+                    }
+                }
+                Device device = deviceService.getDevice(inviteInfo.getDeviceId());
+                if (device != null) {
+                    try {
+                        // 澶氭煡璇竴娆¢槻姝㈠凡缁忚澶勭悊浜�
+                        InviteInfo info = inviteStreamService.getInviteInfo(inviteInfo.getType(),
+                                inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
+                        if (info != null) {
+                            cmder.streamByeCmd(device, inviteInfo.getChannelId(),
+                                    inviteInfo.getStream(), null);
+                        } else {
+                            logger.info("[鏃犱汉瑙傜湅] 鏈壘鍒拌澶囩殑鐐规挱淇℃伅锛� {}锛� 娴侊細{}", inviteInfo.getDeviceId(), param.getStream());
+                        }
+                    } catch (InvalidArgumentException | ParseException | SipException |
+                             SsrcTransactionNotFoundException e) {
+                        logger.error("[鏃犱汉瑙傜湅]鐐规挱锛� 鍙戦�丅YE澶辫触 {}", e.getMessage());
+                    }
+                } else {
+                    logger.info("[鏃犱汉瑙傜湅] 鏈壘鍒拌澶囷細 {}锛屾祦锛歿}", inviteInfo.getDeviceId(), param.getStream());
+                }
 
         boolean close = mediaService.closeStreamOnNoneReader(param.getMediaServerId(), param.getApp(), param.getStream(), param.getSchema());
         ret.put("code", close);
@@ -282,16 +609,22 @@
         if (!"rtp".equals(param.getApp())) {
             return HookResult.SUCCESS();
         }
-        try {
-            MediaSendRtpStoppedEvent event = new MediaSendRtpStoppedEvent(this);
-            MediaServer mediaServerItem = mediaServerService.getOne(param.getMediaServerId());
-            if (mediaServerItem != null) {
-                event.setMediaServer(mediaServerItem);
-                applicationEventPublisher.publishEvent(event);
+        taskExecutor.execute(() -> {
+            List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(param.getStream());
+            if (sendRtpItems.size() > 0) {
+                for (SendRtpItem sendRtpItem : sendRtpItems) {
+                    ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
+                    ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
+                    try {
+                        commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem.getCallId());
+                    } catch (SipException | InvalidArgumentException | ParseException e) {
+                        logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鍙戦�丅YE: {}", e.getMessage());
+                    }
+                    redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(),
+                            sendRtpItem.getCallId(), sendRtpItem.getStream());
+                }
             }
-        }catch (Exception e) {
-            logger.info("[ZLM-HOOK-rtp鍙戦�佸叧闂璢 鍙戦�侀�氱煡澶辫触 ", e);
-        }
+        });
 
         return HookResult.SUCCESS();
     }
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java
deleted file mode 100755
index 80699d2..0000000
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java
+++ /dev/null
@@ -1,121 +0,0 @@
-package com.genersoft.iot.vmp.media.zlm;
-
-import com.genersoft.iot.vmp.conf.UserSetting;
-import com.genersoft.iot.vmp.gb28181.bean.GbStream;
-import com.genersoft.iot.vmp.media.bean.MediaServer;
-import com.genersoft.iot.vmp.media.service.IMediaServerService;
-import com.genersoft.iot.vmp.media.zlm.dto.ChannelOnlineEvent;
-import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
-import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
-import com.genersoft.iot.vmp.service.IStreamPushService;
-import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
-import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
-import com.genersoft.iot.vmp.storager.dao.StreamPushMapper;
-import com.genersoft.iot.vmp.utils.DateUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-import java.text.ParseException;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * @author lin
- */
-@Component
-public class ZLMMediaListManager {
-
-    private Logger logger = LoggerFactory.getLogger("ZLMMediaListManager");
-
-    @Autowired
-    private IVideoManagerStorage storager;
-
-    @Autowired
-    private GbStreamMapper gbStreamMapper;
-
-    @Autowired
-    private IStreamPushService streamPushService;
-
-
-    @Autowired
-    private StreamPushMapper streamPushMapper;
-
-    @Autowired
-    private UserSetting userSetting;
-
-
-    @Autowired
-    private IMediaServerService mediaServerService;
-
-    private Map<String, ChannelOnlineEvent> channelOnPublishEvents = new ConcurrentHashMap<>();
-
-    public StreamPushItem addPush(OnStreamChangedHookParam onStreamChangedHookParam) {
-        StreamPushItem transform = streamPushService.transform(onStreamChangedHookParam);
-        StreamPushItem pushInDb = streamPushService.getPush(onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream());
-        transform.setPushIng(onStreamChangedHookParam.isRegist());
-        transform.setUpdateTime(DateUtil.getNow());
-        transform.setPushTime(DateUtil.getNow());
-        transform.setSelf(userSetting.getServerId().equals(onStreamChangedHookParam.getSeverId()));
-        if (pushInDb == null) {
-            transform.setCreateTime(DateUtil.getNow());
-            streamPushMapper.add(transform);
-        }else {
-            streamPushMapper.update(transform);
-            gbStreamMapper.updateMediaServer(onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream(), onStreamChangedHookParam.getMediaServerId());
-        }
-        ChannelOnlineEvent channelOnlineEventLister = getChannelOnlineEventLister(transform.getApp(), transform.getStream());
-        if ( channelOnlineEventLister != null)  {
-            try {
-                channelOnlineEventLister.run(transform.getApp(), transform.getStream(), transform.getServerId());;
-            } catch (ParseException e) {
-                logger.error("addPush: ", e);
-            }
-            removedChannelOnlineEventLister(transform.getApp(), transform.getStream());
-        }
-        return transform;
-    }
-
-    public void sendStreamEvent(String app, String stream, String mediaServerId) {
-        MediaServer mediaServerItem = mediaServerService.getOne(mediaServerId);
-        // 鏌ョ湅鎺ㄦ祦鐘舵��
-        Boolean streamReady = mediaServerService.isStreamReady(mediaServerItem, app, stream);
-        if (streamReady != null && streamReady) {
-            ChannelOnlineEvent channelOnlineEventLister = getChannelOnlineEventLister(app, stream);
-            if (channelOnlineEventLister != null)  {
-                try {
-                    channelOnlineEventLister.run(app, stream, mediaServerId);
-                } catch (ParseException e) {
-                    logger.error("sendStreamEvent: ", e);
-                }
-                removedChannelOnlineEventLister(app, stream);
-            }
-        }
-    }
-
-    public int removeMedia(String app, String streamId) {
-        // 鏌ユ壘鏄惁鍏宠仈浜嗗浗鏍囷紝 鍏宠仈浜嗕笉鍒犻櫎锛� 缃负绂荤嚎
-        GbStream gbStream = gbStreamMapper.selectOne(app, streamId);
-        int result;
-        if (gbStream == null) {
-            result = storager.removeMedia(app, streamId);
-        }else {
-            result =storager.mediaOffline(app, streamId);
-        }
-        return result;
-    }
-
-    public void addChannelOnlineEventLister(String app, String stream, ChannelOnlineEvent callback) {
-        this.channelOnPublishEvents.put(app + "_" + stream, callback);
-    }
-
-    public void removedChannelOnlineEventLister(String app, String stream) {
-        this.channelOnPublishEvents.remove(app + "_" + stream);
-    }
-
-    public ChannelOnlineEvent getChannelOnlineEventLister(String app, String stream) {
-        return this.channelOnPublishEvents.get(app + "_" + stream);
-    }
-
-}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java
index 3ba48d3..7b0bf39 100755
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java
@@ -264,4 +264,13 @@
         }
         return result;
     }
+
+    public JSONObject stopSendRtpStream(MediaServerItem mediaServerItem, SendRtpItem sendRtpItem) {
+        Map<String, Object> param = new HashMap<>();
+        param.put("vhost", "__defaultVhost__");
+        param.put("app", sendRtpItem.getApp());
+        param.put("stream", sendRtpItem.getStream());
+        param.put("ssrc", sendRtpItem.getSsrc());
+        return zlmresTfulUtils.stopSendRtp(mediaServerItem, param);
+    }
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ChannelOnlineEvent.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ChannelOnlineEvent.java
index 714838e..6b3c94f 100755
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ChannelOnlineEvent.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ChannelOnlineEvent.java
@@ -1,5 +1,7 @@
 package com.genersoft.iot.vmp.media.zlm.dto;
 
+import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
+
 import java.text.ParseException;
 
 /**
@@ -7,5 +9,5 @@
  */
 public interface ChannelOnlineEvent {
 
-    void run(String app, String stream, String serverId) throws ParseException;
+    void run(SendRtpItem sendRtpItem) throws ParseException;
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java b/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java
index 16ff831..3ea8e5f 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java
@@ -99,4 +99,13 @@
     void updateChannelGPS(Device device, DeviceChannel deviceChannel, MobilePosition mobilePosition);
 
     void stopPlay(String deviceId, String channelId);
+    void batchUpdateChannelGPS(List<DeviceChannel> channelList);
+
+    void batchAddMobilePosition(List<MobilePosition> addMobilePositionList);
+
+    void online(DeviceChannel channel);
+
+    void offline(DeviceChannel channel);
+
+    void delete(DeviceChannel channel);
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java
index 1507331..c718681 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java
@@ -115,4 +115,5 @@
     Map<String, StreamPushItem> getAllAppAndStreamMap();
 
 
+    void updatePush(OnStreamChangedHookParam param);
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java b/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java
index 1a9e3e5..6a4f866 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java
@@ -61,6 +61,7 @@
         messageForPushChannel.setGbId(gbId);
         messageForPushChannel.setApp(app);
         messageForPushChannel.setStream(stream);
+        messageForPushChannel.setServerId(serverId);
         messageForPushChannel.setMediaServerId(mediaServerId);
         messageForPushChannel.setPlatFormId(platFormId);
         messageForPushChannel.setPlatFormName(platFormName);
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java
index 6807632..16fc5b5 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java
@@ -23,6 +23,7 @@
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
 import org.springframework.util.ObjectUtils;
 
 import java.util.ArrayList;
@@ -248,8 +249,24 @@
     }
 
     @Override
+    public void online(DeviceChannel channel) {
+        channelMapper.online(channel.getDeviceId(), channel.getChannelId());
+    }
+
+    @Override
     public int channelsOffline(List<DeviceChannel> channels) {
         return channelMapper.batchOffline(channels);
+    }
+
+
+    @Override
+    public void offline(DeviceChannel channel) {
+        channelMapper.offline(channel.getDeviceId(), channel.getChannelId());
+    }
+
+    @Override
+    public void delete(DeviceChannel channel) {
+        channelMapper.del(channel.getDeviceId(), channel.getChannelId());
     }
 
     @Override
@@ -358,4 +375,47 @@
     public void stopPlay(String deviceId, String channelId) {
         channelMapper.stopPlay(deviceId, channelId);
     }
+
+    @Override
+    @Transactional
+    public void batchUpdateChannelGPS(List<DeviceChannel> channelList) {
+        for (DeviceChannel deviceChannel : channelList) {
+            deviceChannel.setUpdateTime(DateUtil.getNow());
+            if (deviceChannel.getGpsTime() == null) {
+                deviceChannel.setGpsTime(DateUtil.getNow());
+            }
+        }
+        int count = 1000;
+        if (channelList.size() > count) {
+            for (int i = 0; i < channelList.size(); i+=count) {
+                int toIndex = i+count;
+                if ( i + count > channelList.size()) {
+                    toIndex = channelList.size();
+                }
+                List<DeviceChannel> channels = channelList.subList(i, toIndex);
+                channelMapper.batchUpdatePosition(channels);
+            }
+        }else {
+            channelMapper.batchUpdatePosition(channelList);
+        }
+    }
+
+    @Override
+    @Transactional
+    public void batchAddMobilePosition(List<MobilePosition> mobilePositions) {
+//        int count = 500;
+//        if (mobilePositions.size() > count) {
+//            for (int i = 0; i < mobilePositions.size(); i+=count) {
+//                int toIndex = i+count;
+//                if ( i + count > mobilePositions.size()) {
+//                    toIndex = mobilePositions.size();
+//                }
+//                List<MobilePosition> mobilePositionsSub = mobilePositions.subList(i, toIndex);
+//                deviceMobilePositionMapper.batchadd(mobilePositionsSub);
+//            }
+//        }else {
+//            deviceMobilePositionMapper.batchadd(mobilePositions);
+//        }
+        deviceMobilePositionMapper.batchadd(mobilePositions);
+    }
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
index 8fb772d..47b902e 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
@@ -558,6 +558,7 @@
                     removeMobilePositionSubscribe(deviceInStore, result->{
                         // 寮�鍚闃�
                         deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition());
+                        deviceInStore.setMobilePositionSubmissionInterval(device.getMobilePositionSubmissionInterval());
                         addMobilePositionSubscribe(deviceInStore);
                         // 鍥犱负鏄紓姝ユ墽琛岋紝闇�瑕佸湪杩欓噷鏇存柊涓嬫暟鎹�
                         deviceMapper.updateCustom(deviceInStore);
@@ -566,12 +567,14 @@
                 }else {
                     // 寮�鍚闃�
                     deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition());
+                    deviceInStore.setMobilePositionSubmissionInterval(device.getMobilePositionSubmissionInterval());
                     addMobilePositionSubscribe(deviceInStore);
                 }
 
             }else if (device.getSubscribeCycleForMobilePosition() == 0) {
                 // 鍙栨秷璁㈤槄
                 deviceInStore.setSubscribeCycleForMobilePosition(0);
+                deviceInStore.setMobilePositionSubmissionInterval(0);
                 removeMobilePositionSubscribe(deviceInStore, null);
             }
         }
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
index 40de0d2..cd47062 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -28,7 +28,6 @@
 import com.genersoft.iot.vmp.media.bean.MediaServer;
 import com.genersoft.iot.vmp.service.*;
 import com.genersoft.iot.vmp.service.bean.*;
-import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
 import com.genersoft.iot.vmp.utils.CloudRecordUtils;
@@ -111,6 +110,14 @@
 
     @Autowired
     private RedisGbPlayMsgListener redisGbPlayMsgListener;
+
+
+    @Qualifier("taskExecutor")
+    @Autowired
+    private ThreadPoolTaskExecutor taskExecutor;
+
+    @Autowired
+    private ZlmHttpHookSubscribe hookSubscribe;
 
     @Autowired
     private SSRCFactory ssrcFactory;
@@ -266,7 +273,6 @@
             logger.warn("[鐐规挱] 鏈壘鍒板彲鐢ㄧ殑zlm deviceId: {},channelId:{}", deviceId, channelId);
             throw new ControllerException(ErrorCode.ERROR100.getCode(), "鏈壘鍒板彲鐢ㄧ殑zlm");
         }
-
         Device device = redisCatchStorage.getDevice(deviceId);
         if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE") && !mediaServerItem.isRtpEnable()) {
             logger.warn("[鐐规挱] 鍗曠鍙f敹娴佹椂涓嶆敮鎸乀CP涓诲姩鏂瑰紡鏀舵祦 deviceId: {},channelId:{}", deviceId, channelId);
@@ -280,6 +286,8 @@
         InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
         if (inviteInfo != null ) {
             if (inviteInfo.getStreamInfo() == null) {
+                // 閲婃斁鐢熸垚鐨剆src锛屼娇鐢ㄤ笂涓�娆$敵璇风殑
+                ssrcFactory.releaseSsrc(mediaServerItem.getId(), ssrc);
                 // 鐐规挱鍙戣捣浜嗕絾鏄皻鏈垚鍔�, 浠呮敞鍐屽洖璋冪瓑寰呯粨鏋滃嵆鍙�
                 inviteStreamService.once(InviteSessionType.PLAY, deviceId, channelId, null, callback);
                 logger.info("[鐐规挱寮�濮媇 宸茬粡璇锋眰涓紝绛夊緟缁撴灉锛� deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
index 7a14940..a64b71a 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
@@ -633,4 +633,21 @@
     public Map<String, StreamPushItem> getAllAppAndStreamMap() {
         return streamPushMapper.getAllAppAndStreamMap();
     }
+
+    @Override
+    public void updatePush(OnStreamChangedHookParam param) {
+        StreamPushItem transform = transform(param);
+        StreamPushItem pushInDb = getPush(param.getApp(), param.getStream());
+        transform.setPushIng(param.isRegist());
+        transform.setUpdateTime(DateUtil.getNow());
+        transform.setPushTime(DateUtil.getNow());
+        transform.setSelf(userSetting.getServerId().equals(param.getSeverId()));
+        if (pushInDb == null) {
+            transform.setCreateTime(DateUtil.getNow());
+            streamPushMapper.add(transform);
+        }else {
+            streamPushMapper.update(transform);
+            gbStreamMapper.updateMediaServer(param.getApp(), param.getStream(), param.getMediaServerId());
+        }
+    }
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java
new file mode 100644
index 0000000..b4bd72c
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java
@@ -0,0 +1,22 @@
+package com.genersoft.iot.vmp.service.redisMsg;
+
+import com.genersoft.iot.vmp.common.CommonCallback;
+import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
+import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
+
+public interface IRedisRpcService {
+
+    SendRtpItem getSendRtpItem(String sendRtpItemKey);
+
+    WVPResult startSendRtp(String sendRtpItemKey, SendRtpItem sendRtpItem);
+
+    WVPResult stopSendRtp(String sendRtpItemKey);
+
+    long waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback<String> callback);
+
+    void stopWaitePushStreamOnline(SendRtpItem sendRtpItem);
+
+    void rtpSendStopped(String sendRtpItemKey);
+
+    void removeCallback(long key);
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
deleted file mode 100755
index 68595a8..0000000
--- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
+++ /dev/null
@@ -1,451 +0,0 @@
-package com.genersoft.iot.vmp.service.redisMsg;
-
-import com.alibaba.fastjson2.JSON;
-import com.alibaba.fastjson2.JSONObject;
-import com.genersoft.iot.vmp.common.VideoManagerConstants;
-import com.genersoft.iot.vmp.conf.DynamicTask;
-import com.genersoft.iot.vmp.conf.UserSetting;
-import com.genersoft.iot.vmp.conf.exception.ControllerException;
-import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
-import com.genersoft.iot.vmp.media.bean.MediaServer;
-import com.genersoft.iot.vmp.media.event.hook.Hook;
-import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
-import com.genersoft.iot.vmp.media.event.hook.HookType;
-import com.genersoft.iot.vmp.media.service.IMediaServerService;
-import com.genersoft.iot.vmp.service.bean.*;
-import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
-import com.genersoft.iot.vmp.utils.redis.RedisUtil;
-import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.data.redis.connection.Message;
-import org.springframework.data.redis.connection.MessageListener;
-import org.springframework.data.redis.core.RedisTemplate;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-import org.springframework.stereotype.Component;
-
-import java.text.ParseException;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-
-/**
- * 鐩戝惉涓嬬骇鍙戦�佹帹閫佷俊鎭紝骞跺彂閫佸浗鏍囨帹娴佹秷鎭笂绾�
- * @author lin
- */
-@Component
-public class RedisGbPlayMsgListener implements MessageListener {
-
-    private final static Logger logger = LoggerFactory.getLogger(RedisGbPlayMsgListener.class);
-
-    public static final String WVP_PUSH_STREAM_KEY = "WVP_PUSH_STREAM";
-
-    /**
-     * 娴佸獟浣撲笉瀛樺湪鐨勯敊璇帥
-     */
-    public static final  int ERROR_CODE_MEDIA_SERVER_NOT_FOUND = -1;
-
-    /**
-     * 绂荤嚎鐨勯敊璇帥
-     */
-    public static final  int ERROR_CODE_OFFLINE = -2;
-
-    /**
-     * 瓒呮椂鐨勯敊璇帥
-     */
-    public static final  int ERROR_CODE_TIMEOUT = -3;
-
-    private final Map<String, PlayMsgCallback> callbacks = new ConcurrentHashMap<>();
-    private final Map<String, PlayMsgCallbackForStartSendRtpStream> callbacksForStartSendRtpStream = new ConcurrentHashMap<>();
-    private final Map<String, PlayMsgErrorCallback> callbacksForError = new ConcurrentHashMap<>();
-
-    @Autowired
-    private UserSetting userSetting;
-
-
-    @Autowired
-    private RedisTemplate<Object, Object> redisTemplate;
-
-    @Autowired
-    private IMediaServerService mediaServerService;
-
-    @Autowired
-    private IRedisCatchStorage redisCatchStorage;
-
-
-    @Autowired
-    private DynamicTask dynamicTask;
-
-
-    @Autowired
-    private HookSubscribe subscribe;
-
-    private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
-
-    @Qualifier("taskExecutor")
-    @Autowired
-    private ThreadPoolTaskExecutor taskExecutor;
-
-
-    public interface PlayMsgCallback{
-        void handler(ResponseSendItemMsg responseSendItemMsg) throws ParseException;
-    }
-
-    public interface PlayMsgCallbackForStartSendRtpStream{
-        void handler();
-    }
-
-    public interface PlayMsgErrorCallback{
-        void handler(WVPResult wvpResult);
-    }
-
-    @Override
-    public void onMessage(Message message, byte[] bytes) {
-        boolean isEmpty = taskQueue.isEmpty();
-        taskQueue.offer(message);
-        if (isEmpty) {
-            taskExecutor.execute(() -> {
-                while (!taskQueue.isEmpty()) {
-                    Message msg = taskQueue.poll();
-                    try {
-                        WvpRedisMsg wvpRedisMsg = JSON.parseObject(msg.getBody(), WvpRedisMsg.class);
-                        logger.info("[鏀跺埌REDIS閫氱煡] 娑堟伅锛� {}", JSON.toJSONString(wvpRedisMsg));
-                        if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) {
-                            continue;
-                        }
-                        if (WvpRedisMsg.isRequest(wvpRedisMsg)) {
-                            logger.info("[鏀跺埌REDIS閫氱煡] 璇锋眰锛� {}", new String(msg.getBody()));
-
-                            switch (wvpRedisMsg.getCmd()){
-                                case WvpRedisMsgCmd.GET_SEND_ITEM:
-                                    RequestSendItemMsg content = JSON.parseObject(wvpRedisMsg.getContent(), RequestSendItemMsg.class);
-                                    requestSendItemMsgHand(content, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
-                                    break;
-                                case WvpRedisMsgCmd.REQUEST_PUSH_STREAM:
-                                    RequestPushStreamMsg param = JSON.to(RequestPushStreamMsg.class, wvpRedisMsg.getContent());
-                                    requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
-                                    break;
-                                case WvpRedisMsgCmd.REQUEST_STOP_PUSH_STREAM:
-                                    RequestStopPushStreamMsg streamMsg = JSON.to(RequestStopPushStreamMsg.class, wvpRedisMsg.getContent());
-                                    requestStopPushStreamMsgHand(streamMsg, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
-                                    break;
-                                default:
-                                    break;
-                            }
-
-                        }else {
-                            logger.info("[鏀跺埌REDIS閫氱煡] 鍥炲锛� {}", new String(msg.getBody()));
-                            switch (wvpRedisMsg.getCmd()){
-                                case WvpRedisMsgCmd.GET_SEND_ITEM:
-
-                                   WVPResult content  = JSON.to(WVPResult.class, wvpRedisMsg.getContent());
-
-                                    String key = wvpRedisMsg.getSerial();
-                                    switch (content.getCode()) {
-                                        case 0:
-                                           ResponseSendItemMsg responseSendItemMsg =JSON.to(ResponseSendItemMsg.class, content.getData());
-                                            PlayMsgCallback playMsgCallback = callbacks.get(key);
-                                            if (playMsgCallback != null) {
-                                                callbacksForError.remove(key);
-                                                try {
-                                                    playMsgCallback.handler(responseSendItemMsg);
-                                                } catch (ParseException e) {
-                                                    logger.error("[REDIS娑堟伅澶勭悊寮傚父] ", e);
-                                                }
-                                            }
-                                            break;
-                                        case ERROR_CODE_MEDIA_SERVER_NOT_FOUND:
-                                        case ERROR_CODE_OFFLINE:
-                                        case ERROR_CODE_TIMEOUT:
-                                            PlayMsgErrorCallback errorCallback = callbacksForError.get(key);
-                                            if (errorCallback != null) {
-                                                callbacks.remove(key);
-                                                errorCallback.handler(content);
-                                            }
-                                            break;
-                                        default:
-                                            break;
-                                    }
-                                    break;
-                                case WvpRedisMsgCmd.REQUEST_PUSH_STREAM:
-                                    WVPResult wvpResult  = JSON.to(WVPResult.class, wvpRedisMsg.getContent());
-                                    String serial = wvpRedisMsg.getSerial();
-                                    switch (wvpResult.getCode()) {
-                                        case 0:
-                                            PlayMsgCallbackForStartSendRtpStream playMsgCallback = callbacksForStartSendRtpStream.get(serial);
-                                            if (playMsgCallback != null) {
-                                                callbacksForError.remove(serial);
-                                                playMsgCallback.handler();
-                                            }
-                                            break;
-                                        case ERROR_CODE_MEDIA_SERVER_NOT_FOUND:
-                                        case ERROR_CODE_OFFLINE:
-                                        case ERROR_CODE_TIMEOUT:
-                                            PlayMsgErrorCallback errorCallback = callbacksForError.get(serial);
-                                            if (errorCallback != null) {
-                                                callbacks.remove(serial);
-                                                errorCallback.handler(wvpResult);
-                                            }
-                                            break;
-                                        default:
-                                            break;
-                                    }
-                                    break;
-                                default:
-                                    break;
-                            }
-
-                        }
-                    }catch (Exception e) {
-                        logger.warn("[RedisGbPlayMsg] 鍙戠幇鏈鐞嗙殑寮傚父, \r\n{}", JSON.toJSONString(message));
-                        logger.error("[RedisGbPlayMsg] 寮傚父鍐呭锛� ", e);
-                    }
-                }
-            });
-        }
-    }
-
-    /**
-     * 澶勭悊鏀跺埌鐨勮姹傛帹娴佺殑璇锋眰
-     */
-    private void requestPushStreamMsgHand(RequestPushStreamMsg requestPushStreamMsg, String fromId, String serial) {
-        MediaServer mediaServer = mediaServerService.getOne(requestPushStreamMsg.getMediaServerId());
-        if (mediaServer == null) {
-            // TODO 鍥炲閿欒
-            return;
-        }
-        SendRtpItem sendRtpItem = SendRtpItem.getInstance(requestPushStreamMsg);
-
-        try {
-            mediaServerService.startSendRtp(mediaServer, null, sendRtpItem);
-        }catch (ControllerException e) {
-            return;
-        }
-
-        // 鍥炲娑堟伅
-        WVPResult<JSONObject> result = new WVPResult<>();
-        result.setCode(0);
-
-        WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), fromId,
-                WvpRedisMsgCmd.REQUEST_PUSH_STREAM, serial, JSON.toJSONString(result));
-        JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
-        redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
-    }
-
-    /**
-     * 澶勭悊鏀跺埌鐨勮姹俿endItem鐨勮姹�
-     */
-    private void requestSendItemMsgHand(RequestSendItemMsg content, String toId, String serial) {
-        MediaServer mediaServerItem = mediaServerService.getOne(content.getMediaServerId());
-        if (mediaServerItem == null) {
-            logger.info("[鍥炲鎺ㄦ祦淇℃伅] 娴佸獟浣搟}涓嶅瓨鍦� ", content.getMediaServerId());
-
-            WVPResult<SendRtpItem> result = new WVPResult<>();
-            result.setCode(ERROR_CODE_MEDIA_SERVER_NOT_FOUND);
-            result.setMsg("娴佸獟浣撲笉瀛樺湪");
-
-            WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), toId,
-                    WvpRedisMsgCmd.GET_SEND_ITEM, serial, JSON.toJSONString(result));
-
-            JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
-            redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
-            return;
-        }
-        // 纭畾娴佹槸鍚﹀湪绾�
-        Boolean streamReady = mediaServerService.isStreamReady(mediaServerItem, content.getApp(), content.getStream());
-        if (streamReady != null && streamReady) {
-            logger.info("[鍥炲鎺ㄦ祦淇℃伅]  {}/{}", content.getApp(), content.getStream());
-            responseSendItem(mediaServerItem, content, toId, serial);
-        }else {
-            // 娴佸凡缁忕绾�
-            // 鍙戦�乺edis娑堟伅浠ヤ娇璁惧涓婄嚎
-            logger.info("[ app={}, stream={} ]閫氶亾绂荤嚎锛屽彂閫乺edis淇℃伅鎺у埗璁惧寮�濮嬫帹娴�",content.getApp(), content.getStream());
-
-            String taskKey = UUID.randomUUID().toString();
-            // 璁剧疆瓒呮椂
-            dynamicTask.startDelay(taskKey, ()->{
-                logger.info("[ app={}, stream={} ] 绛夊緟璁惧寮�濮嬫帹娴佽秴鏃�", content.getApp(), content.getStream());
-                WVPResult<SendRtpItem> result = new WVPResult<>();
-                result.setCode(ERROR_CODE_TIMEOUT);
-                WvpRedisMsg response = WvpRedisMsg.getResponseInstance(
-                        userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, JSON.toJSONString(result)
-                );
-                JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
-                redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
-            }, userSetting.getPlatformPlayTimeout());
-
-            // 娣诲姞璁㈤槄
-            Hook hook = Hook.getInstance(HookType.on_media_arrival, content.getApp(), content.getStream(), content.getMediaServerId());
-            subscribe.addSubscribe(hook, (hookData)->{
-                        dynamicTask.stop(taskKey);
-                        responseSendItem(mediaServerItem, content, toId, serial);
-                    });
-
-            MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1, content.getApp(), content.getStream(),
-                    content.getChannelId(), content.getPlatformId(), content.getPlatformName(), content.getServerId(),
-                    content.getMediaServerId());
-
-            String key = VideoManagerConstants.VM_MSG_STREAM_PUSH_REQUESTED;
-            logger.info("[redis鍙戦�侀�氱煡] 鎺ㄦ祦琚姹� {}: {}/{}", key, messageForPushChannel.getApp(), messageForPushChannel.getStream());
-            redisTemplate.convertAndSend(key, JSON.toJSON(messageForPushChannel));
-        }
-    }
-
-    /**
-     * 灏嗚幏鍙栧埌鐨剆endItem鍙戦�佸嚭鍘�
-     */
-    private void responseSendItem(MediaServer mediaServerItem, RequestSendItemMsg content, String toId, String serial) {
-        SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, content.getIp(),
-                content.getPort(), content.getSsrc(), content.getPlatformId(),
-                content.getApp(), content.getStream(), content.getChannelId(),
-                content.getTcp(), content.getRtcp());
-
-        WVPResult<ResponseSendItemMsg> result = new WVPResult<>();
-        result.setCode(0);
-        ResponseSendItemMsg responseSendItemMsg = new ResponseSendItemMsg();
-        responseSendItemMsg.setSendRtpItem(sendRtpItem);
-        responseSendItemMsg.setMediaServerItem(mediaServerItem);
-        result.setData(responseSendItemMsg);
-        redisCatchStorage.updateSendRTPSever(sendRtpItem);
-
-        WvpRedisMsg response = WvpRedisMsg.getResponseInstance(
-                userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, JSON.toJSONString(result)
-        );
-        JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
-        redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
-    }
-
-    /**
-     * 鍙戦�佹秷鎭姹備笅绾х敓鎴愭帹娴佷俊鎭�
-     * @param serverId 涓嬬骇鏈嶅姟ID
-     * @param app 搴旂敤鍚�
-     * @param stream 娴両D
-     * @param ip 鐩爣IP
-     * @param port 鐩爣绔彛
-     * @param ssrc  ssrc
-     * @param platformId 骞冲彴鍥芥爣缂栧彿
-     * @param channelId 閫氶亾ID
-     * @param isTcp 鏄惁浣跨敤TCP
-     * @param callback 寰楀埌淇℃伅鐨勫洖璋�
-     */
-    public void sendMsg(String serverId, String mediaServerId, String app, String stream, String ip, int port, String ssrc,
-                        String platformId, String channelId, boolean isTcp, boolean rtcp, String platformName, PlayMsgCallback callback, PlayMsgErrorCallback errorCallback) {
-        RequestSendItemMsg requestSendItemMsg = RequestSendItemMsg.getInstance(
-                serverId, mediaServerId, app, stream, ip, port, ssrc, platformId, channelId, isTcp, rtcp, platformName);
-        requestSendItemMsg.setServerId(serverId);
-        String key = UUID.randomUUID().toString();
-        WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId, WvpRedisMsgCmd.GET_SEND_ITEM,
-                key, JSON.toJSONString(requestSendItemMsg));
-
-        JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg);
-        logger.info("[璇锋眰鎺ㄦ祦SendItem] {}: {}", serverId, jsonObject);
-        callbacks.put(key, callback);
-        callbacksForError.put(key, errorCallback);
-        dynamicTask.startDelay(key, ()->{
-            callbacks.remove(key);
-            callbacksForError.remove(key);
-            WVPResult<Object> wvpResult = new WVPResult<>();
-            wvpResult.setCode(ERROR_CODE_TIMEOUT);
-            wvpResult.setMsg("timeout");
-            errorCallback.handler(wvpResult);
-        }, userSetting.getPlatformPlayTimeout());
-        redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
-    }
-
-    /**
-     * 鍙戦�佽姹傛帹娴佺殑娑堟伅
-     * @param param 鎺ㄦ祦鍙傛暟
-     * @param callback 鍥炶皟
-     */
-    public void sendMsgForStartSendRtpStream(String serverId, RequestPushStreamMsg param, PlayMsgCallbackForStartSendRtpStream callback) {
-        String key = UUID.randomUUID().toString();
-        WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId,
-                WvpRedisMsgCmd.REQUEST_PUSH_STREAM, key, JSON.toJSONString(param));
-
-        JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg);
-        logger.info("[REDIS 璇锋眰鍏朵粬骞冲彴鎺ㄦ祦] {}: {}", serverId, jsonObject);
-        dynamicTask.startDelay(key, ()->{
-            callbacksForStartSendRtpStream.remove(key);
-            callbacksForError.remove(key);
-        }, userSetting.getPlatformPlayTimeout());
-        callbacksForStartSendRtpStream.put(key, callback);
-        callbacksForError.put(key, (wvpResult)->{
-            logger.info("[REDIS 璇锋眰鍏朵粬骞冲彴鎺ㄦ祦] 澶辫触: {}", wvpResult.getMsg());
-            callbacksForStartSendRtpStream.remove(key);
-            callbacksForError.remove(key);
-        });
-        redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
-    }
-
-    /**
-     * 鍙戦�佽姹傛帹娴佺殑娑堟伅
-     */
-    public void sendMsgForStopSendRtpStream(String serverId, RequestStopPushStreamMsg streamMsg) {
-        String key = UUID.randomUUID().toString();
-        WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId,
-                WvpRedisMsgCmd.REQUEST_STOP_PUSH_STREAM, key, JSON.toJSONString(streamMsg));
-
-        JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg);
-        logger.info("[REDIS 璇锋眰鍏朵粬骞冲彴鍋滄鎺ㄦ祦] {}: {}", serverId, jsonObject);
-        redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
-    }
-
-    private SendRtpItem querySendRTPServer(String platformGbId, String channelId, String streamId, String callId) {
-        if (platformGbId == null) {
-            platformGbId = "*";
-        }
-        if (channelId == null) {
-            channelId = "*";
-        }
-        if (streamId == null) {
-            streamId = "*";
-        }
-        if (callId == null) {
-            callId = "*";
-        }
-        String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX
-                + userSetting.getServerId() + "_*_"
-                + platformGbId + "_"
-                + channelId + "_"
-                + streamId + "_"
-                + callId;
-        List<Object> scan = RedisUtil.scan(redisTemplate, key);
-        if (scan.size() > 0) {
-            return (SendRtpItem)redisTemplate.opsForValue().get(scan.get(0));
-        }else {
-            return null;
-        }
-    }
-
-    /**
-     * 澶勭悊鏀跺埌鐨勮姹傛帹娴佺殑璇锋眰
-     */
-    private void requestStopPushStreamMsgHand(RequestStopPushStreamMsg streamMsg, String fromId, String serial) {
-        SendRtpItem sendRtpItem = streamMsg.getSendRtpItem();
-        if (sendRtpItem == null) {
-            logger.info("[REDIS 鎵ц鍏朵粬骞冲彴鐨勮姹傚仠姝㈡帹娴乚 澶辫触锛� sendRtpItem涓篘ULL");
-            return;
-        }
-        MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
-        if (mediaInfo == null) {
-            // TODO 鍥炲閿欒
-            return;
-        }
-
-        if (mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc())) {
-            logger.info("[REDIS 鎵ц鍏朵粬骞冲彴鐨勮姹傚仠姝㈡帹娴乚 鎴愬姛锛� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
-            // 鍙戦�乺edis娑堟伅
-            MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
-                    sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
-                    sendRtpItem.getPlatformId(), streamMsg.getPlatformName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
-            messageForPushChannel.setPlatFormIndex(streamMsg.getPlatFormIndex());
-            redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
-        }
-
-    }
-}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java
deleted file mode 100755
index fcfeff3..0000000
--- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java
+++ /dev/null
@@ -1,111 +0,0 @@
-package com.genersoft.iot.vmp.service.redisMsg;
-
-import com.alibaba.fastjson2.JSON;
-import com.genersoft.iot.vmp.conf.UserSetting;
-import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType;
-import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
-import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
-import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
-import com.genersoft.iot.vmp.media.bean.MediaServer;
-import com.genersoft.iot.vmp.media.service.IMediaServerService;
-import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
-import com.genersoft.iot.vmp.service.IStreamPushService;
-import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
-import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse;
-import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
-import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.data.redis.connection.Message;
-import org.springframework.data.redis.connection.MessageListener;
-import org.springframework.stereotype.Component;
-
-import javax.sip.InvalidArgumentException;
-import javax.sip.SipException;
-import java.text.ParseException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * 鎺ユ敹redis鍙戦�佺殑缁撴潫鎺ㄦ祦璇锋眰
- * @author lin
- */
-@Component
-public class RedisPushStreamCloseResponseListener implements MessageListener {
-
-    private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamCloseResponseListener.class);
-
-    @Autowired
-    private IStreamPushService streamPushService;
-
-    @Autowired
-    private IRedisCatchStorage redisCatchStorage;
-
-    @Autowired
-    private IVideoManagerStorage storager;
-
-    @Autowired
-    private ISIPCommanderForPlatform commanderFroPlatform;
-
-    @Autowired
-    private UserSetting userSetting;
-
-    @Autowired
-    private IMediaServerService mediaServerService;
-
-
-    private Map<String, PushStreamResponseEvent> responseEvents = new ConcurrentHashMap<>();
-
-    public interface PushStreamResponseEvent{
-        void run(MessageForPushChannelResponse response);
-    }
-
-    @Override
-    public void onMessage(Message message, byte[] bytes) {
-        logger.info("[REDIS娑堟伅-鎺ㄦ祦缁撴潫]锛� {}", new String(message.getBody()));
-        MessageForPushChannel pushChannel = JSON.parseObject(message.getBody(), MessageForPushChannel.class);
-        StreamPushItem push = streamPushService.getPush(pushChannel.getApp(), pushChannel.getStream());
-        if (push != null) {
-            List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChannelId(
-                    push.getGbId());
-            if (!sendRtpItems.isEmpty()) {
-                for (SendRtpItem sendRtpItem : sendRtpItems) {
-                    ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
-                    if (parentPlatform != null) {
-                        redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStream());
-                        try {
-                            commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem);
-                        } catch (SipException | InvalidArgumentException | ParseException e) {
-                            logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鍙戦�丅YE: {}", e.getMessage());
-                        }
-                    }
-                    if (push.isSelf()) {
-                        // 鍋滄鍚戜笂绾ф帹娴�
-                        logger.info("[REDIS娑堟伅-鎺ㄦ祦缁撴潫] 鍋滄鍚戜笂绾ф帹娴侊細{}", sendRtpItem.getStream());
-                        MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
-                        redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStream());
-                        mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc());
-                        if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) {
-                            MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
-                                    sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
-                                    sendRtpItem.getPlatformId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
-                            messageForPushChannel.setPlatFormIndex(parentPlatform.getId());
-                            redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
-                        }
-                    }
-                }
-            }
-        }
-
-    }
-
-    public void addEvent(String app, String stream, PushStreamResponseEvent callback) {
-        responseEvents.put(app + stream, callback);
-    }
-
-    public void removeEvent(String app, String stream) {
-        responseEvents.remove(app + stream);
-    }
-}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java
deleted file mode 100755
index 85709a7..0000000
--- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java
+++ /dev/null
@@ -1,97 +0,0 @@
-package com.genersoft.iot.vmp.service.redisMsg;
-
-import com.alibaba.fastjson2.JSON;
-import com.alibaba.fastjson2.JSONObject;
-import com.genersoft.iot.vmp.conf.UserSetting;
-import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
-import com.genersoft.iot.vmp.media.zlm.dto.ChannelOnlineEvent;
-import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.data.redis.connection.Message;
-import org.springframework.data.redis.connection.MessageListener;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-import org.springframework.stereotype.Component;
-
-import java.text.ParseException;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-
-/**
- * 鎺ユ敹鍏朵粬wvp鍙戦�佹祦鍙樺寲閫氱煡
- * @author lin
- */
-@Component
-public class RedisStreamMsgListener implements MessageListener {
-
-    private final static Logger logger = LoggerFactory.getLogger(RedisStreamMsgListener.class);
-
-    @Autowired
-    private UserSetting userSetting;
-
-    @Autowired
-    private ZLMMediaListManager zlmMediaListManager;
-
-    private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
-
-    @Qualifier("taskExecutor")
-    @Autowired
-    private ThreadPoolTaskExecutor taskExecutor;
-
-    @Override
-    public void onMessage(Message message, byte[] bytes) {
-        boolean isEmpty = taskQueue.isEmpty();
-        taskQueue.offer(message);
-        if (isEmpty) {
-            taskExecutor.execute(() -> {
-                while (!taskQueue.isEmpty()) {
-                    Message msg = taskQueue.poll();
-                    try {
-                        JSONObject steamMsgJson = JSON.parseObject(msg.getBody(), JSONObject.class);
-                        if (steamMsgJson == null) {
-                            logger.warn("[鏀跺埌redis 娴佸彉鍖朷娑堟伅瑙f瀽澶辫触");
-                            continue;
-                        }
-                        String serverId = steamMsgJson.getString("serverId");
-
-                        if (userSetting.getServerId().equals(serverId)) {
-                            // 鑷繁鍙戦�佺殑娑堟伅蹇界暐鍗冲彲
-                            continue;
-                        }
-                        logger.info("[鏀跺埌redis 娴佸彉鍖朷锛� {}", new String(message.getBody()));
-                        String app = steamMsgJson.getString("app");
-                        String stream = steamMsgJson.getString("stream");
-                        boolean register = steamMsgJson.getBoolean("register");
-                        String mediaServerId = steamMsgJson.getString("mediaServerId");
-                        OnStreamChangedHookParam onStreamChangedHookParam = new OnStreamChangedHookParam();
-                        onStreamChangedHookParam.setSeverId(serverId);
-                        onStreamChangedHookParam.setApp(app);
-                        onStreamChangedHookParam.setStream(stream);
-                        onStreamChangedHookParam.setRegist(register);
-                        onStreamChangedHookParam.setMediaServerId(mediaServerId);
-                        onStreamChangedHookParam.setCreateStamp(System.currentTimeMillis()/1000);
-                        onStreamChangedHookParam.setAliveSecond(0L);
-                        onStreamChangedHookParam.setTotalReaderCount(0);
-                        onStreamChangedHookParam.setOriginType(0);
-                        onStreamChangedHookParam.setOriginTypeStr("0");
-                        onStreamChangedHookParam.setOriginTypeStr("unknown");
-                        ChannelOnlineEvent channelOnlineEventLister = zlmMediaListManager.getChannelOnlineEventLister(app, stream);
-                        if ( channelOnlineEventLister != null)  {
-                            try {
-                                channelOnlineEventLister.run(app, stream, serverId);;
-                            } catch (ParseException e) {
-                                logger.error("addPush: ", e);
-                            }
-                            zlmMediaListManager.removedChannelOnlineEventLister(app, stream);
-                        }
-                    }catch (Exception e) {
-                        logger.warn("[REDIS娑堟伅-娴佸彉鍖朷 鍙戠幇鏈鐞嗙殑寮傚父, \r\n{}", JSON.toJSONString(message));
-                        logger.error("[REDIS娑堟伅-娴佸彉鍖朷 寮傚父鍐呭锛� ", e);
-                    }
-                }
-            });
-        }
-    }
-}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java
new file mode 100644
index 0000000..af79204
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java
@@ -0,0 +1,304 @@
+package com.genersoft.iot.vmp.service.redisMsg.control;
+
+import com.alibaba.fastjson2.JSONObject;
+import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
+import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage;
+import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
+import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
+import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
+import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
+import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
+import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
+import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
+import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
+import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
+import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
+import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
+import com.genersoft.iot.vmp.service.IMediaServerService;
+import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
+import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
+import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
+import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Component;
+
+import javax.sip.InvalidArgumentException;
+import javax.sip.SipException;
+import java.text.ParseException;
+
+/**
+ * 鍏朵粬wvp鍙戣捣鐨剅pc璋冪敤锛岃繖閲岀殑鏂规硶琚� RedisRpcConfig 閫氳繃鍙嶅皠瀵绘壘瀵瑰簲鐨勬柟娉曞悕绉拌皟鐢�
+ */
+@Component
+public class RedisRpcController {
+
+    private final static Logger logger = LoggerFactory.getLogger(RedisRpcController.class);
+
+    @Autowired
+    private SSRCFactory ssrcFactory;
+
+    @Autowired
+    private IMediaServerService mediaServerService;
+
+    @Autowired
+    private SendRtpPortManager sendRtpPortManager;
+
+    @Autowired
+    private IRedisCatchStorage redisCatchStorage;
+
+    @Autowired
+    private UserSetting userSetting;
+
+    @Autowired
+    private ZlmHttpHookSubscribe hookSubscribe;
+
+    @Autowired
+    private ZLMServerFactory zlmServerFactory;
+
+
+    @Autowired
+    private RedisTemplate<Object, Object> redisTemplate;
+
+
+    @Autowired
+    private ISIPCommanderForPlatform commanderFroPlatform;
+
+
+    @Autowired
+    private IVideoManagerStorage storager;
+
+
+    /**
+     * 鑾峰彇鍙戞祦鐨勪俊鎭�
+     */
+    public RedisRpcResponse getSendRtpItem(RedisRpcRequest request) {
+        String sendRtpItemKey = request.getParam().toString();
+        SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(sendRtpItemKey);
+        if (sendRtpItem == null) {
+            logger.info("[redis-rpc] 鑾峰彇鍙戞祦鐨勪俊鎭�, 鏈壘鍒皉edis涓殑鍙戞祦淇℃伅锛� key锛歿}", sendRtpItemKey);
+            RedisRpcResponse response = request.getResponse();
+            response.setStatusCode(200);
+            return response;
+        }
+        logger.info("[redis-rpc] 鑾峰彇鍙戞祦鐨勪俊鎭細 {}/{}, 鐩爣鍦板潃锛� {}锛歿}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
+        // 鏌ヨ鏈骇鏄惁鏈夎繖涓祦
+        MediaServerItem mediaServerItem = mediaServerService.getMediaServerByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream());
+        if (mediaServerItem == null) {
+            RedisRpcResponse response = request.getResponse();
+            response.setStatusCode(200);
+        }
+        // 鑷钩鍙板唴瀹�
+        int localPort = sendRtpPortManager.getNextPort(mediaServerItem);
+        if (localPort == 0) {
+            logger.info("[redis-rpc] getSendRtpItem->鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�" );
+            RedisRpcResponse response = request.getResponse();
+            response.setStatusCode(200);
+        }
+        // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶�
+        sendRtpItem.setStatus(1);
+        sendRtpItem.setServerId(userSetting.getServerId());
+        sendRtpItem.setLocalIp(mediaServerItem.getSdpIp());
+        if (sendRtpItem.getSsrc() == null) {
+            // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡
+            String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
+            sendRtpItem.setSsrc(ssrc);
+        }
+        redisCatchStorage.updateSendRTPSever(sendRtpItem);
+        redisTemplate.opsForValue().set(sendRtpItemKey, sendRtpItem);
+        RedisRpcResponse response = request.getResponse();
+        response.setStatusCode(200);
+        response.setBody(sendRtpItemKey);
+        return response;
+    }
+
+    /**
+     * 鐩戝惉娴佷笂绾�
+     */
+    public RedisRpcResponse waitePushStreamOnline(RedisRpcRequest request) {
+        SendRtpItem sendRtpItem = JSONObject.parseObject(request.getParam().toString(), SendRtpItem.class);
+        logger.info("[redis-rpc] 鐩戝惉娴佷笂绾匡細 {}/{}, 鐩爣鍦板潃锛� {}锛歿}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
+        // 鏌ヨ鏈骇鏄惁鏈夎繖涓祦
+        MediaServerItem mediaServerItem = mediaServerService.getMediaServerByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream());
+        if (mediaServerItem != null) {
+            logger.info("[redis-rpc] 鐩戝惉娴佷笂绾挎椂鍙戠幇娴佸凡瀛樺湪鐩存帴杩斿洖锛� {}/{}, 鐩爣鍦板潃锛� {}锛歿}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
+            // 璇诲彇redis涓殑涓婄骇鐐规挱淇℃伅锛岀敓鎴恠endRtpItm鍙戦�佸嚭鍘�
+            if (sendRtpItem.getSsrc() == null) {
+                // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡
+                String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
+                sendRtpItem.setSsrc(ssrc);
+            }
+            sendRtpItem.setMediaServerId(mediaServerItem.getId());
+            sendRtpItem.setLocalIp(mediaServerItem.getSdpIp());
+            sendRtpItem.setServerId(userSetting.getServerId());
+
+            redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem);
+            RedisRpcResponse response = request.getResponse();
+            response.setBody(sendRtpItem.getRedisKey());
+            response.setStatusCode(200);
+        }
+        // 鐩戝惉娴佷笂绾裤�� 娴佷笂绾跨洿鎺ュ彂閫乻endRtpItem娑堟伅缁欏疄闄呯殑淇′护澶勭悊鑰�
+        HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed(
+                sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null);
+
+        hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> {
+            logger.info("[redis-rpc] 鐩戝惉娴佷笂绾匡紝娴佸凡涓婄嚎锛� {}/{}, 鐩爣鍦板潃锛� {}锛歿}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
+            // 璇诲彇redis涓殑涓婄骇鐐规挱淇℃伅锛岀敓鎴恠endRtpItm鍙戦�佸嚭鍘�
+            if (sendRtpItem.getSsrc() == null) {
+                // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡
+                String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItemInUse.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItemInUse.getId());
+                sendRtpItem.setSsrc(ssrc);
+            }
+            sendRtpItem.setMediaServerId(mediaServerItemInUse.getId());
+            sendRtpItem.setLocalIp(mediaServerItemInUse.getSdpIp());
+            sendRtpItem.setServerId(userSetting.getServerId());
+
+            redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem);
+            RedisRpcResponse response = request.getResponse();
+            response.setBody(sendRtpItem.getRedisKey());
+            response.setStatusCode(200);
+            // 鎵嬪姩鍙戦�佺粨鏋�
+            sendResponse(response);
+            hookSubscribe.removeSubscribe(hook);
+
+        });
+        return null;
+    }
+
+    /**
+     * 鍋滄鐩戝惉娴佷笂绾�
+     */
+    public RedisRpcResponse stopWaitePushStreamOnline(RedisRpcRequest request) {
+        SendRtpItem sendRtpItem = JSONObject.parseObject(request.getParam().toString(), SendRtpItem.class);
+        logger.info("[redis-rpc] 鍋滄鐩戝惉娴佷笂绾匡細 {}/{}, 鐩爣鍦板潃锛� {}锛歿}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
+        // 鐩戝惉娴佷笂绾裤�� 娴佷笂绾跨洿鎺ュ彂閫乻endRtpItem娑堟伅缁欏疄闄呯殑淇′护澶勭悊鑰�
+        HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed(
+                sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null);
+        hookSubscribe.removeSubscribe(hook);
+        RedisRpcResponse response = request.getResponse();
+        response.setStatusCode(200);
+        return response;
+    }
+
+
+    /**
+     * 寮�濮嬪彂娴�
+     */
+    public RedisRpcResponse startSendRtp(RedisRpcRequest request) {
+        String sendRtpItemKey = request.getParam().toString();
+        SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(sendRtpItemKey);
+        RedisRpcResponse response = request.getResponse();
+        response.setStatusCode(200);
+        if (sendRtpItem == null) {
+            logger.info("[redis-rpc] 寮�濮嬪彂娴�, 鏈壘鍒皉edis涓殑鍙戞祦淇℃伅锛� key锛歿}", sendRtpItemKey);
+            WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "鏈壘鍒皉edis涓殑鍙戞祦淇℃伅");
+            response.setBody(wvpResult);
+            return response;
+        }
+        logger.info("[redis-rpc] 寮�濮嬪彂娴侊細 {}/{}, 鐩爣鍦板潃锛� {}锛歿}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
+        MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId());
+        if (mediaServerItem == null) {
+            logger.info("[redis-rpc] startSendRtp->鏈壘鍒癕ediaServer锛� {}", sendRtpItem.getMediaServerId() );
+            WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "鏈壘鍒癕ediaServer");
+            response.setBody(wvpResult);
+            return response;
+        }
+
+        Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
+        if (!streamReady) {
+            logger.info("[redis-rpc] startSendRtp->娴佷笉鍦ㄧ嚎锛� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() );
+            WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "娴佷笉鍦ㄧ嚎");
+            response.setBody(wvpResult);
+            return response;
+        }
+        JSONObject jsonObject = zlmServerFactory.startSendRtp(mediaServerItem, sendRtpItem);
+        if (jsonObject.getInteger("code") == 0) {
+            logger.info("[redis-rpc] 鍙戞祦鎴愬姛锛� {}/{}, 鐩爣鍦板潃锛� {}锛歿}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
+            WVPResult wvpResult = WVPResult.success();
+            response.setBody(wvpResult);
+        }else {
+            logger.info("[redis-rpc] 鍙戞祦澶辫触锛� {}/{}, 鐩爣鍦板潃锛� {}锛歿}锛� {}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), jsonObject);
+            WVPResult wvpResult = WVPResult.fail(jsonObject.getInteger("code"), jsonObject.getString("msg"));
+            response.setBody(wvpResult);
+        }
+        return response;
+    }
+
+    /**
+     * 鍋滄鍙戞祦
+     */
+    public RedisRpcResponse stopSendRtp(RedisRpcRequest request) {
+        String sendRtpItemKey = request.getParam().toString();
+        SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(sendRtpItemKey);
+        RedisRpcResponse response = request.getResponse();
+        response.setStatusCode(200);
+        if (sendRtpItem == null) {
+            logger.info("[redis-rpc] 鍋滄鎺ㄦ祦, 鏈壘鍒皉edis涓殑鍙戞祦淇℃伅锛� key锛歿}", sendRtpItemKey);
+            WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "鏈壘鍒皉edis涓殑鍙戞祦淇℃伅");
+            response.setBody(wvpResult);
+            return response;
+        }
+        logger.info("[redis-rpc] 鍋滄鎺ㄦ祦锛� {}/{}, 鐩爣鍦板潃锛� {}锛歿}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
+        MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId());
+        if (mediaServerItem == null) {
+            logger.info("[redis-rpc] stopSendRtp->鏈壘鍒癕ediaServer锛� {}", sendRtpItem.getMediaServerId() );
+            WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "鏈壘鍒癕ediaServer");
+            response.setBody(wvpResult);
+            return response;
+        }
+        JSONObject jsonObject = zlmServerFactory.stopSendRtpStream(mediaServerItem, sendRtpItem);
+        if (jsonObject.getInteger("code") == 0) {
+            logger.info("[redis-rpc] 鍋滄鎺ㄦ祦鎴愬姛锛� {}/{}, 鐩爣鍦板潃锛� {}锛歿}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
+            response.setBody(WVPResult.success());
+            return response;
+        }else {
+            int code = jsonObject.getInteger("code");
+            String msg = jsonObject.getString("msg");
+            logger.info("[redis-rpc] 鍋滄鎺ㄦ祦澶辫触锛� {}/{}, 鐩爣鍦板潃锛� {}锛歿}锛� code锛� {}, msg: {}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), code, msg );
+            response.setBody(WVPResult.fail(code, msg));
+            return response;
+        }
+    }
+
+    /**
+     * 鍏朵粬wvp閫氱煡鎺ㄦ祦宸茬粡鍋滄浜�
+     */
+    public RedisRpcResponse rtpSendStopped(RedisRpcRequest request) {
+        String sendRtpItemKey = request.getParam().toString();
+        SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(sendRtpItemKey);
+        RedisRpcResponse response = request.getResponse();
+        response.setStatusCode(200);
+        if (sendRtpItem == null) {
+            logger.info("[redis-rpc] 鎺ㄦ祦宸茬粡鍋滄, 鏈壘鍒皉edis涓殑鍙戞祦淇℃伅锛� key锛歿}", sendRtpItemKey);
+            return response;
+        }
+        logger.info("[redis-rpc] 鎺ㄦ祦宸茬粡鍋滄锛� {}/{}, 鐩爣鍦板潃锛� {}锛歿}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
+        String platformId = sendRtpItem.getPlatformId();
+        ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
+        if (platform == null) {
+            return response;
+        }
+        try {
+            commanderFroPlatform.streamByeCmd(platform, sendRtpItem);
+            redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(),
+                    sendRtpItem.getCallId(), sendRtpItem.getStream());
+            redisCatchStorage.sendPlatformStopPlayMsg(sendRtpItem, platform);
+        } catch (SipException | InvalidArgumentException | ParseException e) {
+            logger.error("[鍛戒护鍙戦�佸け璐 鍙戦�丅YE: {}", e.getMessage());
+        }
+        return response;
+    }
+
+    private void sendResponse(RedisRpcResponse response){
+        logger.info("[redis-rpc] >> {}", response);
+        response.setToId(userSetting.getServerId());
+        RedisRpcMessage message = new RedisRpcMessage();
+        message.setResponse(response);
+        redisTemplate.convertAndSend(RedisRpcConfig.REDIS_REQUEST_CHANNEL_KEY, message);
+    }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java
new file mode 100644
index 0000000..8ffb562
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java
@@ -0,0 +1,155 @@
+package com.genersoft.iot.vmp.service.redisMsg.service;
+
+import com.alibaba.fastjson2.JSON;
+import com.genersoft.iot.vmp.common.CommonCallback;
+import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
+import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
+import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
+import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
+import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
+import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
+import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
+import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
+import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
+import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
+import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Service;
+
+@Service
+public class RedisRpcServiceImpl implements IRedisRpcService {
+
+    private final static Logger logger = LoggerFactory.getLogger(RedisRpcServiceImpl.class);
+
+    @Autowired
+    private RedisRpcConfig redisRpcConfig;
+
+    @Autowired
+    private UserSetting userSetting;
+
+    @Autowired
+    private ZlmHttpHookSubscribe hookSubscribe;
+
+    @Autowired
+    private SSRCFactory ssrcFactory;
+
+    @Autowired
+    private RedisTemplate<Object, Object> redisTemplate;
+
+    private RedisRpcRequest buildRequest(String uri, Object param) {
+        RedisRpcRequest request = new RedisRpcRequest();
+        request.setFromId(userSetting.getServerId());
+        request.setParam(param);
+        request.setUri(uri);
+        return request;
+    }
+
+    @Override
+    public SendRtpItem getSendRtpItem(String sendRtpItemKey) {
+        RedisRpcRequest request = buildRequest("getSendRtpItem", sendRtpItemKey);
+        RedisRpcResponse response = redisRpcConfig.request(request, 10);
+        if (response.getBody() == null) {
+            return null;
+        }
+        return (SendRtpItem)redisTemplate.opsForValue().get(response.getBody().toString());
+    }
+
+    @Override
+    public WVPResult startSendRtp(String sendRtpItemKey, SendRtpItem sendRtpItem) {
+        logger.info("[璇锋眰鍏朵粬WVP] 寮�濮嬫帹娴侊紝wvp锛歿}锛� {}/{}", sendRtpItem.getServerId(), sendRtpItem.getApp(), sendRtpItem.getStream());
+        RedisRpcRequest request = buildRequest("startSendRtp", sendRtpItemKey);
+        request.setToId(sendRtpItem.getServerId());
+        RedisRpcResponse response = redisRpcConfig.request(request, 10);
+        return JSON.parseObject(response.getBody().toString(), WVPResult.class);
+    }
+
+    @Override
+    public WVPResult stopSendRtp(String sendRtpItemKey) {
+        SendRtpItem sendRtpItem = (SendRtpItem)redisTemplate.opsForValue().get(sendRtpItemKey);
+        if (sendRtpItem == null) {
+            logger.info("[璇锋眰鍏朵粬WVP] 鍋滄鎺ㄦ祦, 鏈壘鍒皉edis涓殑鍙戞祦淇℃伅锛� key锛歿}", sendRtpItemKey);
+            return WVPResult.fail(ErrorCode.ERROR100.getCode(), "鏈壘鍒板彂娴佷俊鎭�");
+        }
+        logger.info("[璇锋眰鍏朵粬WVP] 鍋滄鎺ㄦ祦锛寃vp锛歿}锛� {}/{}", sendRtpItem.getServerId(), sendRtpItem.getApp(), sendRtpItem.getStream());
+        RedisRpcRequest request = buildRequest("stopSendRtp", sendRtpItemKey);
+        request.setToId(sendRtpItem.getServerId());
+        RedisRpcResponse response = redisRpcConfig.request(request, 10);
+        return JSON.parseObject(response.getBody().toString(), WVPResult.class);
+    }
+
+    @Override
+    public long waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback<String> callback) {
+        logger.info("[璇锋眰鎵�鏈塛VP鐩戝惉娴佷笂绾縘 {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
+        // 鐩戝惉娴佷笂绾裤�� 娴佷笂绾跨洿鎺ュ彂閫乻endRtpItem娑堟伅缁欏疄闄呯殑淇′护澶勭悊鑰�
+        HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed(
+                sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null);
+        RedisRpcRequest request = buildRequest("waitePushStreamOnline", sendRtpItem);
+        request.setToId(sendRtpItem.getServerId());
+        hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> {
+
+            // 璇诲彇redis涓殑涓婄骇鐐规挱淇℃伅锛岀敓鎴恠endRtpItm鍙戦�佸嚭鍘�
+            if (sendRtpItem.getSsrc() == null) {
+                // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡
+                String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItemInUse.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItemInUse.getId());
+                sendRtpItem.setSsrc(ssrc);
+            }
+            sendRtpItem.setMediaServerId(mediaServerItemInUse.getId());
+            sendRtpItem.setLocalIp(mediaServerItemInUse.getSdpIp());
+            sendRtpItem.setServerId(userSetting.getServerId());
+            redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem);
+            if (callback != null) {
+                callback.run(sendRtpItem.getRedisKey());
+            }
+            hookSubscribe.removeSubscribe(hook);
+            redisRpcConfig.removeCallback(request.getSn());
+        });
+
+        redisRpcConfig.request(request, response -> {
+            if (response.getBody() == null) {
+                logger.info("[璇锋眰鎵�鏈塛VP鐩戝惉娴佷笂绾縘 娴佷笂绾�,浣嗘槸鏈壘鍒板彂娴佷俊鎭細{}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
+                return;
+            }
+            logger.info("[璇锋眰鎵�鏈塛VP鐩戝惉娴佷笂绾縘 娴佷笂绾� {}/{}->{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.toString());
+
+            if (callback != null) {
+                callback.run(response.getBody().toString());
+            }
+            hookSubscribe.removeSubscribe(hook);
+        });
+        return request.getSn();
+    }
+
+    @Override
+    public void stopWaitePushStreamOnline(SendRtpItem sendRtpItem) {
+        logger.info("[鍋滄WVP鐩戝惉娴佷笂绾縘 {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
+        HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed(
+                sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null);
+        hookSubscribe.removeSubscribe(hook);
+        RedisRpcRequest request = buildRequest("stopWaitePushStreamOnline", sendRtpItem);
+        request.setToId(sendRtpItem.getServerId());
+        redisRpcConfig.request(request, 10);
+    }
+
+    @Override
+    public void rtpSendStopped(String sendRtpItemKey) {
+        SendRtpItem sendRtpItem = (SendRtpItem)redisTemplate.opsForValue().get(sendRtpItemKey);
+        if (sendRtpItem == null) {
+            logger.info("[鍋滄WVP鐩戝惉娴佷笂绾縘 鏈壘鍒皉edis涓殑鍙戞祦淇℃伅锛� key锛歿}", sendRtpItemKey);
+            return;
+        }
+        RedisRpcRequest request = buildRequest("rtpSendStopped", sendRtpItemKey);
+        request.setToId(sendRtpItem.getServerId());
+        redisRpcConfig.request(request, 10);
+    }
+
+    @Override
+    public void removeCallback(long key) {
+        redisRpcConfig.removeCallback(key);
+    }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
index 1233623..21911b9 100755
--- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
@@ -45,6 +45,8 @@
 
     void updateSendRTPSever(SendRtpItem sendRtpItem);
 
+    List<SendRtpItem> querySendRTPServer(String platformGbId, String channelId, String streamId);
+
     /**
      * 鏌ヨRTP鎺ㄩ�佷俊鎭紦瀛�
      * @param platformGbId
@@ -197,6 +199,8 @@
 
     void addDiskInfo(List<Map<String, Object>> diskInfo);
 
+    void deleteSendRTPServer(SendRtpItem sendRtpItem);
+
     List<SendRtpItem> queryAllSendRTPServer();
 
     List<Device> getAllDevices();
@@ -209,7 +213,7 @@
 
     void sendPlatformStartPlayMsg(MessageForPushChannel messageForPushChannel);
 
-    void sendPlatformStopPlayMsg(MessageForPushChannel messageForPushChannel);
+    void sendPlatformStopPlayMsg(SendRtpItem sendRtpItem, ParentPlatform platform);
 
     void addPushListItem(String app, String stream, MediaArrivalEvent param);
 
@@ -219,4 +223,11 @@
 
     void sendPushStreamClose(MessageForPushChannel messageForPushChannel);
 
+    void addWaiteSendRtpItem(SendRtpItem sendRtpItem, int platformPlayTimeout);
+
+    SendRtpItem getWaiteSendRtpItem(String app, String stream);
+
+    void sendStartSendRtp(SendRtpItem sendRtpItem);
+
+    void sendPushStreamOnline(SendRtpItem sendRtpItem);
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java
index c03d73a..10d0ee1 100755
--- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java
@@ -52,8 +52,8 @@
             "<if test='status != null'>, status=#{status}</if>" +
             "<if test='streamId != null'>, stream_id=#{streamId}</if>" +
             "<if test='hasAudio != null'>, has_audio=#{hasAudio}</if>" +
-            ", custom_longitude=#{longitude}" +
-            ", custom_latitude=#{latitude}" +
+            "<if test='customLongitude != null'>, custom_longitude=#{customLongitude}</if>" +
+            "<if test='customLatitude != null'>, custom_latitude=#{customLatitude}</if>" +
             "<if test='longitudeGcj02 != null'>, longitude_gcj02=#{longitudeGcj02}</if>" +
             "<if test='latitudeGcj02 != null'>, latitude_gcj02=#{latitudeGcj02}</if>" +
             "<if test='longitudeWgs84 != null'>, longitude_wgs84=#{longitudeWgs84}</if>" +
@@ -89,8 +89,10 @@
             "dc.password, " +
             "COALESCE(dc.custom_ptz_type, dc.ptz_type) AS ptz_type, " +
             "dc.status, " +
-            "COALESCE(dc.custom_longitude, dc.longitude) AS longitude, " +
-            "COALESCE(dc.custom_latitude, dc.latitude) AS latitude, " +
+            "dc.longitude, " +
+            "dc.latitude, " +
+            "dc.custom_longitude, " +
+            "dc.custom_latitude, " +
             "dc.stream_id, " +
             "dc.device_id, " +
             "dc.parental, " +
@@ -345,6 +347,8 @@
             "<if test='item.hasAudio != null'>, has_audio=#{item.hasAudio}</if>" +
             "<if test='item.longitude != null'>, longitude=#{item.longitude}</if>" +
             "<if test='item.latitude != null'>, latitude=#{item.latitude}</if>" +
+            "<if test='item.customLongitude != null'>, custom_longitude=#{item.customLongitude}</if>" +
+            "<if test='item.customLatitude != null'>, custom_latitude=#{item.customLatitude}</if>" +
             "<if test='item.longitudeGcj02 != null'>, longitude_gcj02=#{item.longitudeGcj02}</if>" +
             "<if test='item.latitudeGcj02 != null'>, latitude_gcj02=#{item.latitudeGcj02}</if>" +
             "<if test='item.longitudeWgs84 != null'>, longitude_wgs84=#{item.longitudeWgs84}</if>" +
@@ -397,6 +401,23 @@
             " </script>"})
     int updatePosition(DeviceChannel deviceChannel);
 
+    @Update({"<script>" +
+            "<foreach collection='deviceChannelList' item='item' separator=';'>" +
+            " UPDATE" +
+            " wvp_device_channel" +
+            " SET gps_time=#{item.gpsTime}" +
+            "<if test='item.longitude != null'>, longitude=#{item.longitude}</if>" +
+            "<if test='item.latitude != null'>, latitude=#{item.latitude}</if>" +
+            "<if test='item.longitudeGcj02 != null'>, longitude_gcj02=#{item.longitudeGcj02}</if>" +
+            "<if test='item.latitudeGcj02 != null'>, latitude_gcj02=#{item.latitudeGcj02}</if>" +
+            "<if test='item.longitudeWgs84 != null'>, longitude_wgs84=#{item.longitudeWgs84}</if>" +
+            "<if test='item.latitudeWgs84 != null'>, latitude_wgs84=#{item.latitudeWgs84}</if>" +
+            "WHERE device_id=#{item.deviceId} " +
+            " <if test='item.channelId != null' > AND channel_id=#{item.channelId}</if>" +
+            "</foreach>" +
+            "</script>"})
+    int batchUpdatePosition(List<DeviceChannel> deviceChannelList);
+
     @Select("SELECT * FROM wvp_device_channel WHERE length(trim(stream_id)) > 0")
     List<DeviceChannel> getAllChannelInPlay();
 
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java
index 7bf243c..c28b16e 100755
--- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java
@@ -33,4 +33,33 @@
     @Delete("DELETE FROM wvp_device_mobile_position WHERE device_id = #{deviceId}")
     int clearMobilePositionsByDeviceId(String deviceId);
 
+
+    @Insert("<script> " +
+            "insert into wvp_device_mobile_position " +
+            "(device_id,channel_id, device_name,time,longitude,latitude,altitude,speed,direction,report_source," +
+            "longitude_gcj02,latitude_gcj02,longitude_wgs84,latitude_wgs84,create_time)"+
+            "values " +
+            "<foreach collection='mobilePositions' index='index' item='item' separator=','> " +
+            "(#{item.deviceId}, #{item.channelId}, #{item.deviceName}, #{item.time}, #{item.longitude}, " +
+            "#{item.latitude}, #{item.altitude}, #{item.speed},#{item.direction}," +
+            "#{item.reportSource}, #{item.longitudeGcj02}, #{item.latitudeGcj02}, #{item.longitudeWgs84}, #{item.latitudeWgs84}, " +
+            "#{item.createTime}) " +
+            "</foreach> " +
+            "</script>")
+    void batchadd2(List<MobilePosition> mobilePositions);
+
+    @Insert("<script> " +
+            "<foreach collection='mobilePositions' index='index' item='item' separator=','> " +
+            "insert into wvp_device_mobile_position " +
+            "(device_id,channel_id, device_name,time,longitude,latitude,altitude,speed,direction,report_source," +
+            "longitude_gcj02,latitude_gcj02,longitude_wgs84,latitude_wgs84,create_time)"+
+            "values " +
+            "(#{item.deviceId}, #{item.channelId}, #{item.deviceName}, #{item.time}, #{item.longitude}, " +
+            "#{item.latitude}, #{item.altitude}, #{item.speed},#{item.direction}," +
+            "#{item.reportSource}, #{item.longitudeGcj02}, #{item.latitudeGcj02}, #{item.longitudeWgs84}, #{item.latitudeWgs84}, " +
+            "#{item.createTime}); " +
+            "</foreach> " +
+            "</script>")
+    void batchadd(List<MobilePosition> mobilePositions);
+
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
index 51878c7..6388932 100755
--- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -12,6 +12,8 @@
 import com.genersoft.iot.vmp.media.bean.MediaInfo;
 import com.genersoft.iot.vmp.media.bean.MediaServer;
 import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
+import com.genersoft.iot.vmp.gb28181.bean.*;
+import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
 import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
 import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
@@ -146,15 +148,26 @@
 
     @Override
     public void updateSendRTPSever(SendRtpItem sendRtpItem) {
+        redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem);
+    }
 
-        String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX +
-                userSetting.getServerId() + "_"
-                + sendRtpItem.getMediaServerId() + "_"
-                + sendRtpItem.getPlatformId() + "_"
-                + sendRtpItem.getChannelId() + "_"
-                + sendRtpItem.getStream() + "_"
-                + sendRtpItem.getCallId();
-        redisTemplate.opsForValue().set(key, sendRtpItem);
+    @Override
+    public List<SendRtpItem> querySendRTPServer(String platformGbId, String channelId, String streamId) {
+        String scanKey = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX
+                + userSetting.getServerId() + "_*_"
+                + platformGbId + "_"
+                + channelId + "_"
+                + streamId + "_"
+                + "*";
+        List<SendRtpItem> result = new ArrayList<>();
+        List<Object> scan = RedisUtil.scan(redisTemplate, scanKey);
+        if (!scan.isEmpty()) {
+            for (Object o : scan) {
+                String key = (String) o;
+                result.add(JsonUtil.redisJsonToObject(redisTemplate, key, SendRtpItem.class));
+            }
+        }
+        return result;
     }
 
     @Override
@@ -172,7 +185,7 @@
             callId = "*";
         }
         String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX
-                + userSetting.getServerId() + "_*_"
+                + "*_*_"
                 + platformGbId + "_"
                 + channelId + "_"
                 + streamId + "_"
@@ -268,9 +281,18 @@
         List<Object> scan = RedisUtil.scan(redisTemplate, key);
         if (scan.size() > 0) {
             for (Object keyStr : scan) {
+                logger.info("[鍒犻櫎 redis鐨凷endRTP]锛� {}", keyStr.toString());
                 redisTemplate.delete(keyStr);
             }
         }
+    }
+
+    /**
+     * 鍒犻櫎RTP鎺ㄩ�佷俊鎭紦瀛�
+     */
+    @Override
+    public void deleteSendRTPServer(SendRtpItem sendRtpItem) {
+        deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),sendRtpItem.getCallId(), sendRtpItem.getStream());
     }
 
     @Override
@@ -555,7 +577,7 @@
     @Override
     public void sendMobilePositionMsg(JSONObject jsonObject) {
         String key = VideoManagerConstants.VM_MSG_SUBSCRIBE_MOBILE_POSITION;
-        logger.info("[redis鍙戦�侀�氱煡] 鍙戦�� 绉诲姩浣嶇疆 {}: {}", key, jsonObject.toString());
+//        logger.info("[redis鍙戦�侀�氱煡] 鍙戦�� 绉诲姩浣嶇疆 {}: {}", key, jsonObject.toString());
         redisTemplate.convertAndSend(key, jsonObject);
     }
 
@@ -646,9 +668,15 @@
     }
 
     @Override
-    public void sendPlatformStopPlayMsg(MessageForPushChannel msg) {
+    public void sendPlatformStopPlayMsg(SendRtpItem sendRtpItem, ParentPlatform platform) {
+
+        MessageForPushChannel msg = MessageForPushChannel.getInstance(0,
+                sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
+                sendRtpItem.getPlatformId(), platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
+        msg.setPlatFormIndex(platform.getId());
+
         String key = VideoManagerConstants.VM_MSG_STREAM_STOP_PLAY_NOTIFY;
-        logger.info("[redis鍙戦�侀�氱煡] 鍙戦�� 涓婄骇骞冲彴鍋滄瑙傜湅 {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId());
+        logger.info("[redis鍙戦�侀�氱煡] 鍙戦�� 涓婄骇骞冲彴鍋滄瑙傜湅 {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), platform.getServerGBId());
         redisTemplate.convertAndSend(key, JSON.toJSON(msg));
     }
 
@@ -681,4 +709,30 @@
         logger.info("[redis鍙戦�侀�氱煡] 鍙戦�� 鍋滄鍚戜笂绾ф帹娴� {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId());
         redisTemplate.convertAndSend(key, JSON.toJSON(msg));
     }
+
+    @Override
+    public void addWaiteSendRtpItem(SendRtpItem sendRtpItem, int platformPlayTimeout) {
+        String key = VideoManagerConstants.WAITE_SEND_PUSH_STREAM + sendRtpItem.getApp() + "_" + sendRtpItem.getStream();
+        redisTemplate.opsForValue().set(key, sendRtpItem);
+    }
+
+    @Override
+    public SendRtpItem getWaiteSendRtpItem(String app, String stream) {
+        String key = VideoManagerConstants.WAITE_SEND_PUSH_STREAM + app + "_" + stream;
+        return JsonUtil.redisJsonToObject(redisTemplate, key, SendRtpItem.class);
+    }
+
+    @Override
+    public void sendStartSendRtp(SendRtpItem sendRtpItem) {
+        String key = VideoManagerConstants.START_SEND_PUSH_STREAM + sendRtpItem.getApp() + "_" + sendRtpItem.getStream();
+        logger.info("[redis鍙戦�侀�氱煡] 閫氱煡鍏朵粬WVP鎺ㄦ祦 {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getPlatformId());
+        redisTemplate.convertAndSend(key, JSON.toJSON(sendRtpItem));
+    }
+
+    @Override
+    public void sendPushStreamOnline(SendRtpItem sendRtpItem) {
+        String key = VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED;
+        logger.info("[redis鍙戦�侀�氱煡] 娴佷笂绾� {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getPlatformId());
+        redisTemplate.convertAndSend(key, JSON.toJSON(sendRtpItem));
+    }
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/log/LogController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/log/LogController.java
index 7ad595d..59a6943 100755
--- a/src/main/java/com/genersoft/iot/vmp/vmanager/log/LogController.java
+++ b/src/main/java/com/genersoft/iot/vmp/vmanager/log/LogController.java
@@ -2,6 +2,7 @@
 
 import com.genersoft.iot.vmp.conf.UserSetting;
 import com.genersoft.iot.vmp.conf.exception.ControllerException;
+import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
 import com.genersoft.iot.vmp.conf.security.JwtUtils;
 import com.genersoft.iot.vmp.service.ILogService;
 import com.genersoft.iot.vmp.storager.dao.dto.LogDto;
@@ -92,4 +93,14 @@
         logService.clear();
     }
 
+    @Autowired
+    private RedisRpcConfig redisRpcConfig;
+
+    @GetMapping("/test/count")
+    public Object count() {
+        return redisRpcConfig.getCallbackCount();
+    }
+
+
+
 }
diff --git a/web_src/src/components/channelList.vue b/web_src/src/components/channelList.vue
index 02e92e7..ef49205 100755
--- a/web_src/src/components/channelList.vue
+++ b/web_src/src/components/channelList.vue
@@ -326,7 +326,9 @@
             e.ptzType = e.ptzType + "";
             that.$set(e, "edit", false);
             that.$set(e, "location", "");
-            if (e.longitude && e.latitude) {
+            if (e.customLongitude && e.customLatitude) {
+              that.$set(e, "location", e.customLongitude + "," + e.customLatitude);
+            }else if (e.longitude && e.latitude) {
               that.$set(e, "location", e.longitude + "," + e.latitude);
             }
           });
@@ -481,7 +483,9 @@
               e.ptzType = e.ptzType + "";
               this.$set(e, "edit", false);
               this.$set(e, "location", "");
-              if (e.longitude && e.latitude) {
+              if (e.customLongitude && e.customLatitude) {
+                this.$set(e, "location", e.customLongitude + "," + e.customLatitude);
+              }else if (e.longitude && e.latitude) {
                 this.$set(e, "location", e.longitude + "," + e.latitude);
               }
             });
@@ -603,8 +607,8 @@
           this.$message.warning("浣嶇疆淇℃伅鏍煎紡鏈夎锛屼緥锛�117.234,36.378");
           return;
         } else {
-          row.longitude = parseFloat(segements[0]);
-          row.latitude = parseFloat(segements[1]);
+          row.customLongitude = parseFloat(segements[0]);
+          row.custom_latitude = parseFloat(segements[1]);
           if (!(row.longitude && row.latitude)) {
             this.$message.warning("浣嶇疆淇℃伅鏍煎紡鏈夎锛屼緥锛�117.234,36.378");
             return;
diff --git "a/\346\225\260\346\215\256\345\272\223/2.7.0/\346\233\264\346\226\260-mysql-2.7.0.sql" "b/\346\225\260\346\215\256\345\272\223/2.7.0/\346\233\264\346\226\260-mysql-2.7.0.sql"
index c229fb1..b14a5c8 100644
--- "a/\346\225\260\346\215\256\345\272\223/2.7.0/\346\233\264\346\226\260-mysql-2.7.0.sql"
+++ "b/\346\225\260\346\215\256\345\272\223/2.7.0/\346\233\264\346\226\260-mysql-2.7.0.sql"
@@ -4,5 +4,15 @@
 alter table wvp_device
     drop switch_primary_sub_stream;
 
+# 绗竴涓ˉ涓佸寘
 alter table wvp_platform
-    add send_stream_ip character varying(50);
\ No newline at end of file
+    add send_stream_ip character varying(50);
+
+alter table wvp_device
+    change on_line on_line bool default false;
+
+alter table wvp_device
+    change id id serial primary key;
+
+alter table wvp_device
+    change ssrc_check ssrc_check bool default false;
\ No newline at end of file
diff --git "a/\346\225\260\346\215\256\345\272\223/2.7.0/\346\233\264\346\226\260-postgresql-kingbase-2.7.0.sql" "b/\346\225\260\346\215\256\345\272\223/2.7.0/\346\233\264\346\226\260-postgresql-kingbase-2.7.0.sql"
index c229fb1..b14a5c8 100644
--- "a/\346\225\260\346\215\256\345\272\223/2.7.0/\346\233\264\346\226\260-postgresql-kingbase-2.7.0.sql"
+++ "b/\346\225\260\346\215\256\345\272\223/2.7.0/\346\233\264\346\226\260-postgresql-kingbase-2.7.0.sql"
@@ -4,5 +4,15 @@
 alter table wvp_device
     drop switch_primary_sub_stream;
 
+# 绗竴涓ˉ涓佸寘
 alter table wvp_platform
-    add send_stream_ip character varying(50);
\ No newline at end of file
+    add send_stream_ip character varying(50);
+
+alter table wvp_device
+    change on_line on_line bool default false;
+
+alter table wvp_device
+    change id id serial primary key;
+
+alter table wvp_device
+    change ssrc_check ssrc_check bool default false;
\ No newline at end of file

--
Gitblit v1.8.0