648540858
2022-03-07 b2c953fc76de2a9686ee81d5311bd9b06e453912
优化ssrc释放逻辑,优化级联点播速度,去除等待流格式的配置项
37个文件已修改
762 ■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java 28 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/conf/UserSetup.java 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/auth/RegisterLogicHandler.java 16 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java 20 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/KeepaliveTimeoutListenerForPlatform.java 23 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/OfflineEventListener.java 26 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/event/platformNotRegister/PlatformNotRegisterEventLister.java 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/SubscribeListenerForPlatform.java 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java 24 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java 22 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java 80 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java 66 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java 54 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java 29 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java 21 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java 18 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMStatusEventListener.java 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/IPlayService.java 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java 146 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java 37 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java 42 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/platform/PlatformController.java 18 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/resources/all-application.yml 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java
@@ -5,6 +5,7 @@
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
@@ -25,15 +26,38 @@
        return new ThreadPoolTaskScheduler();
    }
    /**
     * 循环执行的任务
     * @param key 任务ID
     * @param task 任务
     * @param cycleForCatalog 间隔
     * @return
     */
    public String startCron(String key, Runnable task, int cycleForCatalog) {
        stopCron(key);
        stop(key);
        // scheduleWithFixedDelay 必须等待上一个任务结束才开始计时period, cycleForCatalog表示执行的间隔
        ScheduledFuture future = threadPoolTaskScheduler.scheduleWithFixedDelay(task, cycleForCatalog * 1000L);
        futureMap.put(key, future);
        return "startCron";
    }
    public void stopCron(String key) {
    /**
     * 延时任务
     * @param key 任务ID
     * @param task 任务
     * @param delay 延时 /秒
     * @return
     */
    public String startDelay(String key, Runnable task, int delay) {
        stop(key);
        Date starTime = new Date(System.currentTimeMillis() + delay * 1000);
        // scheduleWithFixedDelay 必须等待上一个任务结束才开始计时period, cycleForCatalog表示执行的间隔
        ScheduledFuture future = threadPoolTaskScheduler.schedule(task, starTime);
        futureMap.put(key, future);
        return "startCron";
    }
    public void stop(String key) {
        if (futureMap.get(key) != null && !futureMap.get(key).isCancelled()) {
            futureMap.get(key).cancel(true);
        }
src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java
@@ -59,8 +59,11 @@
            redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
            // 取消订阅
            sipCommanderForPlatform.unregister(parentPlatform, null, null);
            Thread.sleep(500);
            sipCommanderForPlatform.unregister(parentPlatform, null, (eventResult)->{
                ParentPlatform platform = storager.queryParentPlatByServerGBId(parentPlatform.getServerGBId());
                sipCommanderForPlatform.register(platform, null, null);
            });
            // 发送平台未注册消息
            publisher.platformNotRegisterEventPublish(parentPlatform.getServerGBId());
        }
src/main/java/com/genersoft/iot/vmp/conf/UserSetup.java
@@ -19,8 +19,6 @@
    private Long playTimeout = 18000L;
    private Boolean waitTrack = Boolean.FALSE;
    private Boolean interfaceAuthentication = Boolean.TRUE;
    private Boolean recordPushLive = Boolean.TRUE;
@@ -57,10 +55,6 @@
        return playTimeout;
    }
    public Boolean isWaitTrack() {
        return waitTrack;
    }
    public Boolean isInterfaceAuthentication() {
        return interfaceAuthentication;
    }
