src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
@@ -77,38 +77,54 @@ //************************** redis 消息********************************* // 流变化的通知 /** * 流变化的通知 */ public static final String WVP_MSG_STREAM_CHANGE_PREFIX = "WVP_MSG_STREAM_CHANGE_"; // 接收推流设备的GPS变化通知 /** * 接收推流设备的GPS变化通知 */ public static final String VM_MSG_GPS = "VM_MSG_GPS"; // 接收推流设备的GPS变化通知 /** * 接收推流设备的GPS变化通知 */ public static final String VM_MSG_PUSH_STREAM_STATUS_CHANGE = "VM_MSG_PUSH_STREAM_STATUS_CHANGE"; // redis 消息通知设备推流到平台 /** * redis 消息通知设备推流到平台 */ public static final String VM_MSG_STREAM_PUSH_REQUESTED = "VM_MSG_STREAM_PUSH_REQUESTED"; // redis 消息请求所有的在线通道 /** * redis 消息请求所有的在线通道 */ public static final String VM_MSG_GET_ALL_ONLINE_REQUESTED = "VM_MSG_GET_ALL_ONLINE_REQUESTED"; // 移动位置订阅通知 /** * 移动位置订阅通知 */ public static final String VM_MSG_SUBSCRIBE_MOBILE_POSITION = "mobileposition"; // 报警订阅的通知(收到报警向redis发出通知) /** * 报警订阅的通知(收到报警向redis发出通知) */ public static final String VM_MSG_SUBSCRIBE_ALARM = "alarm"; // 报警通知的发送 (收到redis发出的通知,转发给其他平台) /** * 报警通知的发送 (收到redis发出的通知,转发给其他平台) */ public static final String VM_MSG_SUBSCRIBE_ALARM_RECEIVE= "alarm_receive"; // 设备状态订阅的通知 /** * 设备状态订阅的通知 */ public static final String VM_MSG_SUBSCRIBE_DEVICE_STATUS = "device"; //************************** 第三方 **************************************** public static final String WVP_STREAM_GB_ID_PREFIX = "memberNo_"; public static final String WVP_STREAM_GPS_MSG_PREFIX = "WVP_STREAM_GPS_MSG_"; src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java
@@ -62,7 +62,7 @@ // Forwards MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70); // ceq CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(Request.MESSAGE), Request.MESSAGE); CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(), Request.MESSAGE); request = sipFactory.createMessageFactory().createRequest(requestURI, Request.MESSAGE, callIdHeader, cSeqHeader, fromHeader, toHeader, viaHeaders, maxForwards); @@ -120,7 +120,7 @@ String callId, WWWAuthenticateHeader www , CallIdHeader callIdHeader) throws ParseException, PeerUnavailableException, InvalidArgumentException { Request registerRequest = createRegisterRequest(parentPlatform, redisCatchStorage.getCSEQ(Request.REGISTER), fromTag, viaTag, callIdHeader); Request registerRequest = createRegisterRequest(parentPlatform, redisCatchStorage.getCSEQ(), fromTag, viaTag, callIdHeader); SipURI requestURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerIP() + ":" + parentPlatform.getServerPort()); if (www == null) { AuthorizationHeader authorizationHeader = sipFactory.createHeaderFactory().createAuthorizationHeader("Digest"); @@ -213,7 +213,7 @@ // Forwards MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70); // ceq CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(Request.MESSAGE), Request.MESSAGE); CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(), Request.MESSAGE); MessageFactoryImpl messageFactory = (MessageFactoryImpl) sipFactory.createMessageFactory(); // 设置编码, 防止中文乱码 messageFactory.setDefaultContentEncodingCharset(parentPlatform.getCharacterSet()); src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java
@@ -2,11 +2,9 @@ import java.text.ParseException; import java.util.ArrayList; import java.util.List; import javax.sip.Dialog; import javax.sip.InvalidArgumentException; import javax.sip.PeerUnavailableException; import javax.sip.SipFactory; import javax.sip.*; import javax.sip.address.Address; import javax.sip.address.SipURI; import javax.sip.header.*; @@ -15,7 +13,11 @@ import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import gov.nist.javax.sip.SipProviderImpl; import gov.nist.javax.sip.SipStackImpl; import gov.nist.javax.sip.stack.SIPDialog; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; import com.genersoft.iot.vmp.conf.SipConfig; @@ -40,6 +42,14 @@ @Autowired private VideoStreamSessionManager streamSession; @Autowired @Qualifier(value="tcpSipProvider") private SipProviderImpl tcpSipProvider; @Autowired @Qualifier(value="udpSipProvider") private SipProviderImpl udpSipProvider; public Request createMessageRequest(Device device, String content, String viaTag, String fromTag, String toTag, CallIdHeader callIdHeader) throws ParseException, InvalidArgumentException, PeerUnavailableException { Request request = null; @@ -95,7 +105,7 @@ MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70); //ceq CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(Request.INVITE), Request.INVITE); CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(), Request.INVITE); request = sipFactory.createMessageFactory().createRequest(requestLine, Request.INVITE, callIdHeader, cSeqHeader,fromHeader, toHeader, viaHeaders, maxForwards); Address concatAddress = sipFactory.createAddressFactory().createAddress(sipFactory.createAddressFactory().createSipURI(sipConfig.getId(), sipConfig.getIp()+":"+sipConfig.getPort())); @@ -131,7 +141,7 @@ MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70); //ceq CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(Request.INVITE), Request.INVITE); CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(), Request.INVITE); request = sipFactory.createMessageFactory().createRequest(requestLine, Request.INVITE, callIdHeader, cSeqHeader,fromHeader, toHeader, viaHeaders, maxForwards); Address concatAddress = sipFactory.createAddressFactory().createAddress(sipFactory.createAddressFactory().createSipURI(sipConfig.getId(), sipConfig.getIp()+":"+sipConfig.getPort())); @@ -200,7 +210,7 @@ MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70); // ceq CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(Request.SUBSCRIBE), Request.SUBSCRIBE); CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(), Request.SUBSCRIBE); request = sipFactory.createMessageFactory().createRequest(requestURI, Request.SUBSCRIBE, callIdHeader, cSeqHeader, fromHeader, toHeader, viaHeaders, maxForwards); @@ -226,55 +236,55 @@ } public Request createInfoRequest(Device device, StreamInfo streamInfo, String content) throws PeerUnavailableException, ParseException, InvalidArgumentException { Request request = null; throws SipException, ParseException, InvalidArgumentException { if (streamInfo == null) { return null; } Dialog dialog = streamSession.getDialogByStream(streamInfo.getDeviceID(), streamInfo.getChannelId(), streamInfo.getStream()); Request request = null; SIPDialog dialog = streamSession.getDialogByStream(streamInfo.getDeviceID(), streamInfo.getChannelId(), streamInfo.getStream()); if (dialog == null) { return null; } SipURI requestLine = sipFactory.createAddressFactory().createSipURI(device.getDeviceId(), device.getHostAddress()); // via ArrayList<ViaHeader> viaHeaders = new ArrayList<ViaHeader>(); ViaHeader viaHeader = sipFactory.createHeaderFactory().createViaHeader(device.getIp(), device.getPort(), device.getTransport(), null); SipStack sipStack = udpSipProvider.getSipStack(); SIPDialog sipDialog = ((SipStackImpl) sipStack).putDialog(dialog); if (dialog != sipDialog) { dialog = sipDialog; }else { dialog.setSipProvider(udpSipProvider); } streamSession.put(streamInfo.getDeviceID(), streamInfo.getChannelId(), dialog.getCallId().getCallId(), dialog); Request infoRequest = dialog.createRequest(Request.INFO); SipURI sipURI = (SipURI) infoRequest.getRequestURI(); sipURI.setHost(device.getIp()); sipURI.setPort(device.getPort()); sipURI.setUser(streamInfo.getChannelId()); ViaHeader viaHeader = (ViaHeader) infoRequest.getHeader(ViaHeader.NAME); viaHeader.setRPort(); viaHeaders.add(viaHeader); // from SipURI fromSipURI = sipFactory.createAddressFactory().createSipURI(sipConfig.getId(), sipConfig.getDomain()); Address fromAddress = sipFactory.createAddressFactory().createAddress(fromSipURI); FromHeader fromHeader = sipFactory.createHeaderFactory().createFromHeader(fromAddress, dialog.getLocalTag()); // to SipURI toSipURI = sipFactory.createAddressFactory().createSipURI(streamInfo.getChannelId(), sipConfig.getDomain()); Address toAddress = sipFactory.createAddressFactory().createAddress(toSipURI); ToHeader toHeader = sipFactory.createHeaderFactory().createToHeader(toAddress, dialog.getRemoteTag()); // callid CallIdHeader callIdHeader = dialog.getCallId(); // Forwards MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70); Long cseq = redisCatchStorage.getCSEQ(Request.INVITE); // ceq CSeqHeader cSeqHeader = sipFactory.createHeaderFactory() .createCSeqHeader(cseq, Request.INFO); request = sipFactory.createMessageFactory().createRequest(requestLine, Request.INFO, callIdHeader, cSeqHeader, fromHeader, toHeader, viaHeaders, maxForwards); // 增加Contact header Address concatAddress = sipFactory.createAddressFactory().createAddress(sipFactory.createAddressFactory() .createSipURI(sipConfig.getId(), sipConfig.getIp() + ":" + sipConfig.getPort())); request.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress)); infoRequest.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress)); List<String> agentParam = new ArrayList<>(); agentParam.add("wvp-pro"); // TODO 添加版本信息以及日期 UserAgentHeader userAgentHeader = null; try { userAgentHeader = sipFactory.createHeaderFactory().createUserAgentHeader(agentParam); } catch (ParseException e) { throw new RuntimeException(e); } infoRequest.addHeader(userAgentHeader); ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", "MANSRTSP"); request.setContent(content, contentTypeHeader); return request; infoRequest.setContent(content, contentTypeHeader); CSeqHeader cSeqHeader = (CSeqHeader)infoRequest.getHeader(CSeqHeader.NAME); cSeqHeader.setSeqNumber(redisCatchStorage.getCSEQ()); // ceq infoRequest.addHeader(cSeqHeader); return infoRequest; } } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
@@ -732,8 +732,23 @@ SIPRequest request = (SIPRequest)transaction.getRequest(); byeURI.setHost(request.getRemoteAddress().getHostAddress()); byeURI.setPort(request.getRemotePort()); byeURI.setUser(channelId); ViaHeader viaHeader = (ViaHeader) byeRequest.getHeader(ViaHeader.NAME); String protocol = viaHeader.getTransport().toUpperCase(); viaHeader.setRPort(); // 增加Contact header Address concatAddress = sipFactory.createAddressFactory().createAddress(sipFactory.createAddressFactory().createSipURI(sipConfig.getId(), sipConfig.getIp()+":"+sipConfig.getPort())); byeRequest.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress)); List<String> agentParam = new ArrayList<>(); agentParam.add("wvp-pro"); // TODO 添加版本信息以及日期 UserAgentHeader userAgentHeader = null; try { userAgentHeader = sipFactory.createHeaderFactory().createUserAgentHeader(agentParam); } catch (ParseException e) { throw new RuntimeException(e); } byeRequest.addHeader(userAgentHeader); ClientTransaction clientTransaction = null; if("TCP".equals(protocol)) { clientTransaction = tcpSipProvider.getNewClientTransaction(byeRequest); @@ -745,11 +760,14 @@ if (okEvent != null) { sipSubscribe.addOkSubscribe(callIdHeader.getCallId(), okEvent); } CSeqHeader cSeqHeader = (CSeqHeader)byeRequest.getHeader(CSeqHeader.NAME); cSeqHeader.setSeqNumber(redisCatchStorage.getCSEQ()); dialog.sendRequest(clientTransaction); } catch (SipException | ParseException e) { e.printStackTrace(); } catch (InvalidArgumentException e) { throw new RuntimeException(e); } } @@ -1483,7 +1501,7 @@ request.setContent(subscribePostitionXml.toString(), contentTypeHeader); CSeqHeader cSeqHeader = (CSeqHeader)request.getHeader(CSeqHeader.NAME); cSeqHeader.setSeqNumber(redisCatchStorage.getCSEQ(Request.SUBSCRIBE)); cSeqHeader.setSeqNumber(redisCatchStorage.getCSEQ()); request.removeHeader(CSeqHeader.NAME); request.addHeader(cSeqHeader); }else { @@ -1587,7 +1605,7 @@ request.setContent(cmdXml.toString(), contentTypeHeader); CSeqHeader cSeqHeader = (CSeqHeader)request.getHeader(CSeqHeader.NAME); cSeqHeader.setSeqNumber(redisCatchStorage.getCSEQ(Request.SUBSCRIBE)); cSeqHeader.setSeqNumber(redisCatchStorage.getCSEQ()); request.removeHeader(CSeqHeader.NAME); request.addHeader(cSeqHeader); @@ -1697,10 +1715,9 @@ @Override public void playPauseCmd(Device device, StreamInfo streamInfo) { try { Long cseq = redisCatchStorage.getCSEQ(Request.INFO); StringBuffer content = new StringBuffer(200); content.append("PAUSE RTSP/1.0\r\n"); content.append("CSeq: " + cseq + "\r\n"); content.append("CSeq: " + getInfoCseq() + "\r\n"); content.append("PauseTime: now\r\n"); Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString()); if (request == null) { @@ -1728,10 +1745,9 @@ @Override public void playResumeCmd(Device device, StreamInfo streamInfo) { try { Long cseq = redisCatchStorage.getCSEQ(Request.INFO); StringBuffer content = new StringBuffer(200); content.append("PLAY RTSP/1.0\r\n"); content.append("CSeq: " + cseq + "\r\n"); content.append("CSeq: " + getInfoCseq() + "\r\n"); content.append("Range: npt=now-\r\n"); Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString()); if (request == null) { @@ -1758,10 +1774,9 @@ @Override public void playSeekCmd(Device device, StreamInfo streamInfo, long seekTime) { try { Long cseq = redisCatchStorage.getCSEQ(Request.INFO); StringBuffer content = new StringBuffer(200); content.append("PLAY RTSP/1.0\r\n"); content.append("CSeq: " + cseq + "\r\n"); content.append("CSeq: " + getInfoCseq() + "\r\n"); content.append("Range: npt=" + Math.abs(seekTime) + "-\r\n"); Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString()); @@ -1789,11 +1804,11 @@ @Override public void playSpeedCmd(Device device, StreamInfo streamInfo, Double speed) { try { Long cseq = redisCatchStorage.getCSEQ(Request.INFO); StringBuffer content = new StringBuffer(200); content.append("PLAY RTSP/1.0\r\n"); content.append("CSeq: " + cseq + "\r\n"); content.append("Scale: " + String.format("%.1f",speed) + "\r\n"); content.append("CSeq: " + getInfoCseq() + "\r\n"); content.append("Scale: " + String.format("%.6f",speed) + "\r\n"); Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString()); if (request == null) { return; @@ -1812,6 +1827,10 @@ e.printStackTrace(); } } private int getInfoCseq() { return (int) ((Math.random() * 9 + 1) * Math.pow(10, 8)); } @Override public void playbackControlCmd(Device device, StreamInfo streamInfo, String content,SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent) { @@ -1820,7 +1839,6 @@ if (request == null) { return; } logger.info(request.toString()); ClientTransaction clientTransaction = null; if ("TCP".equals(device.getTransport())) { clientTransaction = tcpSipProvider.getNewClientTransaction(request); src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
@@ -105,7 +105,7 @@ } request = headerProviderPlarformProvider.createRegisterRequest(parentPlatform, redisCatchStorage.getCSEQ(Request.REGISTER), "FromRegister" + tm, redisCatchStorage.getCSEQ(), "FromRegister" + tm, "z9hG4bK-" + UUID.randomUUID().toString().replace("-", ""), callIdHeader); // 将 callid 写入缓存, 等注册成功可以更新状态 String callIdFromHeader = callIdHeader.getCallId(); src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java
@@ -2,24 +2,32 @@ import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.gb28181.SipLayer; import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.event.response.SIPResponseProcessorAbstract; import gov.nist.javax.sip.ResponseEventExt; import gov.nist.javax.sip.message.SIPResponse; import gov.nist.javax.sip.stack.SIPDialog; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.sip.InvalidArgumentException; import javax.sip.ResponseEvent; import javax.sip.SipException; import javax.sdp.SdpFactory; import javax.sdp.SdpParseException; import javax.sdp.SessionDescription; import javax.sip.*; import javax.sip.address.Address; import javax.sip.address.SipURI; import javax.sip.header.CSeqHeader; import javax.sip.header.CallIdHeader; import javax.sip.header.UserAgentHeader; import javax.sip.message.Request; import javax.sip.message.Response; import java.text.ParseException; import java.util.ArrayList; import java.util.List; /** @@ -34,14 +42,16 @@ private final String method = "INVITE"; @Autowired private SipLayer sipLayer; @Autowired private SipConfig config; private VideoStreamSessionManager streamSession; @Autowired private SIPProcessorObserver sipProcessorObserver; @Autowired private SipConfig sipConfig; @Autowired private SipFactory sipFactory; @Override public void afterPropertiesSet() throws Exception { @@ -49,8 +59,7 @@ sipProcessorObserver.addResponseProcessor(method, this); } @Autowired private VideoStreamSessionManager streamSession; /** * 处理invite响应 @@ -74,6 +83,19 @@ CSeqHeader cseq = (CSeqHeader) response.getHeader(CSeqHeader.NAME); Request reqAck = dialog.createAck(cseq.getSeqNumber()); SipURI requestURI = (SipURI) reqAck.getRequestURI(); String contentString = new String(response.getRawContent()); // jainSip不支持y=字段, 移除以解析。 int ssrcIndex = contentString.indexOf("y="); // 检查是否有y字段 SessionDescription sdp; if (ssrcIndex >= 0) { //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 String substring = contentString.substring(0, contentString.indexOf("y=")); sdp = SdpFactory.getInstance().createSessionDescription(substring); } else { sdp = SdpFactory.getInstance().createSessionDescription(contentString); } requestURI.setUser(sdp.getOrigin().getUsername()); try { requestURI.setHost(event.getRemoteIpAddress()); } catch (ParseException e) { @@ -81,6 +103,18 @@ } requestURI.setPort(event.getRemotePort()); reqAck.setRequestURI(requestURI); List<String> agentParam = new ArrayList<>(); agentParam.add("wvp-pro"); // TODO 添加版本信息以及日期 UserAgentHeader userAgentHeader = null; try { userAgentHeader = sipFactory.createHeaderFactory().createUserAgentHeader(agentParam); } catch (ParseException e) { throw new RuntimeException(e); } reqAck.addHeader(userAgentHeader); Address concatAddress = sipFactory.createAddressFactory().createAddress(sipFactory.createAddressFactory().createSipURI(sipConfig.getId(), sipConfig.getIp()+":"+sipConfig.getPort())); reqAck.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress)); logger.info("[回复ack] {}-> {}:{} ",requestURI, event.getRemoteIpAddress(), event.getRemotePort()); dialog.sendAck(reqAck); @@ -88,6 +122,10 @@ } } catch (InvalidArgumentException | SipException e) { e.printStackTrace(); } catch (ParseException e) { throw new RuntimeException(e); } catch (SdpParseException e) { throw new RuntimeException(e); } } src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -98,9 +98,7 @@ @PostMapping(value = "/on_server_keepalive", produces = "application/json;charset=UTF-8") public ResponseEntity<String> onServerKeepalive(@RequestBody JSONObject json){ if (logger.isDebugEnabled()) { logger.debug("[ ZLM HOOK ] on_server_keepalive API调用,参数:" + json.toString()); } logger.info("[ ZLM HOOK ] on_server_keepalive API调用,参数:" + json.toString()); String mediaServerId = json.getString("mediaServerId"); List<ZLMHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_server_keepalive); if (subscribes != null && subscribes.size() > 0) { src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
@@ -277,13 +277,7 @@ return null; } String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerId; MediaServerItem serverItem=(MediaServerItem)redisUtil.get(key); if(null==serverItem){ //zlm服务不在线,启动重连 reloadZlm(); serverItem=(MediaServerItem)redisUtil.get(key); } return serverItem; return (MediaServerItem)redisUtil.get(key); } @Override @@ -412,7 +406,6 @@ } redisUtil.set(key, serverItem); resetOnlineServerItem(serverItem); updateMediaServerKeepalive(serverItem.getId(), null); if (serverItem.isAutoConfig()) { setZLMConfig(serverItem, "0".equals(zlmServerConfig.getHookEnable())); } @@ -476,9 +469,6 @@ String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId(); if (redisUtil.zSize(key) == null || redisUtil.zSize(key) == 0) { logger.info("获取负载最低的节点时无在线节点,启动重连机制"); //启动重连 reloadZlm(); if (redisUtil.zSize(key) == null || redisUtil.zSize(key) == 0) { logger.info("获取负载最低的节点时无在线节点"); return null; @@ -643,6 +633,11 @@ public void updateMediaServerKeepalive(String mediaServerId, JSONObject data) { MediaServerItem mediaServerItem = getOne(mediaServerId); if (mediaServerItem == null) { // 缓存不存在,从数据库查询,如果数据库不存在则是错误的 MediaServerItem mediaServerItemFromDatabase = getOneFromDatabase(mediaServerId); if (mediaServerItemFromDatabase == null) { return; } // zlm连接重试 logger.warn("[更新ZLM 保活信息]失败,未找到流媒体信息,尝试重连zlm"); reloadZlm(); @@ -658,6 +653,10 @@ redisUtil.set(key, data, hookAliveInterval); } private MediaServerItem getOneFromDatabase(String mediaServerId) { return mediaServerMapper.queryOne(mediaServerId); } @Override public void syncCatchFromDatabase() { List<MediaServerItem> allInCatch = getAll(); src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
@@ -17,10 +17,9 @@ /** * 计数器。为cseq进行计数 * * @param method sip 方法 * @return */ Long getCSEQ(String method); Long getCSEQ(); /** * 开始播放时将流存入 src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -42,8 +42,8 @@ private UserSetting userSetting; @Override public Long getCSEQ(String method) { String key = VideoManagerConstants.SIP_CSEQ_PREFIX + userSetting.getServerId() + "_" + method; public Long getCSEQ() { String key = VideoManagerConstants.SIP_CSEQ_PREFIX + userSetting.getServerId(); long result = redis.incr(key, 1L); if (result > Integer.MAX_VALUE) {