648540858
2021-12-14 0c10e8d9d3ca01fb31f632560f6089f5d2b1d585
优化info消息的cseq计数
11个文件已修改
1个文件已删除
143 ■■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/conf/runner/SipDeviceRunner.java 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java 14 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java 22 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/bean/CatalogSubscribeTask.java 32 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java 22 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/PlaybackController.java 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/session/InfoCseqCache.java 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
@@ -56,6 +56,8 @@
    public static final String MEDIA_TRANSACTION_USED_PREFIX = "VMP_MEDIA_TRANSACTION_";
    public static final String SIP_CSEQ_PREFIX = "VMP_SIP_CSEQ_";
    //************************** redis 消息*********************************
    public static final String WVP_MSG_STREAM_CHANGE_PREFIX = "WVP_MSG_STREAM_CHANGE_";
src/main/java/com/genersoft/iot/vmp/conf/runner/SipDeviceRunner.java
@@ -1,5 +1,7 @@
package com.genersoft.iot.vmp.conf.runner;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.UserSetup;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import org.springframework.beans.factory.annotation.Autowired;
@@ -23,6 +25,9 @@
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
    @Autowired
    private UserSetup userSetup;
    @Override
    public void run(String... args) throws Exception {
        // 读取redis没有心跳信息的则设置为离线,等收到下次心跳设置为在线
@@ -32,7 +37,8 @@
        for (String deviceId : onlineForAll) {
            storager.online(deviceId);
        }
        // 重置cseq计数
        redisCatchStorage.resetAllCSEQ();
        // TODO 查询在线设备那些开启了订阅,为设备开启定时的目录订阅
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java
@@ -14,7 +14,7 @@
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.vmanager.gb28181.session.InfoCseqCache;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -34,6 +34,9 @@
    
    @Autowired
    private SipFactory sipFactory;
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
    @Autowired
    private VideoStreamSessionManager streamSession;
@@ -195,6 +198,7 @@
        // Forwards
        MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70);
        // ceq
        CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(1L, Request.SUBSCRIBE);
