648540858
2024-04-23 d41d6b34af2485198ed01e1888db1571e4da1a6a
Merge branch 'refs/heads/2.7.0'

# Conflicts:
# src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java
# src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
# src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
# src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
# src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
# src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java
# src/main/java/com/genersoft/iot/vmp/media/zlm/ZlmHttpHookSubscribe.java
# src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java
# src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java
# src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java
# src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
# src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
# src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
# src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java
# src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java
# src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
# src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
33个文件已修改
8个文件已添加
4个文件已删除
3902 ■■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java 20 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java 225 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcMessage.java 24 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcRequest.java 93 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcResponse.java 99 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java 30 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java 65 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java 22 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java 511 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java 363 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java 199 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java 276 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/utils/XmlUtil.java 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java 357 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java 121 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ChannelOnlineEvent.java 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java 60 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java 12 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java 17 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java 22 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java 451 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java 111 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java 97 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java 304 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java 155 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java 29 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java 29 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java 78 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/vmanager/log/LogController.java 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
web_src/src/components/channelList.vue 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
数据库/2.7.0/更新-mysql-2.7.0.sql 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
数据库/2.7.0/更新-postgresql-kingbase-2.7.0.sql 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
@@ -72,6 +72,9 @@
    public static final String REGISTER_EXPIRE_TASK_KEY_PREFIX = "VMP_device_register_expire_";
    public static final String PUSH_STREAM_LIST = "VMP_PUSH_STREAM_LIST_";
    public static final String WAITE_SEND_PUSH_STREAM = "VMP_WAITE_SEND_PUSH_STREAM:";
    public static final String START_SEND_PUSH_STREAM = "VMP_START_SEND_PUSH_STREAM:";
    public static final String PUSH_STREAM_ONLINE = "VMP_PUSH_STREAM_ONLINE:";
src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java
@@ -66,7 +66,7 @@
    private List<String> allowedOrigins = new ArrayList<>();
    private int maxNotifyCountQueue = 10000;
    private int maxNotifyCountQueue = 100000;
    private int registerAgainAfterTime = 60;
src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java
@@ -29,25 +29,21 @@
    private RedisAlarmMsgListener redisAlarmMsgListener;
    @Autowired
    private RedisStreamMsgListener redisStreamMsgListener;
    @Autowired
    private RedisGbPlayMsgListener redisGbPlayMsgListener;
    @Autowired
    private RedisPushStreamStatusMsgListener redisPushStreamStatusMsgListener;
    @Autowired
    private RedisPushStreamStatusListMsgListener redisPushStreamListMsgListener;
    @Autowired
    private RedisPushStreamResponseListener redisPushStreamResponseListener;
    @Autowired
    private RedisCloseStreamMsgListener redisCloseStreamMsgListener;
    @Autowired
    private RedisPushStreamCloseResponseListener redisPushStreamCloseResponseListener;
    private RedisRpcConfig redisRpcConfig;
    @Autowired
    private RedisPushStreamResponseListener redisPushStreamCloseResponseListener;
    /**
@@ -64,13 +60,11 @@
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(redisGPSMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_GPS));
        container.addMessageListener(redisAlarmMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_SUBSCRIBE_ALARM_RECEIVE));
        container.addMessageListener(redisStreamMsgListener, new PatternTopic(VideoManagerConstants.WVP_MSG_STREAM_CHANGE_PREFIX + "PUSH"));
        container.addMessageListener(redisGbPlayMsgListener, new PatternTopic(RedisGbPlayMsgListener.WVP_PUSH_STREAM_KEY));
        container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE));
        container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE));
        container.addMessageListener(redisPushStreamResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE));
        container.addMessageListener(redisCloseStreamMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE));
        container.addMessageListener(redisPushStreamCloseResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED));
        container.addMessageListener(redisRpcConfig, new PatternTopic(RedisRpcConfig.REDIS_REQUEST_CHANNEL_KEY));
        container.addMessageListener(redisPushStreamCloseResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE));
        return container;
    }
}
src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java
New file
@@ -0,0 +1,225 @@
package com.genersoft.iot.vmp.conf.redis;
import com.alibaba.fastjson2.JSON;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.service.redisMsg.control.RedisRpcController;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
@Component
public class RedisRpcConfig implements MessageListener {
    private final static Logger logger = LoggerFactory.getLogger(RedisRpcConfig.class);
    public final static String REDIS_REQUEST_CHANNEL_KEY = "WVP_REDIS_REQUEST_CHANNEL_KEY";
    private final Random random = new Random();
    @Autowired
    private UserSetting userSetting;
    @Autowired
    private RedisRpcController redisRpcController;
    @Autowired
    private RedisTemplate<Object, Object> redisTemplate;
    @Autowired
    private ZlmHttpHookSubscribe hookSubscribe;
    private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
    @Qualifier("taskExecutor")
    @Autowired
    private ThreadPoolTaskExecutor taskExecutor;
    @Override
    public void onMessage(Message message, byte[] pattern) {
        boolean isEmpty = taskQueue.isEmpty();
        taskQueue.offer(message);
        if (isEmpty) {
            taskExecutor.execute(() -> {
                while (!taskQueue.isEmpty()) {
                    Message msg = taskQueue.poll();
                    try {
                        RedisRpcMessage redisRpcMessage = JSON.parseObject(new String(msg.getBody()), RedisRpcMessage.class);
                        if (redisRpcMessage.getRequest() != null) {
                            handlerRequest(redisRpcMessage.getRequest());
                        } else if (redisRpcMessage.getResponse() != null){
                            handlerResponse(redisRpcMessage.getResponse());
                        } else {
                            logger.error("[redis rpc è§£æžå¤±è´¥] {}", JSON.toJSONString(redisRpcMessage));
                        }
                    } catch (Exception e) {
                        logger.error("[redis rpc è§£æžå¼‚常] ", e);
                    }
                }
            });
        }
    }
    private void handlerResponse(RedisRpcResponse response) {
        if (userSetting.getServerId().equals(response.getToId())) {
            return;
        }
        logger.info("[redis-rpc] << {}", response);
        response(response);
    }
    private void handlerRequest(RedisRpcRequest request) {
        try {
            if (userSetting.getServerId().equals(request.getFromId())) {
                return;
            }
            logger.info("[redis-rpc] << {}", request);
            Method method = getMethod(request.getUri());
            // æ²¡æœ‰æºå¸¦ç›®æ ‡ID的可以理解为哪个wvp有结果就哪个回复,携带目标ID,但是如果是不存在的uri则直接回复404
            if (userSetting.getServerId().equals(request.getToId())) {
                if (method == null) {
                    // å›žå¤404结果
                    RedisRpcResponse response = request.getResponse();
                    response.setStatusCode(404);
                    sendResponse(response);
                    return;
                }
                RedisRpcResponse response = (RedisRpcResponse)method.invoke(redisRpcController, request);
                if(response != null) {
                    sendResponse(response);
                }
            }else {
                if (method == null) {
                    return;
                }
                RedisRpcResponse response = (RedisRpcResponse)method.invoke(redisRpcController, request);
                if (response != null) {
                    sendResponse(response);
                }
            }
        }catch (InvocationTargetException | IllegalAccessException e) {
            logger.error("[redis rpc ] å¤„理请求失败 ", e);
        }
    }
    private Method getMethod(String name) {
        // å¯åŠ¨åŽæ‰«ææ‰€æœ‰çš„è·¯å¾„æ³¨è§£
        Method[] methods = redisRpcController.getClass().getMethods();
        for (Method method : methods) {
            if (method.getName().equals(name)) {
                return method;
            }
        }
        return null;
    }
    private void sendResponse(RedisRpcResponse response){
        logger.info("[redis-rpc] >> {}", response);
        response.setToId(userSetting.getServerId());
        RedisRpcMessage message = new RedisRpcMessage();
        message.setResponse(response);
        redisTemplate.convertAndSend(REDIS_REQUEST_CHANNEL_KEY, message);
    }
    private void sendRequest(RedisRpcRequest request){
        logger.info("[redis-rpc] >> {}", request);
        RedisRpcMessage message = new RedisRpcMessage();
        message.setRequest(request);
        redisTemplate.convertAndSend(REDIS_REQUEST_CHANNEL_KEY, message);
    }
    private final Map<Long, SynchronousQueue<RedisRpcResponse>> topicSubscribers = new ConcurrentHashMap<>();
    private final Map<Long, CommonCallback<RedisRpcResponse>> callbacks = new ConcurrentHashMap<>();
    public RedisRpcResponse request(RedisRpcRequest request, int timeOut) {
        request.setSn((long) random.nextInt(1000) + 1);
        SynchronousQueue<RedisRpcResponse> subscribe = subscribe(request.getSn());
        try {
            sendRequest(request);
            return subscribe.poll(timeOut, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            logger.warn("[redis rpc timeout] uri: {}, sn: {}", request.getUri(), request.getSn(), e);
        } finally {
            this.unsubscribe(request.getSn());
        }
        return null;
    }
    public void request(RedisRpcRequest request, CommonCallback<RedisRpcResponse> callback) {
        request.setSn((long) random.nextInt(1000) + 1);
        setCallback(request.getSn(), callback);
        sendRequest(request);
    }
    public Boolean response(RedisRpcResponse response) {
        SynchronousQueue<RedisRpcResponse> queue = topicSubscribers.get(response.getSn());
        CommonCallback<RedisRpcResponse> callback = callbacks.get(response.getSn());
        if (queue != null) {
            try {
                return queue.offer(response, 2, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                logger.error("{}", e.getMessage(), e);
            }
        }else if (callback != null) {
            callback.run(response);
            callbacks.remove(response.getSn());
        }
        return false;
    }
    private void unsubscribe(long key) {
        topicSubscribers.remove(key);
    }
    private SynchronousQueue<RedisRpcResponse> subscribe(long key) {
        SynchronousQueue<RedisRpcResponse> queue = null;
        if (!topicSubscribers.containsKey(key))
            topicSubscribers.put(key, queue = new SynchronousQueue<>());
        return queue;
    }
    private void setCallback(long key, CommonCallback<RedisRpcResponse> callback)  {
        // TODO å¦‚果多个上级点播同一个通道会有问题
        callbacks.put(key, callback);
    }
    public void removeCallback(long key)  {
        callbacks.remove(key);
    }
    public int getCallbackCount(){
        return callbacks.size();
    }
//    @Scheduled(fixedRate = 1000)   //每1秒执行一次
//    public void execute(){
//        logger.info("callbacks的长度: " + callbacks.size());
//        logger.info("队列的长度: " + topicSubscribers.size());
//        logger.info("HOOK监听的长度: " + hookSubscribe.size());
//        logger.info("");
//    }
}
src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcMessage.java
New file
@@ -0,0 +1,24 @@
package com.genersoft.iot.vmp.conf.redis.bean;
public class RedisRpcMessage {
    private RedisRpcRequest request;
    private RedisRpcResponse response;
    public RedisRpcRequest getRequest() {
        return request;
    }
    public void setRequest(RedisRpcRequest request) {
        this.request = request;
    }
    public RedisRpcResponse getResponse() {
        return response;
    }
    public void setResponse(RedisRpcResponse response) {
        this.response = response;
    }
}
src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcRequest.java
New file
@@ -0,0 +1,93 @@
package com.genersoft.iot.vmp.conf.redis.bean;
/**
 * é€šè¿‡redis发送请求
 */
public class RedisRpcRequest {
    /**
     * æ¥è‡ªçš„WVP ID
     */
    private String fromId;
    /**
     * ç›®æ ‡çš„WVP ID
     */
    private String toId;
    /**
     * åºåˆ—号
     */
    private long sn;
    /**
     * è®¿é—®çš„路径
     */
    private String uri;
    /**
     * å‚æ•°
     */
    private Object param;
    public String getFromId() {
        return fromId;
    }
    public void setFromId(String fromId) {
        this.fromId = fromId;
    }
    public String getToId() {
        return toId;
    }
    public void setToId(String toId) {
        this.toId = toId;
    }
    public String getUri() {
        return uri;
    }
    public void setUri(String uri) {
        this.uri = uri;
    }
    public Object getParam() {
        return param;
    }
    public void setParam(Object param) {
        this.param = param;
    }
    public long getSn() {
        return sn;
    }
    public void setSn(long sn) {
        this.sn = sn;
    }
    @Override
    public String toString() {
        return "RedisRpcRequest{" +
                "uri='" + uri + '\'' +
                ", fromId='" + fromId + '\'' +
                ", toId='" + toId + '\'' +
                ", sn=" + sn +
                ", param=" + param +
                '}';
    }
    public RedisRpcResponse getResponse() {
        RedisRpcResponse response = new RedisRpcResponse();
        response.setFromId(fromId);
        response.setToId(toId);
        response.setSn(sn);
        response.setUri(uri);
        return response;
    }
}
src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcResponse.java
New file
@@ -0,0 +1,99 @@
package com.genersoft.iot.vmp.conf.redis.bean;
/**
 * é€šè¿‡redis发送回复
 */