@@ -87,10 +81,6 @@
    public void setPlayTimeout(Long playTimeout) {
        this.playTimeout = playTimeout;
    }
    public void setWaitTrack(Boolean waitTrack) {
        this.waitTrack = waitTrack;
    }
    public void setInterfaceAuthentication(boolean interfaceAuthentication) {
src/main/java/com/genersoft/iot/vmp/gb28181/auth/RegisterLogicHandler.java
@@ -1,5 +1,6 @@
package com.genersoft.iot.vmp.gb28181.auth;
import com.genersoft.iot.vmp.storager.impl.VideoManagerStoragerImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -20,13 +21,24 @@
    @Autowired
    private SIPCommander cmder;
    @Autowired
    private VideoManagerStoragerImpl storager;
    
    public void onRegister(Device device) {
        // 只有第一次注册时调用查询设备信息,如需更新调用更新API接口
        // TODO 此处错误无法获取到通道
        Device device1 = storager.queryVideoDevice(device.getDeviceId());
        if (device.isFirsRegister()) {
            logger.info("[{}] 首次注册,查询设备信息以及通道信息", device.getDeviceId());
            cmder.deviceInfoQuery(device);
            cmder.catalogQuery(device, null);
            try {
                Thread.sleep(100);
                cmder.deviceInfoQuery(device);
                Thread.sleep(100);
                cmder.catalogQuery(device, null);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java
@@ -81,6 +81,10 @@
     */
    private boolean isPlay;
    private byte[] transaction;
    private byte[] dialog;
    public String getIp() {
        return ip;
    }
@@ -200,4 +204,20 @@
    public void setPlay(boolean play) {
        isPlay = play;
    }
    public byte[] getTransaction() {
        return transaction;
    }
    public void setTransaction(byte[] transaction) {
        this.transaction = transaction;
    }
    public byte[] getDialog() {
        return dialog;
    }
    public void setDialog(byte[] dialog) {
        this.dialog = dialog;
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/KeepaliveTimeoutListenerForPlatform.java
@@ -2,7 +2,10 @@
import com.genersoft.iot.vmp.conf.RedisKeyExpirationEventMessageListener;
import com.genersoft.iot.vmp.conf.UserSetup;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
@@ -39,6 +42,9 @@
    @Autowired
    private SipSubscribe sipSubscribe;
    @Autowired
    private IVideoManagerStorager storager;
    public KeepaliveTimeoutListenerForPlatform(RedisMessageListenerContainer listenerContainer, UserSetup userSetup) {
        super(listenerContainer, userSetup);
    }
@@ -61,15 +67,22 @@
        String REGISTER_INFO_PREFIX = VideoManagerConstants.PLATFORM_REGISTER_INFO_PREFIX + userSetup.getServerId() + "_";
        if (expiredKey.startsWith(PLATFORM_KEEPLIVEKEY_PREFIX)) {
            String platformGBId = expiredKey.substring(PLATFORM_KEEPLIVEKEY_PREFIX.length(),expiredKey.length());
            publisher.platformKeepaliveExpireEventPublish(platformGBId);
            ParentPlatform platform = storager.queryParentPlatByServerGBId(platformGBId);
            if (platform != null) {
                publisher.platformKeepaliveExpireEventPublish(platformGBId);
            }
        }else if (expiredKey.startsWith(PLATFORM_REGISTER_PREFIX)) {
            String platformGBId = expiredKey.substring(PLATFORM_REGISTER_PREFIX.length(),expiredKey.length());
            publisher.platformRegisterCycleEventPublish(platformGBId);
            ParentPlatform platform = storager.queryParentPlatByServerGBId(platformGBId);
            if (platform != null) {
                publisher.platformRegisterCycleEventPublish(platformGBId);
            }
        }else if (expiredKey.startsWith(KEEPLIVEKEY_PREFIX)){
            String deviceId = expiredKey.substring(KEEPLIVEKEY_PREFIX.length(),expiredKey.length());
            publisher.outlineEventPublish(deviceId, KEEPLIVEKEY_PREFIX);
            Device device = storager.queryVideoDevice(deviceId);
            if (device != null) {
                publisher.outlineEventPublish(deviceId, KEEPLIVEKEY_PREFIX);
            }
        }else if (expiredKey.startsWith(REGISTER_INFO_PREFIX)) {
            String callid = expiredKey.substring(REGISTER_INFO_PREFIX.length());
            SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult();
src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/OfflineEventListener.java
@@ -2,8 +2,13 @@
import com.genersoft.iot.vmp.conf.UserSetup;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -32,6 +37,9 @@
    
    @Autowired
    private IVideoManagerStorager storager;
    @Autowired
    private VideoStreamSessionManager streamSession;
    
    @Autowired
    private RedisUtil redis;
@@ -41,6 +49,14 @@
    @Autowired
    private EventPublisher eventPublisher;
    @Autowired
    private IMediaServerService mediaServerService;
    @Autowired
    private ZLMRTPServerFactory zlmrtpServerFactory;
    @Override
    public void onApplicationEvent(OfflineEvent event) {
@@ -73,5 +89,15 @@
        // TODO 离线取消订阅
        // 离线释放所有ssrc
        List<SsrcTransaction> ssrcTransactions = streamSession.getSsrcTransactionForAll(event.getDeviceId(), null, null, null);
        if (ssrcTransactions.size() > 0) {
            for (SsrcTransaction ssrcTransaction : ssrcTransactions) {
                mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc());
                mediaServerService.closeRTPServer(event.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getStream());
                streamSession.remove(event.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getStream());
            }
        }
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/event/platformNotRegister/PlatformNotRegisterEventLister.java
@@ -75,7 +75,7 @@
                    stream.append(",");
                }
                stream.append(sendRtpItem.getStreamId());
                redisCatchStorage.deleteSendRTPServer(event.getPlatformGbID(), sendRtpItem.getChannelId());
                redisCatchStorage.deleteSendRTPServer(event.getPlatformGbID(), sendRtpItem.getChannelId(), null, null);
                MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                Map<String, Object> param = new HashMap<>();
                param.put("vhost", "__defaultVhost__");
@@ -84,9 +84,7 @@
                zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param);
            }
        }
        Timer timer = new Timer();
        SipSubscribe.Event okEvent = (responseEvent)->{
            timer.cancel();
src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/SubscribeListenerForPlatform.java
@@ -4,8 +4,6 @@
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.RedisKeyExpirationEventMessageListener;
import com.genersoft.iot.vmp.conf.UserSetup;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import org.checkerframework.checker.units.qual.A;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -46,7 +44,7 @@
        String PLATFORM_KEEPLIVEKEY_PREFIX = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_";
        if (expiredKey.startsWith(PLATFORM_KEEPLIVEKEY_PREFIX)) {
            // 取消定时任务
            dynamicTask.stopCron(expiredKey);
            dynamicTask.stop(expiredKey);
        }
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java
@@ -86,6 +86,15 @@
        return dialog;
    }
    public SIPDialog getDialogByCallId(String deviceId, String channelId, String callID){
        SsrcTransaction ssrcTransaction = getSsrcTransaction(deviceId, channelId, callID, null);
        if (ssrcTransaction == null) return null;
        byte[] dialogByteArray = ssrcTransaction.getDialog();
        if (dialogByteArray == null) return null;
        SIPDialog dialog = (SIPDialog)SerializeUtils.deSerialize(dialogByteArray);
        return dialog;
    }
    public SsrcTransaction getSsrcTransaction(String deviceId, String channelId, String callId, String stream){
        if (StringUtils.isEmpty(callId)) callId ="*";
        if (StringUtils.isEmpty(stream)) stream ="*";
@@ -95,6 +104,21 @@
        return (SsrcTransaction)redisUtil.get((String) scanResult.get(0));
    }
    public List<SsrcTransaction> getSsrcTransactionForAll(String deviceId, String channelId, String callId, String stream){
        if (StringUtils.isEmpty(deviceId)) deviceId ="*";
        if (StringUtils.isEmpty(channelId)) channelId ="*";
        if (StringUtils.isEmpty(callId)) callId ="*";
        if (StringUtils.isEmpty(stream)) stream ="*";
        String key = VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetup.getServerId() + "_" + deviceId + "_" + channelId + "_" + callId+ "_" + stream;
        List<Object> scanResult = redisUtil.scan(key);
        if (scanResult.size() == 0) return null;
        List<SsrcTransaction> result = new ArrayList<>();
        for (Object keyObj : scanResult) {
            result.add((SsrcTransaction)redisUtil.get((String) keyObj));
        }
        return result;
    }
    public String getMediaServerId(String deviceId, String channelId, String stream){
        SsrcTransaction ssrcTransaction = getSsrcTransaction(deviceId, channelId, null, stream);
        if (ssrcTransaction == null) return null;
src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java
@@ -63,7 +63,5 @@
                }
            }
        }
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java
@@ -96,4 +96,11 @@
     * @param recordInfo 录像信息
     */
    boolean recordInfo(DeviceChannel deviceChannel, ParentPlatform parentPlatform, String fromTag, RecordInfo recordInfo);
    /**
     * 向发起点播的上级回复bye
     * @param platform 平台信息
     * @param callId  callId
     */
    void streamByeCmd(ParentPlatform platform, String callId);
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
@@ -2,6 +2,7 @@
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetup;
import com.genersoft.iot.vmp.gb28181.bean.Device;
@@ -84,6 +85,9 @@
    @Autowired
    private IMediaServerService mediaServerService;
    @Autowired
    private DynamicTask dynamicTask;
    /**
@@ -330,7 +334,8 @@
      * @param errorEvent sip错误订阅
      */
    @Override
    public void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent) {
    public void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
                              ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent) {
        String streamId = ssrcInfo.getStream();
        try {
            if (device == null) return;
@@ -342,15 +347,13 @@
            subscribeKey.put("app", "rtp");
            subscribeKey.put("stream", streamId);
            subscribeKey.put("regist", true);
            subscribeKey.put("schema", "rtmp");
            subscribeKey.put("mediaServerId", mediaServerItem.getId());
            subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
                    (MediaServerItem mediaServerItemInUse, JSONObject json)->{
                if (userSetup.isWaitTrack() && json.getJSONArray("tracks") == null) return;
                if (event != null) {
                    event.response(mediaServerItemInUse, json);
                }
//                subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey);
            });
            //
            StringBuffer content = new StringBuffer(200);
@@ -419,7 +422,7 @@
            transmitRequest(device, request, (e -> {
                streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
                mediaServerService.releaseSsrc(mediaServerItem, ssrcInfo.getSsrc());
                mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
                errorEvent.response(e);
            }), e ->{
                // 这里为例避免一个通道的点播只有一个callID这个参数使用一个固定值
@@ -458,8 +461,6 @@
            logger.debug("录像回放添加订阅,订阅内容:" + subscribeKey.toString());
            subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
                    (MediaServerItem mediaServerItemInUse, JSONObject json)->{
                        System.out.println(344444);
                if (userSetup.isWaitTrack() && json.getJSONArray("tracks") == null) return;
                if (event != null) {
                    event.response(mediaServerItemInUse, json);
                }
@@ -565,7 +566,6 @@
            logger.debug("录像回放添加订阅,订阅内容:" + subscribeKey.toString());
            subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
                    (MediaServerItem mediaServerItemInUse, JSONObject json)->{
                if (userSetup.isWaitTrack() && json.getJSONArray("tracks") == null) return;
                event.response(mediaServerItemInUse, json);
                subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey);
            });
@@ -662,6 +662,7 @@
    @Override
    public void streamByeCmd(String deviceId, String channelId, String stream, SipSubscribe.Event okEvent) {
        try {
            SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(deviceId, channelId, null, stream);
            ClientTransaction transaction = streamSession.getTransactionByStream(deviceId, channelId, stream);
            if (transaction == null) {
                logger.warn("[ {} -> {}]停止视频流的时候发现事务已丢失", deviceId, channelId);
@@ -715,10 +716,9 @@
            dialog.sendRequest(clientTransaction);
            SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(deviceId, channelId, callIdHeader.getCallId(), null);
            if (ssrcTransaction != null) {
                MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransaction.getMediaServerId());
                mediaServerService.releaseSsrc(mediaServerItem, ssrcTransaction.getSsrc());
                mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransaction.getSsrc());
                mediaServerService.closeRTPServer(deviceId, channelId, ssrcTransaction.getStream());
                streamSession.remove(deviceId, channelId, ssrcTransaction.getStream());
            }
@@ -1169,8 +1169,6 @@
     */ 
    @Override
    public boolean catalogQuery(Device device, SipSubscribe.Event errorEvent) {
        // 清空通道
//        storager.cleanChannelsForDevice(device.getDeviceId());
        try {
            StringBuffer catalogXml = new StringBuffer(200);
            catalogXml.append("<?xml version=\"1.0\" encoding=\"GB2312\"?>\r\n");
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
@@ -5,8 +5,16 @@
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderPlarformProvider;
import com.genersoft.iot.vmp.gb28181.utils.DateUtil;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.utils.SerializeUtils;
import gov.nist.javax.sip.SipProviderImpl;
import gov.nist.javax.sip.SipStackImpl;
import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.stack.SIPDialog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -18,10 +26,14 @@
import org.springframework.util.StringUtils;
import javax.sip.*;
import javax.sip.address.SipURI;
import javax.sip.header.CallIdHeader;
import javax.sip.header.ViaHeader;
import javax.sip.header.WWWAuthenticateHeader;
import javax.sip.message.Request;
import java.lang.reflect.Field;
import java.text.ParseException;
import java.util.HashSet;
import java.util.List;
import java.util.UUID;
@@ -38,17 +50,23 @@
    private IRedisCatchStorage redisCatchStorage;
    @Autowired
    private IMediaServerService mediaServerService;
    @Autowired
    private SipSubscribe sipSubscribe;
    @Autowired
    private ZLMRTPServerFactory zlmrtpServerFactory;
    @Lazy
    @Autowired
    @Qualifier(value="tcpSipProvider")
    private SipProvider tcpSipProvider;
    private SipProviderImpl tcpSipProvider;
    @Lazy
    @Autowired
    @Qualifier(value="udpSipProvider")
    private SipProvider udpSipProvider;
    private SipProviderImpl udpSipProvider;
    @Override
    public boolean register(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) {
@@ -57,13 +75,12 @@
    @Override
    public boolean unregister(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) {
        parentPlatform.setExpires("0");
        ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
        if (parentPlatformCatch != null) {
            parentPlatformCatch.setParentPlatform(parentPlatform);
            redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
        }
        parentPlatform.setExpires("0");
        return register(parentPlatform, null, null, errorEvent, okEvent, false);
    }
@@ -543,4 +560,59 @@
        }
        return true;
    }
    @Override
    public void streamByeCmd(ParentPlatform platform, String callId) {
        SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platform.getServerGBId(), null, null, callId);
        if (sendRtpItem != null) {
            String mediaServerId = sendRtpItem.getMediaServerId();
            MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
            if (mediaServerItem != null) {
                mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc());
                zlmrtpServerFactory.closeRTPServer(mediaServerItem, sendRtpItem.getStreamId());
            }
            byte[] dialogByteArray = sendRtpItem.getDialog();
            if (dialogByteArray != null) {
                SIPDialog dialog = (SIPDialog) SerializeUtils.deSerialize(dialogByteArray);
                SipStack sipStack = udpSipProvider.getSipStack();
                SIPDialog sipDialog = ((SipStackImpl) sipStack).putDialog(dialog);
                if (dialog != sipDialog) {
                    dialog = sipDialog;
                } else {
                    try {
                        dialog.setSipProvider(udpSipProvider);
                        Field sipStackField = SIPDialog.class.getDeclaredField("sipStack");
                        sipStackField.setAccessible(true);
                        sipStackField.set(dialog, sipStack);
                        Field eventListenersField = SIPDialog.class.getDeclaredField("eventListeners");
                        eventListenersField.setAccessible(true);
                        eventListenersField.set(dialog, new HashSet<>());
                        byte[] transactionByteArray = sendRtpItem.getTransaction();
                        ClientTransaction clientTransaction = (ClientTransaction) SerializeUtils.deSerialize(transactionByteArray);
                        Request byeRequest = dialog.createRequest(Request.BYE);
                        SipURI byeURI = (SipURI) byeRequest.getRequestURI();
                        SIPRequest request = (SIPRequest) clientTransaction.getRequest();
                        byeURI.setHost(request.getRemoteAddress().getHostName());
                        byeURI.setPort(request.getRemotePort());
                        if ("TCP".equals(platform.getTransport())) {
                            clientTransaction = tcpSipProvider.getNewClientTransaction(byeRequest);
                        } else if ("UDP".equals(platform.getTransport())) {
                            clientTransaction = udpSipProvider.getNewClientTransaction(byeRequest);
                        }
                        dialog.sendRequest(clientTransaction);
                    } catch (SipException e) {
                        e.printStackTrace();
                    } catch (ParseException e) {
                        e.printStackTrace();
                    } catch (NoSuchFieldException e) {
                        e.printStackTrace();
                    } catch (IllegalAccessException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
@@ -3,6 +3,7 @@
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
@@ -22,6 +23,7 @@
import javax.sip.DialogState;
import javax.sip.RequestEvent;
import javax.sip.address.SipURI;
import javax.sip.header.CallIdHeader;
import javax.sip.header.FromHeader;
import javax.sip.header.HeaderAddress;
import javax.sip.header.ToHeader;
@@ -60,6 +62,9 @@
    @Autowired
    private ZLMHttpHookSubscribe subscribe;
    @Autowired
    private DynamicTask dynamicTask;
    /**   
     * 处理  ACK请求
@@ -68,13 +73,16 @@
     */
    @Override
    public void process(RequestEvent evt) {
        logger.info("ACK请求: {}", ((System.currentTimeMillis())));
        Dialog dialog = evt.getDialog();
        CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME);
        if (dialog == null) return;
        if (dialog.getState()== DialogState.CONFIRMED) {
            String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser();
            logger.info("ACK请求: platformGbId->{}", platformGbId);
            // 取消设置的超时任务
            dynamicTask.stop(callIdHeader.getCallId());
            String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser();
            SendRtpItem sendRtpItem =  redisCatchStorage.querySendRTPServer(platformGbId, channelId);
            SendRtpItem sendRtpItem =  redisCatchStorage.querySendRTPServer(platformGbId, channelId, null, callIdHeader.getCallId());
            String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
            String deviceId = sendRtpItem.getDeviceId();
            StreamInfo streamInfo = null;
@@ -83,15 +91,12 @@
            }else {
                streamInfo = redisCatchStorage.queryPlaybackByDevice(deviceId, channelId);
            }
            System.out.println(JSON.toJSON(streamInfo));
            if (streamInfo == null) {
                streamInfo = new StreamInfo();
                streamInfo.setApp(sendRtpItem.getApp());
                streamInfo.setStream(sendRtpItem.getStreamId());
            }
            redisCatchStorage.updateSendRTPSever(sendRtpItem);
            logger.info(platformGbId);
            logger.info(channelId);
            Map<String, Object> param = new HashMap<>();
            param.put("vhost","__defaultVhost__");
            param.put("app",streamInfo.getApp());
@@ -100,42 +105,23 @@
            param.put("dst_url",sendRtpItem.getIp());
            param.put("dst_port", sendRtpItem.getPort());
            param.put("is_udp", is_Udp);
            // 设备推流查询,成功后才能转推
            MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
            zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
//            if (zlmrtpServerFactory.isStreamReady(mediaInfo, streamInfo.getApp(), streamInfo.getStreamId())) {
//                logger.info("已获取设备推流[{}/{}],开始向上级推流[{}:{}]",
//                        streamInfo.getApp() ,streamInfo.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort());
//                zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
//            } else {
//                // 对hook进行订阅
//                logger.info("等待设备推流[{}/{}].......",
//                        streamInfo.getApp(), streamInfo.getStreamId());
//                Timer timer = new Timer();
//                timer.schedule(new TimerTask() {
//                    @Override
//                    public void run() {
//                        logger.info("设备推流[{}/{}]超时,终止向上级推流",
//                                finalStreamInfo.getApp() , finalStreamInfo.getStreamId());
//
//                    }
//                }, 30*1000L);
//                // 添加订阅
//                JSONObject subscribeKey = new JSONObject();
//                subscribeKey.put("app", "rtp");
//                subscribeKey.put("stream", streamInfo.getStreamId());
//                subscribeKey.put("mediaServerId", streamInfo.getMediaServerId());
//                subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_publish, subscribeKey,
//                        (MediaServerItem mediaServerItemInUse, JSONObject json) -> {
//                            logger.info("已获取设备推流[{}/{}],开始向上级推流[{}:{}]",
//                                    finalStreamInfo.getApp(), finalStreamInfo.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort());
//                            timer.cancel();
//                            zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
//                            subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey);
//                        });
//            }
            JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
            if (jsonObject.getInteger("code") != 0) {
                logger.info("监听流以等待流上线{}/{}", streamInfo.getApp(), streamInfo.getStream());
                // 监听流上线
                // 添加订阅
                JSONObject subscribeKey = new JSONObject();
                subscribeKey.put("app", "rtp");
                subscribeKey.put("stream", streamInfo.getStream());
                subscribeKey.put("regist", true);
                subscribeKey.put("schema", "rtmp");
                subscribeKey.put("mediaServerId", sendRtpItem.getMediaServerId());
                subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
                        (MediaServerItem mediaServerItemInUse, JSONObject json)->{
                            zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
                        });
            }
        }
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
@@ -4,6 +4,8 @@
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
@@ -13,6 +15,8 @@
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import com.genersoft.iot.vmp.utils.SerializeUtils;
import gov.nist.javax.sip.stack.SIPDialog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
@@ -21,6 +25,7 @@
import javax.sip.*;
import javax.sip.address.SipURI;
import javax.sip.header.CallIdHeader;
import javax.sip.header.FromHeader;
import javax.sip.header.HeaderAddress;
import javax.sip.header.ToHeader;
@@ -56,6 +61,9 @@
    @Autowired
    private SIPProcessorObserver sipProcessorObserver;
    @Autowired
    private VideoStreamSessionManager streamSession;
    @Override
    public void afterPropertiesSet() throws Exception {
        // 添加消息处理的订阅
@@ -71,11 +79,12 @@
        try {
            responseAck(evt, Response.OK);
            Dialog dialog = evt.getDialog();
            CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME);
            if (dialog == null) return;
            if (dialog.getState().equals(DialogState.TERMINATED)) {
                String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser();
                String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser();
                SendRtpItem sendRtpItem =  redisCatchStorage.querySendRTPServer(platformGbId, channelId);
                SendRtpItem sendRtpItem =  redisCatchStorage.querySendRTPServer(platformGbId, channelId, null, callIdHeader.getCallId());
                logger.info("收到bye, [{}/{}]", platformGbId, channelId);
                if (sendRtpItem != null){
                    String streamId = sendRtpItem.getStreamId();
@@ -87,35 +96,44 @@
                    logger.info("停止向上级推流:" + streamId);
                    MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                    zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param);
                    redisCatchStorage.deleteSendRTPServer(platformGbId, channelId);
                    redisCatchStorage.deleteSendRTPServer(platformGbId, channelId, callIdHeader.getCallId(), null);
                    int totalReaderCount = zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId);
                    if (totalReaderCount == 0) {
                    if (totalReaderCount <= 0) {
                        logger.info(streamId + "无其它观看者,通知设备停止推流");
                        cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId, streamId);
                    }else if (totalReaderCount == -1){
                        logger.warn(streamId + " 查找其它观看者失败");
                    }
                }
                // 可能是设备主动停止
                Device device = storager.queryVideoDeviceByChannelId(platformGbId);
                if (device != null) {
                    storager.stopPlay(device.getDeviceId(), channelId);
                    StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId);
                    if (sendRtpItem != null) {
                        if (sendRtpItem.isPlay()) {
                            if (streamInfo != null) {
                                redisCatchStorage.stopPlay(streamInfo);
                            }
                        }else {
                            if (streamInfo != null) {
                                redisCatchStorage.stopPlayback(streamInfo);
                            }
                        }
                        storager.stopPlay(device.getDeviceId(), channelId);
                    if (streamInfo != null) {
                        redisCatchStorage.stopPlay(streamInfo);
                        mediaServerService.closeRTPServer(device.getDeviceId(), channelId, streamInfo.getStream());
                    }
                    SsrcTransaction ssrcTransactionForPlay = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, "play", null);
                    if (ssrcTransactionForPlay != null){
                        SIPDialog dialogForPlay = (SIPDialog) SerializeUtils.deSerialize(ssrcTransactionForPlay.getDialog());
                        if (dialogForPlay.getCallId().equals(callIdHeader.getCallId())){
                            // 释放ssrc
                            MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransactionForPlay.getMediaServerId());
                            if (mediaServerItem != null) {
                                mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransactionForPlay.getSsrc());
                            }
                            streamSession.remove(device.getDeviceId(), channelId, ssrcTransactionForPlay.getStream());
                        }
                    }
                    SsrcTransaction ssrcTransactionForPlayBack = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, callIdHeader.getCallId(), null);
                    if (ssrcTransactionForPlayBack != null) {
                        // 释放ssrc
                        MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransactionForPlayBack.getMediaServerId());
                        if (mediaServerItem != null) {
                            mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransactionForPlayBack.getSsrc());
                        }
                        streamSession.remove(device.getDeviceId(), channelId, ssrcTransactionForPlayBack.getStream());
                    }
                }
            }
        } catch (SipException e) {
            e.printStackTrace();
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -3,6 +3,7 @@
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
@@ -21,6 +22,7 @@
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import com.genersoft.iot.vmp.utils.SerializeUtils;
import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult;
import gov.nist.javax.sdp.TimeDescriptionImpl;
import gov.nist.javax.sdp.fields.TimeField;
@@ -67,6 +69,9 @@
    @Autowired
    private IRedisCatchStorage  redisCatchStorage;
    @Autowired
    private DynamicTask dynamicTask;
    @Autowired
    private SIPCommander cmder;
@@ -257,11 +262,13 @@
                    }
                    sendRtpItem.setCallId(callIdHeader.getCallId());
                    sendRtpItem.setPlay("Play".equals(sessionName));
                    byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog());
                    sendRtpItem.setDialog(dialogByteArray);
                    byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction());
                    sendRtpItem.setTransaction(transactionByteArray);
                    // 写入redis, 超时时回复
                    redisCatchStorage.updateSendRTPSever(sendRtpItem);
                    Device finalDevice = device;
                    MediaServerItem finalMediaServerItem = mediaServerItem;
                    Long finalStartTime = startTime;
                    Long finalStopTime = stopTime;
                    ZLMHttpHookSubscribe.Event hookEvent = (mediaServerItemInUSe, responseJSON)->{
@@ -289,7 +296,15 @@
                        content.append("f=\r\n");
                        try {
                            // 超时未收到Ack应该回复bye,当前等待时间为10秒
                            dynamicTask.startDelay(callIdHeader.getCallId(), ()->{
                                logger.info("Ack 等待超时");
                                mediaServerService.releaseSsrc(mediaServerItemInUSe.getId(), ssrc);
                                // 回复bye
                                cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId());
                            }, 60);
                            responseSdpAck(evt, content.toString(), platform);
                        } catch (SipException e) {
                            e.printStackTrace();
                        } catch (InvalidArgumentException e) {
@@ -320,6 +335,7 @@
                                if (result.getEvent() != null) {
                                    errorEvent.response(result.getEvent());
                                }
                                redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null);
                                try {
                                    responseAck(evt, Response.REQUEST_TIMEOUT);
                                } catch (SipException e) {
@@ -343,7 +359,9 @@
                                sendRtpItem.setStreamId(String.format("%s_%s", device.getDeviceId(), channelId));
                            }
                            sendRtpItem.setPlay(false);
                            playService.play(mediaServerItem,device.getDeviceId(), channelId, hookEvent,errorEvent);
                            playService.play(mediaServerItem,device.getDeviceId(), channelId, hookEvent, errorEvent, ()->{
                                redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null);
                            });
                        }else {
                            sendRtpItem.setStreamId(streamInfo.getStream());
                            hookEvent.response(mediaServerItem, null);
@@ -365,6 +383,11 @@
                    // 写入redis, 超时时回复
                    sendRtpItem.setStatus(1);
                    sendRtpItem.setCallId(callIdHeader.getCallId());
                    byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog());
                    sendRtpItem.setDialog(dialogByteArray);
                    byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction());
                    sendRtpItem.setTransaction(transactionByteArray);
                    redisCatchStorage.updateSendRTPSever(sendRtpItem);
                    StringBuffer content = new StringBuffer(200);
                    content.append("v=0\r\n");
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java
@@ -158,6 +158,10 @@
                        device.setCharset("gb2312");
                        device.setDeviceId(deviceId);
                        device.setFirsRegister(true);
                    }else {
                        if (device.getOnline() == 0) {
                            device.setFirsRegister(true);
                        }
                    }
                    device.setIp(received);
                    device.setPort(rPort);
