src/main/java/com/genersoft/iot/vmp/gb28181/event/platformKeepaliveExpire/PlatformKeepaliveExpireEventLister.java
@@ -64,6 +64,7 @@ // 有3次未收到心跳回复, 设置平台状态为离线, 开始重新注册 logger.warn("有3次未收到心跳回复,标记设置平台状态为离线, 并重新注册 平台国标ID:" + event.getPlatformGbID()); publisher.platformNotRegisterEventPublish(event.getPlatformGbID()); parentPlatformCatch.setKeepAliveReply(0); }else { // 再次发送心跳 String callId = sipCommanderForPlatform.keepalive(parentPlatform); src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java
@@ -154,6 +154,8 @@ } else if (Request.BYE.equals(method)) { ByeRequestProcessor processor = new ByeRequestProcessor(); processor.setRequestEvent(evt); processor.setRedisCatchStorage(redisCatchStorage); processor.setZlmrtpServerFactory(zlmrtpServerFactory); return processor; } else if (Request.CANCEL.equals(method)) { CancelRequestProcessor processor = new CancelRequestProcessor(); src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
@@ -893,7 +893,7 @@ catalogXml.append("</Query>\r\n"); String tm = Long.toString(System.currentTimeMillis()); Request request = headerProvider.createMessageRequest(device, catalogXml.toString(), "ViaDeviceInfoBranch", "FromDev" + tm, null); Request request = headerProvider.createMessageRequest(device, catalogXml.toString(), "z9hG4bK-ViaDeviceInfo" + tm, "FromDev" + tm, null); transmitRequest(device, request); @@ -923,7 +923,7 @@ catalogXml.append("</Query>\r\n"); String tm = Long.toString(System.currentTimeMillis()); Request request = headerProvider.createMessageRequest(device, catalogXml.toString(), "ViaCatalogBranch", "FromCat" + tm, null); Request request = headerProvider.createMessageRequest(device, catalogXml.toString(), "z9hG4bK-ViaCatalog" + tm, "FromCat" + tm, null); transmitRequest(device, request, errorEvent); } catch (SipException | ParseException | InvalidArgumentException e) { src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
@@ -118,18 +118,18 @@ try { StringBuffer keepaliveXml = new StringBuffer(200); keepaliveXml.append("<?xml version=\"1.0\" encoding=\"GB2312\" ?>\r\n"); keepaliveXml.append("<?xml version=\"1.0\"?>\r\n");//" encoding=\"GB2312\"?>\r\n"); keepaliveXml.append("<Notify>\r\n"); keepaliveXml.append("<CmdType>Keepalive</CmdType>\r\n"); keepaliveXml.append("<SN>" + (int)((Math.random()*9+1)*100000) + "</SN>\r\n"); keepaliveXml.append("<DeviceID>" + parentPlatform.getServerGBId() + "</DeviceID>\r\n"); keepaliveXml.append("<DeviceID>" + parentPlatform.getDeviceGBId() + "</DeviceID>\r\n"); keepaliveXml.append("<Status>OK</Status>\r\n"); keepaliveXml.append("</Notify>\r\n"); Request request = headerProviderPlarformProvider.createKeetpaliveMessageRequest( parentPlatform, keepaliveXml.toString(), UUID.randomUUID().toString().replace("-", ""), "z9hG4bK-" + UUID.randomUUID().toString().replace("-", ""), UUID.randomUUID().toString().replace("-", ""), null); transmitRequest(parentPlatform, request); src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/AckRequestProcessor.java
@@ -22,12 +22,9 @@ @Component public class AckRequestProcessor extends SIPRequestAbstractProcessor { //@Autowired private IRedisCatchStorage redisCatchStorage; //@Autowired private ZLMRTPServerFactory zlmrtpServerFactory; /** * 处理 ACK请求 @@ -49,6 +46,8 @@ String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; String deviceId = sendRtpItem.getDeviceId(); StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); sendRtpItem.setStreamId(streamInfo.getStreamId()); redisCatchStorage.updateSendRTPSever(sendRtpItem); System.out.println(platformGbId); System.out.println(channelId); Map<String, Object> param = new HashMap<>(); @@ -68,11 +67,16 @@ if (System.currentTimeMillis() - startTime < 30 * 1000) { if (zlmrtpServerFactory.isRtpReady(streamInfo.getStreamId())) { rtpPushed = true; System.out.println("已获取设备推流,开始向上级推流"); zlmrtpServerFactory.startSendRtpStream(param); } else { System.out.println("等待设备推流......."); Thread.sleep(2000); continue; } } else { rtpPushed = true; System.out.println("设备推流超时,终止向上级推流"); } } catch (InterruptedException e) { e.printStackTrace(); @@ -108,5 +112,4 @@ public void setZlmrtpServerFactory(ZLMRTPServerFactory zlmrtpServerFactory) { this.zlmrtpServerFactory = zlmrtpServerFactory; } } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/ByeRequestProcessor.java
@@ -1,13 +1,20 @@ package com.genersoft.iot.vmp.gb28181.transmit.request.impl; import javax.sip.Dialog; import javax.sip.DialogState; import javax.sip.InvalidArgumentException; import javax.sip.RequestEvent; import javax.sip.SipException; import javax.sip.message.Response; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.gb28181.transmit.request.SIPRequestAbstractProcessor; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import java.text.ParseException; import java.util.HashMap; import java.util.Map; /** * @Description: BYE请求处理器 @@ -15,6 +22,10 @@ * @date: 2020年5月3日 下午5:32:05 */ public class ByeRequestProcessor extends SIPRequestAbstractProcessor { private IRedisCatchStorage redisCatchStorage; private ZLMRTPServerFactory zlmrtpServerFactory; /** * 处理BYE请求 @@ -24,6 +35,22 @@ public void process(RequestEvent evt) { try { responseAck(evt); Dialog dialog = evt.getDialog(); if (dialog == null) return; if (dialog.getState().equals(DialogState.TERMINATED)) { String remoteUri = dialog.getRemoteParty().getURI().toString(); String localUri = dialog.getLocalParty().getURI().toString(); String platformGbId = remoteUri.substring(remoteUri.indexOf(":") + 1, remoteUri.indexOf("@")); String channelId = localUri.substring(remoteUri.indexOf(":") + 1, remoteUri.indexOf("@")); SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId); String streamId = sendRtpItem.getStreamId(); Map<String, Object> param = new HashMap<>(); param.put("vhost","__defaultVhost__"); param.put("app","rtp"); param.put("stream",streamId); System.out.println("停止向上级推流:" + streamId); zlmrtpServerFactory.stopSendRtpStream(param); } } catch (SipException e) { e.printStackTrace(); } catch (InvalidArgumentException e) { @@ -47,4 +74,19 @@ getServerTransaction(evt).sendResponse(response); } public IRedisCatchStorage getRedisCatchStorage() { return redisCatchStorage; } public void setRedisCatchStorage(IRedisCatchStorage redisCatchStorage) { this.redisCatchStorage = redisCatchStorage; } public ZLMRTPServerFactory getZlmrtpServerFactory() { return zlmrtpServerFactory; } public void setZlmrtpServerFactory(ZLMRTPServerFactory zlmrtpServerFactory) { this.zlmrtpServerFactory = zlmrtpServerFactory; } } src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java
@@ -123,4 +123,8 @@ public JSONObject startSendRtp(Map<String, Object> param) { return sendPost("startSendRtp",param); } public JSONObject stopSendRtp(Map<String, Object> param) { return sendPost("stopSendRtp",param); } } src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
@@ -127,46 +127,46 @@ } /** * * 调用zlm RESTful API —— startSendRtp */ public Boolean startSendRtpStream(Map<String, Object>param) { Boolean result = false; JSONObject jsonObject = zlmresTfulUtils.startSendRtp(param); System.out.println(jsonObject); if (jsonObject != null) { switch (jsonObject.getInteger("code")){ case 0: if (jsonObject == null) { logger.error("RTP推流失败: 请检查ZLM服务"); } else if (jsonObject.getInteger("code") == 0) { result= true; logger.error("RTP推流请求成功,本地推流端口:" + jsonObject.getString("local_port")); break; // case -300: // id已经存在 // result = false; // break; // case -400: // 端口占用 // result= false; // break; default: logger.error("RTP推流失败: " + jsonObject.getString("msg")); break; } }else { // 检查ZLM状态 logger.error("RTP推流失败: 请检查ZLM服务"); logger.error("RTP推流失败: " + jsonObject.getString("msg")); } return result; } /** * * 查询待转推的流是否就绪 */ public Boolean isRtpReady(String streamId) { JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo("rtp", "rtmp", streamId); if (mediaInfo.getInteger("code") == 0 && mediaInfo.getBoolean("online")) { logger.info("设备RTP推流成功"); return true; } else { logger.info("设备RTP推流未完成"); return false; return (mediaInfo.getInteger("code") == 0 && mediaInfo.getBoolean("online")); } /** * 调用zlm RESTful API —— stopSendRtp */ public Boolean stopSendRtpStream(Map<String, Object>param) { Boolean result = false; JSONObject jsonObject = zlmresTfulUtils.stopSendRtp(param); System.out.println(jsonObject); if (jsonObject == null) { logger.error("停止RTP推流失败: 请检查ZLM服务"); } else if (jsonObject.getInteger("code") == 0) { result= true; logger.error("停止RTP推流成功"); } else { logger.error("停止RTP推流失败: " + jsonObject.getString("msg")); } return result; } }