public class RedisRpcResponse {
    /**
     * æ¥è‡ªçš„WVP ID
     */
    private String fromId;
    /**
     * ç›®æ ‡çš„WVP ID
     */
    private String toId;
    /**
     * åºåˆ—号
     */
    private long sn;
    /**
     * çŠ¶æ€ç 
     */
    private int statusCode;
    /**
     * è®¿é—®çš„路径
     */
    private String uri;
    /**
     * å‚æ•°
     */
    private Object body;
    public String getFromId() {
        return fromId;
    }
    public void setFromId(String fromId) {
        this.fromId = fromId;
    }
    public String getToId() {
        return toId;
    }
    public void setToId(String toId) {
        this.toId = toId;
    }
    public long getSn() {
        return sn;
    }
    public void setSn(long sn) {
        this.sn = sn;
    }
    public int getStatusCode() {
        return statusCode;
    }
    public void setStatusCode(int statusCode) {
        this.statusCode = statusCode;
    }
    public String getUri() {
        return uri;
    }
    public void setUri(String uri) {
        this.uri = uri;
    }
    public Object getBody() {
        return body;
    }
    public void setBody(Object body) {
        this.body = body;
    }
    @Override
    public String toString() {
        return "RedisRpcResponse{" +
                "uri='" + uri + '\'' +
                ", fromId='" + fromId + '\'' +
                ", toId='" + toId + '\'' +
                ", sn=" + sn +
                ", statusCode=" + statusCode +
                ", body=" + body +
                '}';
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java
@@ -187,6 +187,18 @@
    private double latitude;
    /**
     * ç»åº¦
     */
    @Schema(description = "自定义经度")
    private double customLongitude;
    /**
     * çº¬åº¦
     */
    @Schema(description = "自定义纬度")
    private double customLatitude;
    /**
     * ç»åº¦ GCJ02
     */
    @Schema(description = "GCJ02坐标系经度")
@@ -226,7 +238,7 @@
     *  æ˜¯å¦å«æœ‰éŸ³é¢‘
     */
    @Schema(description = "是否含有音频")
    private boolean hasAudio;
    private Boolean hasAudio;
    /**
     * æ ‡è®°é€šé“的类型,0->国标通道 1->直播流通道 2->业务分组/虚拟组织/行政区划
@@ -586,4 +598,20 @@
    public void setStreamIdentification(String streamIdentification) {
        this.streamIdentification = streamIdentification;
    }
    public double getCustomLongitude() {
        return customLongitude;
    }
    public void setCustomLongitude(double customLongitude) {
        this.customLongitude = customLongitude;
    }
    public double getCustomLatitude() {
        return customLatitude;
    }
    public void setCustomLatitude(double customLatitude) {
        this.customLatitude = customLatitude;
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java
@@ -2,6 +2,8 @@
import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
public class SendRtpItem {
    /**
@@ -23,6 +25,11 @@
     * å¹³å°id
     */
    private String platformId;
    /**
     * å¹³å°åç§°
     */
    private String platformName;
     /**
     * å¯¹åº”设备id
@@ -64,6 +71,11 @@
    private boolean tcpActive;
    /**
     * è‡ªå·±æŽ¨æµä½¿ç”¨çš„IP
     */
    private String localIp;
    /**
     * è‡ªå·±æŽ¨æµä½¿ç”¨çš„端口
     */
    private int localPort;
@@ -81,7 +93,7 @@
    /**
     *  invite çš„ callId
     */
    private String CallId;
    private String callId;
    /**
     *  invite çš„ fromTag
@@ -124,6 +136,11 @@
     */
    private String receiveStream;
    /**
     * ä¸Šçº§çš„点播类型
     */
    private String sessionName;
    public static SendRtpItem getInstance(RequestPushStreamMsg requestPushStreamMsg) {
        SendRtpItem sendRtpItem = new SendRtpItem();
        sendRtpItem.setMediaServerId(requestPushStreamMsg.getMediaServerId());
@@ -138,7 +155,7 @@
        sendRtpItem.setUsePs(requestPushStreamMsg.isPs());
        sendRtpItem.setOnlyAudio(requestPushStreamMsg.isOnlyAudio());
        return sendRtpItem;
    }
    public static SendRtpItem getInstance(String app, String stream, String ssrc, String dstIp, Integer dstPort, boolean tcp, int sendLocalPort, Integer pt) {
@@ -262,11 +279,11 @@
    }
    public String getCallId() {
        return CallId;
        return callId;
    }
    public void setCallId(String callId) {
        CallId = callId;
        this.callId = callId;
    }
    public InviteStreamType getPlayType() {
@@ -341,6 +358,30 @@
        this.receiveStream = receiveStream;
    }
    public String getPlatformName() {
        return platformName;
    }
    public void setPlatformName(String platformName) {
        this.platformName = platformName;
    }
    public String getLocalIp() {
        return localIp;
    }
    public void setLocalIp(String localIp) {
        this.localIp = localIp;
    }
    public String getSessionName() {
        return sessionName;
    }
    public void setSessionName(String sessionName) {
        this.sessionName = sessionName;
    }
    @Override
    public String toString() {
        return "SendRtpItem{" +
@@ -348,6 +389,7 @@
                ", port=" + port +
                ", ssrc='" + ssrc + '\'' +
                ", platformId='" + platformId + '\'' +
                ", platformName='" + platformName + '\'' +
                ", deviceId='" + deviceId + '\'' +
                ", app='" + app + '\'' +
                ", channelId='" + channelId + '\'' +
@@ -355,10 +397,11 @@
                ", stream='" + stream + '\'' +
                ", tcp=" + tcp +
                ", tcpActive=" + tcpActive +
                ", localIp='" + localIp + '\'' +
                ", localPort=" + localPort +
                ", mediaServerId='" + mediaServerId + '\'' +
                ", serverId='" + serverId + '\'' +
                ", CallId='" + CallId + '\'' +
                ", CallId='" + callId + '\'' +
                ", fromTag='" + fromTag + '\'' +
                ", toTag='" + toTag + '\'' +
                ", pt=" + pt +
@@ -367,6 +410,18 @@
                ", rtcp=" + rtcp +
                ", playType=" + playType +
                ", receiveStream='" + receiveStream + '\'' +
                ", sessionName='" + sessionName + '\'' +
                '}';
    }
    public String getRedisKey() {
        String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX +
                serverId + "_"
                + mediaServerId + "_"
                + platformId + "_"
                + channelId + "_"
                + stream + "_"
                + callId;
        return key;
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java
@@ -49,6 +49,7 @@
        ParentPlatform parentPlatform = null;
        Map<String, List<ParentPlatform>> parentPlatformMap = new HashMap<>();
        Map<String, DeviceChannel> channelMap = new HashMap<>();
        if (!ObjectUtils.isEmpty(event.getPlatformId())) {
            subscribe = subscribeHolder.getCatalogSubscribe(event.getPlatformId());
            if (subscribe == null) {
@@ -67,6 +68,7 @@
                    for (DeviceChannel deviceChannel : event.getDeviceChannels()) {
                        List<ParentPlatform> parentPlatformsForGB = storager.queryPlatFormListForGBWithGBId(deviceChannel.getChannelId(), platforms);
                        parentPlatformMap.put(deviceChannel.getChannelId(), parentPlatformsForGB);
                        channelMap.put(deviceChannel.getChannelId(), deviceChannel);
                    }
                }
            }else if (event.getGbStreams() != null) {
@@ -174,7 +176,7 @@
                                }
                                logger.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), gbId);
                                List<DeviceChannel> deviceChannelList = new ArrayList<>();
                                DeviceChannel deviceChannel = storager.queryChannelInParentPlatform(platform.getServerGBId(), gbId);
                                DeviceChannel deviceChannel = channelMap.get(gbId);
                                deviceChannelList.add(deviceChannel);
                                GbStream gbStream = storager.queryStreamInParentPlatform(platform.getServerGBId(), gbId);
                                if(gbStream != null){
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
@@ -592,6 +592,7 @@
        Integer finalIndex = index;
        String catalogXmlContent = getCatalogXmlContentForCatalogAddOrUpdate(parentPlatform, channels,
                deviceChannels.size(), type, subscribeInfo);
        System.out.println(catalogXmlContent);
        logger.info("[发送NOTIFY通知]类型: {},发送数量: {}", type, channels.size());
        sendNotify(parentPlatform, catalogXmlContent, subscribeInfo, eventResult -> {
            logger.error("发送NOTIFY通知消息失败。错误:{} {}", eventResult.statusCode, eventResult.msg);
@@ -621,7 +622,6 @@
    private  String getCatalogXmlContentForCatalogAddOrUpdate(ParentPlatform parentPlatform, List<DeviceChannel> channels, int sumNum, String type, SubscribeInfo subscribeInfo) {
        StringBuffer catalogXml = new StringBuffer(600);
        String characterSet = parentPlatform.getCharacterSet();
        catalogXml.append("<?xml version=\"1.0\" encoding=\"" + characterSet + "\"?>\r\n")
                .append("<Notify>\r\n")
@@ -660,6 +660,8 @@
                                .append("<Owner> " + channel.getOwner()+ "</Owner>\r\n")
                                .append("<CivilCode>" + channel.getCivilCode() + "</CivilCode>\r\n")
                                .append("<Address>" + channel.getAddress() + "</Address>\r\n");
                        catalogXml.append("<Longitude>" + channel.getLongitude() + "</Longitude>\r\n");
                        catalogXml.append("<Latitude>" + channel.getLatitude() + "</Latitude>\r\n");
                    }
                    if (!"presence".equals(subscribeInfo.getEventType())) {
                        catalogXml.append("<Event>" + type + "</Event>\r\n");
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java
@@ -6,7 +6,6 @@
import com.google.common.primitives.Bytes;
import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.message.SIPResponse;
import org.apache.commons.lang3.ArrayUtils;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.Element;
@@ -172,6 +171,7 @@
        return getRootElement(evt, "gb2312");
    }
    public Element getRootElement(RequestEvent evt, String charset) throws DocumentException {
        if (charset == null) {
            charset = "gb2312";
        }
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
@@ -13,10 +13,11 @@
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
@@ -51,6 +52,8 @@
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
    @Autowired
    private IRedisRpcService redisRpcService;
    @Autowired
    private UserSetting userSetting;
@@ -68,9 +71,6 @@
    private DynamicTask dynamicTask;
    @Autowired
    private RedisGbPlayMsgListener redisGbPlayMsgListener;
    @Autowired
    private IPlayService playService;
@@ -86,7 +86,7 @@
        logger.info("[收到ACK]: æ¥è‡ª->{}", fromUserId);
        SendRtpItem sendRtpItem =  redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId());
        if (sendRtpItem == null) {
            logger.warn("[收到ACK]:未找到来自{},目标为({})的推流信息",fromUserId, toUserId);
            logger.warn("[收到ACK]:未找到来自{},callId: {}", fromUserId, callIdHeader.getCallId());
            return;
        }
        // tcp主动时,此时是级联下级平台,在回复200ok时,本地已经请求zlm开启监听,跳过下面步骤
@@ -106,10 +106,13 @@
        if (parentPlatform != null) {
            if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) {
                RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(sendRtpItem);
                redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, () -> {
                    playService.startSendRtpStreamFailHand(sendRtpItem, parentPlatform, callIdHeader);
                });
                WVPResult wvpResult = redisRpcService.startSendRtp(sendRtpItem.getRedisKey(), sendRtpItem);
                if (wvpResult.getCode() == 0) {
                    RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(sendRtpItem);
                    redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, () -> {
                        playService.startSendRtpStreamFailHand(sendRtpItem, parentPlatform, callIdHeader);
                    });
                }
            } else {
                try {
                    if (sendRtpItem.isTcpActive()) {
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
@@ -16,10 +16,10 @@
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.bean.RequestStopPushStreamMsg;
import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import gov.nist.javax.sip.message.SIPRequest;
@@ -91,7 +91,8 @@
    private IStreamPushService pushService;
    @Autowired
    private RedisGbPlayMsgListener redisGbPlayMsgListener;
    private IRedisRpcService redisRpcService;
    @Override
    public void afterPropertiesSet() throws Exception {
@@ -138,17 +139,8 @@
                    if (userSetting.getUseCustomSsrcForParentInvite()) {
                        mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc());
                    }
                    ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
                    if (platform != null) {
                        MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
                                sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
                                sendRtpItem.getPlatformId(), platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
                        messageForPushChannel.setPlatFormIndex(platform.getId());
                        redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
                    }else {
                        logger.info("[上级平台停止观看] æœªæ‰¾åˆ°å¹³å°{}的信息,发送redis消息失败", sendRtpItem.getPlatformId());
                    }
                }else {
                    logger.info("[上级平台停止观看] æœªæ‰¾åˆ°å¹³å°{}的信息,发送redis消息失败", sendRtpItem.getPlatformId());
                }
            }else {
                MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -31,11 +31,17 @@
import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.service.IStreamProxyService;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.*;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
import com.genersoft.iot.vmp.service.redisMsg.RedisPushStreamResponseListener;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
@@ -51,6 +57,7 @@
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.sdp.*;
@@ -64,6 +71,7 @@
import java.util.Map;
import java.util.Random;
import java.util.Vector;
import java.util.*;
/**
 * SIP命令类型: INVITE请求
@@ -92,16 +100,16 @@
    private IRedisCatchStorage redisCatchStorage;
    @Autowired
    private IInviteStreamService inviteStreamService;
    private IRedisRpcService redisRpcService;
    @Autowired
    private RedisTemplate<Object, Object> redisTemplate;
    @Autowired
    private SSRCFactory ssrcFactory;
    @Autowired
    private DynamicTask dynamicTask;
    @Autowired
    private RedisPushStreamResponseListener redisPushStreamResponseListener;
    @Autowired
    private IPlayService playService;
@@ -125,17 +133,16 @@
    private UserSetting userSetting;
    @Autowired
    private ZLMMediaListManager mediaListManager;
    @Autowired
    private SipConfig config;
    @Autowired
    private RedisGbPlayMsgListener redisGbPlayMsgListener;
    @Autowired
    private VideoStreamSessionManager streamSession;
    @Autowired
    private SendRtpPortManager sendRtpPortManager;
    @Autowired
    private RedisPushStreamResponseListener redisPushStreamResponseListener;
    @Override
@@ -552,43 +559,79 @@
                    }
                } else if (gbStream != null) {
                    String ssrc;
                    if (userSetting.getUseCustomSsrcForParentInvite() || gb28181Sdp.getSsrc() == null) {
                        // ä¸Šçº§å¹³å°ç‚¹æ’­æ—¶ä¸ä½¿ç”¨ä¸Šçº§å¹³å°æŒ‡å®šçš„ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式
                        ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
                    }else {
                        ssrc = gb28181Sdp.getSsrc();
                    SendRtpItem sendRtpItem = new SendRtpItem();
                    if (!userSetting.getUseCustomSsrcForParentInvite() && gb28181Sdp.getSsrc() != null) {
                        sendRtpItem.setSsrc(gb28181Sdp.getSsrc());
                    }
                    if (tcpActive != null) {
                        sendRtpItem.setTcpActive(tcpActive);
                    }
                    sendRtpItem.setTcp(mediaTransmissionTCP);
                    sendRtpItem.setRtcp(platform.isRtcp());
                    sendRtpItem.setPlatformName(platform.getName());
                    sendRtpItem.setPlatformId(platform.getServerGBId());
                    sendRtpItem.setMediaServerId(mediaServerItem.getId());
                    sendRtpItem.setChannelId(channelId);
                    sendRtpItem.setIp(addressStr);
                    sendRtpItem.setPort(port);
                    sendRtpItem.setUsePs(true);
                    sendRtpItem.setApp(gbStream.getApp());
                    sendRtpItem.setStream(gbStream.getStream());
                    sendRtpItem.setCallId(callIdHeader.getCallId());
                    sendRtpItem.setFromTag(request.getFromTag());
                    sendRtpItem.setOnlyAudio(false);
                    sendRtpItem.setStatus(0);
                    sendRtpItem.setSessionName(sessionName);
                    // æ¸…理可能存在的缓存避免用到旧的数据
                    List<SendRtpItem> sendRtpItemList = redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, gbStream.getStream());
                    if (!sendRtpItemList.isEmpty()) {
                        for (SendRtpItem rtpItem : sendRtpItemList) {
                            redisCatchStorage.deleteSendRTPServer(rtpItem);
                        }
                    }
                    if ("push".equals(gbStream.getStreamType())) {
                        sendRtpItem.setPlayType(InviteStreamType.PUSH);
                        if (streamPushItem != null) {
                            // ä»Žredis查询是否正在接收这个推流
                            StreamPushItem pushListItem = redisCatchStorage.getPushListItem(gbStream.getApp(), gbStream.getStream());
                            if (pushListItem != null) {
                                pushListItem.setSelf(userSetting.getServerId().equals(pushListItem.getServerId()));
                                // æŽ¨æµçŠ¶æ€
                                pushStream(evt, request, gbStream, pushListItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                                sendRtpItem.setServerId(pushListItem.getSeverId());
                                sendRtpItem.setMediaServerId(pushListItem.getMediaServerId());
                                StreamPushItem transform = streamPushService.transform(pushListItem);
                                transform.setSelf(userSetting.getServerId().equals(pushListItem.getSeverId()));
                                redisCatchStorage.updateSendRTPSever(sendRtpItem);
                                // å¼€å§‹æŽ¨æµ
                                sendPushStream(sendRtpItem, mediaServerItem, platform, request);
                            }else {
                                // æœªæŽ¨æµ æ‹‰èµ·
                                notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                                if (!platform.isStartOfflinePush()) {
                                    // å¹³å°è®¾ç½®ä¸­å…³é—­äº†æ‹‰èµ·ç¦»çº¿çš„æŽ¨æµåˆ™ç›´æŽ¥å›žå¤
                                    try {
                                        logger.info("[上级点播] å¤±è´¥ï¼ŒæŽ¨æµè®¾å¤‡æœªæŽ¨æµï¼Œchannel: {}, app: {}, stream: {}", sendRtpItem.getChannelId(), sendRtpItem.getApp(), sendRtpItem.getStream());
                                        responseAck(request, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing");
                                    } catch (SipException | InvalidArgumentException | ParseException e) {
                                        logger.error("[命令发送失败] invite é€šé“未推流: {}", e.getMessage());
                                    }
                                    return;
                                }
                                notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request);
                            }
                        }
                    } else if ("proxy".equals(gbStream.getStreamType())) {
                        if (null != proxyByAppAndStream) {
                            if (sendRtpItem.getSsrc() == null) {
                                // ä¸Šçº§å¹³å°ç‚¹æ’­æ—¶ä¸ä½¿ç”¨ä¸Šçº§å¹³å°æŒ‡å®šçš„ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式
                                String ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
                                sendRtpItem.setSsrc(ssrc);
                            }
                            if (proxyByAppAndStream.isStatus()) {
                                pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive,
                                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                                sendProxyStream(sendRtpItem, mediaServerItem, platform, request);
                            } else {
                                //开启代理拉流
                                notifyStreamOnline(evt, request, gbStream, null, platform, callIdHeader, mediaServerItem, port, tcpActive,
                                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                                notifyProxyStreamOnline(sendRtpItem, mediaServerItem, platform, request);
                            }
                        }
                    }
                }
            }
@@ -614,58 +657,13 @@
    /**
     * å®‰æŽ’推流
     */
    private void pushProxyStream(RequestEvent evt, SIPRequest request, GbStream gbStream, ParentPlatform platform,
                            CallIdHeader callIdHeader, MediaServer mediaServer,
                            int port, Boolean tcpActive, boolean mediaTransmissionTCP,
                            String channelId, String addressStr, String ssrc, String requesterId) {
            Boolean streamReady = mediaServerService.isStreamReady(mediaServer, gbStream.getApp(), gbStream.getStream());
    private void sendProxyStream(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) {
            Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
            if (streamReady != null && streamReady) {
                // è‡ªå¹³å°å†…容
                SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServer, addressStr, port, ssrc, requesterId,
                        gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
            if (sendRtpItem == null) {
                logger.warn("服务器端口资源不足");
                try {
                    responseAck(request, Response.BUSY_HERE);
                } catch (SipException | InvalidArgumentException | ParseException e) {
                    logger.error("[命令发送失败] invite æœåŠ¡å™¨ç«¯å£èµ„æºä¸è¶³: {}", e.getMessage());
                }
                return;
            }
            if (tcpActive != null) {
                sendRtpItem.setTcpActive(tcpActive);
            }
            sendRtpItem.setPlayType(InviteStreamType.PUSH);
            // å†™å…¥redis, è¶…时时回复
            sendRtpItem.setStatus(1);
            sendRtpItem.setCallId(callIdHeader.getCallId());
            sendRtpItem.setFromTag(request.getFromTag());
            SIPResponse response = sendStreamAck(mediaServer, request, sendRtpItem, platform, evt);
            if (response != null) {
                sendRtpItem.setToTag(response.getToTag());
            }
            redisCatchStorage.updateSendRTPSever(sendRtpItem);
        }
    }
    private void pushStream(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
                            CallIdHeader callIdHeader, MediaServer mediaServerItem,
                            int port, Boolean tcpActive, boolean mediaTransmissionTCP,
                            String channelId, String addressStr, String ssrc, String requesterId) {
        // æŽ¨æµ
        if (streamPushItem.isSelf()) {
            Boolean streamReady = mediaServerService.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
            if (streamReady != null && streamReady) {
                // è‡ªå¹³å°å†…容
                SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                        gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
                if (sendRtpItem == null) {
                int localPort = sendRtpPortManager.getNextPort(mediaServerItem);
                if (localPort == 0) {
                    logger.warn("服务器端口资源不足");
                    try {
                        responseAck(request, Response.BUSY_HERE);
@@ -674,226 +672,197 @@
                    }
                    return;
                }
                if (tcpActive != null) {
                    sendRtpItem.setTcpActive(tcpActive);
            sendRtpItem.setPlayType(InviteStreamType.PROXY);
            // å†™å…¥redis, è¶…时时回复
            sendRtpItem.setStatus(1);
            sendRtpItem.setLocalIp(mediaServerItem.getSdpIp());
            SIPResponse response = sendStreamAck(mediaServer, request, sendRtpItem, platform, evt);
            if (response != null) {
                sendRtpItem.setToTag(response.getToTag());
            }
            redisCatchStorage.updateSendRTPSever(sendRtpItem);
        }
    }
    private void sendPushStream(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) {
        // æŽ¨æµ
        if (sendRtpItem.getServerId().equals(userSetting.getServerId())) {
            Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
            if (streamReady != null && streamReady) {
                // è‡ªå¹³å°å†…容
                int localPort = sendRtpPortManager.getNextPort(mediaServerItem);
                if (localPort == 0) {
                    logger.warn("服务器端口资源不足");
                    try {
                        responseAck(request, Response.BUSY_HERE);
                    } catch (SipException | InvalidArgumentException | ParseException e) {
                        logger.error("[命令发送失败] invite æœåŠ¡å™¨ç«¯å£èµ„æºä¸è¶³: {}", e.getMessage());
                    }
                    return;
                }
                sendRtpItem.setPlayType(InviteStreamType.PUSH);
                // å†™å…¥redis, è¶…时时回复
                sendRtpItem.setStatus(1);
                sendRtpItem.setCallId(callIdHeader.getCallId());
                sendRtpItem.setFromTag(request.getFromTag());
                SIPResponse response = sendStreamAck(mediaServerItem, request, sendRtpItem, platform, evt);
                SIPResponse response = sendStreamAck(request, sendRtpItem, platform);
                if (response != null) {
                    sendRtpItem.setToTag(response.getToTag());
                }
                if (sendRtpItem.getSsrc() == null) {
                    // ä¸Šçº§å¹³å°ç‚¹æ’­æ—¶ä¸ä½¿ç”¨ä¸Šçº§å¹³å°æŒ‡å®šçš„ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式
                    String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
                    sendRtpItem.setSsrc(ssrc);
                }
                redisCatchStorage.updateSendRTPSever(sendRtpItem);
            } else {
                // ä¸åœ¨çº¿ æ‹‰èµ·
                notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request);
            }
        } else {
            // å…¶ä»–平台内容
            otherWvpPushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                    mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
            otherWvpPushStream(sendRtpItem, request, platform);
        }
    }
    /**
     * é€šçŸ¥æµä¸Šçº¿
     */
    private void notifyStreamOnline(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
                                    CallIdHeader callIdHeader, MediaServer mediaServerItem,
                                    int port, Boolean tcpActive, boolean mediaTransmissionTCP,
                                    String channelId, String addressStr, String ssrc, String requesterId) {
        if ("proxy".equals(gbStream.getStreamType())) {
            // TODO æŽ§åˆ¶å¯ç”¨ä»¥ä½¿è®¾å¤‡ä¸Šçº¿
            logger.info("[ app={}, stream={} ]通道未推流,启用流后开始推流", gbStream.getApp(), gbStream.getStream());
            // ç›‘听流上线
            Hook hook = Hook.getInstance(HookType.on_media_arrival, gbStream.getApp(), gbStream.getStream(), mediaServerItem.getId());
            this.hookSubscribe.addSubscribe(hook, (hookData) -> {
                logger.info("[上级点播]拉流代理已经就绪, {}/{}", hookData.getApp(), hookData.getStream());
                dynamicTask.stop(callIdHeader.getCallId());
                pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive,
                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
            });
            dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
                logger.info("[ app={}, stream={} ] ç­‰å¾…拉流代理流超时", gbStream.getApp(), gbStream.getStream());
                this.hookSubscribe.removeSubscribe(hook);
            }, userSetting.getPlatformPlayTimeout());
            boolean start = streamProxyService.start(gbStream.getApp(), gbStream.getStream());
            if (!start) {
                try {
                    responseAck(request, Response.BUSY_HERE, "channel [" + gbStream.getGbId() + "] offline");
                } catch (SipException | InvalidArgumentException | ParseException e) {
                    logger.error("[命令发送失败] invite é€šé“未推流: {}", e.getMessage());
                }
                this.hookSubscribe.removeSubscribe(hook);
                dynamicTask.stop(callIdHeader.getCallId());
    private void notifyProxyStreamOnline(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) {
        // TODO æŽ§åˆ¶å¯ç”¨ä»¥ä½¿è®¾å¤‡ä¸Šçº¿
        logger.info("[ app={}, stream={} ]通道未推流,启用流后开始推流", sendRtpItem.getApp(), sendRtpItem.getStream());
        // ç›‘听流上线
        HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", mediaServerItem.getId());
        zlmHttpHookSubscribe.addSubscribe(hookSubscribe, (mediaServerItemInUSe, hookParam) -> {
            OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam;
            logger.info("[上级点播]拉流代理已经就绪, {}/{}", streamChangedHookParam.getApp(), streamChangedHookParam.getStream());
            dynamicTask.stop(sendRtpItem.getCallId());
            sendProxyStream(sendRtpItem, mediaServerItem, platform, request);
        });
        dynamicTask.startDelay(sendRtpItem.getCallId(), () -> {
            logger.info("[ app={}, stream={} ] ç­‰å¾…拉流代理流超时", sendRtpItem.getApp(), sendRtpItem.getStream());
            zlmHttpHookSubscribe.removeSubscribe(hookSubscribe);
        }, userSetting.getPlatformPlayTimeout());
        boolean start = streamProxyService.start(sendRtpItem.getApp(), sendRtpItem.getStream());
        if (!start) {
            try {
                responseAck(request, Response.BUSY_HERE, "channel [" + sendRtpItem.getChannelId() + "] offline");
            } catch (SipException | InvalidArgumentException | ParseException e) {
                logger.error("[命令发送失败] invite é€šé“未推流: {}", e.getMessage());
            }
        } else if ("push".equals(gbStream.getStreamType())) {
            if (!platform.isStartOfflinePush()) {
                // å¹³å°è®¾ç½®ä¸­å…³é—­äº†æ‹‰èµ·ç¦»çº¿çš„æŽ¨æµåˆ™ç›´æŽ¥å›žå¤
                try {
                    logger.info("[上级点播] å¤±è´¥ï¼ŒæŽ¨æµè®¾å¤‡æœªæŽ¨æµï¼Œchannel: {}, app: {}, stream: {}", gbStream.getGbId(), gbStream.getApp(), gbStream.getStream());
                    responseAck(request, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing");
                } catch (SipException | InvalidArgumentException | ParseException e) {
                    logger.error("[命令发送失败] invite é€šé“未推流: {}", e.getMessage());
                }
                return;
            }
            // å‘送redis消息以使设备上线
            logger.info("[ app={}, stream={} ]通道未推流,发送redis信息控制设备开始推流", gbStream.getApp(), gbStream.getStream());
            MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1,
                    gbStream.getApp(), gbStream.getStream(), gbStream.getGbId(), gbStream.getPlatformId(),
                    platform.getName(), null, gbStream.getMediaServerId());
            redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
            // è®¾ç½®è¶…æ—¶
            dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
                logger.info("[ app={}, stream={} ] ç­‰å¾…设备开始推流超时", gbStream.getApp(), gbStream.getStream());
                try {
                    redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream());
                    mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream());
                    responseAck(request, Response.REQUEST_TIMEOUT); // è¶…æ—¶
                } catch (SipException | InvalidArgumentException | ParseException e) {
                    logger.error("未处理的异常 ", e);
                }
            }, userSetting.getPlatformPlayTimeout());
            // æ·»åŠ ç›‘å¬
            int finalPort = port;
            Boolean finalTcpActive = tcpActive;
            // æ·»åŠ åœ¨æœ¬æœºä¸Šçº¿çš„é€šçŸ¥
            mediaListManager.addChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream(), (app, stream, serverId) -> {
                dynamicTask.stop(callIdHeader.getCallId());
                redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream());
                if (serverId.equals(userSetting.getServerId())) {
                    SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId,
                            app, stream, channelId, mediaTransmissionTCP, platform.isRtcp());
                    if (sendRtpItem == null) {
                        logger.warn("上级点时创建sendRTPItem失败,可能是服务器端口资源不足");
                        try {
                            responseAck(request, Response.BUSY_HERE);
                        } catch (SipException e) {
                            logger.error("未处理的异常 ", e);
                        } catch (InvalidArgumentException e) {
                            logger.error("未处理的异常 ", e);
                        } catch (ParseException e) {
                            logger.error("未处理的异常 ", e);
                        }
                        return;
                    }
                    if (finalTcpActive != null) {
                        sendRtpItem.setTcpActive(finalTcpActive);
                    }
                    sendRtpItem.setPlayType(InviteStreamType.PUSH);
                    // å†™å…¥redis, è¶…时时回复
                    sendRtpItem.setStatus(1);
                    sendRtpItem.setCallId(callIdHeader.getCallId());
                    sendRtpItem.setFromTag(request.getFromTag());
                    SIPResponse response = sendStreamAck(mediaServerItem, request, sendRtpItem, platform, evt);
                    if (response != null) {
                        sendRtpItem.setToTag(response.getToTag());
                    }
                    redisCatchStorage.updateSendRTPSever(sendRtpItem);
                } else {
                    // å…¶ä»–平台内容
                    otherWvpPushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                            mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                }
            });
            // æ·»åŠ å›žå¤çš„æ‹’ç»æˆ–è€…é”™è¯¯çš„é€šçŸ¥
            redisPushStreamResponseListener.addEvent(gbStream.getApp(), gbStream.getStream(), response -> {
                if (response.getCode() != 0) {
                    dynamicTask.stop(callIdHeader.getCallId());
                    mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream());
                    try {
                        responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg());
                    } catch (SipException | InvalidArgumentException | ParseException e) {
                        logger.error("[命令发送失败] å›½æ ‡çº§è” ç‚¹æ’­å›žå¤: {}", e.getMessage());
                    }
                }
            });
            zlmHttpHookSubscribe.removeSubscribe(hookSubscribe);
            dynamicTask.stop(sendRtpItem.getCallId());
        }
    }
    /**
     * æ¥è‡ªå…¶ä»–wvp的推流
     * é€šçŸ¥æµä¸Šçº¿
     */
    private void otherWvpPushStream(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
                                    CallIdHeader callIdHeader, MediaServer mediaServerItem,
                                    int port, Boolean tcpActive, boolean mediaTransmissionTCP,
                                    String channelId, String addressStr, String ssrc, String requesterId) {
        logger.info("[级联点播]直播流来自其他平台,发送redis消息");
        // å‘送redis消息
        redisGbPlayMsgListener.sendMsg(streamPushItem.getServerId(), streamPushItem.getMediaServerId(),
                streamPushItem.getApp(), streamPushItem.getStream(), addressStr, port, ssrc, requesterId,
                channelId, mediaTransmissionTCP, platform.isRtcp(),platform.getName(), responseSendItemMsg -> {
                    SendRtpItem sendRtpItem = responseSendItemMsg.getSendRtpItem();
                    if (sendRtpItem == null || responseSendItemMsg.getMediaServerItem() == null) {
                        logger.warn("服务器端口资源不足");
                        try {
                            responseAck(request, Response.BUSY_HERE);
                        } catch (SipException e) {
                            logger.error("未处理的异常 ", e);
                        } catch (InvalidArgumentException e) {
                            logger.error("未处理的异常 ", e);
                        } catch (ParseException e) {
                            logger.error("未处理的异常 ", e);
                        }
                        return;
                    }
                    // æ”¶åˆ°sendItem
                    if (tcpActive != null) {
                        sendRtpItem.setTcpActive(tcpActive);
                    }
                    sendRtpItem.setPlayType(InviteStreamType.PUSH);
                    // å†™å…¥redis, è¶…时时回复
                    sendRtpItem.setStatus(1);
                    sendRtpItem.setCallId(callIdHeader.getCallId());
                    sendRtpItem.setFromTag(request.getFromTag());
                    SIPResponse response = sendStreamAck(responseSendItemMsg.getMediaServerItem(), request, sendRtpItem, platform, evt);
                    if (response != null) {
                        sendRtpItem.setToTag(response.getToTag());
                    }
                    redisCatchStorage.updateSendRTPSever(sendRtpItem);
                }, (wvpResult) -> {
                    // é”™è¯¯
                    if (wvpResult.getCode() == RedisGbPlayMsgListener.ERROR_CODE_OFFLINE) {
                        // ç¦»çº¿
                        // æŸ¥è¯¢æ˜¯å¦åœ¨æœ¬æœºä¸Šçº¿äº†
                        StreamPushItem currentStreamPushItem = streamPushService.getPush(streamPushItem.getApp(), streamPushItem.getStream());
                        if (currentStreamPushItem.isPushIng()) {
                            // åœ¨çº¿çŠ¶æ€
                            pushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                                    mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                        } else {
                            // ä¸åœ¨çº¿ æ‹‰èµ·
                            notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                                    mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                        }
                    }
    private void notifyPushStreamOnline(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) {
        // å‘送redis消息以使设备上线,流上线后被
        logger.info("[ app={}, stream={} ]通道未推流,发送redis信息控制设备开始推流", sendRtpItem.getApp(), sendRtpItem.getStream());
        MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1,
                sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), sendRtpItem.getPlatformId(),
                platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
        redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
        // è®¾ç½®è¶…æ—¶
        dynamicTask.startDelay(sendRtpItem.getCallId(), () -> {
            redisRpcService.stopWaitePushStreamOnline(sendRtpItem);
            logger.info("[ app={}, stream={} ] ç­‰å¾…设备开始推流超时", sendRtpItem.getApp(), sendRtpItem.getStream());
            try {
                responseAck(request, Response.REQUEST_TIMEOUT); // è¶…æ—¶
            } catch (SipException | InvalidArgumentException | ParseException e) {
                logger.error("未处理的异常 ", e);
            }
        }, userSetting.getPlatformPlayTimeout());
        //
        long key = redisRpcService.waitePushStreamOnline(sendRtpItem, (sendRtpItemKey) -> {
            dynamicTask.stop(sendRtpItem.getCallId());
            if (sendRtpItemKey == null) {
                logger.warn("[级联点播] ç­‰å¾…推流得到结果未空: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
                try {
                    responseAck(request, Response.BUSY_HERE);
                } catch (SipException | InvalidArgumentException | ParseException e) {
                    logger.error("未处理的异常 ", e);
                }
                return;
            }
            SendRtpItem sendRtpItemFromRedis = (SendRtpItem)redisTemplate.opsForValue().get(sendRtpItemKey);
            if (sendRtpItemFromRedis == null) {
                logger.warn("[级联点播] ç­‰å¾…推流, æœªæ‰¾åˆ°redis中缓存的发流信息: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
                try {
                    responseAck(request, Response.BUSY_HERE);
                } catch (SipException | InvalidArgumentException | ParseException e) {
                    logger.error("未处理的异常 ", e);
                }
                return;
            }
            if (sendRtpItemFromRedis.getServerId().equals(userSetting.getServerId())) {
                logger.info("[级联点播] ç­‰å¾…的推流在本平台上线 {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
                int localPort = sendRtpPortManager.getNextPort(mediaServerItem);
                if (localPort == 0) {
                    logger.warn("上级点时创建sendRTPItem失败,可能是服务器端口资源不足");
                    try {
                        responseAck(request, Response.BUSY_HERE);
                    } catch (InvalidArgumentException | ParseException | SipException e) {
                        logger.error("[命令发送失败] å›½æ ‡çº§è” ç‚¹æ’­å›žå¤ BUSY_HERE: {}", e.getMessage());
                    } catch (SipException | InvalidArgumentException | ParseException e) {
                        logger.error("未处理的异常 ", e);
                    }
                });
                    return;
                }
                sendRtpItem.setLocalPort(localPort);
                if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) {
                    sendRtpItem.setLocalIp(platform.getSendStreamIp());
                }
                // å†™å…¥redis, è¶…时时回复
                sendRtpItem.setStatus(1);
                SIPResponse response = sendStreamAck(request, sendRtpItem, platform);
                if (response != null) {
                    sendRtpItem.setToTag(response.getToTag());
                }
                redisCatchStorage.updateSendRTPSever(sendRtpItem);
            } else {
                // å…¶ä»–平台内容
                otherWvpPushStream(sendRtpItemFromRedis, request, platform);
            }
        });
        // æ·»åŠ å›žå¤çš„æ‹’ç»æˆ–è€…é”™è¯¯çš„é€šçŸ¥
        // redis消息例如: PUBLISH VM_MSG_STREAM_PUSH_RESPONSE  '{"code":1,"msg":"失败","app":"1","stream":"2"}'
        redisPushStreamResponseListener.addEvent(sendRtpItem.getApp(), sendRtpItem.getStream(), response -> {
            if (response.getCode() != 0) {
                dynamicTask.stop(sendRtpItem.getCallId());
                redisRpcService.stopWaitePushStreamOnline(sendRtpItem);
                redisRpcService.removeCallback(key);
                try {
                    responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg());
                } catch (SipException | InvalidArgumentException | ParseException e) {
                    logger.error("[命令发送失败] å›½æ ‡çº§è” ç‚¹æ’­å›žå¤: {}", e.getMessage());
                }
            }
        });
    }
    public SIPResponse sendStreamAck(MediaServer mediaServerItem, SIPRequest request, SendRtpItem sendRtpItem, ParentPlatform platform, RequestEvent evt) {
        String sdpIp = mediaServerItem.getSdpIp();
    /**
     * æ¥è‡ªå…¶ä»–wvp的推流
     */
    private void otherWvpPushStream(SendRtpItem sendRtpItem, SIPRequest request, ParentPlatform platform) {
        logger.info("[级联点播] æ¥è‡ªå…¶ä»–wvp的推流 {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
        sendRtpItem = redisRpcService.getSendRtpItem(sendRtpItem.getRedisKey());
        if (sendRtpItem == null) {
            return;
        }
        // å†™å…¥redis, è¶…时时回复
        sendRtpItem.setStatus(1);
        SIPResponse response = sendStreamAck(request, sendRtpItem, platform);
        if (response != null) {
            sendRtpItem.setToTag(response.getToTag());
        }
        redisCatchStorage.updateSendRTPSever(sendRtpItem);
    }
    public SIPResponse sendStreamAck(MediaServerItem mediaServerItem, SIPRequest request, SendRtpItem sendRtpItem, ParentPlatform platform, RequestEvent evt) {
        String sdpIp = sendRtpItem.getLocalIp();
        if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) {
            sdpIp = platform.getSendStreamIp();
        }
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java
@@ -1,7 +1,5 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.genersoft.iot.vmp.conf.CivilCodeFileConf;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.Device;
@@ -20,10 +18,10 @@
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import javax.sip.RequestEvent;
import javax.sip.header.FromHeader;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -39,8 +37,6 @@
    private final static Logger logger = LoggerFactory.getLogger(NotifyRequestForCatalogProcessor.class);
    private final List<DeviceChannel> updateChannelOnlineList = new CopyOnWriteArrayList<>();
    private final List<DeviceChannel> updateChannelOfflineList = new CopyOnWriteArrayList<>();
    private final Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>();
    private final Map<String, DeviceChannel> addChannelMap = new ConcurrentHashMap<>();
@@ -60,275 +56,110 @@
    private IDeviceChannelService deviceChannelService;
    @Autowired
    private DynamicTask dynamicTask;
    @Autowired
    private CivilCodeFileConf civilCodeFileConf;
    @Autowired
    private SipConfig sipConfig;
    private final static String talkKey = "notify-request-for-catalog-task";
    @Transactional
    public void process(List<RequestEvent> evtList) {
        if (evtList.isEmpty()) {
            return;
        }
        for (RequestEvent evt : evtList) {
            try {
                long start = System.currentTimeMillis();
                FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
                String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
    public void process(RequestEvent evt) {
        try {
            long start = System.currentTimeMillis();
            FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
            String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
                Device device = redisCatchStorage.getDevice(deviceId);
                if (device == null || !device.isOnLine()) {
                    logger.warn("[收到目录订阅]:{}, ä½†æ˜¯è®¾å¤‡å·²ç»ç¦»çº¿", (device != null ? device.getDeviceId():"" ));
                    return;
                }
                Element rootElement = getRootElement(evt, device.getCharset());
                if (rootElement == null) {
                    logger.warn("[ æ”¶åˆ°ç›®å½•订阅 ] content cannot be null, {}", evt.getRequest());
                    return;
                }
                Element deviceListElement = rootElement.element("DeviceList");
                if (deviceListElement == null) {
                    return;
                }
                Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
                if (deviceListIterator != null) {
            Device device = redisCatchStorage.getDevice(deviceId);
            if (device == null || !device.isOnLine()) {
                logger.warn("[收到目录订阅]:{}, ä½†æ˜¯è®¾å¤‡å·²ç»ç¦»çº¿", (device != null ? device.getDeviceId():"" ));
                return;
            }
            Element rootElement = getRootElement(evt, device.getCharset());
            if (rootElement == null) {
                logger.warn("[ æ”¶åˆ°ç›®å½•订阅 ] content cannot be null, {}", evt.getRequest());
                return;
            }
            Element deviceListElement = rootElement.element("DeviceList");
            if (deviceListElement == null) {
                return;
            }
            Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
            if (deviceListIterator != null) {
                // éåކDeviceList
                while (deviceListIterator.hasNext()) {
                    Element itemDevice = deviceListIterator.next();
                    Element channelDeviceElement = itemDevice.element("DeviceID");
                    if (channelDeviceElement == null) {
                        continue;
                    }
                    Element eventElement = itemDevice.element("Event");
                    String event;
                    if (eventElement == null) {
                        logger.warn("[收到目录订阅]:{}, ä½†æ˜¯Event为空, è®¾ä¸ºé»˜è®¤å€¼ ADD", (device != null ? device.getDeviceId():"" ));
                        event = CatalogEvent.ADD;
                    }else {
                        event = eventElement.getText().toUpperCase();
                    }
                    DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event);
                    if (channel == null) {
                        logger.info("[收到目录订阅]:但是解析失败 {}", new String(evt.getRequest().getRawContent()));
                        continue;
                    }
                    if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) {
                        channel.setParentId(null);
                    }
                    channel.setDeviceId(device.getDeviceId());
                    logger.info("[收到目录订阅]:{}/{}", device.getDeviceId(), channel.getChannelId());
                    switch (event) {
                        case CatalogEvent.ON:
                            // ä¸Šçº¿
                            logger.info("[收到通道上线通知] æ¥è‡ªè®¾å¤‡: {}, é€šé“ {}", device.getDeviceId(), channel.getChannelId());
                            updateChannelOnlineList.add(channel);
                            if (updateChannelOnlineList.size() > 300) {
                                executeSaveForOnline();
                            }
                            if (userSetting.getDeviceStatusNotify()) {
                                // å‘送redis消息
                                redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), true);
                            }
                            break;
                        case CatalogEvent.OFF :
                            // ç¦»çº¿
                            logger.info("[收到通道离线通知] æ¥è‡ªè®¾å¤‡: {}, é€šé“ {}", device.getDeviceId(), channel.getChannelId());
                            if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
                                logger.info("[收到通道离线通知] ä½†æ˜¯å¹³å°å·²é…ç½®æ‹’绝此消息,来自设备: {}, é€šé“ {}", device.getDeviceId(), channel.getChannelId());
                            }else {
                                updateChannelOfflineList.add(channel);
                                if (updateChannelOfflineList.size() > 300) {
                                    executeSaveForOffline();
                                }
                                if (userSetting.getDeviceStatusNotify()) {
                                    // å‘送redis消息
                                    redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
                                }
                            }
                            break;
                        case CatalogEvent.VLOST:
                            // è§†é¢‘丢失
                            logger.info("[收到通道视频丢失通知] æ¥è‡ªè®¾å¤‡: {}, é€šé“ {}", device.getDeviceId(), channel.getChannelId());
                            if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
                                logger.info("[收到通道视频丢失通知] ä½†æ˜¯å¹³å°å·²é…ç½®æ‹’绝此消息,来自设备: {}, é€šé“ {}", device.getDeviceId(), channel.getChannelId());
                            }else {
                                updateChannelOfflineList.add(channel);
                                if (updateChannelOfflineList.size() > 300) {
                                    executeSaveForOffline();
                                }
                                if (userSetting.getDeviceStatusNotify()) {
                                    // å‘送redis消息
                                    redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
                                }
                            }
                            break;
                        case CatalogEvent.DEFECT:
                            // æ•…éšœ
                            logger.info("[收到通道视频故障通知] æ¥è‡ªè®¾å¤‡: {}, é€šé“ {}", device.getDeviceId(), channel.getChannelId());
                            if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
                                logger.info("[收到通道视频故障通知] ä½†æ˜¯å¹³å°å·²é…ç½®æ‹’绝此消息,来自设备: {}, é€šé“ {}", device.getDeviceId(), channel.getChannelId());
                            }else {
                                updateChannelOfflineList.add(channel);
                                if (updateChannelOfflineList.size() > 300) {
                                    executeSaveForOffline();
                                }
                                if (userSetting.getDeviceStatusNotify()) {
                                    // å‘送redis消息
                                    redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
                                }
                            }
                            break;
                        case CatalogEvent.ADD:
                            // å¢žåŠ 
                            logger.info("[收到增加通道通知] æ¥è‡ªè®¾å¤‡: {}, é€šé“ {}", device.getDeviceId(), channel.getChannelId());
                            // åˆ¤æ–­æ­¤é€šé“是否存在
                            DeviceChannel deviceChannel = deviceChannelService.getOne(deviceId, channel.getChannelId());
                            if (deviceChannel != null) {
                                logger.info("[增加通道] å·²å­˜åœ¨ï¼Œä¸å‘送通知只更新,设备: {}, é€šé“ {}", device.getDeviceId(), channel.getChannelId());
                                channel.setId(deviceChannel.getId());
                                updateChannelMap.put(channel.getChannelId(), channel);
                                if (updateChannelMap.keySet().size() > 300) {
                                    executeSaveForUpdate();
                                }
                            }else {
                                addChannelMap.put(channel.getChannelId(), channel);
                                if (userSetting.getDeviceStatusNotify()) {
                                    // å‘送redis消息
                                    redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true);
                                }
                                if (addChannelMap.keySet().size() > 300) {
                                    executeSaveForAdd();
                                }
                            }
                            break;
                        case CatalogEvent.DEL:
                            // åˆ é™¤
                            logger.info("[收到删除通道通知] æ¥è‡ªè®¾å¤‡: {}, é€šé“ {}", device.getDeviceId(), channel.getChannelId());
                            deleteChannelList.add(channel);
                            if (userSetting.getDeviceStatusNotify()) {
                                // å‘送redis消息
                                redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), false);
                            }
                            if (deleteChannelList.size() > 300) {
                                executeSaveForDelete();
                            }
                            break;
                        case CatalogEvent.UPDATE:
                            // æ›´æ–°
                            logger.info("[收到更新通道通知] æ¥è‡ªè®¾å¤‡: {}, é€šé“ {}", device.getDeviceId(), channel.getChannelId());
                            // åˆ¤æ–­æ­¤é€šé“是否存在
                            DeviceChannel deviceChannelForUpdate = deviceChannelService.getOne(deviceId, channel.getChannelId());
                            if (deviceChannelForUpdate != null) {
                                channel.setId(deviceChannelForUpdate.getId());
                                channel.setUpdateTime(DateUtil.getNow());
                                updateChannelMap.put(channel.getChannelId(), channel);
                                if (updateChannelMap.keySet().size() > 300) {
                                    executeSaveForUpdate();
                                }
                            }else {
                                addChannelMap.put(channel.getChannelId(), channel);
                                if (addChannelMap.keySet().size() > 300) {
                                    executeSaveForAdd();
                                }
                                if (userSetting.getDeviceStatusNotify()) {
                                    // å‘送redis消息
                                    redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true);
                                }
                            }
                            break;
                        default:
                            logger.warn("[ NotifyCatalog ] event not found ï¼š {}", event );
                    }
                    // è½¬å‘变化信息
                    eventPublisher.catalogEventPublish(null, channel, event);
                    if (!updateChannelMap.keySet().isEmpty()
                            || !addChannelMap.keySet().isEmpty()
                            || !updateChannelOnlineList.isEmpty()
                            || !updateChannelOfflineList.isEmpty()
                            || !deleteChannelList.isEmpty()) {
                        if (!dynamicTask.contains(talkKey)) {
                            dynamicTask.startDelay(talkKey, this::executeSave, 1000);
                    // éåކDeviceList
                    while (deviceListIterator.hasNext()) {
                        Element itemDevice = deviceListIterator.next();
                        Element eventElement = itemDevice.element("Event");
                        String event;
                        if (eventElement == null) {
                            logger.warn("[收到目录订阅]:{}, ä½†æ˜¯Event为空, è®¾ä¸ºé»˜è®¤å€¼ ADD", (device != null ? device.getDeviceId():"" ));
                            event = CatalogEvent.ADD;
                        }else {
                            event = eventElement.getText().toUpperCase();
                        }
                        DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event);
                        if (channel == null) {
                            logger.info("[收到目录订阅]:但是解析失败 {}", new String(evt.getRequest().getRawContent()));
                            continue;
                        }
                        if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) {
                            channel.setParentId(null);
                        }
                        channel.setDeviceId(device.getDeviceId());
                        logger.info("[收到目录订阅]:{}, {}/{}",event, device.getDeviceId(), channel.getChannelId());
                        switch (event) {
                            case CatalogEvent.ON:
                                // ä¸Šçº¿
                                deviceChannelService.online(channel);
                                if (userSetting.getDeviceStatusNotify()) {
                                    // å‘送redis消息
                                    redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), true);
                                }
                                break;
                            case CatalogEvent.OFF :
                            case CatalogEvent.VLOST:
                            case CatalogEvent.DEFECT:
                                // ç¦»çº¿
                                if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
                                    logger.info("[目录订阅] ç¦»çº¿ ä½†æ˜¯å¹³å°å·²é…ç½®æ‹’绝此消息,来自设备: {}, é€šé“ {}", device.getDeviceId(), channel.getChannelId());
                                }else {
                                    deviceChannelService.offline(channel);
                                    if (userSetting.getDeviceStatusNotify()) {
                                        // å‘送redis消息
                                        redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
                                    }
                                }
                                break;
                            case CatalogEvent.DEL:
                                // åˆ é™¤
                                deviceChannelService.delete(channel);
                                if (userSetting.getDeviceStatusNotify()) {
                                    // å‘送redis消息
                                    redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), false);
                                }
                                break;
                            case CatalogEvent.ADD:
                            case CatalogEvent.UPDATE:
                                // æ›´æ–°
                                channel.setUpdateTime(DateUtil.getNow());
                                deviceChannelService.updateChannel(deviceId,channel);
                                if (userSetting.getDeviceStatusNotify()) {
                                    // å‘送redis消息
                                    redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true);
                                }
                                break;
                            default:
                                logger.warn("[ NotifyCatalog ] event not found ï¼š {}", event );
                        }
                        // è½¬å‘变化信息
                        eventPublisher.catalogEventPublish(null, channel, event);
                    }
                }
            } catch (DocumentException e) {
                logger.error("未处理的异常 ", e);
            }
        } catch (DocumentException e) {
            logger.error("未处理的异常 ", e);
        }
    }
    private void executeSave(){
        try {
            executeSaveForAdd();
        } catch (Exception e) {
            logger.error("[存储收到的增加通道] å¼‚常: ", e );
        }
        try {
            executeSaveForUpdate();
        } catch (Exception e) {
            logger.error("[存储收到的更新通道] å¼‚常: ", e );
        }
        try {
            executeSaveForDelete();
        } catch (Exception e) {
            logger.error("[存储收到的删除通道] å¼‚常: ", e );
        }
        try {
            executeSaveForOnline();
        } catch (Exception e) {
            logger.error("[存储收到的通道上线] å¼‚常: ", e );
        }
        try {
            executeSaveForOffline();
        } catch (Exception e) {
            logger.error("[存储收到的通道离线] å¼‚常: ", e );
        }
        dynamicTask.stop(talkKey);
    }
    private void executeSaveForUpdate(){
        if (!updateChannelMap.values().isEmpty()) {
            ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values());
            updateChannelMap.clear();
            deviceChannelService.batchUpdateChannel(deviceChannels);
        }
    }
    private void executeSaveForAdd(){
        if (!addChannelMap.values().isEmpty()) {
            ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(addChannelMap.values());
            addChannelMap.clear();
            deviceChannelService.batchAddChannel(deviceChannels);
        }
    }
    private void executeSaveForDelete(){
        if (!deleteChannelList.isEmpty()) {
            deviceChannelService.deleteChannels(deleteChannelList);
            deleteChannelList.clear();
        }
    }
    private void executeSaveForOnline(){
        if (!updateChannelOnlineList.isEmpty()) {
            deviceChannelService.channelsOnline(updateChannelOnlineList);
            updateChannelOnlineList.clear();
        }
    }
    private void executeSaveForOffline(){
        if (!updateChannelOfflineList.isEmpty()) {
            deviceChannelService.channelsOffline(updateChannelOfflineList);
            updateChannelOfflineList.clear();
        }
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java
New file
@@ -0,0 +1,199 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.service.IDeviceChannelService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;
import javax.sip.RequestEvent;
import javax.sip.header.FromHeader;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
 * SIP命令类型: NOTIFY请求中的移动位置请求处理
 */
@Component
public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessorParent {
    private final static Logger logger = LoggerFactory.getLogger(NotifyRequestForMobilePositionProcessor.class);
    @Autowired
    private UserSetting userSetting;
    @Autowired
    private EventPublisher eventPublisher;
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
    @Autowired
    private IDeviceChannelService deviceChannelService;
    @Transactional
    public void process(List<RequestEvent> eventList) {
        if (eventList.isEmpty()) {
            return;
        }
        Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>();
        List<MobilePosition> addMobilePositionList = new ArrayList<>();
        for (RequestEvent evt : eventList) {
            try {
                FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
                String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
                long startTime = System.currentTimeMillis();
                // å›žå¤ 200 OK
                Element rootElement = getRootElement(evt);
                if (rootElement == null) {
                    logger.error("处理MobilePosition移动位置Notify时未获取到消息体,{}", evt.getRequest());
                    return;
                }
                Device device = redisCatchStorage.getDevice(deviceId);
                if (device == null) {
                    logger.error("处理MobilePosition移动位置Notify时未获取到device,{}", deviceId);
                    return;
                }
                MobilePosition mobilePosition = new MobilePosition();
                mobilePosition.setDeviceId(device.getDeviceId());
                mobilePosition.setDeviceName(device.getName());
                mobilePosition.setCreateTime(DateUtil.getNow());
                List<Element> elements = rootElement.elements();
                for (Element element : elements) {
                    switch (element.getName()){
                        case "DeviceID":
                            String channelId = element.getStringValue();
                            if (!deviceId.equals(channelId)) {
                                mobilePosition.setChannelId(channelId);
                            }
                            continue;
                        case "Time":
                            String timeVal = element.getStringValue();
                            if (ObjectUtils.isEmpty(timeVal)) {
                                mobilePosition.setTime(DateUtil.getNow());
                            } else {
                                mobilePosition.setTime(SipUtils.parseTime(timeVal));
                            }
                            continue;
                        case "Longitude":
                            mobilePosition.setLongitude(Double.parseDouble(element.getStringValue()));
                            continue;
                        case "Latitude":
                            mobilePosition.setLatitude(Double.parseDouble(element.getStringValue()));
                            continue;
                        case "Speed":
                            String speedVal = element.getStringValue();
                            if (NumericUtil.isDouble(speedVal)) {
                                mobilePosition.setSpeed(Double.parseDouble(speedVal));
                            } else {
                                mobilePosition.setSpeed(0.0);
                            }
                            continue;
                        case "Direction":
                            String directionVal = element.getStringValue();
                            if (NumericUtil.isDouble(directionVal)) {
                                mobilePosition.setDirection(Double.parseDouble(directionVal));
                            } else {
                                mobilePosition.setDirection(0.0);
                            }
                            continue;
                        case "Altitude":
                            String altitudeVal = element.getStringValue();
                            if (NumericUtil.isDouble(altitudeVal)) {
                                mobilePosition.setAltitude(Double.parseDouble(altitudeVal));
                            } else {
                                mobilePosition.setAltitude(0.0);
                            }
                            continue;
                    }
                }
//            logger.info("[收到移动位置订阅通知]:{}/{}->{}.{}, æ—¶é—´ï¼š {}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
//                    mobilePosition.getLongitude(), mobilePosition.getLatitude(), System.currentTimeMillis() - startTime);
                mobilePosition.setReportSource("Mobile Position");
                // æ›´æ–°device channel çš„经纬度
                DeviceChannel deviceChannel = new DeviceChannel();
                deviceChannel.setDeviceId(device.getDeviceId());
                deviceChannel.setLongitude(mobilePosition.getLongitude());
                deviceChannel.setLatitude(mobilePosition.getLatitude());
                deviceChannel.setGpsTime(mobilePosition.getTime());
                updateChannelMap.put(deviceId + mobilePosition.getChannelId(), deviceChannel);
                addMobilePositionList.add(mobilePosition);
                // å‘关联了该通道并且开启移动位置订阅的上级平台发送移动位置订阅消息
                try {
                    eventPublisher.mobilePositionEventPublish(mobilePosition);
                }catch (Exception e) {
                    logger.error("[向上级转发移动位置失败] ", e);
                }
                if (mobilePosition.getChannelId().equals(mobilePosition.getDeviceId()) || mobilePosition.getChannelId() == null) {
                    List<DeviceChannel> channels = deviceChannelService.queryChaneListByDeviceId(mobilePosition.getDeviceId());
                    channels.forEach(channel -> {
                        // å‘送redis消息。 é€šçŸ¥ä½ç½®ä¿¡æ¯çš„变化
                        JSONObject jsonObject = new JSONObject();
                        jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime()));
                        jsonObject.put("serial", channel.getDeviceId());
                        jsonObject.put("code", channel.getChannelId());
                        jsonObject.put("longitude", mobilePosition.getLongitude());
                        jsonObject.put("latitude", mobilePosition.getLatitude());
                        jsonObject.put("altitude", mobilePosition.getAltitude());
                        jsonObject.put("direction", mobilePosition.getDirection());
                        jsonObject.put("speed", mobilePosition.getSpeed());
                        redisCatchStorage.sendMobilePositionMsg(jsonObject);
                    });
                }else {
                    // å‘送redis消息。 é€šçŸ¥ä½ç½®ä¿¡æ¯çš„变化
                    JSONObject jsonObject = new JSONObject();
                    jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime()));
                    jsonObject.put("serial", mobilePosition.getDeviceId());
                    jsonObject.put("code", mobilePosition.getChannelId());
                    jsonObject.put("longitude", mobilePosition.getLongitude());
                    jsonObject.put("latitude", mobilePosition.getLatitude());
                    jsonObject.put("altitude", mobilePosition.getAltitude());
                    jsonObject.put("direction", mobilePosition.getDirection());
                    jsonObject.put("speed", mobilePosition.getSpeed());
                    redisCatchStorage.sendMobilePositionMsg(jsonObject);
                }
            } catch (DocumentException e) {
                logger.error("未处理的异常 ", e);
            }
        }
        if(!updateChannelMap.isEmpty()) {
            List<DeviceChannel>  channels = new ArrayList<>(updateChannelMap.values());
            logger.info("[移动位置订阅]更新通道位置: {}", channels.size());
            deviceChannelService.batchUpdateChannelGPS(channels);
            updateChannelMap.clear();
        }
        if (userSetting.isSavePositionHistory() && !addMobilePositionList.isEmpty()) {
            try {
                logger.info("[移动位置订阅] æ·»åŠ é€šé“è½¨è¿¹ç‚¹ä½ï¼š {}", addMobilePositionList.size());
                deviceChannelService.batchAddMobilePosition(addMobilePositionList);
            }catch (Exception e) {
                logger.info("[移动位置订阅] b添加通道轨迹点位保存失败: {}", addMobilePositionList.size());
            }
            addMobilePositionList.clear();
        }
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
@@ -25,6 +25,8 @@
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
@@ -35,6 +37,7 @@
import javax.sip.header.FromHeader;
import javax.sip.message.Response;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -76,6 +79,9 @@
    @Autowired
    private NotifyRequestForCatalogProcessor notifyRequestForCatalogProcessor;
    @Autowired
    private NotifyRequestForMobilePositionProcessor notifyRequestForMobilePositionProcessor;
    private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
    @Qualifier("taskExecutor")
