src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
@@ -670,6 +670,7 @@ try { SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(deviceId, channelId, null, stream); ClientTransaction transaction = streamSession.getTransactionByStream(deviceId, channelId, stream); if (transaction == null) { logger.warn("[ {} -> {}]停止视频流的时候发现事务已丢失", deviceId, channelId); SipSubscribe.EventResult<Object> eventResult = new SipSubscribe.EventResult<>(); @@ -685,7 +686,12 @@ if (stream == null) return; dialog = streamSession.getDialogByStream(deviceId, channelId, stream); } if (ssrcTransaction != null) { MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransaction.getMediaServerId()); mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransaction.getSsrc()); mediaServerService.closeRTPServer(deviceId, channelId, ssrcTransaction.getStream()); streamSession.remove(deviceId, channelId, ssrcTransaction.getStream()); } if (dialog == null) { logger.warn("[ {} -> {}]停止视频流的时候发现对话已丢失", deviceId, channelId); @@ -730,12 +736,6 @@ dialog.sendRequest(clientTransaction); if (ssrcTransaction != null) { MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransaction.getMediaServerId()); mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransaction.getSsrc()); mediaServerService.closeRTPServer(deviceId, channelId, ssrcTransaction.getStream()); streamSession.remove(deviceId, channelId, ssrcTransaction.getStream()); } } catch (SipException | ParseException e) { e.printStackTrace(); } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -372,12 +372,12 @@ } } if (playTransaction == null) { SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, true); String streamId = null; if (mediaServerItem.isRtpEnable()) { sendRtpItem.setStreamId(String.format("%s_%s", device.getDeviceId(), channelId)); }else { sendRtpItem.setStreamId(ssrcInfo.getStream()); streamId = String.format("%s_%s", device.getDeviceId(), channelId); } SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, true); sendRtpItem.setStreamId(ssrcInfo.getStream()); // 写入redis, 超时时回复 redisCatchStorage.updateSendRTPSever(sendRtpItem); playService.play(mediaServerItem, ssrcInfo, device, channelId, hookEvent, errorEvent, (code, msg)->{ src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java
@@ -100,7 +100,6 @@ subscribeHolder.removeCatalogSubscribe(platformGBId); subscribeHolder.removeMobilePositionSubscribe(platformGBId); } } } src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -191,7 +191,7 @@ JSONObject ret = new JSONObject(); ret.put("code", 0); ret.put("msg", "success"); ret.put("enableHls", true); ret.put("enable_hls", true); String mediaServerId = json.getString("mediaServerId"); ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_publish, json); if (subscribe != null) { @@ -206,9 +206,9 @@ String app = json.getString("app"); String stream = json.getString("stream"); if ("rtp".equals(app)) { ret.put("enableMP4", userSetup.getRecordSip()); ret.put("enable_mp4", userSetup.getRecordSip()); }else { ret.put("enableMP4", userSetup.isRecordPushLive()); ret.put("enable_mp4", userSetup.isRecordPushLive()); } List<SsrcTransaction> ssrcTransactionForAll = sessionManager.getSsrcTransactionForAll(null, null, null, stream); if (ssrcTransactionForAll != null && ssrcTransactionForAll.size() == 1) { src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
@@ -10,8 +10,7 @@ import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import java.util.HashMap; import java.util.Map; import java.util.*; @Component public class ZLMRTPServerFactory { @@ -23,54 +22,80 @@ private int[] portRangeArray = new int[2]; public int createRTPServer(MediaServerItem mediaServerItem, String streamId) { Map<String, Integer> currentStreams = new HashMap<>(); public int getFreePort(MediaServerItem mediaServerItem, int startPort, int endPort, List<Integer> usedFreelist) { if (endPort <= startPort) return -1; if (usedFreelist == null) { usedFreelist = new ArrayList<>(); } JSONObject listRtpServerJsonResult = zlmresTfulUtils.listRtpServer(mediaServerItem); if (listRtpServerJsonResult != null) { JSONArray data = listRtpServerJsonResult.getJSONArray("data"); if (data != null) { for (int i = 0; i < data.size(); i++) { JSONObject dataItem = data.getJSONObject(i); currentStreams.put(dataItem.getString("stream_id"), dataItem.getInteger("port")); usedFreelist.add(dataItem.getInteger("port")); } } } // 已经在推流 if (currentStreams.get(streamId) != null) { Map<String, Object> closeRtpServerParam = new HashMap<>(); closeRtpServerParam.put("stream_id", streamId); zlmresTfulUtils.closeRtpServer(mediaServerItem, closeRtpServerParam); currentStreams.remove(streamId); } Map<String, Object> param = new HashMap<>(); int result = -1; // 不设置推流端口端则使用随机端口 if (!StringUtils.isEmpty(mediaServerItem.getSendRtpPortRange())){ int newPort = getPortFromportRange(mediaServerItem); param.put("port", newPort); // 设置推流端口 if (startPort%2 == 1) { startPort ++; } boolean checkPort = false; for (int i = startPort; i < endPort + 1; i+=2) { if (!usedFreelist.contains(i)){ checkPort = true; startPort = i; break; } } if (!checkPort) { logger.warn("未找到节点{}上范围[{}-{}]的空闲端口", mediaServerItem.getId(), startPort, endPort); return -1; } param.put("port", startPort); String stream = UUID.randomUUID().toString(); param.put("enable_tcp", 1); param.put("stream_id", streamId); param.put("stream_id", stream); param.put("port", 0); JSONObject openRtpServerResultJson = zlmresTfulUtils.openRtpServer(mediaServerItem, param); if (openRtpServerResultJson != null) { switch (openRtpServerResultJson.getInteger("code")){ case 0: result= openRtpServerResultJson.getInteger("port"); break; case -300: // id已经存在, 可能已经在其他端口推流 Map<String, Object> closeRtpServerParam = new HashMap<>(); closeRtpServerParam.put("stream_id", streamId); zlmresTfulUtils.closeRtpServer(mediaServerItem, closeRtpServerParam); result = createRTPServer(mediaServerItem, streamId);; break; case -400: // 端口占用 result= createRTPServer(mediaServerItem, streamId); break; default: logger.error("创建RTP Server 失败 {}: " + openRtpServerResultJson.getString("msg"), param.get("port")); break; if (openRtpServerResultJson.getInteger("code") == 0) { result= openRtpServerResultJson.getInteger("port"); Map<String, Object> closeRtpServerParam = new HashMap<>(); closeRtpServerParam.put("stream_id", stream); zlmresTfulUtils.closeRtpServer(mediaServerItem, closeRtpServerParam); }else { usedFreelist.add(startPort); startPort +=2; result = getFreePort(mediaServerItem, startPort, endPort,usedFreelist); } }else { // 检查ZLM状态 logger.error("创建RTP Server 失败 {}: 请检查ZLM服务", param.get("port")); } return result; } public int createRTPServer(MediaServerItem mediaServerItem, String streamId) { Map<String, Object> param = new HashMap<>(); int result = -1; // 推流端口设置0则使用随机端口 param.put("enable_tcp", 1); param.put("stream_id", streamId); param.put("port", 0); JSONObject openRtpServerResultJson = zlmresTfulUtils.openRtpServer(mediaServerItem, param); if (openRtpServerResultJson != null) { if (openRtpServerResultJson.getInteger("code") == 0) { result= openRtpServerResultJson.getInteger("port"); }else { logger.error("创建RTP Server 失败 {}: ", openRtpServerResultJson.getString("msg")); } }else { // 检查ZLM状态 @@ -99,32 +124,32 @@ return result; } private int getPortFromportRange(MediaServerItem mediaServerItem) { int currentPort = mediaServerItem.getCurrentPort(); if (currentPort == 0) { String[] portRangeStrArray = mediaServerItem.getSendRtpPortRange().split(","); if (portRangeStrArray.length != 2) { portRangeArray[0] = 30000; portRangeArray[1] = 30500; }else { portRangeArray[0] = Integer.parseInt(portRangeStrArray[0]); portRangeArray[1] = Integer.parseInt(portRangeStrArray[1]); } } if (currentPort == 0 || currentPort++ > portRangeArray[1]) { currentPort = portRangeArray[0]; mediaServerItem.setCurrentPort(currentPort); return portRangeArray[0]; } else { if (currentPort % 2 == 1) { currentPort++; } currentPort++; mediaServerItem.setCurrentPort(currentPort); return currentPort; } } // private int getPortFromportRange(MediaServerItem mediaServerItem) { // int currentPort = mediaServerItem.getCurrentPort(); // if (currentPort == 0) { // String[] portRangeStrArray = mediaServerItem.getSendRtpPortRange().split(","); // if (portRangeStrArray.length != 2) { // portRangeArray[0] = 30000; // portRangeArray[1] = 30500; // }else { // portRangeArray[0] = Integer.parseInt(portRangeStrArray[0]); // portRangeArray[1] = Integer.parseInt(portRangeStrArray[1]); // } // } // // if (currentPort == 0 || currentPort++ > portRangeArray[1]) { // currentPort = portRangeArray[0]; // mediaServerItem.setCurrentPort(currentPort); // return portRangeArray[0]; // } else { // if (currentPort % 2 == 1) { // currentPort++; // } // currentPort++; // mediaServerItem.setCurrentPort(currentPort); // return currentPort; // } // } /** * 创建一个国标推流 @@ -139,13 +164,18 @@ public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, String deviceId, String channelId, boolean tcp){ // 使用RTPServer 功能找一个可用的端口 String playSsrc = serverItem.getSsrcConfig().getPlaySsrc(); int localPort = createRTPServer(serverItem, playSsrc); if (localPort != -1) { // TODO 高并发时可能因为未放入缓存而ssrc冲突 serverItem.getSsrcConfig().releaseSsrc(playSsrc); closeRTPServer(serverItem, playSsrc); String sendRtpPortRange = serverItem.getSendRtpPortRange(); if (StringUtils.isEmpty(sendRtpPortRange)) { return null; } String[] portRangeStrArray = serverItem.getSendRtpPortRange().split(","); int localPort = -1; if (portRangeStrArray.length != 2) { localPort = getFreePort(serverItem, 30000, 30500, null); }else { localPort = getFreePort(serverItem, Integer.parseInt(portRangeStrArray[0]), Integer.parseInt(portRangeStrArray[1]), null); } if (localPort == -1) { logger.error("没有可用的端口"); return null; } @@ -174,13 +204,19 @@ * @return SendRtpItem */ public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, String app, String stream, String channelId, boolean tcp){ String playSsrc = serverItem.getSsrcConfig().getPlaySsrc(); int localPort = createRTPServer(serverItem, playSsrc); if (localPort != -1) { // TODO 高并发时可能因为未放入缓存而ssrc冲突 serverItem.getSsrcConfig().releaseSsrc(ssrc); closeRTPServer(serverItem, playSsrc); // 使用RTPServer 功能找一个可用的端口 String sendRtpPortRange = serverItem.getSendRtpPortRange(); if (StringUtils.isEmpty(sendRtpPortRange)) { return null; } String[] portRangeStrArray = serverItem.getSendRtpPortRange().split(","); int localPort = -1; if (portRangeStrArray.length != 2) { localPort = getFreePort(serverItem, 30000, 30500, null); }else { localPort = getFreePort(serverItem, Integer.parseInt(portRangeStrArray[0]), Integer.parseInt(portRangeStrArray[1]), null); } if (localPort == -1) { logger.error("没有可用的端口"); return null; } @@ -199,7 +235,7 @@ } /** * 调用zlm RESTful API —— startSendRtp * 调用zlm RESTFUL API —— startSendRtp */ public JSONObject startSendRtpStream(MediaServerItem mediaServerItem, Map<String, Object>param) { Boolean result = false; @@ -208,9 +244,9 @@ logger.error("RTP推流失败: 请检查ZLM服务"); } else if (jsonObject.getInteger("code") == 0) { result= true; logger.info("RTP推流[ {}/{} ]请求成功,本地推流端口:{}" ,param.get("app"), param.get("stream"), jsonObject.getString("local_port")); logger.info("RTP推流成功[ {}/{} ],本地推流端口:{}" ,param.get("app"), param.get("stream"), jsonObject.getString("local_port")); } else { logger.error("RTP推流失败: " + jsonObject.getString("msg")); logger.error("RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"),JSONObject.toJSON(param)); } return jsonObject; } @@ -265,7 +301,7 @@ result= true; logger.info("停止RTP推流成功"); } else { logger.error("停止RTP推流失败: " + jsonObject.getString("msg")); logger.error("RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"),JSONObject.toJSON(param)); } return result; } src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerConfig.java
@@ -194,6 +194,9 @@ @JSONField(name = "rtp_proxy.port") private int rtpProxyPort; @JSONField(name = "rtp_proxy.port_range") private String portRange; @JSONField(name = "rtp_proxy.timeoutSec") private String rtpProxyTimeoutSec; @@ -802,4 +805,12 @@ public void setHookAliveInterval(int hookAliveInterval) { this.hookAliveInterval = hookAliveInterval; } public String getPortRange() { return portRange; } public void setPortRange(String portRange) { this.portRange = portRange; } } src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaServerItem.java
@@ -91,7 +91,7 @@ streamNoneReaderDelayMS = zlmServerConfig.getGeneralStreamNoneReaderDelayMS(); hookAliveInterval = zlmServerConfig.getHookAliveInterval(); rtpEnable = false; // 默认使用单端口;直到用户自己设置开启多端口 rtpPortRange = "30000,30500"; // 默认使用30000,30500作为级联时发送流的端口号 rtpPortRange = zlmServerConfig.getPortRange().replace("_",","); // 默认使用30000,30500作为级联时发送流的端口号 sendRtpPortRange = "30000,30500"; // 默认使用30000,30500作为级联时发送流的端口号 recordAssistPort = 0; // 默认关闭 src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
@@ -521,6 +521,9 @@ // 最多等待未初始化的Track时间,单位毫秒,超时之后会忽略未初始化的Track, 设置此选项优化那些音频错误的不规范流, // 等zlm支持给每个rtpServer设置关闭音频的时候可以不设置此选项 param.put("general.wait_track_ready_ms", "3000" ); if (mediaServerItem.isRtpEnable() && !StringUtils.isEmpty(mediaServerItem.getRtpPortRange())) { param.put("rtp_proxy.port_range", mediaServerItem.getRtpPortRange().replace(",", "-")); } JSONObject responseJSON = zlmresTfulUtils.setServerConfig(mediaServerItem, param); src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
@@ -411,38 +411,38 @@ // 遍历存储结果,查找app+Stream->platformId+catalogId的对应关系,然后执行批量写入 for (StreamPushItem streamPushItem : streamPushItemsForPlatform) { List<String[]> platFormInfoList = streamPushItemsForAll.get(streamPushItem.getApp() + streamPushItem.getStream()); if (platFormInfoList != null) { if (platFormInfoList.size() > 0) { for (String[] platFormInfoArray : platFormInfoList) { StreamPushItem streamPushItemForPlatform = new StreamPushItem(); streamPushItemForPlatform.setGbStreamId(streamPushItem.getGbStreamId()); if (platFormInfoArray.length > 0) { // 数组 platFormInfoArray 0 为平台ID。 1为目录ID // 不存在这个平台,则忽略导入此关联关系 if (platformInfoMap.get(platFormInfoArray[0]) == null || platformInfoMap.get(platFormInfoArray[0]).get(platFormInfoArray[1]) == null) { logger.info("导入数据时不存在平台或目录{}/{},已导入未分配", platFormInfoArray[0], platFormInfoArray[1] ); continue; } streamPushItemForPlatform.setPlatformId(platFormInfoArray[0]); List<GbStream> gbStreamList = platformForEvent.get(streamPushItem.getPlatformId()); if (gbStreamList == null) { gbStreamList = new ArrayList<>(); platformForEvent.put(platFormInfoArray[0], gbStreamList); } // 为发送通知整理数据 streamPushItemForPlatform.setName(streamPushItem.getName()); streamPushItemForPlatform.setApp(streamPushItem.getApp()); streamPushItemForPlatform.setStream(streamPushItem.getStream()); streamPushItemForPlatform.setGbId(streamPushItem.getGbId()); gbStreamList.add(streamPushItemForPlatform); if (platFormInfoList != null && platFormInfoList.size() > 0) { for (String[] platFormInfoArray : platFormInfoList) { StreamPushItem streamPushItemForPlatform = new StreamPushItem(); streamPushItemForPlatform.setGbStreamId(streamPushItem.getGbStreamId()); if (platFormInfoArray.length > 0) { // 数组 platFormInfoArray 0 为平台ID。 1为目录ID // 不存在这个平台,则忽略导入此关联关系 if (platformInfoMap.get(platFormInfoArray[0]) == null || platformInfoMap.get(platFormInfoArray[0]).get(platFormInfoArray[1]) == null) { logger.info("导入数据时不存在平台或目录{}/{},已导入未分配", platFormInfoArray[0], platFormInfoArray[1] ); continue; } if (platFormInfoArray.length > 1) { streamPushItemForPlatform.setCatalogId(platFormInfoArray[1]); streamPushItemForPlatform.setPlatformId(platFormInfoArray[0]); if (platFormInfoArray[0].equals("34020000002110000001")) { System.out.println(111); } streamPushItemListFroPlatform.add(streamPushItemForPlatform); List<GbStream> gbStreamList = platformForEvent.get(platFormInfoArray[0]); if (gbStreamList == null) { gbStreamList = new ArrayList<>(); platformForEvent.put(platFormInfoArray[0], gbStreamList); } // 为发送通知整理数据 streamPushItemForPlatform.setName(streamPushItem.getName()); streamPushItemForPlatform.setApp(streamPushItem.getApp()); streamPushItemForPlatform.setStream(streamPushItem.getStream()); streamPushItemForPlatform.setGbId(streamPushItem.getGbId()); gbStreamList.add(streamPushItemForPlatform); } if (platFormInfoArray.length > 1) { streamPushItemForPlatform.setCatalogId(platFormInfoArray[1]); } streamPushItemListFroPlatform.add(streamPushItemForPlatform); } } src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java
@@ -885,12 +885,11 @@ List<ParentPlatform> parentPlatforms = parentPlatformMapper.selectAllAhareAllLiveStream(); if (parentPlatforms.size() > 0) { for (ParentPlatform parentPlatform : parentPlatforms) { streamPushItem.setCatalogId(parentPlatform.getCatalogId()); streamPushItem.setPlatformId(parentPlatform.getServerGBId()); String stream = streamPushItem.getStream(); StreamProxyItem streamProxyItems = platformGbStreamMapper.selectOne(streamPushItem.getApp(), stream, StreamProxyItem streamProxyItems = platformGbStreamMapper.selectOne(streamPushItem.getApp(), streamPushItem.getStream(), parentPlatform.getServerGBId()); if (streamProxyItems == null) { streamPushItem.setCatalogId(parentPlatform.getCatalogId()); streamPushItem.setPlatformId(parentPlatform.getServerGBId()); platformGbStreamMapper.add(streamPushItem); eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), streamPushItem, CatalogEvent.ADD); } src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java
@@ -109,7 +109,6 @@ // 录像查询以channelId作为deviceId查询 String key = DeferredResultHolder.CALLBACK_CMD_STOP + deviceId + channelId; resultHolder.put(key, uuid, result); Device device = storager.queryVideoDevice(deviceId); StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); if (streamInfo == null) { RequestMessage msg = new RequestMessage(); @@ -120,15 +119,14 @@ storager.stopPlay(deviceId, channelId); return result; } cmder.streamByeCmd(deviceId, channelId, streamInfo.getStream(), null, (event) -> { cmder.streamByeCmd(deviceId, channelId, streamInfo.getStream(), null, eventResult -> { redisCatchStorage.stopPlay(streamInfo); storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId()); RequestMessage msg = new RequestMessage(); msg.setId(uuid); msg.setKey(key); //Response response = event.getResponse(); msg.setData(String.format("success")); resultHolder.invokeAllResult(msg); RequestMessage msgForSuccess = new RequestMessage(); msgForSuccess.setId(uuid); msgForSuccess.setKey(key); msgForSuccess.setData(String.format("success")); resultHolder.invokeAllResult(msgForSuccess); }); if (deviceId != null || channelId != null) { src/main/java/com/genersoft/iot/vmp/vmanager/streamPush/StreamPushController.java
@@ -168,16 +168,6 @@ result.setResult(ResponseEntity.status(HttpStatus.BAD_REQUEST).body(wvpResult)); return result; } // if (!file.getContentType().endsWith(".xls") // && !file.getContentType().endsWith(".csv") // && !file.getContentType().endsWith(".xlsx") ) { // logger.warn("通道导入文件类型错误: {}",file.getContentType() ); // WVPResult<Object> wvpResult = new WVPResult<>(); // wvpResult.setCode(-1); // wvpResult.setMsg("文件类型错误,请使用"); // result.setResult(ResponseEntity.status(HttpStatus.BAD_REQUEST).body(wvpResult)); // return result; // } // 同时只处理一个文件 if (resultHolder.exist(key, null)) { logger.warn("已有导入任务正在执行"); src/main/resources/all-application.yml
@@ -147,7 +147,7 @@ rtp: # [可选] 是否启用多端口模式, 开启后会在portRange范围内选择端口用于媒体流传输 enable: true # [可选] 在此范围内选择端口用于媒体流传输, # [可选] 在此范围内选择端口用于媒体流传输, 必须提前在zlm上配置该属性,不然自动配置此属性可能不成功 port-range: 30000,30500 # 端口范围 # [可选] 国标级联在此范围内选择端口发送媒体流, send-port-range: 30000,30500 # 端口范围 src/main/resources/application-dev.yml
@@ -76,7 +76,7 @@ rtp: # [可选] 是否启用多端口模式, 开启后会在portRange范围内选择端口用于媒体流传输 enable: true # [可选] 在此范围内选择端口用于媒体流传输, # [可选] 在此范围内选择端口用于媒体流传输, 必须提前在zlm上配置该属性,不然自动配置此属性可能不成功 port-range: 30000,30500 # 端口范围 # [可选] 国标级联在此范围内选择端口发送媒体流, send-port-range: 30000,30500 # 端口范围 web_src/src/components/dialog/chooseChannelForStream.vue
@@ -174,7 +174,7 @@ page: that.currentPage, count: that.count, query: that.searchSrt, pushing: that.online, pushing: that.pushing, platformId: that.platformId, catalogId: that.catalogId, mediaServerId: that.mediaServerId