648540858
2022-03-10 c1d7f867c2ffcb1364334a5e013eb8f208819ef5
优化目录订阅以及国标级联目录订阅回复
24个文件已修改
1个文件已添加
659 ■■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/conf/MediaConfig.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/auth/RegisterLogicHandler.java 26 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java 37 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java 26 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java 18 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/OfflineEventListener.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEvent.java 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java 26 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java 14 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java 106 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java 143 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java 55 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageRequestProcessor.java 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java 75 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java 60 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/conf/MediaConfig.java
@@ -60,8 +60,8 @@
    @Value("${media.secret}")
    private String secret;
    @Value("${media.stream-none-reader-delay-ms:18000}")
    private int streamNoneReaderDelayMS = 18000;
    @Value("${media.stream-none-reader-delay-ms:10000}")
    private int streamNoneReaderDelayMS = 10000;
    @Value("${media.rtp.enable}")
    private boolean rtpEnable;
src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java
@@ -47,7 +47,7 @@
        Properties properties = new Properties();
        properties.setProperty("javax.sip.STACK_NAME", "GB28181_SIP");
        properties.setProperty("javax.sip.IP_ADDRESS", sipConfig.getMonitorIp());
        properties.setProperty("gov.nist.javax.sip.LOG_MESSAGE_CONTENT", "false");
        properties.setProperty("gov.nist.javax.sip.LOG_MESSAGE_CONTENT", "true");
        /**
         * sip_server_log.log 和 sip_debug_log.log public static final int TRACE_NONE =
         * 0; public static final int TRACE_MESSAGES = 16; public static final int
@@ -57,6 +57,7 @@
        properties.setProperty("gov.nist.javax.sip.SERVER_LOG", "sip_server_log");
        properties.setProperty("gov.nist.javax.sip.DEBUG_LOG", "sip_debug_log");
        sipStack = (SipStackImpl) sipFactory.createSipStack(properties);
        return sipStack;
    }
@@ -70,6 +71,7 @@
            tcpSipProvider = (SipProviderImpl)sipStack.createSipProvider(tcpListeningPoint);
            tcpSipProvider.setDialogErrorsAutomaticallyHandled();
            tcpSipProvider.addSipListener(sipProcessorObserver);
//            tcpSipProvider.setAutomaticDialogSupportEnabled(false);
            logger.info("Sip Server TCP 启动成功 port {" + sipConfig.getMonitorIp() + ":" + sipConfig.getPort() + "}");
        } catch (TransportNotSupportedException e) {
            e.printStackTrace();
@@ -93,6 +95,7 @@
            udpListeningPoint = sipStack.createListeningPoint(sipConfig.getMonitorIp(), sipConfig.getPort(), "UDP");
            udpSipProvider = (SipProviderImpl)sipStack.createSipProvider(udpListeningPoint);
            udpSipProvider.addSipListener(sipProcessorObserver);
//            udpSipProvider.setAutomaticDialogSupportEnabled(false);
        } catch (TransportNotSupportedException e) {
            e.printStackTrace();
        } catch (InvalidArgumentException e) {
src/main/java/com/genersoft/iot/vmp/gb28181/auth/RegisterLogicHandler.java
@@ -27,18 +27,18 @@
    
    public void onRegister(Device device) {
        // 只有第一次注册时调用查询设备信息,如需更新调用更新API接口
        // TODO 此处错误无法获取到通道
        Device device1 = storager.queryVideoDevice(device.getDeviceId());
        if (device.isFirsRegister()) {
            logger.info("[{}] 首次注册,查询设备信息以及通道信息", device.getDeviceId());
            try {
                Thread.sleep(100);
                cmder.deviceInfoQuery(device);
                Thread.sleep(100);
                cmder.catalogQuery(device, null);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
//        // TODO 此处错误无法获取到通道
//        Device device1 = storager.queryVideoDevice(device.getDeviceId());
//        if (device.isFirsRegister()) {
//            logger.info("[{}] 首次注册,查询设备信息以及通道信息", device.getDeviceId());
//            try {
//                Thread.sleep(100);
//                cmder.deviceInfoQuery(device);
//                Thread.sleep(100);
//                cmder.catalogQuery(device, null);
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }
//        }
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java
@@ -220,4 +220,6 @@
    public void setDialog(byte[] dialog) {
        this.dialog = dialog;
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java
New file
@@ -0,0 +1,37 @@
package com.genersoft.iot.vmp.gb28181.bean;
import org.springframework.stereotype.Component;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class SubscribeHolder {
    private static ConcurrentHashMap<String, SubscribeInfo> catalogMap = new ConcurrentHashMap<>();
    private static ConcurrentHashMap<String, SubscribeInfo> mobilePositionMap = new ConcurrentHashMap<>();
    public void putCatalogSubscribe(String platformId, SubscribeInfo subscribeInfo) {
        catalogMap.put(platformId, subscribeInfo);
    }
    public SubscribeInfo getCatalogSubscribe(String platformId) {
        return catalogMap.get(platformId);
    }
    public void removeCatalogSubscribe(String platformId) {
        catalogMap.remove(platformId);
    }
    public void putMobilePositionSubscribe(String platformId, SubscribeInfo subscribeInfo) {
        mobilePositionMap.put(platformId, subscribeInfo);
    }
    public SubscribeInfo getMobilePositionSubscribe(String platformId) {
        return mobilePositionMap.get(platformId);
    }
    public void removeMobilePositionSubscribe(String platformId) {
        mobilePositionMap.remove(platformId);
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java
@@ -1,13 +1,15 @@
package com.genersoft.iot.vmp.gb28181.bean;
import com.genersoft.iot.vmp.utils.SerializeUtils;
import javax.sip.Dialog;
import javax.sip.RequestEvent;
import javax.sip.ServerTransaction;
import javax.sip.header.*;
import javax.sip.message.Request;
public class SubscribeInfo {
    public SubscribeInfo() {
    }
    public SubscribeInfo(RequestEvent evt, String id) {
        this.id = id;
@@ -23,6 +25,8 @@
        this.eventType = eventHeader.getEventType();
        ViaHeader viaHeader = (ViaHeader)request.getHeader(ViaHeader.NAME);
        this.branch = viaHeader.getBranch();
        this.transaction = evt.getServerTransaction();
        this.dialog = evt.getDialog();
    }
    private String id;
@@ -33,6 +37,8 @@
    private String fromTag;
    private String toTag;
    private String branch;
    private ServerTransaction transaction;
    private Dialog dialog;
    public String getId() {
        return id;
@@ -97,4 +103,20 @@
    public void setBranch(String branch) {
        this.branch = branch;
    }
    public ServerTransaction getTransaction() {
        return transaction;
    }
    public void setTransaction(ServerTransaction transaction) {
        this.transaction = transaction;
    }
    public Dialog getDialog() {
        return dialog;
    }
    public void setDialog(Dialog dialog) {
        this.dialog = dialog;
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java
@@ -33,12 +33,20 @@
    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;
    
    public void onlineEventPublish(Device device, String from, int expires) {
        OnlineEvent onEvent = new OnlineEvent(this);
        onEvent.setDevice(device);
        onEvent.setFrom(from);
        onEvent.setExpires(expires);
        applicationEventPublisher.publishEvent(onEvent);
    }
    public void onlineEventPublish(Device device, String from) {
        OnlineEvent onEvent = new OnlineEvent(this);
        onEvent.setDevice(device);
        onEvent.setFrom(from);
        applicationEventPublisher.publishEvent(onEvent);
    }
        applicationEventPublisher.publishEvent(onEvent);
    }
    
    public void outlineEventPublish(String deviceId, String from){
        OfflineEvent outEvent = new OfflineEvent(this);
@@ -107,6 +115,12 @@
    }
    /**
     *
     * @param platformId
     * @param deviceChannels
     * @param type
     */
    public void catalogEventPublish(String platformId, List<DeviceChannel> deviceChannels, String type) {
        CatalogEvent outEvent = new CatalogEvent(this);
        List<DeviceChannel> channels = new ArrayList<>();
src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/OfflineEventListener.java
@@ -91,7 +91,7 @@
        // 离线释放所有ssrc
        List<SsrcTransaction> ssrcTransactions = streamSession.getSsrcTransactionForAll(event.getDeviceId(), null, null, null);
        if (ssrcTransactions.size() > 0) {
        if (ssrcTransactions != null && ssrcTransactions.size() > 0) {
            for (SsrcTransaction ssrcTransaction : ssrcTransactions) {
                mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc());
                mediaServerService.closeRTPServer(event.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getStream());
src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEvent.java
@@ -23,6 +23,8 @@
    
    private String from;
    private int  expires;
    public Device getDevice() {
        return device;
    }
@@ -38,5 +40,12 @@
    public void setFrom(String from) {
        this.from = from;
    }
    public int getExpires() {
        return expires;
    }
    public void setExpires(int expires) {
        this.expires = expires;
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java
@@ -6,6 +6,7 @@
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.storager.dao.dto.User;
import org.slf4j.Logger;
@@ -51,6 +52,9 @@
    @Autowired
    private EventPublisher eventPublisher;
    @Autowired
    private SIPCommander cmder;
    private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    @Override
@@ -62,13 +66,21 @@
        Device device = event.getDevice();
        if (device == null) return;
        String key = VideoManagerConstants.KEEPLIVEKEY_PREFIX + userSetup.getServerId() + "_" + event.getDevice().getDeviceId();
        Device deviceInStore = storager.queryVideoDevice(device.getDeviceId());
        device.setOnline(1);
        // 处理上线监听
        storager.updateDevice(device);
        switch (event.getFrom()) {
        // 注册时触发的在线事件,先在redis中增加超时超时监听
        case VideoManagerConstants.EVENT_ONLINE_REGISTER:
            // 超时时间
            redis.set(key, event.getDevice().getDeviceId(), sipConfig.getKeepaliveTimeOut());
            device.setRegisterTime(format.format(System.currentTimeMillis()));
            if (deviceInStore == null) { //第一次上线
                logger.info("[{}] 首次注册,查询设备信息以及通道信息", device.getDeviceId());
                cmder.deviceInfoQuery(device);
                cmder.catalogQuery(device, null);
            }
            break;
        // 设备主动发送心跳触发的在线事件
        case VideoManagerConstants.EVENT_ONLINE_KEEPLIVE:
@@ -87,19 +99,11 @@
            break;
        }
        device.setOnline(1);
        Device deviceInStore = storager.queryVideoDevice(device.getDeviceId());
        if (deviceInStore != null && deviceInStore.getOnline() == 0) {
            List<DeviceChannel> deviceChannelList = storager.queryOnlineChannelsByDeviceId(device.getDeviceId());
            eventPublisher.catalogEventPublish(null, deviceChannelList, CatalogEvent.ON);
        }
        // 处理上线监听
        storager.updateDevice(device);
        List<DeviceChannel> deviceChannelList = storager.queryOnlineChannelsByDeviceId(device.getDeviceId());
        eventPublisher.catalogEventPublish(null, deviceChannelList, CatalogEvent.ON);
        // 上线添加订阅
        if (device.getSubscribeCycleForCatalog() > 0) {
            deviceService.addCatalogSubscribe(device);
        }
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java
@@ -52,6 +52,9 @@
    @Autowired
    private IGbStreamService gbStreamService;
    @Autowired
    private SubscribeHolder subscribeHolder;
    @Override
    public void onApplicationEvent(CatalogEvent event) {
        SubscribeInfo subscribe = null;
@@ -62,7 +65,8 @@
            parentPlatform = storager.queryParentPlatByServerGBId(event.getPlatformId());
            if (parentPlatform != null && !parentPlatform.isStatus())return;
            String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() +  "_Catalog_" + event.getPlatformId();
            subscribe = redisCatchStorage.getSubscribe(key);
//            subscribe = redisCatchStorage.getSubscribe(key);
            subscribe = subscribeHolder.getCatalogSubscribe(event.getPlatformId());
            if (subscribe == null) {
                logger.debug("发送订阅消息时发现订阅信息已经不存在");
@@ -114,7 +118,8 @@
                        if (parentPlatforms != null && parentPlatforms.size() > 0) {
                            for (ParentPlatform platform : parentPlatforms) {
                                String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() +  "_Catalog_" + platform.getServerGBId();
                                SubscribeInfo subscribeInfo = redisCatchStorage.getSubscribe(key);
//                                SubscribeInfo subscribeInfo = redisCatchStorage.getSubscribe(key);
                                SubscribeInfo subscribeInfo = subscribeHolder.getCatalogSubscribe(platform.getServerGBId());
                                if (subscribeInfo == null) continue;
                                logger.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), gbId);
                                List<DeviceChannel> deviceChannelList = new ArrayList<>();
@@ -153,8 +158,9 @@
                        List<ParentPlatform> parentPlatforms = parentPlatformMap.get(gbId);
                        if (parentPlatforms != null && parentPlatforms.size() > 0) {
                            for (ParentPlatform platform : parentPlatforms) {
                                String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_Catalog_" + platform.getServerGBId();
                                SubscribeInfo subscribeInfo = redisCatchStorage.getSubscribe(key);
//                                String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_Catalog_" + platform.getServerGBId();
//                                SubscribeInfo subscribeInfo = redisCatchStorage.getSubscribe(key);
                                SubscribeInfo subscribeInfo = subscribeHolder.getCatalogSubscribe(event.getPlatformId());
                                if (subscribeInfo == null) continue;
                                logger.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), gbId);
                                List<DeviceChannel> deviceChannelList = new ArrayList<>();
src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java
@@ -2,6 +2,7 @@
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
@@ -16,25 +17,28 @@
    private IRedisCatchStorage redisCatchStorage;
    private IVideoManagerStorager storager;
    private ISIPCommanderForPlatform sipCommanderForPlatform;
    private SubscribeHolder subscribeHolder;
    private String platformId;
    private String sn;
    private String key;
    private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    public GPSSubscribeTask(IRedisCatchStorage redisCatchStorage, ISIPCommanderForPlatform sipCommanderForPlatform, IVideoManagerStorager storager, String platformId, String sn, String key) {
    public GPSSubscribeTask(IRedisCatchStorage redisCatchStorage, ISIPCommanderForPlatform sipCommanderForPlatform, IVideoManagerStorager storager, String platformId, String sn, String key, SubscribeHolder subscribeInfo) {
        this.redisCatchStorage = redisCatchStorage;
        this.storager = storager;
        this.platformId = platformId;
        this.sn = sn;
        this.key = key;
        this.sipCommanderForPlatform = sipCommanderForPlatform;
        this.subscribeHolder = subscribeInfo;
    }
    @Override
    public void run() {
        SubscribeInfo subscribe = redisCatchStorage.getSubscribe(key);
        SubscribeInfo subscribe = subscribeHolder.getMobilePositionSubscribe(platformId);
        if (subscribe != null) {
            ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformId);
            if (parentPlatform == null || parentPlatform.isStatus()) {
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java
@@ -94,6 +94,7 @@
        Response response = responseEvent.getResponse();
        logger.debug("\n收到响应:\n{}", responseEvent.getResponse());
        int status = response.getStatusCode();
        if (((status >= 200) && (status < 300)) || status == 401) { // Success!
            CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME);
            String method = cseqHeader.getMethod();
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java
@@ -236,57 +236,57 @@
        return request;
    }
    public Request createNotifyRequest(ParentPlatform parentPlatform, String content, CallIdHeader callIdHeader, String viaTag, SubscribeInfo subscribeInfo) throws PeerUnavailableException, ParseException, InvalidArgumentException {
        Request request = null;
        // sipuri
        SipURI requestURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerIP()+ ":" + parentPlatform.getServerPort());
        // via
        ArrayList<ViaHeader> viaHeaders = new ArrayList<ViaHeader>();
        ViaHeader viaHeader = sipFactory.createHeaderFactory().createViaHeader(parentPlatform.getDeviceIp(), Integer.parseInt(parentPlatform.getDevicePort()),
                parentPlatform.getTransport(), subscribeInfo.getBranch());
        viaHeader.setRPort();
        viaHeaders.add(viaHeader);
        // from
        SipURI fromSipURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getDeviceGBId(),
                parentPlatform.getDeviceIp() + ":" + parentPlatform.getDevicePort());
        Address fromAddress = sipFactory.createAddressFactory().createAddress(fromSipURI);
        FromHeader fromHeader = sipFactory.createHeaderFactory().createFromHeader(fromAddress, subscribeInfo.getToTag());
        // to
        SipURI toSipURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerGBDomain());
        Address toAddress = sipFactory.createAddressFactory().createAddress(toSipURI);
        ToHeader toHeader = sipFactory.createHeaderFactory().createToHeader(toAddress, subscribeInfo.getFromTag());
        // Forwards
        MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70);
        // ceq
        CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(Request.NOTIFY), Request.NOTIFY);
        MessageFactoryImpl messageFactory = (MessageFactoryImpl) sipFactory.createMessageFactory();
        // 设置编码, 防止中文乱码
        messageFactory.setDefaultContentEncodingCharset("gb2312");
        request = messageFactory.createRequest(requestURI, Request.NOTIFY, callIdHeader, cSeqHeader, fromHeader,
                toHeader, viaHeaders, maxForwards);
        List<String> agentParam = new ArrayList<>();
        agentParam.add("wvp-pro");
        UserAgentHeader userAgentHeader = sipFactory.createHeaderFactory().createUserAgentHeader(agentParam);
        request.addHeader(userAgentHeader);
        EventHeader event = sipFactory.createHeaderFactory().createEventHeader(subscribeInfo.getEventType());
        if (subscribeInfo.getEventId() != null) {
            event.setEventId(subscribeInfo.getEventId());
        }
        request.addHeader(event);
        SubscriptionStateHeader active = sipFactory.createHeaderFactory().createSubscriptionStateHeader("active");
        request.setHeader(active);
        String sipAddress = sipConfig.getIp() + ":" + sipConfig.getPort();
        Address concatAddress = sipFactory.createAddressFactory().createAddress(sipFactory.createAddressFactory()
                .createSipURI(parentPlatform.getDeviceGBId(), sipAddress));
        request.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress));
        ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml");
        request.setContent(content, contentTypeHeader);
        return request;
    }
//    public Request createNotifyRequest(ParentPlatform parentPlatform, String content, CallIdHeader callIdHeader, String viaTag, String fromTag, SubscribeInfo subscribeInfo) throws PeerUnavailableException, ParseException, InvalidArgumentException {
//        Request request = null;
//        // sipuri
//        SipURI requestURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerIP()+ ":" + parentPlatform.getServerPort());
//        // via
//        ArrayList<ViaHeader> viaHeaders = new ArrayList<ViaHeader>();
//        ViaHeader viaHeader = sipFactory.createHeaderFactory().createViaHeader(parentPlatform.getDeviceIp(), Integer.parseInt(parentPlatform.getDevicePort()),
//                parentPlatform.getTransport(), viaTag);
//        viaHeader.setRPort();
//        viaHeaders.add(viaHeader);
//        // from
//        SipURI fromSipURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getDeviceGBId(),
//                parentPlatform.getDeviceIp() + ":" + parentPlatform.getDevicePort());
//        Address fromAddress = sipFactory.createAddressFactory().createAddress(fromSipURI);
//        FromHeader fromHeader = sipFactory.createHeaderFactory().createFromHeader(fromAddress, fromTag);
//        // to
//        SipURI toSipURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerGBDomain());
//        Address toAddress = sipFactory.createAddressFactory().createAddress(toSipURI);
//        ToHeader toHeader = sipFactory.createHeaderFactory().createToHeader(toAddress, subscribeInfo.getFromTag());
//
//        // Forwards
//        MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70);
//        // ceq
//        CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(Request.NOTIFY), Request.NOTIFY);
//        MessageFactoryImpl messageFactory = (MessageFactoryImpl) sipFactory.createMessageFactory();
//        // 设置编码, 防止中文乱码
//        messageFactory.setDefaultContentEncodingCharset("gb2312");
//        request = messageFactory.createRequest(requestURI, Request.NOTIFY, callIdHeader, cSeqHeader, fromHeader,
//                toHeader, viaHeaders, maxForwards);
//        List<String> agentParam = new ArrayList<>();
//        agentParam.add("wvp-pro");
//        UserAgentHeader userAgentHeader = sipFactory.createHeaderFactory().createUserAgentHeader(agentParam);
//        request.addHeader(userAgentHeader);
//
//        EventHeader event = sipFactory.createHeaderFactory().createEventHeader(subscribeInfo.getEventType());
//        if (subscribeInfo.getEventId() != null) {
//            event.setEventId(subscribeInfo.getEventId());
//        }
//
//        request.addHeader(event);
//
//        SubscriptionStateHeader active = sipFactory.createHeaderFactory().createSubscriptionStateHeader("active");
//        request.setHeader(active);
//
//        String sipAddress = sipConfig.getIp() + ":" + sipConfig.getPort();
//        Address concatAddress = sipFactory.createAddressFactory().createAddress(sipFactory.createAddressFactory()
//                .createSipURI(parentPlatform.getDeviceGBId(), sipAddress));
//        request.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress));
//
//        ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml");
//        request.setContent(content, contentTypeHeader);
//        return request;
//    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java
@@ -215,6 +215,9 @@
        // Event
        EventHeader eventHeader = sipFactory.createHeaderFactory().createEventHeader(event);
        int random = (int)Math.random() * 1000000000;
        eventHeader.setEventId(random + "");
        request.addHeader(eventHeader);
        ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml");
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
@@ -1518,7 +1518,7 @@
            // 有效时间默认为60秒以上
            Request request = headerProvider.createSubscribeRequest(device, cmdXml.toString(), "z9hG4bK-viaPos-" + tm,
                    "fromTagPos" + tm, null, device.getSubscribeCycleForCatalog() + 60, "Catalog" ,
                    "fromTagPos" + tm, null, device.getSubscribeCycleForCatalog(), "Catalog" ,
                    callIdHeader);
            transmitRequest(device, request, errorEvent, okEvent);
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
@@ -27,9 +27,7 @@
import javax.sip.*;
import javax.sip.address.SipURI;
import javax.sip.header.CallIdHeader;
import javax.sip.header.ViaHeader;
import javax.sip.header.WWWAuthenticateHeader;
import javax.sip.header.*;
import javax.sip.message.Request;
import java.lang.reflect.Field;
import java.text.ParseException;
@@ -68,6 +66,9 @@
    @Qualifier(value="udpSipProvider")
    private SipProviderImpl udpSipProvider;
    @Autowired
    private SipFactory sipFactory;
    @Override
    public boolean register(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) {
        return register(parentPlatform, null, null, errorEvent, okEvent, false);
@@ -88,7 +89,7 @@
    public boolean register(ParentPlatform parentPlatform, @Nullable String callId, @Nullable WWWAuthenticateHeader www,
                            SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent, boolean registerAgain) {
        try {
            Request request = null;
            Request request;
            String tm = Long.toString(System.currentTimeMillis());
            if (!registerAgain ) {
                //        //callid
@@ -364,16 +365,18 @@
                    : udpSipProvider.getNewCallId();
            callIdHeader.setCallId(subscribeInfo.getCallId());
            String tm = Long.toString(System.currentTimeMillis());
//
            sendNotify(parentPlatform, deviceStatusXml.toString(), subscribeInfo, eventResult -> {
                logger.error("发送NOTIFY通知消息失败。错误:{} {}", eventResult.statusCode, eventResult.msg);
            }, null);
            Request request = headerProviderPlarformProvider.createNotifyRequest(parentPlatform,
                    deviceStatusXml.toString(),callIdHeader,
                    "z9hG4bK-" + UUID.randomUUID().toString().replace("-", ""),  subscribeInfo);
            transmitRequest(parentPlatform, request);
        } catch (SipException | ParseException | InvalidArgumentException e) {
        } catch (SipException | ParseException  e) {
            e.printStackTrace();
            return false;
        } catch (NoSuchFieldException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        }
        return true;
    }
@@ -386,36 +389,88 @@
        if (index == null) {
            index = 0;
        }
        if (index >= deviceChannels.size()) {
            return true;
        }
        try {
            if (index > deviceChannels.size() - 1) {
                return true;
            }
            Request request = getCatalogNotifyRequestForCatalogAddOrUpdate(parentPlatform, deviceChannels.get(index), deviceChannels.size(), type, subscribeInfo);
            index += 1;
            Integer finalIndex = index;
            transmitRequest(parentPlatform, request, null, (eventResult -> {
                sendNotifyForCatalogAddOrUpdate(type, parentPlatform, deviceChannels, subscribeInfo, finalIndex);
            String catalogXmlContent = getCatalogXmlContentForCatalogAddOrUpdate(parentPlatform, deviceChannels.get(index ), deviceChannels.size(), type, subscribeInfo);
            sendNotify(parentPlatform, catalogXmlContent, subscribeInfo, eventResult -> {
                logger.error("发送NOTIFY通知消息失败。错误:{} {}", eventResult.statusCode, eventResult.msg);
            }, (eventResult -> {
                sendNotifyForCatalogAddOrUpdate(type, parentPlatform, deviceChannels, subscribeInfo, finalIndex + 1);
            }));
        } catch (SipException | ParseException | InvalidArgumentException e) {
        } catch (SipException | ParseException e) {
            e.printStackTrace();
            return false;
        } catch (NoSuchFieldException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        }
        return true;
    }
    private Request getCatalogNotifyRequestForCatalogAddOrUpdate(ParentPlatform parentPlatform, DeviceChannel channel, int size, String type,
                                            SubscribeInfo subscribeInfo) throws ParseException, InvalidArgumentException,
            PeerUnavailableException {
        String catalogXmlContent = getCatalogXmlContentForCatalogAddOrUpdate(parentPlatform, channel, size, type, subscribeInfo);
    private void sendNotify(ParentPlatform parentPlatform, String catalogXmlContent,
                                   SubscribeInfo subscribeInfo, SipSubscribe.Event errorEvent,  SipSubscribe.Event okEvent )
            throws NoSuchFieldException, IllegalAccessException, SipException, ParseException {
        Dialog dialog  = subscribeInfo.getDialog();
        Request notifyRequest = dialog.createRequest(Request.NOTIFY);
        CallIdHeader callIdHeader = parentPlatform.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
                : udpSipProvider.getNewCallId();
        callIdHeader.setCallId(subscribeInfo.getCallId());
        Request request = headerProviderPlarformProvider.createNotifyRequest(parentPlatform, catalogXmlContent,
                callIdHeader, "z9hG4bK-" + UUID.randomUUID().toString().replace("-", ""), subscribeInfo);
        return request;
        ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml");
        notifyRequest.setContent(catalogXmlContent, contentTypeHeader);
        SubscriptionStateHeader subscriptionState = sipFactory.createHeaderFactory()
                .createSubscriptionStateHeader(SubscriptionStateHeader.ACTIVE);
        notifyRequest.addHeader(subscriptionState);
        EventHeader event = sipFactory.createHeaderFactory().createEventHeader(subscribeInfo.getEventType());
        if (subscribeInfo.getEventId() != null) {
            event.setEventId(subscribeInfo.getEventId());
        }
        notifyRequest.addHeader(event);
        SipURI sipURI = (SipURI) notifyRequest.getRequestURI();
        SIPRequest request = (SIPRequest) subscribeInfo.getTransaction().getRequest();
        sipURI.setHost(request.getRemoteAddress().getHostName());
        sipURI.setPort(request.getRemotePort());
        ClientTransaction transaction = null;
        if ("TCP".equals(parentPlatform.getTransport())) {
            transaction = tcpSipProvider.getNewClientTransaction(notifyRequest);
        } else if ("UDP".equals(parentPlatform.getTransport())) {
            transaction = udpSipProvider.getNewClientTransaction(notifyRequest);
        }
        // 添加错误订阅
        if (errorEvent != null) {
            sipSubscribe.addErrorSubscribe(subscribeInfo.getCallId(), errorEvent);
        }
        // 添加订阅
        if (okEvent != null) {
            sipSubscribe.addOkSubscribe(subscribeInfo.getCallId(), okEvent);
        }
        if (transaction == null) {
            logger.error("平台{}的Transport错误:{}",parentPlatform.getServerGBId(), parentPlatform.getTransport());
            return;
        }
        dialog.sendRequest(transaction);
    }
//    private Request getCatalogNotifyRequestForCatalogAddOrUpdate(ParentPlatform parentPlatform, DeviceChannel channel, int size, String type,
//                                            SubscribeInfo subscribeInfo) throws ParseException, InvalidArgumentException,
//            PeerUnavailableException, NoSuchFieldException, IllegalAccessException {
//        String catalogXmlContent = getCatalogXmlContentForCatalogAddOrUpdate(parentPlatform, channel, size, type, subscribeInfo);
//
//        CallIdHeader callIdHeader = parentPlatform.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
//                : udpSipProvider.getNewCallId();
//        callIdHeader.setCallId(subscribeInfo.getCallId());
//        String tm = Long.toString(System.currentTimeMillis());
//
//        Request request = headerProviderPlarformProvider.createNotifyRequest(parentPlatform, catalogXmlContent,
//                callIdHeader, "z9hG4bK-" + UUID.randomUUID().toString().replace("-", ""),"FromRegister" + tm, subscribeInfo);
//        return request;
//    }
    private  String getCatalogXmlContentForCatalogAddOrUpdate(ParentPlatform parentPlatform, DeviceChannel channel, int sumNum, String type, SubscribeInfo subscribeInfo) {
        StringBuffer catalogXml = new StringBuffer(600);
@@ -465,34 +520,31 @@
        if (index == null) {
            index = 0;
        }
        if (index > deviceChannels.size() - 1) {
        if (index >= deviceChannels.size()) {
            return true;
        }
        try {
            String catalogXml = getCatalogXmlContentForCatalogOther(deviceChannels.get(index), type, parentPlatform);
            CallIdHeader callIdHeader = parentPlatform.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
                        : udpSipProvider.getNewCallId();
                Request request = headerProviderPlarformProvider.createNotifyRequest(parentPlatform, catalogXml,
                        callIdHeader,
                        "z9hG4bK-" + UUID.randomUUID().toString().replace("-", ""), subscribeInfo);
                index += 1;
            Integer finalIndex = index;
            transmitRequest(parentPlatform, request, null, eventResult -> {
                sendNotifyForCatalogOther(type, parentPlatform, deviceChannels, subscribeInfo, finalIndex);
            });
            String catalogXmlContent = getCatalogXmlContentForCatalogOther(parentPlatform, deviceChannels.get(index), type);
            sendNotify(parentPlatform, catalogXmlContent, subscribeInfo, eventResult -> {
                logger.error("发送NOTIFY通知消息失败。错误:{} {}", eventResult.statusCode, eventResult.msg);
            }, (eventResult -> {
                sendNotifyForCatalogOther(type, parentPlatform, deviceChannels, subscribeInfo, finalIndex + 1);
            }));
        } catch (SipException e) {
            e.printStackTrace();
        } catch (InvalidArgumentException e) {
            e.printStackTrace();
        } catch (ParseException e) {
            e.printStackTrace();
        } catch (NoSuchFieldException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        }
        return true;
    }
    private String getCatalogXmlContentForCatalogOther(DeviceChannel channel, String type, ParentPlatform parentPlatform) {
    private String getCatalogXmlContentForCatalogOther(ParentPlatform parentPlatform, DeviceChannel channel, String type) {
        if (parentPlatform.getServerGBId().equals(channel.getParentId())) {
            channel.setParentId(parentPlatform.getDeviceGBId());
        }
@@ -594,6 +646,7 @@
                        byte[] transactionByteArray = sendRtpItem.getTransaction();
                        ClientTransaction clientTransaction = (ClientTransaction) SerializeUtils.deSerialize(transactionByteArray);
                        Request byeRequest = dialog.createRequest(Request.BYE);
                        SipURI byeURI = (SipURI) byeRequest.getRequestURI();
                        SIPRequest request = (SIPRequest) clientTransaction.getRequest();
                        byeURI.setHost(request.getRemoteAddress().getHostName());
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
@@ -233,6 +233,7 @@
     */
    private void processNotifyCatalogList(RequestEvent evt) {
        try {
            System.out.println(343434);
            FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
            String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
@@ -308,12 +309,6 @@
                    eventPublisher.catalogEventPublish(null, channel, eventElement.getText().toUpperCase());
                }
                // RequestMessage msg = new RequestMessage();
                // msg.setDeviceId(deviceId);
                // msg.setType(DeferredResultHolder.CALLBACK_CMD_CATALOG);
                // msg.setData(device);
                // deferredResultHolder.invokeResult(msg);
                if (offLineDetector.isOnline(deviceId)) {
                    publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_MESSAGE);
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java
@@ -81,7 +81,7 @@
            String requestAddress = evtExt.getRemoteIpAddress() + ":" + evtExt.getRemotePort();
            logger.info("[{}] 收到注册请求,开始处理", requestAddress);
            Request request = evt.getRequest();
            ExpiresHeader expiresHeader = (ExpiresHeader) request.getHeader(Expires.NAME);
            Response response = null; 
            boolean passwordCorrect = false;
            // 注册标志  0:未携带授权头或者密码错误  1:注册成功   2:注销成功
@@ -128,7 +128,7 @@
                    dateHeader.setDate(wvpSipDate);
                    response.addHeader(dateHeader);
                    ExpiresHeader expiresHeader = (ExpiresHeader) request.getHeader(Expires.NAME);
                    if (expiresHeader == null) {
                        response = getMessageFactory().createResponse(Response.BAD_REQUEST, request);
                        ServerTransaction serverTransaction = getServerTransaction(evt);
@@ -193,9 +193,7 @@
            // 保存到redis
            if (registerFlag == 1 ) {
                logger.info("[{}] 注册成功! deviceId:" + device.getDeviceId(), requestAddress);
                publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_REGISTER);
                // 重新注册更新设备和通道,以免设备替换或更新后信息无法更新
                handler.onRegister(device);
                publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_REGISTER, expiresHeader.getExpires());
            } else if (registerFlag == 2) {
                logger.info("[{}] 注销成功! deviceId:" + device.getDeviceId(), requestAddress);
                publisher.outlineEventPublish(device.getDeviceId(), VideoManagerConstants.EVENT_OUTLINE_UNREGISTER);
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java
@@ -5,6 +5,7 @@
import com.genersoft.iot.vmp.conf.UserSetup;
import com.genersoft.iot.vmp.gb28181.bean.CmdType;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo;
import com.genersoft.iot.vmp.gb28181.task.GPSSubscribeTask;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
@@ -15,18 +16,19 @@
import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import com.genersoft.iot.vmp.utils.SerializeUtils;
import gov.nist.javax.sip.SipProviderImpl;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.ServerTransaction;
import javax.sip.SipException;
import javax.sip.*;
import javax.sip.header.ExpiresHeader;
import javax.sip.header.ToHeader;
import javax.sip.message.Request;
@@ -54,11 +56,25 @@
    @Autowired
    private IVideoManagerStorager storager;
    @Lazy
    @Autowired
    @Qualifier(value="tcpSipProvider")
    private SipProviderImpl tcpSipProvider;
    @Lazy
    @Autowired
    @Qualifier(value="udpSipProvider")
    private SipProviderImpl udpSipProvider;
    @Autowired
    private DynamicTask dynamicTask;
    @Autowired
    private UserSetup userSetup;
    @Autowired
    private SubscribeHolder subscribeHolder;
    @Override
    public void afterPropertiesSet() throws Exception {
@@ -136,16 +152,17 @@
                .append("</Response>\r\n");
        if (subscribeInfo.getExpires() > 0) {
            if (redisCatchStorage.getSubscribe(key) != null) {
            if (subscribeHolder.getMobilePositionSubscribe(platformId) != null) {
                dynamicTask.stop(key);
            }
            String interval = XmlUtil.getText(rootElement, "Interval"); // GPS上报时间间隔
            dynamicTask.startCron(key, new GPSSubscribeTask(redisCatchStorage, sipCommanderForPlatform, storager,  platformId, sn, key), Integer.parseInt(interval));
            redisCatchStorage.updateSubscribe(key, subscribeInfo);
            dynamicTask.startCron(key, new GPSSubscribeTask(redisCatchStorage, sipCommanderForPlatform, storager,  platformId, sn, key, subscribeHolder), Integer.parseInt(interval));
            subscribeHolder.putMobilePositionSubscribe(platformId, subscribeInfo);
//            redisCatchStorage.updateSubscribe(key, subscribeInfo);
        }else if (subscribeInfo.getExpires() == 0) {
            dynamicTask.stop(key);
            redisCatchStorage.delSubscribe(key);
//            redisCatchStorage.delSubscribe(key);
            subscribeHolder.removeMobilePositionSubscribe(platformId);
        }
        try {
@@ -168,10 +185,19 @@
    }
    private void processNotifyCatalogList(RequestEvent evt, Element rootElement) {
    private void processNotifyCatalogList(RequestEvent evt, Element rootElement) throws SipException {
        String platformId = SipUtils.getUserIdFromFromHeader(evt.getRequest());
        String deviceID = XmlUtil.getText(rootElement, "DeviceID");
        ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
        SubscribeInfo subscribeInfo = new SubscribeInfo(evt, platformId);
        if (evt.getServerTransaction() == null) {
            ServerTransaction serverTransaction = platform.getTransport().equals("TCP") ? tcpSipProvider.getNewServerTransaction(evt.getRequest())
                    : udpSipProvider.getNewServerTransaction(evt.getRequest());
            subscribeInfo.setTransaction(serverTransaction);
            Dialog dialog = serverTransaction.getDialog();
            dialog.terminateOnBye(false);
            subscribeInfo.setDialog(dialog);
        }
        String sn = XmlUtil.getText(rootElement, "SN");
        String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() +  "_Catalog_" + platformId;
        logger.info("接收到{}的Catalog订阅", platformId);
@@ -185,9 +211,11 @@
                .append("</Response>\r\n");
        if (subscribeInfo.getExpires() > 0) {
            redisCatchStorage.updateSubscribe(key, subscribeInfo);
//            redisCatchStorage.updateSubscribe(key, subscribeInfo);
            subscribeHolder.putCatalogSubscribe(platformId, subscribeInfo);
        }else if (subscribeInfo.getExpires() == 0) {
            redisCatchStorage.delSubscribe(key);
//            redisCatchStorage.delSubscribe(key);
            subscribeHolder.removeCatalogSubscribe(platformId);
        }
        try {
@@ -195,7 +223,8 @@
            Response response = responseXmlAck(evt, resultXml.toString(), parentPlatform);
            ToHeader toHeader = (ToHeader)response.getHeader(ToHeader.NAME);
            subscribeInfo.setToTag(toHeader.getTag());
            redisCatchStorage.updateSubscribe(key, subscribeInfo);
//            redisCatchStorage.updateSubscribe(key, subscribeInfo);
            subscribeHolder.putCatalogSubscribe(platformId, subscribeInfo);
        } catch (SipException e) {
            e.printStackTrace();
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageRequestProcessor.java
@@ -21,6 +21,7 @@
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.header.CSeqHeader;
import javax.sip.header.CallIdHeader;
import javax.sip.message.Response;
import java.text.ParseException;
@@ -64,6 +65,11 @@
        String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest());
        CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME);
        // 查询设备是否存在
        CSeqHeader cseqHeader = (CSeqHeader) evt.getRequest().getHeader(CSeqHeader.NAME);
        String method = cseqHeader.getMethod();
        if (method.equals("MESSAGE")) {
            System.out.println();
        }
        Device device = redisCatchStorage.getDevice(deviceId);
        // 查询上级平台是否存在
        ParentPlatform parentPlatform = storage.queryParentPlatByServerGBId(deviceId);
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java
@@ -85,41 +85,54 @@
                return;
            }
            int sumNum = Integer.parseInt(sumNumElement.getText());
            Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
            if (deviceListIterator != null) {
                List<DeviceChannel> channelList = new ArrayList<>();
                // 遍历DeviceList
                while (deviceListIterator.hasNext()) {
                    Element itemDevice = deviceListIterator.next();
                    Element channelDeviceElement = itemDevice.element("DeviceID");
                    if (channelDeviceElement == null) {
                        continue;
            if (sumNum == 0) {
                // 数据已经完整接收
                storager.cleanChannelsForDevice(device.getDeviceId());
                RequestMessage msg = new RequestMessage();
                msg.setKey(key);
                WVPResult<Object> result = new WVPResult<>();
                result.setCode(0);
                result.setData(device);
                msg.setData(result);
                result.setMsg("更新成功,共0条");
                deferredResultHolder.invokeAllResult(msg);
                catalogDataCatch.del(key);
            }else {
                Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
                if (deviceListIterator != null) {
                    List<DeviceChannel> channelList = new ArrayList<>();
                    // 遍历DeviceList
                    while (deviceListIterator.hasNext()) {
                        Element itemDevice = deviceListIterator.next();
                        Element channelDeviceElement = itemDevice.element("DeviceID");
                        if (channelDeviceElement == null) {
                            continue;
                        }
                        DeviceChannel deviceChannel = XmlUtil.channelContentHander(itemDevice);
                        deviceChannel.setDeviceId(device.getDeviceId());
                        logger.debug("收到来自设备【{}】的通道: {}【{}】", device.getDeviceId(), deviceChannel.getName(), deviceChannel.getChannelId());
                        channelList.add(deviceChannel);
                    }
                    DeviceChannel deviceChannel = XmlUtil.channelContentHander(itemDevice);
                    deviceChannel.setDeviceId(device.getDeviceId());
                    logger.debug("收到来自设备【{}】的通道: {}【{}】", device.getDeviceId(), deviceChannel.getName(), deviceChannel.getChannelId());
                    channelList.add(deviceChannel);
                }
                catalogDataCatch.put(key, sumNum, device, channelList);
                if (catalogDataCatch.get(key).size() == sumNum) {
                    // 数据已经完整接收
                    boolean resetChannelsResult = storager.resetChannels(device.getDeviceId(), catalogDataCatch.get(key));
                    RequestMessage msg = new RequestMessage();
                    msg.setKey(key);
                    WVPResult<Object> result = new WVPResult<>();
                    result.setCode(0);
                    result.setData(device);
                    if (resetChannelsResult) {
                        result.setMsg("更新成功,共" + sumNum + "条,已更新" + catalogDataCatch.get(key).size() + "条");
                    }else {
                        result.setMsg("接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.get(key).size() + "条");
                    catalogDataCatch.put(key, sumNum, device, channelList);
                    if (catalogDataCatch.get(key).size() == sumNum) {
                        // 数据已经完整接收
                        boolean resetChannelsResult = storager.resetChannels(device.getDeviceId(), catalogDataCatch.get(key));
                        RequestMessage msg = new RequestMessage();
                        msg.setKey(key);
                        WVPResult<Object> result = new WVPResult<>();
                        result.setCode(0);
                        result.setData(device);
                        if (resetChannelsResult || sumNum ==0) {
                            result.setMsg("更新成功,共" + sumNum + "条,已更新" + catalogDataCatch.get(key).size() + "条");
                        }else {
                            result.setMsg("接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.get(key).size() + "条");
                        }
                        msg.setData(result);
                        deferredResultHolder.invokeAllResult(msg);
                        catalogDataCatch.del(key);
                    }
                    msg.setData(result);
                    deferredResultHolder.invokeAllResult(msg);
                    catalogDataCatch.del(key);
                }
                // 回复200 OK
                responseAck(evt, Response.OK);
                if (offLineDetector.isOnline(device.getDeviceId())) {
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
@@ -31,8 +31,8 @@
            return false;
        }
        if (dynamicTask.contains(device.getDeviceId())) {
            logger.info("[添加目录订阅] 设备{}的目录订阅以存在", device.getDeviceId());
            return false;
            // 存在则停止现有的,开启新的
            dynamicTask.stop(device.getDeviceId());
        }
        logger.info("[添加目录订阅] 设备{}", device.getDeviceId());
        // 添加目录订阅
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
@@ -513,6 +513,14 @@
        param.put("hook.on_server_keepalive",String.format("%s/on_server_keepalive", hookPrex));
        param.put("hook.timeoutSec","20");
        param.put("general.streamNoneReaderDelayMS","-1".equals(mediaServerItem.getStreamNoneReaderDelayMS())?"3600000":mediaServerItem.getStreamNoneReaderDelayMS() );
        // 推流断开后可以在超时时间内重新连接上继续推流,这样播放器会接着播放。
        // 置0关闭此特性(推流断开会导致立即断开播放器)
        // 此参数不应大于播放器超时时间
        // 优化此消息以更快的收到流注销事件
        param.put("general.continue_push_ms", "3000" );
        // 最多等待未初始化的Track时间,单位毫秒,超时之后会忽略未初始化的Track, 设置此选项优化那些音频错误的不规范流,
        // 等zlm支持给每个rtpServer设置关闭音频的时候可以不设置此选项
        param.put("general.wait_track_ready_ms", "3000" );
        JSONObject responseJSON = zlmresTfulUtils.setServerConfig(mediaServerItem, param);
@@ -620,6 +628,8 @@
    public void updateMediaServerKeepalive(String mediaServerId, JSONObject data) {
        MediaServerItem mediaServerItem = getOne(mediaServerId);
        if (mediaServerItem == null) {
            // zlm连接重试
            logger.warn("[更新ZLM 保活信息]失败,未找到流媒体信息");
            return;
        }
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
@@ -1,14 +1,12 @@
package com.genersoft.iot.vmp.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.UserSetup;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
@@ -23,6 +21,8 @@
import com.genersoft.iot.vmp.vmanager.bean.StreamPushExcelDto;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
@@ -32,6 +32,8 @@
@Service
public class StreamPushServiceImpl implements IStreamPushService {
    private final static Logger logger = LoggerFactory.getLogger(StreamPushServiceImpl.class);
    @Autowired
    private GbStreamMapper gbStreamMapper;
@@ -158,12 +160,17 @@
    public boolean removeFromGB(GbStream stream) {
        // 判断是否需要发送事件
        gbStreamService.sendCatalogMsg(stream, CatalogEvent.DEL);
        int del = gbStreamMapper.del(stream.getApp(), stream.getStream());
        platformGbStreamMapper.delByAppAndStream(stream.getApp(), stream.getStream());
        int del = gbStreamMapper.del(stream.getApp(), stream.getStream());
        MediaServerItem mediaInfo = mediaServerService.getOne(stream.getMediaServerId());
        JSONObject mediaList = zlmresTfulUtils.getMediaList(mediaInfo, stream.getApp(), stream.getStream());
        if (mediaList == null) {
            streamPushMapper.del(stream.getApp(), stream.getStream());
        if (mediaList != null) {
            if (mediaList.getInteger("code") == 0) {
                JSONArray data = mediaList.getJSONArray("data");
                if (data == null) {
                    streamPushMapper.del(stream.getApp(), stream.getStream());
                }
            }
        }
        return del > 0;
    }
@@ -180,9 +187,9 @@
        StreamPushItem streamPushItem = streamPushMapper.selectOne(app, streamId);
        gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL);
        int delStream = streamPushMapper.del(app, streamId);
        gbStreamMapper.del(app, streamId);
        platformGbStreamMapper.delByAppAndStream(app, streamId);
        gbStreamMapper.del(app, streamId);
        int delStream = streamPushMapper.del(app, streamId);
        if (delStream > 0) {
            MediaServerItem mediaServerItem = mediaServerService.getOne(streamPushItem.getMediaServerId());
            zlmresTfulUtils.closeStreams(mediaServerItem,app, streamId);
@@ -376,6 +383,29 @@
                .collect(Collectors.toList());
        if (streamPushItemsForPlatform.size() > 0) {
            // 获取所有平台,平台和目录信息一般不会特别大量。
            List<ParentPlatform> parentPlatformList = parentPlatformMapper.getParentPlatformList();
            Map<String, Map<String, PlatformCatalog>> platformInfoMap = new HashMap<>();
            if (parentPlatformList.size() == 0) {
                return;
            }
            for (ParentPlatform platform : parentPlatformList) {
                Map<String, PlatformCatalog> catalogMap = new HashMap<>();
                // 创建根节点
                PlatformCatalog platformCatalog = new PlatformCatalog();
                platformCatalog.setId(platform.getServerGBId());
                catalogMap.put(platform.getServerGBId(), platformCatalog);
                // 查询所有节点信息
                List<PlatformCatalog> platformCatalogs = platformCatalogMapper.selectByPlatForm(platform.getServerGBId());
                if (platformCatalogs.size() > 0) {
                    for (PlatformCatalog catalog : platformCatalogs) {
                        catalogMap.put(catalog.getId(), catalog);
                    }
                }
                platformInfoMap.put(platform.getServerGBId(), catalogMap);
            }
            List<StreamPushItem> streamPushItemListFroPlatform = new ArrayList<>();
            Map<String, List<GbStream>> platformForEvent = new HashMap<>();
            // 遍历存储结果,查找app+Stream->platformId+catalogId的对应关系,然后执行批量写入
@@ -388,6 +418,12 @@
                            streamPushItemForPlatform.setGbStreamId(streamPushItem.getGbStreamId());
                            if (platFormInfoArray.length > 0) {
                                // 数组 platFormInfoArray 0 为平台ID。 1为目录ID
                                // 不存在这个平台,则忽略导入此关联关系
                                if (platformInfoMap.get(platFormInfoArray[0]) == null
                                        || platformInfoMap.get(platFormInfoArray[0]).get(platFormInfoArray[1]) == null) {
                                    logger.info("导入数据时不存在平台或目录{}/{},已导入未分配", platFormInfoArray[0], platFormInfoArray[1] );
                                    continue;
                                }
                                streamPushItemForPlatform.setPlatformId(platFormInfoArray[0]);
                                List<GbStream> gbStreamList = platformForEvent.get(streamPushItem.getPlatformId());
@@ -406,8 +442,6 @@
                                streamPushItemForPlatform.setCatalogId(platFormInfoArray[1]);
                            }
                            streamPushItemListFroPlatform.add(streamPushItemForPlatform);
                        }
                    }
@@ -432,9 +466,9 @@
        }
        gbStreamService.sendCatalogMsgs(gbStreams, CatalogEvent.DEL);
        int delStream = streamPushMapper.delAllForGbStream(gbStreams);
        gbStreamMapper.batchDelForGbStream(gbStreams);
        platformGbStreamMapper.delByGbStreams(gbStreams);
        gbStreamMapper.batchDelForGbStream(gbStreams);
        int delStream = streamPushMapper.delAllForGbStream(gbStreams);
        if (delStream > 0) {
            for (GbStream gbStream : gbStreams) {
                MediaServerItem mediaServerItem = mediaServerService.getOne(gbStream.getMediaServerId());