@@ -97,61 +103,73 @@
                responseAck((SIPRequest) evt.getRequest(), Response.BUSY_HERE, null, null);
                logger.error("[notify] å¾…处理消息队列已满 {},返回486 BUSY_HERE,消息不做处理", userSetting.getMaxNotifyCountQueue());
                return;
            }else {
            } else {
                responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null);
            }
        }catch (SipException | InvalidArgumentException | ParseException e) {
        } catch (SipException | InvalidArgumentException | ParseException e) {
            logger.error("未处理的异常 ", e);
        }
        boolean runed = !taskQueue.isEmpty();
        logger.info("[notify] å¾…处理消息数量: {}", taskQueue.size());
        taskQueue.offer(new HandlerCatchData(evt, null, null));
        if (!runed) {
            taskExecutor.execute(()-> {
                while (!taskQueue.isEmpty()) {
                    try {
                        HandlerCatchData take = taskQueue.poll();
                        if (take == null) {
                            continue;
                        }
                        Element rootElement = getRootElement(take.getEvt());
                        if (rootElement == null) {
                            logger.error("处理NOTIFY消息时未获取到消息体,{}", take.getEvt().getRequest());
                            continue;
                        }
                        String cmd = XmlUtil.getText(rootElement, "CmdType");
                        if (CmdType.CATALOG.equals(cmd)) {
                            logger.info("接收到Catalog通知");
                            notifyRequestForCatalogProcessor.process(take.getEvt());
                        } else if (CmdType.ALARM.equals(cmd)) {
                            logger.info("接收到Alarm通知");
                            processNotifyAlarm(take.getEvt());
                        } else if (CmdType.MOBILE_POSITION.equals(cmd)) {
                            logger.info("接收到MobilePosition通知");
                            processNotifyMobilePosition(take.getEvt());
                        } else {
                            logger.info("接收到消息:" + cmd);
                        }
                    } catch (DocumentException e) {
                        logger.error("处理NOTIFY消息时错误", e);
                    }
    }
    @Scheduled(fixedRate = 200)   //每200毫秒执行一次
    public void executeTaskQueue(){
        if (taskQueue.isEmpty()) {
            return;
        }
        try {
            List<RequestEvent> catalogEventList = new ArrayList<>();
            List<RequestEvent> alarmEventList = new ArrayList<>();
            List<RequestEvent> mobilePositionEventList = new ArrayList<>();
            for (HandlerCatchData take : taskQueue) {
                if (take == null) {
                    continue;
                }
            });
                Element rootElement = getRootElement(take.getEvt());
                if (rootElement == null) {
                    logger.error("处理NOTIFY消息时未获取到消息体,{}", take.getEvt().getRequest());
                    continue;
                }
                String cmd = XmlUtil.getText(rootElement, "CmdType");
                if (CmdType.CATALOG.equals(cmd)) {
                    catalogEventList.add(take.getEvt());
                } else if (CmdType.ALARM.equals(cmd)) {
                    alarmEventList.add(take.getEvt());
                } else if (CmdType.MOBILE_POSITION.equals(cmd)) {
                    mobilePositionEventList.add(take.getEvt());
                } else {
                    logger.info("接收到消息:" + cmd);
                }
            }
            taskQueue.clear();
            if (!alarmEventList.isEmpty()) {
                processNotifyAlarm(alarmEventList);
            }
            if (!catalogEventList.isEmpty()) {
                notifyRequestForCatalogProcessor.process(catalogEventList);
            }
            if (!mobilePositionEventList.isEmpty()) {
                notifyRequestForMobilePositionProcessor.process(mobilePositionEventList);
            }
        } catch (DocumentException e) {
            logger.error("处理NOTIFY消息时错误", e);
        }
    }
    /**
     * å¤„理MobilePosition移动位置Notify
     *
     * @param evt
     */
    private void processNotifyMobilePosition(RequestEvent evt) {
    @Async("taskExecutor")
    public void processNotifyMobilePosition(RequestEvent evt) {
        try {
            FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
            String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
            // å›žå¤ 200 OK
            Element rootElement = getRootElement(evt);
            if (rootElement == null) {
@@ -179,6 +197,13 @@
            if (device == null) {
                logger.warn("[mobilePosition移动位置Notify] æœªæ‰¾åˆ°é€šé“{}所属的设备", channelId);
                return;
            }
            // å…¼å®¹è®¾å¤‡éƒ¨åˆ†è®¾å¤‡ä¸ŠæŠ¥æ˜¯é€šé“编号与设备编号一致的情况
            if(deviceId.equals(channelId)) {
                List<DeviceChannel> deviceChannels = deviceChannelService.queryChaneListByDeviceId(deviceId);
                if (deviceChannels.size() == 1) {
                    channelId = deviceChannels.get(0).getChannelId();
                }
            }
            if (!ObjectUtils.isEmpty(device.getName())) {
                mobilePosition.setDeviceName(device.getName());
@@ -210,8 +235,8 @@
            } else {
                mobilePosition.setAltitude(0.0);
            }
            logger.info("[收到移动位置订阅通知]:{}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
                    mobilePosition.getLongitude(), mobilePosition.getLatitude());
//            logger.info("[收到移动位置订阅通知]:{}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
//                    mobilePosition.getLongitude(), mobilePosition.getLatitude());
            mobilePosition.setReportSource("Mobile Position");
            // æ›´æ–°device channel çš„经纬度
@@ -221,12 +246,12 @@
            deviceChannel.setLongitude(mobilePosition.getLongitude());
            deviceChannel.setLatitude(mobilePosition.getLatitude());
            deviceChannel.setGpsTime(mobilePosition.getTime());
            deviceChannel = deviceChannelService.updateGps(deviceChannel, device);
            mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84());
            mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84());
            mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
            mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
//            deviceChannel = deviceChannelService.updateGps(deviceChannel, device);
//
//            mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84());
//            mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84());
//            mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
//            mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
            deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
@@ -237,95 +262,97 @@
    /***
     * å¤„理alarm设备报警Notify
     *
     * @param evt
     */
    private void processNotifyAlarm(RequestEvent evt) {
    private void processNotifyAlarm(List<RequestEvent> evtList) {
        if (!sipConfig.isAlarm()) {
            return;
        }
        try {
            FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
            String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
        if (!evtList.isEmpty()) {
            for (RequestEvent evt : evtList) {
                try {
                    FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
                    String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
            Element rootElement = getRootElement(evt);
            if (rootElement == null) {
                logger.error("处理alarm设备报警Notify时未获取到消息体{}", evt.getRequest());
                return;
            }
            Element deviceIdElement = rootElement.element("DeviceID");
            String channelId = deviceIdElement.getText().toString();
                    Element rootElement = getRootElement(evt);
                    if (rootElement == null) {
                        logger.error("处理alarm设备报警Notify时未获取到消息体{}", evt.getRequest());
                        return;
                    }
                    Element deviceIdElement = rootElement.element("DeviceID");
                    String channelId = deviceIdElement.getText().toString();
            Device device = redisCatchStorage.getDevice(deviceId);
            if (device == null) {
                logger.warn("[ NotifyAlarm ] æœªæ‰¾åˆ°è®¾å¤‡ï¼š{}", deviceId);
                return;
            }
            rootElement = getRootElement(evt, device.getCharset());
            if (rootElement == null) {
                logger.warn("[ NotifyAlarm ] content cannot be null, {}", evt.getRequest());
                return;
            }
            DeviceAlarm deviceAlarm = new DeviceAlarm();
            deviceAlarm.setDeviceId(deviceId);
            deviceAlarm.setAlarmPriority(XmlUtil.getText(rootElement, "AlarmPriority"));
            deviceAlarm.setAlarmMethod(XmlUtil.getText(rootElement, "AlarmMethod"));
            String alarmTime = XmlUtil.getText(rootElement, "AlarmTime");
            if (alarmTime == null) {
                logger.warn("[ NotifyAlarm ] AlarmTime cannot be null");
                return;
            }
            deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime));
            if (XmlUtil.getText(rootElement, "AlarmDescription") == null) {
                deviceAlarm.setAlarmDescription("");
            } else {
                deviceAlarm.setAlarmDescription(XmlUtil.getText(rootElement, "AlarmDescription"));
            }
            if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Longitude"))) {
                deviceAlarm.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude")));
            } else {
                deviceAlarm.setLongitude(0.00);
            }
            if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Latitude"))) {
                deviceAlarm.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude")));
            } else {
                deviceAlarm.setLatitude(0.00);
            }
            logger.info("[收到Notify-Alarm]:{}/{}", device.getDeviceId(), deviceAlarm.getChannelId());
            if ("4".equals(deviceAlarm.getAlarmMethod())) {
                MobilePosition mobilePosition = new MobilePosition();
                mobilePosition.setChannelId(channelId);
                mobilePosition.setCreateTime(DateUtil.getNow());
                mobilePosition.setDeviceId(deviceAlarm.getDeviceId());
                mobilePosition.setTime(deviceAlarm.getAlarmTime());
                mobilePosition.setLongitude(deviceAlarm.getLongitude());
                mobilePosition.setLatitude(deviceAlarm.getLatitude());
                mobilePosition.setReportSource("GPS Alarm");
                    Device device = redisCatchStorage.getDevice(deviceId);
                    if (device == null) {
                        logger.warn("[ NotifyAlarm ] æœªæ‰¾åˆ°è®¾å¤‡ï¼š{}", deviceId);
                        return;
                    }
                    rootElement = getRootElement(evt, device.getCharset());
                    if (rootElement == null) {
                        logger.warn("[ NotifyAlarm ] content cannot be null, {}", evt.getRequest());
                        return;
                    }
                    DeviceAlarm deviceAlarm = new DeviceAlarm();
                    deviceAlarm.setDeviceId(deviceId);
                    deviceAlarm.setAlarmPriority(XmlUtil.getText(rootElement, "AlarmPriority"));
                    deviceAlarm.setAlarmMethod(XmlUtil.getText(rootElement, "AlarmMethod"));
                    String alarmTime = XmlUtil.getText(rootElement, "AlarmTime");
                    if (alarmTime == null) {
                        logger.warn("[ NotifyAlarm ] AlarmTime cannot be null");
                        return;
                    }
                    deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime));
                    if (XmlUtil.getText(rootElement, "AlarmDescription") == null) {
                        deviceAlarm.setAlarmDescription("");
                    } else {
                        deviceAlarm.setAlarmDescription(XmlUtil.getText(rootElement, "AlarmDescription"));
                    }
                    if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Longitude"))) {
                        deviceAlarm.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude")));
                    } else {
                        deviceAlarm.setLongitude(0.00);
                    }
                    if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Latitude"))) {
                        deviceAlarm.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude")));
                    } else {
                        deviceAlarm.setLatitude(0.00);
                    }
                    logger.info("[收到Notify-Alarm]:{}/{}", device.getDeviceId(), deviceAlarm.getChannelId());
                    if ("4".equals(deviceAlarm.getAlarmMethod())) {
                        MobilePosition mobilePosition = new MobilePosition();
                        mobilePosition.setChannelId(channelId);
                        mobilePosition.setCreateTime(DateUtil.getNow());
                        mobilePosition.setDeviceId(deviceAlarm.getDeviceId());
                        mobilePosition.setTime(deviceAlarm.getAlarmTime());
                        mobilePosition.setLongitude(deviceAlarm.getLongitude());
                        mobilePosition.setLatitude(deviceAlarm.getLatitude());
                        mobilePosition.setReportSource("GPS Alarm");
                // æ›´æ–°device channel çš„经纬度
                DeviceChannel deviceChannel = new DeviceChannel();
                deviceChannel.setDeviceId(device.getDeviceId());
                deviceChannel.setChannelId(channelId);
                deviceChannel.setLongitude(mobilePosition.getLongitude());
                deviceChannel.setLatitude(mobilePosition.getLatitude());
                deviceChannel.setGpsTime(mobilePosition.getTime());
                        // æ›´æ–°device channel çš„经纬度
                        DeviceChannel deviceChannel = new DeviceChannel();
                        deviceChannel.setDeviceId(device.getDeviceId());
                        deviceChannel.setChannelId(channelId);
                        deviceChannel.setLongitude(mobilePosition.getLongitude());
                        deviceChannel.setLatitude(mobilePosition.getLatitude());
                        deviceChannel.setGpsTime(mobilePosition.getTime());
                deviceChannel = deviceChannelService.updateGps(deviceChannel, device);
                        deviceChannel = deviceChannelService.updateGps(deviceChannel, device);
                mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84());
                mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84());
                mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
                mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
                        mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84());
                        mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84());
                        mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
                        mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
                deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
                        deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
                    }
                    // å›žå¤200 OK
                    if (redisCatchStorage.deviceIsOnline(deviceId)) {
                        publisher.deviceAlarmEventPublish(deviceAlarm);
                    }
                } catch (DocumentException e) {
                    logger.error("未处理的异常 ", e);
                }
            }
            // å›žå¤200 OK
            if (redisCatchStorage.deviceIsOnline(deviceId)) {
                publisher.deviceAlarmEventPublish(deviceAlarm);
            }
        } catch (DocumentException e) {
            logger.error("未处理的异常 ", e);
        }
    }
