src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
@@ -56,6 +56,8 @@ public static final String MEDIA_TRANSACTION_USED_PREFIX = "VMP_MEDIA_TRANSACTION_"; public static final String SIP_CSEQ_PREFIX = "VMP_SIP_CSEQ_"; //************************** redis 消息********************************* public static final String WVP_MSG_STREAM_CHANGE_PREFIX = "WVP_MSG_STREAM_CHANGE_"; src/main/java/com/genersoft/iot/vmp/conf/runner/SipDeviceRunner.java
@@ -1,5 +1,7 @@ package com.genersoft.iot.vmp.conf.runner; import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.UserSetup; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import org.springframework.beans.factory.annotation.Autowired; @@ -23,6 +25,9 @@ @Autowired private IRedisCatchStorage redisCatchStorage; @Autowired private UserSetup userSetup; @Override public void run(String... args) throws Exception { // 读取redis没有心跳信息的则设置为离线,等收到下次心跳设置为在线 @@ -32,7 +37,8 @@ for (String deviceId : onlineForAll) { storager.online(deviceId); } // 重置cseq计数 redisCatchStorage.resetAllCSEQ(); // TODO 查询在线设备那些开启了订阅,为设备开启定时的目录订阅 } } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java
@@ -14,7 +14,7 @@ import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.vmanager.gb28181.session.InfoCseqCache; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -34,6 +34,9 @@ @Autowired private SipFactory sipFactory; @Autowired private IRedisCatchStorage redisCatchStorage; @Autowired private VideoStreamSessionManager streamSession; @@ -195,6 +198,7 @@ // Forwards MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70); // ceq CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(1L, Request.SUBSCRIBE); @@ -218,7 +222,7 @@ return request; } public Request createInfoRequest(Device device, StreamInfo streamInfo, String content) public Request createInfoRequest(Device device, StreamInfo streamInfo, String content, Long cseq) throws PeerUnavailableException, ParseException, InvalidArgumentException { Request request = null; Dialog dialog = streamSession.getDialog(streamInfo.getDeviceID(), streamInfo.getChannelId()); @@ -247,10 +251,12 @@ // Forwards MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70); if (cseq == null) { cseq = redisCatchStorage.getCSEQ(Request.INFO); } // ceq CSeqHeader cSeqHeader = sipFactory.createHeaderFactory() .createCSeqHeader(InfoCseqCache.CSEQCACHE.get(streamInfo.getStreamId()), Request.INFO); .createCSeqHeader(cseq, Request.INFO); request = sipFactory.createMessageFactory().createRequest(requestLine, Request.INFO, callIdHeader, cSeqHeader, fromHeader, toHeader, viaHeaders, maxForwards); src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
@@ -18,7 +18,6 @@ import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import com.genersoft.iot.vmp.vmanager.gb28181.session.InfoCseqCache; import gov.nist.javax.sip.SipProviderImpl; import gov.nist.javax.sip.SipStackImpl; import gov.nist.javax.sip.message.SIPRequest; @@ -1553,12 +1552,12 @@ @Override public void playPauseCmd(Device device, StreamInfo streamInfo) { try { Long cseq = redisCatchStorage.getCSEQ(Request.INFO); StringBuffer content = new StringBuffer(200); content.append("PAUSE RTSP/1.0\r\n"); content.append("CSeq: " + InfoCseqCache.CSEQCACHE.get(streamInfo.getStreamId()) + "\r\n"); content.append("CSeq: " + cseq + "\r\n"); content.append("PauseTime: now\r\n"); Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString()); Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString(), cseq); logger.info(request.toString()); ClientTransaction clientTransaction = null; if ("TCP".equals(device.getTransport())) { @@ -1581,11 +1580,12 @@ @Override public void playResumeCmd(Device device, StreamInfo streamInfo) { try { Long cseq = redisCatchStorage.getCSEQ(Request.INFO); StringBuffer content = new StringBuffer(200); content.append("PLAY RTSP/1.0\r\n"); content.append("CSeq: " + InfoCseqCache.CSEQCACHE.get(streamInfo.getStreamId()) + "\r\n"); content.append("CSeq: " + cseq + "\r\n"); content.append("Range: npt=now-\r\n"); Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString()); Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString(), cseq); logger.info(request.toString()); ClientTransaction clientTransaction = null; if ("TCP".equals(device.getTransport())) { @@ -1607,12 +1607,13 @@ @Override public void playSeekCmd(Device device, StreamInfo streamInfo, long seekTime) { try { Long cseq = redisCatchStorage.getCSEQ(Request.INFO); StringBuffer content = new StringBuffer(200); content.append("PLAY RTSP/1.0\r\n"); content.append("CSeq: " + InfoCseqCache.CSEQCACHE.get(streamInfo.getStreamId()) + "\r\n"); content.append("CSeq: " + cseq + "\r\n"); content.append("Range: npt=" + Math.abs(seekTime) + "-\r\n"); Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString()); Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString(), cseq); logger.info(request.toString()); ClientTransaction clientTransaction = null; if ("TCP".equals(device.getTransport())) { @@ -1634,11 +1635,12 @@ @Override public void playSpeedCmd(Device device, StreamInfo streamInfo, Double speed) { try { Long cseq = redisCatchStorage.getCSEQ(Request.INFO); StringBuffer content = new StringBuffer(200); content.append("PLAY RTSP/1.0\r\n"); content.append("CSeq: " + InfoCseqCache.CSEQCACHE.get(streamInfo.getStreamId()) + "\r\n"); content.append("CSeq: " + cseq + "\r\n"); content.append("Scale: " + String.format("%.1f",speed) + "\r\n"); Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString()); Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString(), cseq); logger.info(request.toString()); ClientTransaction clientTransaction = null; if ("TCP".equals(device.getTransport())) { src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java
@@ -89,7 +89,7 @@ }); // 获取zlm信息 logger.info("等待默认zlm接入..."); logger.info("[zlm接入]等待默认zlm中..."); // 获取所有的zlm, 并开启主动连接 List<MediaServerItem> all = mediaServerService.getAllFromDatabase(); src/main/java/com/genersoft/iot/vmp/service/bean/CatalogSubscribeTask.java
@@ -25,24 +25,28 @@ sipCommander.catalogSubscribe(device, eventResult -> { ResponseEvent event = (ResponseEvent) eventResult.event; Element rootElement = null; try { rootElement = XmlUtil.getRootElement(event.getResponse().getRawContent(), "gb2312"); } catch (DocumentException e) { e.printStackTrace(); } Element resultElement = rootElement.element("Result"); String result = resultElement.getText(); if (result.toUpperCase().equals("OK")){ // 成功 logger.info("目录订阅成功: {}", device.getDeviceId()); if (event.getResponse().getRawContent() != null) { try { rootElement = XmlUtil.getRootElement(event.getResponse().getRawContent(), "gb2312"); } catch (DocumentException e) { e.printStackTrace(); } Element resultElement = rootElement.element("Result"); String result = resultElement.getText(); if (result.toUpperCase().equals("OK")){ // 成功 logger.info("[目录订阅]成功: {}", device.getDeviceId()); }else { // 失败 logger.info("[目录订阅]失败: {}-{}", device.getDeviceId(), result); } }else { // 失败 logger.info("目录订阅失败: {}-{}", device.getDeviceId(), result); // 成功 logger.info("[目录订阅]成功: {}", device.getDeviceId()); } },eventResult -> { // 失败 logger.warn("目录订阅失败: {}-信令发送失败", device.getDeviceId()); logger.warn("[目录订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg); }); } } src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
@@ -51,6 +51,8 @@ dynamicTask.stopCron(device.getDeviceId()); device.setSubscribeCycleForCatalog(0); sipCommander.catalogSubscribe(device, null, null); // 清空cseq计数 return true; } } src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
@@ -83,7 +83,7 @@ */ @Override public void run(String... args) throws Exception { logger.info("Media Server 缓存初始化"); logger.info("[缓存初始化] Media Server "); List<MediaServerItem> mediaServerItemList = mediaServerMapper.queryAll(); for (MediaServerItem mediaServerItem : mediaServerItemList) { if (StringUtils.isEmpty(mediaServerItem.getId())) { src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
@@ -15,6 +15,14 @@ public interface IRedisCatchStorage { /** * 计数器。为cseq进行计数 * * @param method sip 方法 * @return */ Long getCSEQ(String method); /** * 开始播放时将流存入 * * @param stream 流信息 @@ -181,4 +189,6 @@ * 获取Device */ Device getDevice(String deviceId); void resetAllCSEQ(); } src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -36,6 +36,28 @@ private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @Override public Long getCSEQ(String method) { String key = VideoManagerConstants.SIP_CSEQ_PREFIX + userSetup.getServerId() + "_" + method; long result = redis.incr(key, 1L); if (result > Integer.MAX_VALUE) { redis.set(key, 1); result = 1; } return result; } @Override public void resetAllCSEQ() { String scanKey = VideoManagerConstants.SIP_CSEQ_PREFIX + userSetup.getServerId() + "_*"; List<Object> keys = redis.scan(scanKey); for (int i = 0; i < keys.size(); i++) { String key = (String) keys.get(i); redis.set(key, 1); } } /** * 开始播放时将流存入redis * src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/PlaybackController.java
@@ -9,7 +9,6 @@ import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.service.IPlayService; import com.genersoft.iot.vmp.vmanager.gb28181.session.InfoCseqCache; import io.swagger.annotations.Api; import io.swagger.annotations.ApiImplicitParam; import io.swagger.annotations.ApiImplicitParams; @@ -31,7 +30,6 @@ import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import org.springframework.web.context.request.async.DeferredResult; import javax.sip.message.Response; import java.util.UUID; @Api(tags = "视频回放") @@ -168,7 +166,6 @@ logger.warn("streamId不存在!"); return new ResponseEntity<String>(json.toString(), HttpStatus.BAD_REQUEST); } setCseq(streamId); Device device = storager.queryVideoDevice(streamInfo.getDeviceID()); cmder.playPauseCmd(device, streamInfo); json.put("msg", "ok"); @@ -189,7 +186,6 @@ logger.warn("streamId不存在!"); return new ResponseEntity<String>(json.toString(), HttpStatus.BAD_REQUEST); } setCseq(streamId); Device device = storager.queryVideoDevice(streamInfo.getDeviceID()); cmder.playResumeCmd(device, streamInfo); json.put("msg", "ok"); @@ -211,7 +207,6 @@ logger.warn("streamId不存在!"); return new ResponseEntity<String>(json.toString(), HttpStatus.BAD_REQUEST); } setCseq(streamId); Device device = storager.queryVideoDevice(streamInfo.getDeviceID()); cmder.playSeekCmd(device, streamInfo, seekTime); json.put("msg", "ok"); @@ -238,18 +233,10 @@ logger.warn("不支持的speed: " + speed); return new ResponseEntity<String>(json.toString(), HttpStatus.BAD_REQUEST); } setCseq(streamId); Device device = storager.queryVideoDevice(streamInfo.getDeviceID()); cmder.playSpeedCmd(device, streamInfo, speed); json.put("msg", "ok"); return new ResponseEntity<String>(json.toString(), HttpStatus.OK); } public void setCseq(String streamId) { if (InfoCseqCache.CSEQCACHE.containsKey(streamId)) { InfoCseqCache.CSEQCACHE.put(streamId, InfoCseqCache.CSEQCACHE.get(streamId) + 1); } else { InfoCseqCache.CSEQCACHE.put(streamId, 2L); } } } src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/session/InfoCseqCache.java
File was deleted