@@ -187,7 +191,6 @@
            if (serverTransaction.getDialog() != null) serverTransaction.getDialog().delete();
            // 注册成功
            // 保存到redis
            // 下发catelog查询目录
            if (registerFlag == 1 ) {
                logger.info("[{}] 注册成功! deviceId:" + device.getDeviceId(), requestAddress);
                publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_REGISTER);
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java
@@ -27,9 +27,7 @@
import javax.sip.RequestEvent;
import javax.sip.ServerTransaction;
import javax.sip.SipException;
import javax.sip.header.CallIdHeader;
import javax.sip.header.ExpiresHeader;
import javax.sip.header.Header;
import javax.sip.header.ToHeader;
import javax.sip.message.Request;
import javax.sip.message.Response;
@@ -139,18 +137,16 @@
        if (subscribeInfo.getExpires() > 0) {
            if (redisCatchStorage.getSubscribe(key) != null) {
                dynamicTask.stopCron(key);
                dynamicTask.stop(key);
            }
            String interval = XmlUtil.getText(rootElement, "Interval"); // GPS上报时间间隔
            dynamicTask.startCron(key, new GPSSubscribeTask(redisCatchStorage, sipCommanderForPlatform, storager,  platformId, sn, key), Integer.parseInt(interval));
            redisCatchStorage.updateSubscribe(key, subscribeInfo);
        }else if (subscribeInfo.getExpires() == 0) {
            dynamicTask.stopCron(key);
            dynamicTask.stop(key);
            redisCatchStorage.delSubscribe(key);
        }
        try {
            ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformId);
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java
@@ -85,19 +85,18 @@
            redisCatchStorage.delPlatformRegisterInfo(callId);
            parentPlatform.setStatus("注册".equals(action));
            // 取回Expires设置,避免注销过程中被置为0
            ParentPlatform parentPlatformTmp = storager.queryParentPlatByServerGBId(platformGBId);
            String expires = parentPlatformTmp.getExpires();
            parentPlatform.setExpires(expires);
            parentPlatform.setId(parentPlatformTmp.getId());
            if (!parentPlatformCatch.getParentPlatform().getExpires().equals("0")) {
                ParentPlatform parentPlatformTmp = storager.queryParentPlatByServerGBId(platformGBId);
                String expires = parentPlatformTmp.getExpires();
                parentPlatform.setExpires(expires);
                parentPlatform.setId(parentPlatformTmp.getId());
                redisCatchStorage.updatePlatformRegister(parentPlatform);
                redisCatchStorage.updatePlatformKeepalive(parentPlatform);
                parentPlatformCatch.setParentPlatform(parentPlatform);
                redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
            }
            storager.updateParentPlatformStatus(platformGBId, "注册".equals(action));
            redisCatchStorage.updatePlatformRegister(parentPlatform);
            redisCatchStorage.updatePlatformKeepalive(parentPlatform);
            parentPlatformCatch.setParentPlatform(parentPlatform);
            redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
        }
    }
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -489,7 +489,7 @@
        }
        String mediaServerId = json.getString("mediaServerId");
        MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
        if (userSetup.isAutoApplyPlay() && mediaInfo != null) {
        if (userSetup.isAutoApplyPlay() && mediaInfo != null && mediaInfo.isRtpEnable()) {
            String app = json.getString("app");
            String streamId = json.getString("stream");
            if ("rtp".equals(app)) {
@@ -499,28 +499,16 @@
                    String channelId = s[1];
                    Device device = redisCatchStorage.getDevice(deviceId);
                    if (device != null) {
                        UUID uuid = UUID.randomUUID();
                        SSRCInfo ssrcInfo;
                        String streamId2 = null;
                        if (mediaInfo.isRtpEnable()) {
                            streamId2 = String.format("%s_%s", device.getDeviceId(), channelId);
                        }
                        ssrcInfo = mediaServerService.openRTPServer(mediaInfo, streamId2);
                        cmder.playStreamCmd(mediaInfo, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
                            logger.info("收到订阅消息: " + response.toJSONString());
                            playService.onPublishHandlerForPlay(mediaServerItemInuse, response, deviceId, channelId, uuid.toString());
                        }, null);
                        playService.play(mediaInfo,deviceId, channelId, null, null, null);
                    }
                }
            }
        }
        JSONObject ret = new JSONObject();
        ret.put("code", 0);
        ret.put("msg", "success");
        return new ResponseEntity<String>(ret.toString(),HttpStatus.OK);
        return new ResponseEntity<>(ret.toString(),HttpStatus.OK);
    }
    
    /**
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
@@ -205,7 +205,7 @@
    /**
     * 调用zlm RESTful API —— startSendRtp
     */
    public Boolean startSendRtpStream(MediaServerItem mediaServerItem, Map<String, Object>param) {
    public JSONObject startSendRtpStream(MediaServerItem mediaServerItem, Map<String, Object>param) {
        Boolean result = false;
        JSONObject jsonObject = zlmresTfulUtils.startSendRtp(mediaServerItem, param);
        if (jsonObject == null) {
@@ -216,7 +216,7 @@
        } else {
            logger.error("RTP推流失败: " + jsonObject.getString("msg"));
        }
        return result;
        return jsonObject;
    }
    /**
src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMStatusEventListener.java
@@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.media.zlm.event;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.service.IStreamProxyService;
import com.genersoft.iot.vmp.service.IStreamPushService;
import org.slf4j.Logger;
@@ -34,6 +35,9 @@
    @Autowired
    private IMediaServerService mediaServerService;
    @Autowired
    private IPlayService playService;
    private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    @Async
@@ -55,6 +59,6 @@
        mediaServerService.zlmServerOffline(event.getMediaServerId());
        streamProxyService.zlmServerOffline(event.getMediaServerId());
        streamPushService.zlmServerOffline(event.getMediaServerId());
        // TODO 处理对国标的影响
        playService.zlmServerOffline(event.getMediaServerId());
    }
}
src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java
@@ -58,7 +58,7 @@
    void removeCount(String mediaServerId);
    void releaseSsrc(MediaServerItem mediaServerItem, String ssrc);
    void releaseSsrc(String mediaServerItemId, String ssrc);
    void clearMediaServerForOnline();
src/main/java/com/genersoft/iot/vmp/service/IPlayService.java
@@ -17,11 +17,13 @@
    void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String uuid);
    PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent);
    PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent, Runnable timeoutCallback);
    MediaServerItem getNewMediaServerItem(Device device);
    void onPublishHandlerForDownload(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String toString);
    DeferredResult<ResponseEntity<String>> playBack(String deviceId, String channelId, String startTime, String endTime, PlayBackCallback errorCallBack);
    void zlmServerOffline(String mediaServerId);
}
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
@@ -52,11 +52,9 @@
            return false;
        }
        logger.info("移除目录订阅: {}", device.getDeviceId());
        dynamicTask.stopCron(device.getDeviceId());
        dynamicTask.stop(device.getDeviceId());
        device.setSubscribeCycleForCatalog(0);
        sipCommander.catalogSubscribe(device, null, null);
        // 清空cseq计数
        return true;
    }
}
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
@@ -167,13 +167,14 @@
        if (mediaServerItem != null) {
            String streamId = String.format("%s_%s", deviceId, channelId);
            zlmrtpServerFactory.closeRTPServer(mediaServerItem, streamId);
            releaseSsrc(mediaServerItem, ssrc);
            releaseSsrc(mediaServerItem.getId(), ssrc);
        }
        streamSession.remove(deviceId, channelId, stream);
    }
    @Override
    public void releaseSsrc(MediaServerItem mediaServerItem, String ssrc) {
    public void releaseSsrc(String mediaServerItemId, String ssrc) {
        MediaServerItem mediaServerItem = getOne(mediaServerItemId);
        if (mediaServerItem == null || ssrc == null) {
            return;
        }
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -5,13 +5,13 @@
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.UserSetup;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
@@ -37,8 +37,7 @@
import org.springframework.web.context.request.async.DeferredResult;
import java.io.FileNotFoundException;
import java.util.Objects;
import java.util.UUID;
import java.util.*;
@SuppressWarnings(value = {"rawtypes", "unchecked"})
@Service
@@ -51,6 +50,9 @@
    @Autowired
    private SIPCommander cmder;
    @Autowired
    private SIPCommanderFroPlatform sipCommanderFroPlatform;
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
@@ -78,7 +80,9 @@
    @Override
    public PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId, ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent) {
    public PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId,
                           ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
                           Runnable timeoutCallback) {
        PlayResult playResult = new PlayResult();
        RequestMessage msg = new RequestMessage();
        String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId;
@@ -101,29 +105,10 @@
        Device device = redisCatchStorage.getDevice(deviceId);
        StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
        playResult.setDevice(device);
        // 超时处理
        result.onTimeout(()->{
            logger.warn(String.format("设备点播超时,deviceId:%s ,channelId:%s", deviceId, channelId));
            WVPResult wvpResult = new WVPResult();
            wvpResult.setCode(-1);
            SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, streamInfo.getStream());
            if (dialog != null) {
                wvpResult.setMsg("收流超时,请稍候重试");
            }else {
                wvpResult.setMsg("点播超时,请稍候重试");
            }
            msg.setData(wvpResult);
            // 点播超时回复BYE
            cmder.streamByeCmd(device.getDeviceId(), channelId, streamInfo.getStream());
            // 释放rtpserver
            mediaServerService.closeRTPServer(playResult.getDevice().getDeviceId(), channelId, streamInfo.getStream());
            // 回复之前所有的点播请求
            resultHolder.invokeAllResult(msg);
            // TODO 释放ssrc
        });
        result.onCompletion(()->{
            // 点播结束时调用截图接口
            // TODO 应该在上流时调用更好,结束也可能是错误结束
            try {
                String classPath = ResourceUtils.getURL("classpath:").getPath();
                // 兼容打包为jar的class路径
@@ -161,31 +146,60 @@
            if (mediaServerItem.isRtpEnable()) {
                streamId = String.format("%s_%s", device.getDeviceId(), channelId);
            }
            SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId);
            // 超时处理
            Timer timer = new Timer();
            timer.schedule(new TimerTask() {
                @Override
                public void run() {
                    logger.warn(String.format("设备点播超时,deviceId:%s ,channelId:%s", deviceId, channelId));
                    if (timeoutCallback != null) {
                        timeoutCallback.run();
                    }
                    WVPResult wvpResult = new WVPResult();
                    wvpResult.setCode(-1);
                    SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, ssrcInfo.getStream());
                    if (dialog != null) {
                        wvpResult.setMsg("收流超时,请稍候重试");
                        // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
                        cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream());
                    }else {
                        wvpResult.setMsg("点播超时,请稍候重试");
                        mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
                        mediaServerService.closeRTPServer(deviceId, channelId, ssrcInfo.getStream());
                        streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
                    }
                    msg.setData(wvpResult);
                    // 回复之前所有的点播请求
                    resultHolder.invokeAllResult(msg);
                }
            }, userSetup.getPlayTimeout());
            // 发送点播消息
            cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInUse, JSONObject response) -> {
                logger.info("收到订阅消息: " + response.toJSONString());
                timer.cancel();
                onPublishHandlerForPlay(mediaServerItemInUse, response, deviceId, channelId, uuid);
                if (hookEvent != null) {
                    hookEvent.response(mediaServerItem, response);
                }
            }, (event) -> {
                timer.cancel();
                WVPResult wvpResult = new WVPResult();
                wvpResult.setCode(-1);
                // 点播返回sip错误
                mediaServerService.closeRTPServer(playResult.getDevice().getDeviceId(), channelId, ssrcInfo.getStream());
                // 释放ssrc
                mediaServerService.releaseSsrc(mediaServerItem, ssrcInfo.getSsrc());
                mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
                streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
                wvpResult.setMsg(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg));
                msg.setData(wvpResult);
                resultHolder.invokeAllResult(msg);
                if (errorEvent != null) {
                    errorEvent.response(event);
                }
            });
        } else {
            String streamId = streamInfo.getStream();
@@ -222,13 +236,41 @@
                    streamId2 = String.format("%s_%s", device.getDeviceId(), channelId);
                }
                SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId2);
                // 超时处理
                Timer timer = new Timer();
                timer.schedule(new TimerTask() {
                    @Override
                    public void run() {
                        logger.warn(String.format("设备点播超时,deviceId:%s ,channelId:%s", deviceId, channelId));
                        if (timeoutCallback != null) {
                            timeoutCallback.run();
                        }
                        WVPResult wvpResult = new WVPResult();
                        wvpResult.setCode(-1);
                        SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, ssrcInfo.getStream());
                        if (dialog != null) {
                            wvpResult.setMsg("收流超时,请稍候重试");
                            // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
                            cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream());
                        }else {
                            wvpResult.setMsg("点播超时,请稍候重试");
                            mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
                            mediaServerService.closeRTPServer(deviceId, channelId, ssrcInfo.getStream());
                            streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
                        }
                        msg.setData(wvpResult);
                        // 回复之前所有的点播请求
                        resultHolder.invokeAllResult(msg);
                    }
                }, userSetup.getPlayTimeout());
                cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
                    logger.info("收到订阅消息: " + response.toJSONString());
                    onPublishHandlerForPlay(mediaServerItemInuse, response, deviceId, channelId, uuid);
                }, (event) -> {
                    mediaServerService.closeRTPServer(playResult.getDevice().getDeviceId(), channelId, ssrcInfo.getStream());
                    // 释放ssrc
                    mediaServerService.releaseSsrc(mediaServerItem, ssrcInfo.getSsrc());
                    mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
                    streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
                    WVPResult wvpResult = new WVPResult();
                    wvpResult.setCode(-1);
@@ -306,14 +348,23 @@
        msg.setId(uuid);
        msg.setKey(key);
        PlayBackResult<RequestMessage> playBackResult = new PlayBackResult<>();
        result.onTimeout(()->{
            msg.setData("回放超时");
            playBackResult.setCode(-1);
            playBackResult.setData(msg);
            callback.call(playBackResult);
        });
        Timer timer = new Timer();
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                logger.warn(String.format("设备回放超时,deviceId:%s ,channelId:%s", deviceId, channelId));
                playBackResult.setCode(-1);
                playBackResult.setData(msg);
                callback.call(playBackResult);
                // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
                cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream());
                // 回复之前所有的点播请求
                callback.call(playBackResult);
            }
        }, userSetup.getPlayTimeout());
        cmder.playbackStreamCmd(newMediaServerItem, ssrcInfo, device, channelId, startTime, endTime, (MediaServerItem mediaServerItem, JSONObject response) -> {
            logger.info("收到订阅消息: " + response.toJSONString());
            timer.cancel();
            StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId);
            if (streamInfo == null) {
                logger.warn("设备回放API调用失败!");
@@ -331,6 +382,7 @@
            playBackResult.setResponse(response);
            callback.call(playBackResult);
        }, event -> {
            timer.cancel();
            msg.setData(String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg));
            playBackResult.setCode(-1);
            playBackResult.setData(msg);
@@ -370,4 +422,26 @@
        return streamInfo;
    }
    @Override
    public void zlmServerOffline(String mediaServerId) {
        // 处理正在向上推流的上级平台
        List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServer(null);
        if (sendRtpItems.size() > 0) {
            for (SendRtpItem sendRtpItem : sendRtpItems) {
                if (sendRtpItem.getMediaServerId().equals(mediaServerId)) {
                    ParentPlatform platform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
                    sipCommanderFroPlatform.streamByeCmd(platform, sendRtpItem.getCallId());
                }
            }
        }
        // 处理正在观看的国标设备
        List<SsrcTransaction> allSsrc = streamSession.getAllSsrc();
        if (allSsrc.size() > 0) {
            for (SsrcTransaction ssrcTransaction : allSsrc) {
                if(ssrcTransaction.getMediaServerId().equals(mediaServerId)) {
                    cmder.streamByeCmd(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getStream());
                }
            }
        }
    }
}
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
@@ -89,7 +89,7 @@
     * @param channelId
     * @return sendRtpItem
     */
    SendRtpItem querySendRTPServer(String platformGbId, String channelId);
    SendRtpItem querySendRTPServer(String platformGbId, String channelId, String streamId, String callId);
    List<SendRtpItem> querySendRTPServer(String platformGbId);
@@ -98,7 +98,7 @@
     * @param platformGbId
     * @param channelId
     */
    void deleteSendRTPServer(String platformGbId, String channelId);
    void deleteSendRTPServer(String platformGbId, String channelId, String callId, String streamId);
    /**
     * 查询某个通道是否存在上级点播(RTP推送)
src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java
@@ -135,6 +135,32 @@
            "'${item.ipAddress}', ${item.port}, '${item.password}', ${item.PTZType}, ${item.status}, " +
            "'${item.streamId}', ${item.longitude}, ${item.latitude},'${item.createTime}', '${item.updateTime}')" +
            "</foreach> " +
            "ON DUPLICATE KEY UPDATE " +
            "updateTime=VALUES(updateTime), " +
            "name=VALUES(name), " +
            "manufacture=VALUES(manufacture), " +
            "model=VALUES(model), " +
            "owner=VALUES(owner), " +
            "civilCode=VALUES(civilCode), " +
            "block=VALUES(block), " +
            "subCount=VALUES(subCount), " +
            "address=VALUES(address), " +
            "parental=VALUES(parental), " +
            "parentId=VALUES(parentId), " +
            "safetyWay=VALUES(safetyWay), " +
            "registerWay=VALUES(registerWay), " +
            "certNum=VALUES(certNum), " +
            "certifiable=VALUES(certifiable), " +
            "errCode=VALUES(errCode), " +
            "secrecy=VALUES(secrecy), " +
            "ipAddress=VALUES(ipAddress), " +
            "port=VALUES(port), " +
            "password=VALUES(password), " +
            "PTZType=VALUES(PTZType), " +
            "status=VALUES(status), " +
            "streamId=VALUES(streamId), " +
            "longitude=VALUES(longitude), " +
            "latitude=VALUES(latitude)" +
            "</script>")
    int batchAdd(List<DeviceChannel> addChannels);
@@ -211,4 +237,15 @@
            "        from device_channel\n" +
            "        where deviceId = #{deviceId}")
    List<DeviceChannelTree> tree(String deviceId);
    @Delete(value = {" <script>" +
            "DELETE " +
            "from " +
            "device_channel " +
            "WHERE " +
            "deviceId = #{deviceId} " +
            " AND channelId NOT IN " +
            "<foreach collection='channels'  item='item'  open='(' separator=',' close=')' > #{item.channelId}</foreach>" +
            " </script>"})
    int cleanChannelsNotInList(String deviceId, List<DeviceChannel> channels);
}
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -18,6 +18,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.core.parameters.P;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
@@ -276,19 +277,32 @@
    @Override
    public void updateSendRTPSever(SendRtpItem sendRtpItem) {
        String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetup.getServerId() + "_" + sendRtpItem.getPlatformId() + "_" + sendRtpItem.getChannelId();
        String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetup.getServerId() + "_"
                + sendRtpItem.getPlatformId() + "_" + sendRtpItem.getChannelId() + "_"
                + sendRtpItem.getStreamId() + "_" + sendRtpItem.getCallId();
        redis.set(key, sendRtpItem);
    }
    @Override
    public SendRtpItem querySendRTPServer(String platformGbId, String channelId) {
        String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetup.getServerId() + "_" + platformGbId + "_" + channelId;
        return (SendRtpItem)redis.get(key);
    public SendRtpItem querySendRTPServer(String platformGbId, String channelId, String streamId, String callId) {
        if (platformGbId == null) platformGbId = "*";
        if (channelId == null) channelId = "*";
        if (streamId == null) streamId = "*";
        if (callId == null) callId = "*";
        String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetup.getServerId() + "_" + platformGbId
                + "_" + channelId + "_" + streamId + "_" + callId;
        List<Object> scan = redis.scan(key);
        if (scan.size() > 0) {
            return (SendRtpItem)redis.get((String)scan.get(0));
        }else {
            return null;
        }
    }
    @Override
    public List<SendRtpItem> querySendRTPServer(String platformGbId) {
        String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetup.getServerId() + "_" + platformGbId + "_*";
        if (platformGbId == null) platformGbId = "*";
        String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetup.getServerId() + "_" + platformGbId + "_*" + "_*" + "_*";
        List<Object> queryResult = redis.scan(key);
        List<SendRtpItem> result= new ArrayList<>();
@@ -306,10 +320,20 @@
     * @param channelId
     */
    @Override
    public void deleteSendRTPServer(String platformGbId, String channelId) {
        String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetup.getServerId() + "_" + platformGbId + "_" + channelId;
        redis.del(key);
    public void deleteSendRTPServer(String platformGbId, String channelId, String callId, String streamId) {
        if (streamId == null) streamId = "*";
        if (callId == null) callId = "*";
        String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetup.getServerId() + "_" + platformGbId
                + "_" + channelId + "_" + streamId + "_" + callId;
        List<Object> scan = redis.scan(key);
        if (scan.size() > 0) {
            for (Object keyStr : scan) {
                redis.del((String)keyStr);
            }
        }
    }
    /**
     * 查询某个通道是否存在上级点播(RTP推送)
@@ -317,7 +341,7 @@
     */
    @Override
    public boolean isChannelSendingRTP(String channelId) {
        String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetup.getServerId() + "_" + "*_" + channelId;
        String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetup.getServerId() + "_" + "*_" + channelId + "*_" + "*_";
        List<Object> RtpStreams = redis.scan(key);
        if (RtpStreams.size() > 0) {
            return true;
src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java
@@ -284,7 +284,8 @@
            logger.debug("[目录查询]收到的数据存在重复: {}" , stringBuilder);
        }
        try {
            int cleanChannelsResult = deviceChannelMapper.cleanChannelsByDeviceId(deviceId);
//            int cleanChannelsResult = deviceChannelMapper.cleanChannelsByDeviceId(deviceId);
            int cleanChannelsResult = deviceChannelMapper.cleanChannelsNotInList(deviceId, channels);
            int limitCount = 300;
            boolean result = cleanChannelsResult < 0;
            if (!result && channels.size() > 0) {
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java
@@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.vmanager.gb28181.device;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.event.DeviceOffLineDetector;
@@ -13,7 +14,6 @@
import com.genersoft.iot.vmp.vmanager.bean.DeviceChannelTree;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.github.pagehelper.PageInfo;
import com.github.xiaoymin.knife4j.annotations.ApiOperationSupport;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
@@ -56,6 +56,9 @@
    @Autowired
    private IDeviceService deviceService;
    @Autowired
    private DynamicTask dynamicTask;
    /**
     * 使用ID查询国标设备
@@ -209,6 +212,8 @@
        boolean isSuccess = storager.delete(deviceId);
        if (isSuccess) {
            redisCatchStorage.clearCatchByDeviceId(deviceId);
            // 停止此设备的订阅更新
            dynamicTask.stop(deviceId);
            JSONObject json = new JSONObject();
            json.put("deviceId", deviceId);
            return new ResponseEntity<>(json.toString(),HttpStatus.OK);
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/platform/PlatformController.java
@@ -2,8 +2,9 @@
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.gb28181.bean.CatalogData;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetup;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
@@ -40,6 +41,9 @@
    private final static Logger logger = LoggerFactory.getLogger(PlatformController.class);
    @Autowired
    private UserSetup userSetup;
    @Autowired
    private IVideoManagerStorager storager;
    @Autowired
@@ -50,6 +54,9 @@
    @Autowired
    private SipConfig sipConfig;
    @Autowired
    private DynamicTask dynamicTask;
    /**
     * 获取国标服务的配置
@@ -222,7 +229,7 @@
        if (updateResult) {
            // 保存时启用就发送注册
            if (parentPlatform.isEnable()) {
                if (parentPlatformOld.isStatus()) {
                if (parentPlatformOld != null && parentPlatformOld.isStatus()) {
                    commanderForPlatform.unregister(parentPlatformOld, null, null);
                    try {
                        Thread.sleep(500);
@@ -287,8 +294,9 @@
        boolean deleteResult = storager.deleteParentPlatform(parentPlatform);
        storager.delCatalogByPlatformId(parentPlatform.getServerGBId());
        storager.delRelationByPlatformId(parentPlatform.getServerGBId());
        // 停止发送位置订阅定时任务
        String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() +  "_MobilePosition_" + parentPlatform.getServerGBId();
        dynamicTask.stop(key);
        if (deleteResult) {
            return new ResponseEntity<>("success", HttpStatus.OK);
        } else {
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java
@@ -88,7 +88,7 @@
        // 获取可用的zlm
        Device device = storager.queryVideoDevice(deviceId);
        MediaServerItem newMediaServerItem = playService.getNewMediaServerItem(device);
        PlayResult playResult = playService.play(newMediaServerItem, deviceId, channelId, null, null);
        PlayResult playResult = playService.play(newMediaServerItem, deviceId, channelId, null, null, null);
        return playResult.getResult();
    }
src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java
@@ -150,7 +150,7 @@
            JSONObject result = new JSONObject();
            result.put("error", "channel[ " + code + " ] " + eventResult.msg);
            resultDeferredResult.setResult(result);
        });
        }, null);
        return resultDeferredResult;
    }
src/main/resources/all-application.yml
@@ -170,8 +170,6 @@
    save-position-history: false
    # 点播等待超时时间,单位:毫秒
    play-timeout: 3000
    # 等待音视频编码信息再返回, true: 可以根据编码选择合适的播放器,false: 可以更快点播
    wait-track: false
    # 是否开启接口鉴权
    interface-authentication: true
    # 自动配置redis 可以过期事件