@@ -218,7 +222,7 @@
        return request;
    }
    public Request createInfoRequest(Device device, StreamInfo streamInfo, String content)
    public Request createInfoRequest(Device device, StreamInfo streamInfo, String content, Long cseq)
            throws PeerUnavailableException, ParseException, InvalidArgumentException {
        Request request = null;
        Dialog dialog = streamSession.getDialog(streamInfo.getDeviceID(), streamInfo.getChannelId());
@@ -247,10 +251,12 @@
        // Forwards
        MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70);
        if (cseq == null) {
            cseq = redisCatchStorage.getCSEQ(Request.INFO);
        }
        // ceq
        CSeqHeader cSeqHeader = sipFactory.createHeaderFactory()
                .createCSeqHeader(InfoCseqCache.CSEQCACHE.get(streamInfo.getStreamId()), Request.INFO);
                .createCSeqHeader(cseq, Request.INFO);
        request = sipFactory.createMessageFactory().createRequest(requestLine, Request.INFO, callIdHeader, cSeqHeader,
                fromHeader, toHeader, viaHeaders, maxForwards);
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
@@ -18,7 +18,6 @@
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import com.genersoft.iot.vmp.vmanager.gb28181.session.InfoCseqCache;
import gov.nist.javax.sip.SipProviderImpl;
import gov.nist.javax.sip.SipStackImpl;
import gov.nist.javax.sip.message.SIPRequest;
@@ -1553,12 +1552,12 @@
    @Override
    public void playPauseCmd(Device device, StreamInfo streamInfo) {
        try {
            Long cseq = redisCatchStorage.getCSEQ(Request.INFO);
            StringBuffer content = new StringBuffer(200);
            content.append("PAUSE RTSP/1.0\r\n");
            content.append("CSeq: " + InfoCseqCache.CSEQCACHE.get(streamInfo.getStreamId()) + "\r\n");
            content.append("CSeq: " + cseq + "\r\n");
            content.append("PauseTime: now\r\n");
            Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString());
            Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString(), cseq);
            logger.info(request.toString());
            ClientTransaction clientTransaction = null;
            if ("TCP".equals(device.getTransport())) {
@@ -1581,11 +1580,12 @@
    @Override
    public void playResumeCmd(Device device, StreamInfo streamInfo) {
        try {
            Long cseq = redisCatchStorage.getCSEQ(Request.INFO);
            StringBuffer content = new StringBuffer(200);
            content.append("PLAY RTSP/1.0\r\n");
            content.append("CSeq: " + InfoCseqCache.CSEQCACHE.get(streamInfo.getStreamId()) + "\r\n");
            content.append("CSeq: " + cseq + "\r\n");
            content.append("Range: npt=now-\r\n");
            Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString());
            Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString(), cseq);
            logger.info(request.toString());
            ClientTransaction clientTransaction = null;
            if ("TCP".equals(device.getTransport())) {
@@ -1607,12 +1607,13 @@
    @Override
    public void playSeekCmd(Device device, StreamInfo streamInfo, long seekTime) {
        try {
            Long cseq = redisCatchStorage.getCSEQ(Request.INFO);
            StringBuffer content = new StringBuffer(200);
            content.append("PLAY RTSP/1.0\r\n");
            content.append("CSeq: " + InfoCseqCache.CSEQCACHE.get(streamInfo.getStreamId()) + "\r\n");
            content.append("CSeq: " + cseq + "\r\n");
            content.append("Range: npt=" + Math.abs(seekTime) + "-\r\n");
            Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString());
            Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString(), cseq);
            logger.info(request.toString());
            ClientTransaction clientTransaction = null;
            if ("TCP".equals(device.getTransport())) {
@@ -1634,11 +1635,12 @@
    @Override
    public void playSpeedCmd(Device device, StreamInfo streamInfo, Double speed) {
        try {
            Long cseq = redisCatchStorage.getCSEQ(Request.INFO);
            StringBuffer content = new StringBuffer(200);
            content.append("PLAY RTSP/1.0\r\n");
            content.append("CSeq: " + InfoCseqCache.CSEQCACHE.get(streamInfo.getStreamId()) + "\r\n");
            content.append("CSeq: " + cseq + "\r\n");
            content.append("Scale: " + String.format("%.1f",speed) + "\r\n");
            Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString());
            Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString(), cseq);
            logger.info(request.toString());
            ClientTransaction clientTransaction = null;
            if ("TCP".equals(device.getTransport())) {
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java
@@ -89,7 +89,7 @@
                });
        // 获取zlm信息
        logger.info("等待默认zlm接入...");
        logger.info("[zlm接入]等待默认zlm中...");
        // 获取所有的zlm, 并开启主动连接
        List<MediaServerItem> all = mediaServerService.getAllFromDatabase();
src/main/java/com/genersoft/iot/vmp/service/bean/CatalogSubscribeTask.java
@@ -25,24 +25,28 @@
        sipCommander.catalogSubscribe(device, eventResult -> {
            ResponseEvent event = (ResponseEvent) eventResult.event;
            Element rootElement = null;
            try {
                rootElement = XmlUtil.getRootElement(event.getResponse().getRawContent(), "gb2312");
            } catch (DocumentException e) {
                e.printStackTrace();
            }
            Element resultElement = rootElement.element("Result");
            String result = resultElement.getText();
            if (result.toUpperCase().equals("OK")){
                // 成功
                logger.info("目录订阅成功: {}", device.getDeviceId());
            if (event.getResponse().getRawContent() != null) {
                try {
                    rootElement = XmlUtil.getRootElement(event.getResponse().getRawContent(), "gb2312");
                } catch (DocumentException e) {
                    e.printStackTrace();
                }
                Element resultElement = rootElement.element("Result");
                String result = resultElement.getText();
                if (result.toUpperCase().equals("OK")){
                    // 成功
                    logger.info("[目录订阅]成功: {}", device.getDeviceId());
                }else {
                    // 失败
                    logger.info("[目录订阅]失败: {}-{}", device.getDeviceId(), result);
                }
            }else {
                // 失败
                logger.info("目录订阅失败: {}-{}", device.getDeviceId(), result);
                // 成功
                logger.info("[目录订阅]成功: {}", device.getDeviceId());
            }
        },eventResult -> {
            // 失败
            logger.warn("目录订阅失败: {}-信令发送失败", device.getDeviceId());
            logger.warn("[目录订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg);
        });
    }
}
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
@@ -51,6 +51,8 @@
        dynamicTask.stopCron(device.getDeviceId());
        device.setSubscribeCycleForCatalog(0);
        sipCommander.catalogSubscribe(device, null, null);
        // 清空cseq计数
        return true;
    }
}
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
@@ -83,7 +83,7 @@
     */
    @Override
    public void run(String... args) throws Exception {
        logger.info("Media Server 缓存初始化");
        logger.info("[缓存初始化] Media Server ");
        List<MediaServerItem> mediaServerItemList = mediaServerMapper.queryAll();
        for (MediaServerItem mediaServerItem : mediaServerItemList) {
            if (StringUtils.isEmpty(mediaServerItem.getId())) {
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
@@ -15,6 +15,14 @@
public interface IRedisCatchStorage {
    /**
     * 计数器。为cseq进行计数
     *
     * @param method sip 方法
     * @return
     */
    Long getCSEQ(String method);
    /**
     * 开始播放时将流存入
     *
     * @param stream 流信息
@@ -181,4 +189,6 @@
     * 获取Device
     */
    Device getDevice(String deviceId);
    void resetAllCSEQ();
}
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -36,6 +36,28 @@
    private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    @Override
    public Long getCSEQ(String method) {
        String key = VideoManagerConstants.SIP_CSEQ_PREFIX  + userSetup.getServerId() + "_" +  method;
        long result =  redis.incr(key, 1L);
        if (result > Integer.MAX_VALUE) {
            redis.set(key, 1);
            result = 1;
        }
        return result;
    }
    @Override
    public void resetAllCSEQ() {
        String scanKey = VideoManagerConstants.SIP_CSEQ_PREFIX  + userSetup.getServerId() + "_*";
        List<Object> keys = redis.scan(scanKey);
        for (int i = 0; i < keys.size(); i++) {
            String key = (String) keys.get(i);
            redis.set(key, 1);
        }
    }
    /**
     * 开始播放时将流存入redis
     *
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/PlaybackController.java
@@ -9,7 +9,6 @@
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.vmanager.gb28181.session.InfoCseqCache;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
@@ -31,7 +30,6 @@
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import org.springframework.web.context.request.async.DeferredResult;
import javax.sip.message.Response;
import java.util.UUID;
@Api(tags = "视频回放")
@@ -168,7 +166,6 @@
            logger.warn("streamId不存在!");
            return new ResponseEntity<String>(json.toString(), HttpStatus.BAD_REQUEST);
        }
        setCseq(streamId);
        Device device = storager.queryVideoDevice(streamInfo.getDeviceID());
        cmder.playPauseCmd(device, streamInfo);
        json.put("msg", "ok");
@@ -189,7 +186,6 @@
            logger.warn("streamId不存在!");
            return new ResponseEntity<String>(json.toString(), HttpStatus.BAD_REQUEST);
        }
        setCseq(streamId);
        Device device = storager.queryVideoDevice(streamInfo.getDeviceID());
        cmder.playResumeCmd(device, streamInfo);
        json.put("msg", "ok");
@@ -211,7 +207,6 @@
            logger.warn("streamId不存在!");
            return new ResponseEntity<String>(json.toString(), HttpStatus.BAD_REQUEST);
        }
        setCseq(streamId);
        Device device = storager.queryVideoDevice(streamInfo.getDeviceID());
        cmder.playSeekCmd(device, streamInfo, seekTime);
        json.put("msg", "ok");
@@ -238,18 +233,10 @@
            logger.warn("不支持的speed: " + speed);
            return new ResponseEntity<String>(json.toString(), HttpStatus.BAD_REQUEST);
        }
        setCseq(streamId);
        Device device = storager.queryVideoDevice(streamInfo.getDeviceID());
        cmder.playSpeedCmd(device, streamInfo, speed);
        json.put("msg", "ok");
        return new ResponseEntity<String>(json.toString(), HttpStatus.OK);
    }
    public void setCseq(String streamId) {
        if (InfoCseqCache.CSEQCACHE.containsKey(streamId)) {
            InfoCseqCache.CSEQCACHE.put(streamId, InfoCseqCache.CSEQCACHE.get(streamId) + 1);
        } else {
            InfoCseqCache.CSEQCACHE.put(streamId, 2L);
        }
    }
}
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/session/InfoCseqCache.java
File was deleted