@@ -353,4 +380,9 @@
    public void setRedisCatchStorage(IRedisCatchStorage redisCatchStorage) {
        this.redisCatchStorage = redisCatchStorage;
    }
    @Scheduled(fixedRate = 10000)   //每1秒执行一次
    public void execute(){
        logger.info("[待处理Notify消息数量]: {}", taskQueue.size());
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/utils/XmlUtil.java
@@ -532,16 +532,17 @@
                    String status = getText(itemDevice, "Status");
                    if (status != null) {
                        // ONLINE OFFLINE HIKVISION DS-7716N-E4 NVR的兼容性处理
                        if (status.equals("ON") || status.equals("On") || status.equals("ONLINE") || status.equals("OK")) {
                        if (status.equalsIgnoreCase("ON") || status.equalsIgnoreCase("On") || status.equalsIgnoreCase("ONLINE") || status.equalsIgnoreCase("OK")) {
                            deviceChannel.setStatus(true);
                        }
                        if (status.equals("OFF") || status.equals("Off") || status.equals("OFFLINE")) {
                        if (status.equalsIgnoreCase("OFF") || status.equalsIgnoreCase("Off") || status.equalsIgnoreCase("OFFLINE")) {
                            deviceChannel.setStatus(false);
                        }
                    }else {
                        deviceChannel.setStatus(true);
                    }
//                    logger.info("状态字符串: {}", status);
//                    logger.info("状态结果: {}", deviceChannel.isStatus());
                    // ç»åº¦
                    String longitude = getText(itemDevice, "Longitude");
                    if (NumericUtil.isDouble(longitude)) {
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -5,6 +5,8 @@
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
@@ -21,6 +23,8 @@
import com.genersoft.iot.vmp.media.zlm.event.HookZlmServerKeepaliveEvent;
import com.genersoft.iot.vmp.media.zlm.event.HookZlmServerStartEvent;
import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import org.slf4j.Logger;
@@ -66,6 +70,10 @@
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
    @Autowired
    private IRedisRpcService redisRpcService;
    @Autowired
    private IInviteStreamService inviteStreamService;
@@ -86,9 +94,6 @@
    @Autowired
    private EventPublisher eventPublisher;
    @Autowired
    private ZLMMediaListManager zlmMediaListManager;
    @Autowired
    private HookSubscribe subscribe;
@@ -117,6 +122,9 @@
    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;
    @Autowired
    private IStreamPushService streamPushService;
    /**
     * æœåŠ¡å™¨å®šæ—¶ä¸ŠæŠ¥æ—¶é—´ï¼Œä¸ŠæŠ¥é—´éš”å¯é…ç½®ï¼Œé»˜è®¤10s上报一次
@@ -172,6 +180,54 @@
        if (mediaServer == null) {
            return new HookResultForOnPublish(0, "success");
        }
        // æŽ¨æµé‰´æƒçš„处理
        if (!"rtp".equals(param.getApp())) {
            StreamProxyItem stream = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream());
            if (stream != null) {
                HookResultForOnPublish result = HookResultForOnPublish.SUCCESS();
                result.setEnable_audio(stream.isEnableAudio());
                result.setEnable_mp4(stream.isEnableMp4());
                return result;
            }
            if (userSetting.getPushAuthority()) {
                // æŽ¨æµé‰´æƒ
                if (param.getParams() == null) {
                    logger.info("推流鉴权失败: ç¼ºå°‘必要参数:sign=md5(user表的pushKey)");
                    return new HookResultForOnPublish(401, "Unauthorized");
                }
                Map<String, String> paramMap = urlParamToMap(param.getParams());
                String sign = paramMap.get("sign");
                if (sign == null) {
                    logger.info("推流鉴权失败: ç¼ºå°‘必要参数:sign=md5(user表的pushKey)");
                    return new HookResultForOnPublish(401, "Unauthorized");
                }
                // æŽ¨æµè‡ªå®šä¹‰æ’­æ”¾é‰´æƒç 
                String callId = paramMap.get("callId");
                // é‰´æƒé…ç½®
                boolean hasAuthority = userService.checkPushAuthority(callId, sign);
                if (!hasAuthority) {
                    logger.info("推流鉴权失败: sign æ— æƒé™: callId={}. sign={}", callId, sign);
                    return new HookResultForOnPublish(401, "Unauthorized");
                }
                StreamAuthorityInfo streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(param);
                streamAuthorityInfo.setCallId(callId);
                streamAuthorityInfo.setSign(sign);
                // é‰´æƒé€šè¿‡
                redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), param.getStream(), streamAuthorityInfo);
            }
        } else {
            zlmMediaListManager.sendStreamEvent(param.getApp(), param.getStream(), param.getMediaServerId());
        }
        HookResultForOnPublish result = HookResultForOnPublish.SUCCESS();
        result.setEnable_audio(true);
        taskExecutor.execute(() -> {
            ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_publish, json);
            if (subscribe != null) {
                subscribe.response(mediaInfo, param);
            }
        });
        ResultForOnPublish resultForOnPublish = mediaService.authenticatePublish(mediaServer, param.getApp(), param.getStream(), param.getParams());
        if (resultForOnPublish != null) {
@@ -207,6 +263,221 @@
            MediaDepartureEvent mediaDepartureEvent = MediaDepartureEvent.getInstance(this, param, mediaServer);
            applicationEventPublisher.publishEvent(mediaDepartureEvent);
        }
        JSONObject json = (JSONObject) JSON.toJSON(param);
        taskExecutor.execute(() -> {
            ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_stream_changed, json);
            MediaServerItem mediaInfo = mediaServerService.getOne(param.getMediaServerId());
            if (mediaInfo == null) {
                logger.info("[ZLM HOOK] æµå˜åŒ–未找到ZLM, {}", param.getMediaServerId());
                return;
            }
            if (subscribe != null) {
                subscribe.response(mediaInfo, param);
            }
            List<OnStreamChangedHookParam.MediaTrack> tracks = param.getTracks();
            // TODO é‡æž„此处逻辑
            if (param.isRegist()) {
                // å¤„理流注册的鉴权信息, æµæ³¨é”€è¿™é‡Œä¸å†åˆ é™¤é‰´æƒä¿¡æ¯ï¼Œä¸‹æ¬¡æ¥äº†æ–°çš„鉴权信息会对就的进行覆盖
                if (param.getOriginType() == OriginType.RTMP_PUSH.ordinal()
                        || param.getOriginType() == OriginType.RTSP_PUSH.ordinal()
                        || param.getOriginType() == OriginType.RTC_PUSH.ordinal()) {
                    StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(param.getApp(), param.getStream());
                    if (streamAuthorityInfo == null) {
                        streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(param);
                    } else {
                        streamAuthorityInfo.setOriginType(param.getOriginType());
                        streamAuthorityInfo.setOriginTypeStr(param.getOriginTypeStr());
                    }
                    redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), param.getStream(), streamAuthorityInfo);
                }
            }
            if ("rtsp".equals(param.getSchema())) {
                logger.info("流变化:注册->{}, app->{}, stream->{}", param.isRegist(), param.getApp(), param.getStream());
                if (param.isRegist()) {
                    mediaServerService.addCount(param.getMediaServerId());
                } else {
                    mediaServerService.removeCount(param.getMediaServerId());
                }
                int updateStatusResult = streamProxyService.updateStatus(param.isRegist(), param.getApp(), param.getStream());
                if (updateStatusResult > 0) {
                }
                if ("rtp".equals(param.getApp()) && !param.isRegist()) {
                    InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, param.getStream());
                    if (inviteInfo != null && (inviteInfo.getType() == InviteSessionType.PLAY || inviteInfo.getType() == InviteSessionType.PLAYBACK)) {
                        inviteStreamService.removeInviteInfo(inviteInfo);
                        storager.stopPlay(inviteInfo.getDeviceId(), inviteInfo.getChannelId());
                    }
                } else if ("broadcast".equals(param.getApp())) {
                    // è¯­éŸ³å¯¹è®²æŽ¨æµ  stream需要满足格式deviceId_channelId
                    if (param.getStream().indexOf("_") > 0) {
                        String[] streamArray = param.getStream().split("_");
                        if (streamArray.length == 2) {
                            String deviceId = streamArray[0];
                            String channelId = streamArray[1];
                            Device device = deviceService.getDevice(deviceId);
                            if (device != null) {
                                if (param.isRegist()) {
                                    if (audioBroadcastManager.exit(deviceId, channelId)) {
                                        playService.stopAudioBroadcast(deviceId, channelId);
                                    }
                                    // å¼€å¯è¯­éŸ³å¯¹è®²é€šé“
                                    try {
                                        playService.audioBroadcastCmd(device, channelId, mediaInfo, param.getApp(), param.getStream(), 60, false, (msg) -> {
                                            logger.info("[语音对讲] é€šé“建立成功, device: {}, channel: {}", deviceId, channelId);
                                        });
                                    } catch (InvalidArgumentException | ParseException | SipException e) {
                                        logger.error("[命令发送失败] è¯­éŸ³å¯¹è®²: {}", e.getMessage());
                                    }
                                } else {
                                    // æµæ³¨é”€
                                    playService.stopAudioBroadcast(deviceId, channelId);
                                }
                            } else {
                                logger.info("[语音对讲] æœªæ‰¾åˆ°è®¾å¤‡ï¼š{}", deviceId);
                            }
                        }
                    }
                } else if ("talk".equals(param.getApp())) {
                    // è¯­éŸ³å¯¹è®²æŽ¨æµ  stream需要满足格式deviceId_channelId
                    if (param.getStream().indexOf("_") > 0) {
                        String[] streamArray = param.getStream().split("_");
                        if (streamArray.length == 2) {
                            String deviceId = streamArray[0];
                            String channelId = streamArray[1];
                            Device device = deviceService.getDevice(deviceId);
                            if (device != null) {
                                if (param.isRegist()) {
                                    if (audioBroadcastManager.exit(deviceId, channelId)) {
                                        playService.stopAudioBroadcast(deviceId, channelId);
                                    }
                                    // å¼€å¯è¯­éŸ³å¯¹è®²é€šé“
                                    playService.talkCmd(device, channelId, mediaInfo, param.getStream(), (msg) -> {
                                        logger.info("[语音对讲] é€šé“建立成功, device: {}, channel: {}", deviceId, channelId);
                                    });
                                } else {
                                    // æµæ³¨é”€
                                    playService.stopTalk(device, channelId, param.isRegist());
                                }
                            } else {
                                logger.info("[语音对讲] æœªæ‰¾åˆ°è®¾å¤‡ï¼š{}", deviceId);
                            }
                        }
                    }
                } else {
                    if (!"rtp".equals(param.getApp())) {
                        String type = OriginType.values()[param.getOriginType()].getType();
                        if (param.isRegist()) {
                            StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(
                                    param.getApp(), param.getStream());
                            String callId = null;
                            if (streamAuthorityInfo != null) {
                                callId = streamAuthorityInfo.getCallId();
                            }
                            StreamInfo streamInfoByAppAndStream = mediaService.getStreamInfoByAppAndStream(mediaInfo,
                                    param.getApp(), param.getStream(), tracks, callId);
                            param.setStreamInfo(new StreamContent(streamInfoByAppAndStream));
                            redisCatchStorage.addStream(mediaInfo, type, param.getApp(), param.getStream(), param);
                            if (param.getOriginType() == OriginType.RTSP_PUSH.ordinal()
                                    || param.getOriginType() == OriginType.RTMP_PUSH.ordinal()
                                    || param.getOriginType() == OriginType.RTC_PUSH.ordinal()) {
                                param.setSeverId(userSetting.getServerId());
                                zlmMediaListManager.addPush(param);
                                // å†—余数据,自己系统中自用
                                redisCatchStorage.addPushListItem(param.getApp(), param.getStream(), param);
                            }
                        } else {
                            // å…¼å®¹æµæ³¨é”€æ—¶ç±»åž‹ä»Žredis记录获取
                            OnStreamChangedHookParam onStreamChangedHookParam = redisCatchStorage.getStreamInfo(
                                    param.getApp(), param.getStream(), param.getMediaServerId());
                            if (onStreamChangedHookParam != null) {
                                type = OriginType.values()[onStreamChangedHookParam.getOriginType()].getType();
                                redisCatchStorage.removeStream(mediaInfo.getId(), type, param.getApp(), param.getStream());
                                if ("PUSH".equalsIgnoreCase(type)) {
                                    // å†—余数据,自己系统中自用
                                    redisCatchStorage.removePushListItem(param.getApp(), param.getStream(), param.getMediaServerId());
                                }
                            }
                            GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream());
                            if (gbStream != null) {
//                                    eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF);
                            }
                            zlmMediaListManager.removeMedia(param.getApp(), param.getStream());
                        }
                        GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream());
                        if (gbStream != null) {
                            if (userSetting.isUsePushingAsStatus()) {
                                eventPublisher.catalogEventPublishForStream(null, gbStream, param.isRegist() ? CatalogEvent.ON : CatalogEvent.OFF);
                            }
                        }
                        if (type != null) {
                            // å‘送流变化redis消息
                            JSONObject jsonObject = new JSONObject();
                            jsonObject.put("serverId", userSetting.getServerId());
                            jsonObject.put("app", param.getApp());
                            jsonObject.put("stream", param.getStream());
                            jsonObject.put("register", param.isRegist());
                            jsonObject.put("mediaServerId", param.getMediaServerId());
                            redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
                        }
                    }
                }
                if (!param.isRegist()) {
                    List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(param.getStream());
                    if (!sendRtpItems.isEmpty()) {
                        for (SendRtpItem sendRtpItem : sendRtpItems) {
                            if (sendRtpItem == null) {
                                continue;
                            }
                            if (sendRtpItem.getApp().equals(param.getApp())) {
                                logger.info(sendRtpItem.toString());
                                if (userSetting.getServerId().equals(sendRtpItem.getServerId())) {
                                    MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
                                            sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
                                            sendRtpItem.getPlatformId(), null, userSetting.getServerId(), param.getMediaServerId());
                                    // é€šçŸ¥å…¶ä»–wvp停止发流
                                    redisCatchStorage.sendPushStreamClose(messageForPushChannel);
                                }else {
                                    String platformId = sendRtpItem.getPlatformId();
                                    ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
                                    Device device = deviceService.getDevice(platformId);
                                    try {
                                        if (platform != null) {
                                            commanderFroPlatform.streamByeCmd(platform, sendRtpItem);
                                            redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(),
                                                    sendRtpItem.getCallId(), sendRtpItem.getStream());
                                        } else {
                                            cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId());
                                            if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST)
                                                    || sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) {
                                                AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
                                                if (audioBroadcastCatch != null) {
                                                    // æ¥è‡ªä¸Šçº§å¹³å°çš„停止对讲
                                                    logger.info("[停止对讲] æ¥è‡ªä¸Šçº§ï¼Œå¹³å°ï¼š{}, é€šé“:{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
                                                    audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
                                                }
                                            }
                                        }
                                    } catch (SipException | InvalidArgumentException | ParseException |
                                             SsrcTransactionNotFoundException e) {
                                        logger.error("[命令发送失败] å‘送BYE: {}", e.getMessage());
                                    }
                                }
                            }
                        }
                    }
                }
            }
        });
        return HookResult.SUCCESS();
    }
@@ -220,6 +491,62 @@
        logger.info("[ZLM HOOK]流无人观看:{}->{}->{}/{}", param.getMediaServerId(), param.getSchema(),
                param.getApp(), param.getStream());
        JSONObject ret = new JSONObject();
        ret.put("code", 0);
        // å›½æ ‡ç±»åž‹çš„æµ
        if ("rtp".equals(param.getApp())) {
            ret.put("close", userSetting.getStreamOnDemand());
            // å›½æ ‡æµï¼Œ ç‚¹æ’­/录像回放/录像下载
            InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, param.getStream());
            // ç‚¹æ’­
            if (inviteInfo != null) {
                // å½•像下载
                if (inviteInfo.getType() == InviteSessionType.DOWNLOAD) {
                    ret.put("close", false);
                    return ret;
                }
                // æ”¶åˆ°æ— äººè§‚看说明流也没有在往上级推送
                if (redisCatchStorage.isChannelSendingRTP(inviteInfo.getChannelId())) {
                    List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChannelId(
                            inviteInfo.getChannelId());
                    if (!sendRtpItems.isEmpty()) {
                        for (SendRtpItem sendRtpItem : sendRtpItems) {
                            ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
                            try {
                                commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem.getCallId());
                            } catch (SipException | InvalidArgumentException | ParseException e) {
                                logger.error("[命令发送失败] å›½æ ‡çº§è” å‘送BYE: {}", e.getMessage());
                            }
                            redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(),
                                    sendRtpItem.getCallId(), sendRtpItem.getStream());
                            if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) {
                                MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
                                        sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
                                        sendRtpItem.getPlatformId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
                                messageForPushChannel.setPlatFormIndex(parentPlatform.getId());
                                redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
                            }
                        }
                    }
                }
                Device device = deviceService.getDevice(inviteInfo.getDeviceId());
                if (device != null) {
                    try {
                        // å¤šæŸ¥è¯¢ä¸€æ¬¡é˜²æ­¢å·²ç»è¢«å¤„理了
                        InviteInfo info = inviteStreamService.getInviteInfo(inviteInfo.getType(),
                                inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
                        if (info != null) {
                            cmder.streamByeCmd(device, inviteInfo.getChannelId(),
                                    inviteInfo.getStream(), null);
                        } else {
                            logger.info("[无人观看] æœªæ‰¾åˆ°è®¾å¤‡çš„点播信息: {}, æµï¼š{}", inviteInfo.getDeviceId(), param.getStream());
                        }
                    } catch (InvalidArgumentException | ParseException | SipException |
                             SsrcTransactionNotFoundException e) {
                        logger.error("[无人观看]点播, å‘送BYE失败 {}", e.getMessage());
                    }
                } else {
                    logger.info("[无人观看] æœªæ‰¾åˆ°è®¾å¤‡ï¼š {},流:{}", inviteInfo.getDeviceId(), param.getStream());
                }
        boolean close = mediaService.closeStreamOnNoneReader(param.getMediaServerId(), param.getApp(), param.getStream(), param.getSchema());
        ret.put("code", close);
@@ -282,16 +609,22 @@
        if (!"rtp".equals(param.getApp())) {
            return HookResult.SUCCESS();
        }
        try {
            MediaSendRtpStoppedEvent event = new MediaSendRtpStoppedEvent(this);
            MediaServer mediaServerItem = mediaServerService.getOne(param.getMediaServerId());
            if (mediaServerItem != null) {
                event.setMediaServer(mediaServerItem);
                applicationEventPublisher.publishEvent(event);
        taskExecutor.execute(() -> {
            List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(param.getStream());
            if (sendRtpItems.size() > 0) {
                for (SendRtpItem sendRtpItem : sendRtpItems) {
                    ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
                    ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
                    try {
                        commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem.getCallId());
                    } catch (SipException | InvalidArgumentException | ParseException e) {
                        logger.error("[命令发送失败] å›½æ ‡çº§è” å‘送BYE: {}", e.getMessage());
                    }
                    redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(),
                            sendRtpItem.getCallId(), sendRtpItem.getStream());
                }
            }
        }catch (Exception e) {
            logger.info("[ZLM-HOOK-rtp发送关闭] å‘送通知失败 ", e);
        }
        });
        return HookResult.SUCCESS();
    }
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java
File was deleted
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java
@@ -264,4 +264,13 @@
        }
        return result;
    }
    public JSONObject stopSendRtpStream(MediaServerItem mediaServerItem, SendRtpItem sendRtpItem) {
        Map<String, Object> param = new HashMap<>();
        param.put("vhost", "__defaultVhost__");
        param.put("app", sendRtpItem.getApp());
        param.put("stream", sendRtpItem.getStream());
        param.put("ssrc", sendRtpItem.getSsrc());
        return zlmresTfulUtils.stopSendRtp(mediaServerItem, param);
    }
}
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ChannelOnlineEvent.java
@@ -1,5 +1,7 @@
package com.genersoft.iot.vmp.media.zlm.dto;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import java.text.ParseException;
/**
@@ -7,5 +9,5 @@
 */
public interface ChannelOnlineEvent {
    void run(String app, String stream, String serverId) throws ParseException;
    void run(SendRtpItem sendRtpItem) throws ParseException;
}
src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java
@@ -99,4 +99,13 @@
    void updateChannelGPS(Device device, DeviceChannel deviceChannel, MobilePosition mobilePosition);
    void stopPlay(String deviceId, String channelId);
    void batchUpdateChannelGPS(List<DeviceChannel> channelList);
    void batchAddMobilePosition(List<MobilePosition> addMobilePositionList);
    void online(DeviceChannel channel);
    void offline(DeviceChannel channel);
    void delete(DeviceChannel channel);
}
src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java
@@ -115,4 +115,5 @@
    Map<String, StreamPushItem> getAllAppAndStreamMap();
    void updatePush(OnStreamChangedHookParam param);
}
src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java
@@ -61,6 +61,7 @@
        messageForPushChannel.setGbId(gbId);
        messageForPushChannel.setApp(app);
        messageForPushChannel.setStream(stream);
        messageForPushChannel.setServerId(serverId);
        messageForPushChannel.setMediaServerId(mediaServerId);
        messageForPushChannel.setPlatFormId(platFormId);
        messageForPushChannel.setPlatFormName(platFormName);
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java
@@ -23,6 +23,7 @@
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;
import java.util.ArrayList;
@@ -248,8 +249,24 @@
    }
    @Override
    public void online(DeviceChannel channel) {
        channelMapper.online(channel.getDeviceId(), channel.getChannelId());
    }
    @Override
    public int channelsOffline(List<DeviceChannel> channels) {
        return channelMapper.batchOffline(channels);
    }
    @Override
    public void offline(DeviceChannel channel) {
        channelMapper.offline(channel.getDeviceId(), channel.getChannelId());
    }
    @Override
    public void delete(DeviceChannel channel) {
        channelMapper.del(channel.getDeviceId(), channel.getChannelId());
    }
    @Override
@@ -358,4 +375,47 @@
    public void stopPlay(String deviceId, String channelId) {
        channelMapper.stopPlay(deviceId, channelId);
    }
    @Override
    @Transactional
    public void batchUpdateChannelGPS(List<DeviceChannel> channelList) {
        for (DeviceChannel deviceChannel : channelList) {
            deviceChannel.setUpdateTime(DateUtil.getNow());
            if (deviceChannel.getGpsTime() == null) {
                deviceChannel.setGpsTime(DateUtil.getNow());
            }
        }
        int count = 1000;
        if (channelList.size() > count) {
            for (int i = 0; i < channelList.size(); i+=count) {
                int toIndex = i+count;
                if ( i + count > channelList.size()) {
                    toIndex = channelList.size();
                }
                List<DeviceChannel> channels = channelList.subList(i, toIndex);
                channelMapper.batchUpdatePosition(channels);
            }
        }else {
            channelMapper.batchUpdatePosition(channelList);
        }
    }
    @Override
    @Transactional
    public void batchAddMobilePosition(List<MobilePosition> mobilePositions) {
//        int count = 500;
//        if (mobilePositions.size() > count) {
//            for (int i = 0; i < mobilePositions.size(); i+=count) {
//                int toIndex = i+count;
//                if ( i + count > mobilePositions.size()) {
//                    toIndex = mobilePositions.size();
//                }
//                List<MobilePosition> mobilePositionsSub = mobilePositions.subList(i, toIndex);
//                deviceMobilePositionMapper.batchadd(mobilePositionsSub);
//            }
//        }else {
//            deviceMobilePositionMapper.batchadd(mobilePositions);
//        }
        deviceMobilePositionMapper.batchadd(mobilePositions);
    }
}
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
@@ -558,6 +558,7 @@
                    removeMobilePositionSubscribe(deviceInStore, result->{
                        // å¼€å¯è®¢é˜…
                        deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition());
                        deviceInStore.setMobilePositionSubmissionInterval(device.getMobilePositionSubmissionInterval());
                        addMobilePositionSubscribe(deviceInStore);
                        // å› ä¸ºæ˜¯å¼‚步执行,需要在这里更新下数据
                        deviceMapper.updateCustom(deviceInStore);
@@ -566,12 +567,14 @@
                }else {
                    // å¼€å¯è®¢é˜…
                    deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition());
                    deviceInStore.setMobilePositionSubmissionInterval(device.getMobilePositionSubmissionInterval());
                    addMobilePositionSubscribe(deviceInStore);
                }
            }else if (device.getSubscribeCycleForMobilePosition() == 0) {
                // å–消订阅
                deviceInStore.setSubscribeCycleForMobilePosition(0);
                deviceInStore.setMobilePositionSubmissionInterval(0);
                removeMobilePositionSubscribe(deviceInStore, null);
            }
        }
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -28,7 +28,6 @@
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.*;
import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.CloudRecordUtils;
@@ -111,6 +110,14 @@
    @Autowired
    private RedisGbPlayMsgListener redisGbPlayMsgListener;
    @Qualifier("taskExecutor")
    @Autowired
    private ThreadPoolTaskExecutor taskExecutor;
    @Autowired
    private ZlmHttpHookSubscribe hookSubscribe;
    @Autowired
    private SSRCFactory ssrcFactory;
@@ -266,7 +273,6 @@
            logger.warn("[点播] æœªæ‰¾åˆ°å¯ç”¨çš„zlm deviceId: {},channelId:{}", deviceId, channelId);
            throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的zlm");
        }
        Device device = redisCatchStorage.getDevice(deviceId);
        if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE") && !mediaServerItem.isRtpEnable()) {
            logger.warn("[点播] å•端口收流时不支持TCP主动方式收流 deviceId: {},channelId:{}", deviceId, channelId);
@@ -280,6 +286,8 @@
        InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
        if (inviteInfo != null ) {
            if (inviteInfo.getStreamInfo() == null) {
                // é‡Šæ”¾ç”Ÿæˆçš„ssrc,使用上一次申请的
                ssrcFactory.releaseSsrc(mediaServerItem.getId(), ssrc);
                // ç‚¹æ’­å‘起了但是尚未成功, ä»…注册回调等待结果即可
                inviteStreamService.once(InviteSessionType.PLAY, deviceId, channelId, null, callback);
                logger.info("[点播开始] å·²ç»è¯·æ±‚中,等待结果, deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
@@ -633,4 +633,21 @@
    public Map<String, StreamPushItem> getAllAppAndStreamMap() {
        return streamPushMapper.getAllAppAndStreamMap();
    }
    @Override
    public void updatePush(OnStreamChangedHookParam param) {
        StreamPushItem transform = transform(param);
        StreamPushItem pushInDb = getPush(param.getApp(), param.getStream());
        transform.setPushIng(param.isRegist());
        transform.setUpdateTime(DateUtil.getNow());
        transform.setPushTime(DateUtil.getNow());
        transform.setSelf(userSetting.getServerId().equals(param.getSeverId()));
        if (pushInDb == null) {
            transform.setCreateTime(DateUtil.getNow());
            streamPushMapper.add(transform);
        }else {
            streamPushMapper.update(transform);
            gbStreamMapper.updateMediaServer(param.getApp(), param.getStream(), param.getMediaServerId());
        }
    }
}
src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java
New file
@@ -0,0 +1,22 @@
package com.genersoft.iot.vmp.service.redisMsg;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
public interface IRedisRpcService {
    SendRtpItem getSendRtpItem(String sendRtpItemKey);
    WVPResult startSendRtp(String sendRtpItemKey, SendRtpItem sendRtpItem);
    WVPResult stopSendRtp(String sendRtpItemKey);
    long waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback<String> callback);
    void stopWaitePushStreamOnline(SendRtpItem sendRtpItem);
    void rtpSendStopped(String sendRtpItemKey);
    void removeCallback(long key);
}
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
File was deleted
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java
File was deleted
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java
old mode 100755 new mode 100644
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java
File was deleted
src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java
New file
@@ -0,0 +1,304 @@
package com.genersoft.iot.vmp.service.redisMsg.control;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import java.text.ParseException;
/**
 * å…¶ä»–wvp发起的rpc调用,这里的方法被 RedisRpcConfig é€šè¿‡åå°„寻找对应的方法名称调用
 */
@Component
public class RedisRpcController {
    private final static Logger logger = LoggerFactory.getLogger(RedisRpcController.class);
    @Autowired
    private SSRCFactory ssrcFactory;
    @Autowired
    private IMediaServerService mediaServerService;
    @Autowired
    private SendRtpPortManager sendRtpPortManager;
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
    @Autowired
    private UserSetting userSetting;
    @Autowired
    private ZlmHttpHookSubscribe hookSubscribe;
    @Autowired
    private ZLMServerFactory zlmServerFactory;
    @Autowired
    private RedisTemplate<Object, Object> redisTemplate;
    @Autowired
    private ISIPCommanderForPlatform commanderFroPlatform;
    @Autowired
    private IVideoManagerStorage storager;
    /**
     * èŽ·å–å‘æµçš„ä¿¡æ¯
     */
    public RedisRpcResponse getSendRtpItem(RedisRpcRequest request) {
        String sendRtpItemKey = request.getParam().toString();
        SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(sendRtpItemKey);
        if (sendRtpItem == null) {
            logger.info("[redis-rpc] èŽ·å–å‘æµçš„ä¿¡æ¯, æœªæ‰¾åˆ°redis中的发流信息, key:{}", sendRtpItemKey);
            RedisRpcResponse response = request.getResponse();
            response.setStatusCode(200);
            return response;
        }
        logger.info("[redis-rpc] èŽ·å–å‘æµçš„ä¿¡æ¯ï¼š {}/{}, ç›®æ ‡åœ°å€ï¼š {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
        // æŸ¥è¯¢æœ¬çº§æ˜¯å¦æœ‰è¿™ä¸ªæµ
        MediaServerItem mediaServerItem = mediaServerService.getMediaServerByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream());
        if (mediaServerItem == null) {
            RedisRpcResponse response = request.getResponse();
            response.setStatusCode(200);
        }
        // è‡ªå¹³å°å†…容
        int localPort = sendRtpPortManager.getNextPort(mediaServerItem);
        if (localPort == 0) {
            logger.info("[redis-rpc] getSendRtpItem->服务器端口资源不足" );
            RedisRpcResponse response = request.getResponse();
            response.setStatusCode(200);
        }
        // å†™å…¥redis, è¶…时时回复
        sendRtpItem.setStatus(1);
        sendRtpItem.setServerId(userSetting.getServerId());
        sendRtpItem.setLocalIp(mediaServerItem.getSdpIp());
        if (sendRtpItem.getSsrc() == null) {
            // ä¸Šçº§å¹³å°ç‚¹æ’­æ—¶ä¸ä½¿ç”¨ä¸Šçº§å¹³å°æŒ‡å®šçš„ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式
            String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
            sendRtpItem.setSsrc(ssrc);
        }
        redisCatchStorage.updateSendRTPSever(sendRtpItem);
        redisTemplate.opsForValue().set(sendRtpItemKey, sendRtpItem);
        RedisRpcResponse response = request.getResponse();
        response.setStatusCode(200);
        response.setBody(sendRtpItemKey);
        return response;
    }
    /**
     * ç›‘听流上线
     */
    public RedisRpcResponse waitePushStreamOnline(RedisRpcRequest request) {
        SendRtpItem sendRtpItem = JSONObject.parseObject(request.getParam().toString(), SendRtpItem.class);
        logger.info("[redis-rpc] ç›‘听流上线: {}/{}, ç›®æ ‡åœ°å€ï¼š {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
        // æŸ¥è¯¢æœ¬çº§æ˜¯å¦æœ‰è¿™ä¸ªæµ
        MediaServerItem mediaServerItem = mediaServerService.getMediaServerByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream());
        if (mediaServerItem != null) {
            logger.info("[redis-rpc] ç›‘听流上线时发现流已存在直接返回: {}/{}, ç›®æ ‡åœ°å€ï¼š {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
            // è¯»å–redis中的上级点播信息,生成sendRtpItm发送出去
            if (sendRtpItem.getSsrc() == null) {
                // ä¸Šçº§å¹³å°ç‚¹æ’­æ—¶ä¸ä½¿ç”¨ä¸Šçº§å¹³å°æŒ‡å®šçš„ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式
                String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
                sendRtpItem.setSsrc(ssrc);
            }
            sendRtpItem.setMediaServerId(mediaServerItem.getId());
            sendRtpItem.setLocalIp(mediaServerItem.getSdpIp());
            sendRtpItem.setServerId(userSetting.getServerId());
            redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem);
            RedisRpcResponse response = request.getResponse();
            response.setBody(sendRtpItem.getRedisKey());
            response.setStatusCode(200);
        }
        // ç›‘听流上线。 æµä¸Šçº¿ç›´æŽ¥å‘送sendRtpItem消息给实际的信令处理者
        HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed(
                sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null);
        hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> {
            logger.info("[redis-rpc] ç›‘听流上线,流已上线: {}/{}, ç›®æ ‡åœ°å€ï¼š {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
            // è¯»å–redis中的上级点播信息,生成sendRtpItm发送出去
            if (sendRtpItem.getSsrc() == null) {
                // ä¸Šçº§å¹³å°ç‚¹æ’­æ—¶ä¸ä½¿ç”¨ä¸Šçº§å¹³å°æŒ‡å®šçš„ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式
                String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItemInUse.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItemInUse.getId());
                sendRtpItem.setSsrc(ssrc);
            }
            sendRtpItem.setMediaServerId(mediaServerItemInUse.getId());
            sendRtpItem.setLocalIp(mediaServerItemInUse.getSdpIp());
            sendRtpItem.setServerId(userSetting.getServerId());
            redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem);
            RedisRpcResponse response = request.getResponse();
            response.setBody(sendRtpItem.getRedisKey());
            response.setStatusCode(200);
            // æ‰‹åŠ¨å‘é€ç»“æžœ
            sendResponse(response);
            hookSubscribe.removeSubscribe(hook);
        });
        return null;
    }
    /**
     * åœæ­¢ç›‘听流上线
     */
    public RedisRpcResponse stopWaitePushStreamOnline(RedisRpcRequest request) {
        SendRtpItem sendRtpItem = JSONObject.parseObject(request.getParam().toString(), SendRtpItem.class);
        logger.info("[redis-rpc] åœæ­¢ç›‘听流上线: {}/{}, ç›®æ ‡åœ°å€ï¼š {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
        // ç›‘听流上线。 æµä¸Šçº¿ç›´æŽ¥å‘送sendRtpItem消息给实际的信令处理者
        HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed(
                sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null);
        hookSubscribe.removeSubscribe(hook);
        RedisRpcResponse response = request.getResponse();
        response.setStatusCode(200);
        return response;
    }
    /**
     * å¼€å§‹å‘流
     */
    public RedisRpcResponse startSendRtp(RedisRpcRequest request) {
        String sendRtpItemKey = request.getParam().toString();
        SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(sendRtpItemKey);
        RedisRpcResponse response = request.getResponse();
        response.setStatusCode(200);
        if (sendRtpItem == null) {
            logger.info("[redis-rpc] å¼€å§‹å‘流, æœªæ‰¾åˆ°redis中的发流信息, key:{}", sendRtpItemKey);
            WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到redis中的发流信息");
            response.setBody(wvpResult);
            return response;
        }
        logger.info("[redis-rpc] å¼€å§‹å‘流: {}/{}, ç›®æ ‡åœ°å€ï¼š {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
        MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId());
        if (mediaServerItem == null) {
            logger.info("[redis-rpc] startSendRtp->未找到MediaServer: {}", sendRtpItem.getMediaServerId() );
            WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到MediaServer");
            response.setBody(wvpResult);
            return response;
        }
        Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
        if (!streamReady) {
            logger.info("[redis-rpc] startSendRtp->流不在线: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() );
            WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "流不在线");
            response.setBody(wvpResult);
            return response;
        }
        JSONObject jsonObject = zlmServerFactory.startSendRtp(mediaServerItem, sendRtpItem);
        if (jsonObject.getInteger("code") == 0) {
            logger.info("[redis-rpc] å‘流成功: {}/{}, ç›®æ ‡åœ°å€ï¼š {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
            WVPResult wvpResult = WVPResult.success();
            response.setBody(wvpResult);
        }else {
            logger.info("[redis-rpc] å‘流失败: {}/{}, ç›®æ ‡åœ°å€ï¼š {}:{}, {}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), jsonObject);
            WVPResult wvpResult = WVPResult.fail(jsonObject.getInteger("code"), jsonObject.getString("msg"));
            response.setBody(wvpResult);
        }
        return response;
    }
    /**
     * åœæ­¢å‘流
     */
    public RedisRpcResponse stopSendRtp(RedisRpcRequest request) {
        String sendRtpItemKey = request.getParam().toString();
        SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(sendRtpItemKey);
        RedisRpcResponse response = request.getResponse();
        response.setStatusCode(200);
        if (sendRtpItem == null) {
            logger.info("[redis-rpc] åœæ­¢æŽ¨æµ, æœªæ‰¾åˆ°redis中的发流信息, key:{}", sendRtpItemKey);
            WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到redis中的发流信息");
            response.setBody(wvpResult);
            return response;
        }
        logger.info("[redis-rpc] åœæ­¢æŽ¨æµï¼š {}/{}, ç›®æ ‡åœ°å€ï¼š {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
        MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId());
        if (mediaServerItem == null) {
            logger.info("[redis-rpc] stopSendRtp->未找到MediaServer: {}", sendRtpItem.getMediaServerId() );
            WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到MediaServer");
            response.setBody(wvpResult);
            return response;
        }
        JSONObject jsonObject = zlmServerFactory.stopSendRtpStream(mediaServerItem, sendRtpItem);
        if (jsonObject.getInteger("code") == 0) {
            logger.info("[redis-rpc] åœæ­¢æŽ¨æµæˆåŠŸï¼š {}/{}, ç›®æ ‡åœ°å€ï¼š {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
            response.setBody(WVPResult.success());
            return response;
        }else {
            int code = jsonObject.getInteger("code");
            String msg = jsonObject.getString("msg");
            logger.info("[redis-rpc] åœæ­¢æŽ¨æµå¤±è´¥ï¼š {}/{}, ç›®æ ‡åœ°å€ï¼š {}:{}, code: {}, msg: {}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), code, msg );
            response.setBody(WVPResult.fail(code, msg));
            return response;
        }
    }
    /**
     * å…¶ä»–wvp通知推流已经停止了
     */
    public RedisRpcResponse rtpSendStopped(RedisRpcRequest request) {
        String sendRtpItemKey = request.getParam().toString();
        SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(sendRtpItemKey);
        RedisRpcResponse response = request.getResponse();
        response.setStatusCode(200);
        if (sendRtpItem == null) {
            logger.info("[redis-rpc] æŽ¨æµå·²ç»åœæ­¢, æœªæ‰¾åˆ°redis中的发流信息, key:{}", sendRtpItemKey);
            return response;
        }
        logger.info("[redis-rpc] æŽ¨æµå·²ç»åœæ­¢ï¼š {}/{}, ç›®æ ‡åœ°å€ï¼š {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
        String platformId = sendRtpItem.getPlatformId();
        ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
        if (platform == null) {
            return response;
        }
        try {
            commanderFroPlatform.streamByeCmd(platform, sendRtpItem);
            redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(),
                    sendRtpItem.getCallId(), sendRtpItem.getStream());
            redisCatchStorage.sendPlatformStopPlayMsg(sendRtpItem, platform);
        } catch (SipException | InvalidArgumentException | ParseException e) {
            logger.error("[命令发送失败] å‘送BYE: {}", e.getMessage());
        }
        return response;
    }
    private void sendResponse(RedisRpcResponse response){
        logger.info("[redis-rpc] >> {}", response);
        response.setToId(userSetting.getServerId());
        RedisRpcMessage message = new RedisRpcMessage();
        message.setResponse(response);
        redisTemplate.convertAndSend(RedisRpcConfig.REDIS_REQUEST_CHANNEL_KEY, message);
    }
}
src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java
New file
@@ -0,0 +1,155 @@
package com.genersoft.iot.vmp.service.redisMsg.service;
import com.alibaba.fastjson2.JSON;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
@Service
public class RedisRpcServiceImpl implements IRedisRpcService {
    private final static Logger logger = LoggerFactory.getLogger(RedisRpcServiceImpl.class);
    @Autowired
    private RedisRpcConfig redisRpcConfig;
    @Autowired
    private UserSetting userSetting;
    @Autowired
    private ZlmHttpHookSubscribe hookSubscribe;
    @Autowired
    private SSRCFactory ssrcFactory;
    @Autowired
    private RedisTemplate<Object, Object> redisTemplate;
    private RedisRpcRequest buildRequest(String uri, Object param) {
        RedisRpcRequest request = new RedisRpcRequest();
        request.setFromId(userSetting.getServerId());
        request.setParam(param);
        request.setUri(uri);
        return request;
    }
    @Override
    public SendRtpItem getSendRtpItem(String sendRtpItemKey) {
        RedisRpcRequest request = buildRequest("getSendRtpItem", sendRtpItemKey);
        RedisRpcResponse response = redisRpcConfig.request(request, 10);
        if (response.getBody() == null) {
            return null;
        }
        return (SendRtpItem)redisTemplate.opsForValue().get(response.getBody().toString());
    }
    @Override
    public WVPResult startSendRtp(String sendRtpItemKey, SendRtpItem sendRtpItem) {
        logger.info("[请求其他WVP] å¼€å§‹æŽ¨æµï¼Œwvp:{}, {}/{}", sendRtpItem.getServerId(), sendRtpItem.getApp(), sendRtpItem.getStream());
        RedisRpcRequest request = buildRequest("startSendRtp", sendRtpItemKey);
        request.setToId(sendRtpItem.getServerId());
        RedisRpcResponse response = redisRpcConfig.request(request, 10);
        return JSON.parseObject(response.getBody().toString(), WVPResult.class);
    }
    @Override
    public WVPResult stopSendRtp(String sendRtpItemKey) {
        SendRtpItem sendRtpItem = (SendRtpItem)redisTemplate.opsForValue().get(sendRtpItemKey);
        if (sendRtpItem == null) {
            logger.info("[请求其他WVP] åœæ­¢æŽ¨æµ, æœªæ‰¾åˆ°redis中的发流信息, key:{}", sendRtpItemKey);
            return WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到发流信息");
        }
        logger.info("[请求其他WVP] åœæ­¢æŽ¨æµï¼Œwvp:{}, {}/{}", sendRtpItem.getServerId(), sendRtpItem.getApp(), sendRtpItem.getStream());
        RedisRpcRequest request = buildRequest("stopSendRtp", sendRtpItemKey);
        request.setToId(sendRtpItem.getServerId());
        RedisRpcResponse response = redisRpcConfig.request(request, 10);
        return JSON.parseObject(response.getBody().toString(), WVPResult.class);
    }
    @Override
    public long waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback<String> callback) {
        logger.info("[请求所有WVP监听流上线] {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
        // ç›‘听流上线。 æµä¸Šçº¿ç›´æŽ¥å‘送sendRtpItem消息给实际的信令处理者
        HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed(
                sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null);
        RedisRpcRequest request = buildRequest("waitePushStreamOnline", sendRtpItem);
        request.setToId(sendRtpItem.getServerId());
        hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> {
            // è¯»å–redis中的上级点播信息,生成sendRtpItm发送出去
            if (sendRtpItem.getSsrc() == null) {
                // ä¸Šçº§å¹³å°ç‚¹æ’­æ—¶ä¸ä½¿ç”¨ä¸Šçº§å¹³å°æŒ‡å®šçš„ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式
                String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItemInUse.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItemInUse.getId());
                sendRtpItem.setSsrc(ssrc);
            }
            sendRtpItem.setMediaServerId(mediaServerItemInUse.getId());
            sendRtpItem.setLocalIp(mediaServerItemInUse.getSdpIp());
            sendRtpItem.setServerId(userSetting.getServerId());
            redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem);
            if (callback != null) {
                callback.run(sendRtpItem.getRedisKey());
            }
            hookSubscribe.removeSubscribe(hook);
            redisRpcConfig.removeCallback(request.getSn());
        });
        redisRpcConfig.request(request, response -> {
            if (response.getBody() == null) {
                logger.info("[请求所有WVP监听流上线] æµä¸Šçº¿,但是未找到发流信息:{}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
                return;
            }
            logger.info("[请求所有WVP监听流上线] æµä¸Šçº¿ {}/{}->{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.toString());
            if (callback != null) {
                callback.run(response.getBody().toString());
            }
            hookSubscribe.removeSubscribe(hook);
        });
        return request.getSn();
    }
    @Override
    public void stopWaitePushStreamOnline(SendRtpItem sendRtpItem) {
        logger.info("[停止WVP监听流上线] {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
        HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed(
                sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null);
        hookSubscribe.removeSubscribe(hook);
        RedisRpcRequest request = buildRequest("stopWaitePushStreamOnline", sendRtpItem);
        request.setToId(sendRtpItem.getServerId());
        redisRpcConfig.request(request, 10);
    }
    @Override
    public void rtpSendStopped(String sendRtpItemKey) {
        SendRtpItem sendRtpItem = (SendRtpItem)redisTemplate.opsForValue().get(sendRtpItemKey);
        if (sendRtpItem == null) {
            logger.info("[停止WVP监听流上线] æœªæ‰¾åˆ°redis中的发流信息, key:{}", sendRtpItemKey);
            return;
        }
        RedisRpcRequest request = buildRequest("rtpSendStopped", sendRtpItemKey);
        request.setToId(sendRtpItem.getServerId());
        redisRpcConfig.request(request, 10);
    }
    @Override
    public void removeCallback(long key) {
        redisRpcConfig.removeCallback(key);
    }
}
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
@@ -45,6 +45,8 @@
    void updateSendRTPSever(SendRtpItem sendRtpItem);
    List<SendRtpItem> querySendRTPServer(String platformGbId, String channelId, String streamId);
    /**
     * æŸ¥è¯¢RTP推送信息缓存
     * @param platformGbId
@@ -197,6 +199,8 @@
    void addDiskInfo(List<Map<String, Object>> diskInfo);
    void deleteSendRTPServer(SendRtpItem sendRtpItem);
    List<SendRtpItem> queryAllSendRTPServer();
    List<Device> getAllDevices();
@@ -209,7 +213,7 @@
    void sendPlatformStartPlayMsg(MessageForPushChannel messageForPushChannel);
    void sendPlatformStopPlayMsg(MessageForPushChannel messageForPushChannel);
    void sendPlatformStopPlayMsg(SendRtpItem sendRtpItem, ParentPlatform platform);
    void addPushListItem(String app, String stream, MediaArrivalEvent param);
@@ -219,4 +223,11 @@
    void sendPushStreamClose(MessageForPushChannel messageForPushChannel);
    void addWaiteSendRtpItem(SendRtpItem sendRtpItem, int platformPlayTimeout);
    SendRtpItem getWaiteSendRtpItem(String app, String stream);
    void sendStartSendRtp(SendRtpItem sendRtpItem);
    void sendPushStreamOnline(SendRtpItem sendRtpItem);
}
src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java
@@ -52,8 +52,8 @@
            "<if test='status != null'>, status=#{status}</if>" +
            "<if test='streamId != null'>, stream_id=#{streamId}</if>" +
            "<if test='hasAudio != null'>, has_audio=#{hasAudio}</if>" +
            ", custom_longitude=#{longitude}" +
            ", custom_latitude=#{latitude}" +
            "<if test='customLongitude != null'>, custom_longitude=#{customLongitude}</if>" +
            "<if test='customLatitude != null'>, custom_latitude=#{customLatitude}</if>" +
            "<if test='longitudeGcj02 != null'>, longitude_gcj02=#{longitudeGcj02}</if>" +
            "<if test='latitudeGcj02 != null'>, latitude_gcj02=#{latitudeGcj02}</if>" +
            "<if test='longitudeWgs84 != null'>, longitude_wgs84=#{longitudeWgs84}</if>" +
@@ -89,8 +89,10 @@
            "dc.password, " +
            "COALESCE(dc.custom_ptz_type, dc.ptz_type) AS ptz_type, " +
            "dc.status, " +
            "COALESCE(dc.custom_longitude, dc.longitude) AS longitude, " +
            "COALESCE(dc.custom_latitude, dc.latitude) AS latitude, " +
            "dc.longitude, " +
            "dc.latitude, " +
            "dc.custom_longitude, " +
            "dc.custom_latitude, " +
            "dc.stream_id, " +
            "dc.device_id, " +
            "dc.parental, " +
@@ -345,6 +347,8 @@
            "<if test='item.hasAudio != null'>, has_audio=#{item.hasAudio}</if>" +
            "<if test='item.longitude != null'>, longitude=#{item.longitude}</if>" +
            "<if test='item.latitude != null'>, latitude=#{item.latitude}</if>" +
            "<if test='item.customLongitude != null'>, custom_longitude=#{item.customLongitude}</if>" +
            "<if test='item.customLatitude != null'>, custom_latitude=#{item.customLatitude}</if>" +
            "<if test='item.longitudeGcj02 != null'>, longitude_gcj02=#{item.longitudeGcj02}</if>" +
            "<if test='item.latitudeGcj02 != null'>, latitude_gcj02=#{item.latitudeGcj02}</if>" +
            "<if test='item.longitudeWgs84 != null'>, longitude_wgs84=#{item.longitudeWgs84}</if>" +
@@ -397,6 +401,23 @@
            " </script>"})
    int updatePosition(DeviceChannel deviceChannel);
    @Update({"<script>" +
            "<foreach collection='deviceChannelList' item='item' separator=';'>" +
            " UPDATE" +
            " wvp_device_channel" +
            " SET gps_time=#{item.gpsTime}" +
            "<if test='item.longitude != null'>, longitude=#{item.longitude}</if>" +
            "<if test='item.latitude != null'>, latitude=#{item.latitude}</if>" +
            "<if test='item.longitudeGcj02 != null'>, longitude_gcj02=#{item.longitudeGcj02}</if>" +
            "<if test='item.latitudeGcj02 != null'>, latitude_gcj02=#{item.latitudeGcj02}</if>" +
            "<if test='item.longitudeWgs84 != null'>, longitude_wgs84=#{item.longitudeWgs84}</if>" +
            "<if test='item.latitudeWgs84 != null'>, latitude_wgs84=#{item.latitudeWgs84}</if>" +
            "WHERE device_id=#{item.deviceId} " +
            " <if test='item.channelId != null' > AND channel_id=#{item.channelId}</if>" +
            "</foreach>" +
            "</script>"})
    int batchUpdatePosition(List<DeviceChannel> deviceChannelList);
    @Select("SELECT * FROM wvp_device_channel WHERE length(trim(stream_id)) > 0")
    List<DeviceChannel> getAllChannelInPlay();
src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java
@@ -33,4 +33,33 @@
    @Delete("DELETE FROM wvp_device_mobile_position WHERE device_id = #{deviceId}")
    int clearMobilePositionsByDeviceId(String deviceId);
    @Insert("<script> " +
            "insert into wvp_device_mobile_position " +
            "(device_id,channel_id, device_name,time,longitude,latitude,altitude,speed,direction,report_source," +
            "longitude_gcj02,latitude_gcj02,longitude_wgs84,latitude_wgs84,create_time)"+
            "values " +
            "<foreach collection='mobilePositions' index='index' item='item' separator=','> " +
            "(#{item.deviceId}, #{item.channelId}, #{item.deviceName}, #{item.time}, #{item.longitude}, " +
            "#{item.latitude}, #{item.altitude}, #{item.speed},#{item.direction}," +
            "#{item.reportSource}, #{item.longitudeGcj02}, #{item.latitudeGcj02}, #{item.longitudeWgs84}, #{item.latitudeWgs84}, " +
            "#{item.createTime}) " +
            "</foreach> " +
            "</script>")
    void batchadd2(List<MobilePosition> mobilePositions);
    @Insert("<script> " +
            "<foreach collection='mobilePositions' index='index' item='item' separator=','> " +
            "insert into wvp_device_mobile_position " +
            "(device_id,channel_id, device_name,time,longitude,latitude,altitude,speed,direction,report_source," +
            "longitude_gcj02,latitude_gcj02,longitude_wgs84,latitude_wgs84,create_time)"+
            "values " +
            "(#{item.deviceId}, #{item.channelId}, #{item.deviceName}, #{item.time}, #{item.longitude}, " +
            "#{item.latitude}, #{item.altitude}, #{item.speed},#{item.direction}," +
            "#{item.reportSource}, #{item.longitudeGcj02}, #{item.latitudeGcj02}, #{item.longitudeWgs84}, #{item.latitudeWgs84}, " +
            "#{item.createTime}); " +
            "</foreach> " +
            "</script>")
    void batchadd(List<MobilePosition> mobilePositions);
}
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -12,6 +12,8 @@
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
@@ -146,15 +148,26 @@
    @Override
    public void updateSendRTPSever(SendRtpItem sendRtpItem) {
        redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem);
    }
        String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX +
                userSetting.getServerId() + "_"
                + sendRtpItem.getMediaServerId() + "_"
                + sendRtpItem.getPlatformId() + "_"
                + sendRtpItem.getChannelId() + "_"
                + sendRtpItem.getStream() + "_"
                + sendRtpItem.getCallId();
        redisTemplate.opsForValue().set(key, sendRtpItem);
    @Override
    public List<SendRtpItem> querySendRTPServer(String platformGbId, String channelId, String streamId) {
        String scanKey = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX
                + userSetting.getServerId() + "_*_"
                + platformGbId + "_"
                + channelId + "_"
                + streamId + "_"
                + "*";
        List<SendRtpItem> result = new ArrayList<>();
        List<Object> scan = RedisUtil.scan(redisTemplate, scanKey);
        if (!scan.isEmpty()) {
            for (Object o : scan) {
                String key = (String) o;
                result.add(JsonUtil.redisJsonToObject(redisTemplate, key, SendRtpItem.class));
            }
        }
        return result;
    }
    @Override
@@ -172,7 +185,7 @@
            callId = "*";
        }
        String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX
                + userSetting.getServerId() + "_*_"
                + "*_*_"
                + platformGbId + "_"
                + channelId + "_"
                + streamId + "_"
@@ -268,9 +281,18 @@
        List<Object> scan = RedisUtil.scan(redisTemplate, key);
        if (scan.size() > 0) {
            for (Object keyStr : scan) {
                logger.info("[删除 redis的SendRTP]: {}", keyStr.toString());
                redisTemplate.delete(keyStr);
            }
        }
    }
    /**
     * åˆ é™¤RTP推送信息缓存
     */
    @Override
    public void deleteSendRTPServer(SendRtpItem sendRtpItem) {
        deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),sendRtpItem.getCallId(), sendRtpItem.getStream());
    }
    @Override
@@ -555,7 +577,7 @@
    @Override
    public void sendMobilePositionMsg(JSONObject jsonObject) {
        String key = VideoManagerConstants.VM_MSG_SUBSCRIBE_MOBILE_POSITION;
        logger.info("[redis发送通知] å‘送 ç§»åŠ¨ä½ç½® {}: {}", key, jsonObject.toString());
//        logger.info("[redis发送通知] å‘送 ç§»åŠ¨ä½ç½® {}: {}", key, jsonObject.toString());
        redisTemplate.convertAndSend(key, jsonObject);
    }
@@ -646,9 +668,15 @@
    }
    @Override
    public void sendPlatformStopPlayMsg(MessageForPushChannel msg) {
    public void sendPlatformStopPlayMsg(SendRtpItem sendRtpItem, ParentPlatform platform) {
        MessageForPushChannel msg = MessageForPushChannel.getInstance(0,
                sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
                sendRtpItem.getPlatformId(), platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
        msg.setPlatFormIndex(platform.getId());
        String key = VideoManagerConstants.VM_MSG_STREAM_STOP_PLAY_NOTIFY;
        logger.info("[redis发送通知] å‘送 ä¸Šçº§å¹³å°åœæ­¢è§‚看 {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId());
        logger.info("[redis发送通知] å‘送 ä¸Šçº§å¹³å°åœæ­¢è§‚看 {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), platform.getServerGBId());
        redisTemplate.convertAndSend(key, JSON.toJSON(msg));
    }
@@ -681,4 +709,30 @@
        logger.info("[redis发送通知] å‘送 åœæ­¢å‘上级推流 {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId());
        redisTemplate.convertAndSend(key, JSON.toJSON(msg));
    }
    @Override
    public void addWaiteSendRtpItem(SendRtpItem sendRtpItem, int platformPlayTimeout) {
        String key = VideoManagerConstants.WAITE_SEND_PUSH_STREAM + sendRtpItem.getApp() + "_" + sendRtpItem.getStream();
        redisTemplate.opsForValue().set(key, sendRtpItem);
    }
    @Override
    public SendRtpItem getWaiteSendRtpItem(String app, String stream) {
        String key = VideoManagerConstants.WAITE_SEND_PUSH_STREAM + app + "_" + stream;
        return JsonUtil.redisJsonToObject(redisTemplate, key, SendRtpItem.class);
    }
    @Override
    public void sendStartSendRtp(SendRtpItem sendRtpItem) {
        String key = VideoManagerConstants.START_SEND_PUSH_STREAM + sendRtpItem.getApp() + "_" + sendRtpItem.getStream();
        logger.info("[redis发送通知] é€šçŸ¥å…¶ä»–WVP推流 {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getPlatformId());
        redisTemplate.convertAndSend(key, JSON.toJSON(sendRtpItem));
    }
    @Override
    public void sendPushStreamOnline(SendRtpItem sendRtpItem) {
        String key = VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED;
        logger.info("[redis发送通知] æµä¸Šçº¿ {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getPlatformId());
        redisTemplate.convertAndSend(key, JSON.toJSON(sendRtpItem));
    }
}
src/main/java/com/genersoft/iot/vmp/vmanager/log/LogController.java
@@ -2,6 +2,7 @@
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
import com.genersoft.iot.vmp.conf.security.JwtUtils;
import com.genersoft.iot.vmp.service.ILogService;
import com.genersoft.iot.vmp.storager.dao.dto.LogDto;
@@ -92,4 +93,14 @@
        logService.clear();
    }
    @Autowired
    private RedisRpcConfig redisRpcConfig;
    @GetMapping("/test/count")
    public Object count() {
        return redisRpcConfig.getCallbackCount();
    }
}
web_src/src/components/channelList.vue
@@ -326,7 +326,9 @@
            e.ptzType = e.ptzType + "";
            that.$set(e, "edit", false);
            that.$set(e, "location", "");
            if (e.longitude && e.latitude) {
            if (e.customLongitude && e.customLatitude) {
              that.$set(e, "location", e.customLongitude + "," + e.customLatitude);
            }else if (e.longitude && e.latitude) {
              that.$set(e, "location", e.longitude + "," + e.latitude);
            }
          });
@@ -481,7 +483,9 @@
              e.ptzType = e.ptzType + "";
              this.$set(e, "edit", false);
              this.$set(e, "location", "");
              if (e.longitude && e.latitude) {
              if (e.customLongitude && e.customLatitude) {
                this.$set(e, "location", e.customLongitude + "," + e.customLatitude);
              }else if (e.longitude && e.latitude) {
                this.$set(e, "location", e.longitude + "," + e.latitude);
              }
            });
@@ -603,8 +607,8 @@
          this.$message.warning("位置信息格式有误,例:117.234,36.378");
          return;
        } else {
          row.longitude = parseFloat(segements[0]);
          row.latitude = parseFloat(segements[1]);
          row.customLongitude = parseFloat(segements[0]);
          row.custom_latitude = parseFloat(segements[1]);
          if (!(row.longitude && row.latitude)) {
            this.$message.warning("位置信息格式有误,例:117.234,36.378");
            return;
Êý¾Ý¿â/2.7.0/¸üÐÂ-mysql-2.7.0.sql
@@ -4,5 +4,15 @@
alter table wvp_device
    drop switch_primary_sub_stream;
# ç¬¬ä¸€ä¸ªè¡¥ä¸åŒ…
alter table wvp_platform
    add send_stream_ip character varying(50);
    add send_stream_ip character varying(50);
alter table wvp_device
    change on_line on_line bool default false;
alter table wvp_device
    change id id serial primary key;
alter table wvp_device
    change ssrc_check ssrc_check bool default false;
Êý¾Ý¿â/2.7.0/¸üÐÂ-postgresql-kingbase-2.7.0.sql
@@ -4,5 +4,15 @@
alter table wvp_device
    drop switch_primary_sub_stream;
# ç¬¬ä¸€ä¸ªè¡¥ä¸åŒ…
alter table wvp_platform
    add send_stream_ip character varying(50);
    add send_stream_ip character varying(50);
alter table wvp_device
    change on_line on_line bool default false;
alter table wvp_device
    change id id serial primary key;
alter table wvp_device
    change ssrc_check ssrc_check bool default false;