From 9c6765d44ef2ccb06fdaf525a06e564a331ab892 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期二, 16 四月 2024 22:10:35 +0800 Subject: [PATCH] 重构多wvp国标级联机制 --- src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java | 106 ++++++++++++++++++++++++++-------------------------- 1 files changed, 53 insertions(+), 53 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 9346eb8..32bf76a 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -18,10 +18,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; -import com.genersoft.iot.vmp.media.zlm.dto.HookType; -import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; -import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; -import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; +import com.genersoft.iot.vmp.media.zlm.dto.*; import com.genersoft.iot.vmp.media.zlm.dto.hook.*; import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; @@ -73,9 +70,6 @@ private AudioBroadcastManager audioBroadcastManager; @Autowired - private ZLMServerFactory zlmServerFactory; - - @Autowired private IPlayService playService; @Autowired @@ -106,9 +100,6 @@ private EventPublisher eventPublisher; @Autowired - private ZLMMediaListManager zlmMediaListManager; - - @Autowired private ZlmHttpHookSubscribe subscribe; @Autowired @@ -124,9 +115,6 @@ private VideoStreamSessionManager sessionManager; @Autowired - private AssistRESTfulUtils assistRESTfulUtils; - - @Autowired private SSRCFactory ssrcFactory; @Qualifier("taskExecutor") @@ -135,6 +123,9 @@ @Autowired private RedisTemplate<Object, Object> redisTemplate; + + @Autowired + private IStreamPushService streamPushService; /** * 鏈嶅姟鍣ㄥ畾鏃朵笂鎶ユ椂闂达紝涓婃姤闂撮殧鍙厤缃紝榛樿10s涓婃姤涓�娆� @@ -147,7 +138,7 @@ taskExecutor.execute(() -> { List<ZlmHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_server_keepalive); - if (subscribes != null && subscribes.size() > 0) { + if (subscribes != null && !subscribes.isEmpty()) { for (ZlmHttpHookSubscribe.Event subscribe : subscribes) { subscribe.response(null, param); } @@ -166,7 +157,7 @@ @PostMapping(value = "/on_play", produces = "application/json;charset=UTF-8") public HookResult onPlay(@RequestBody OnPlayHookParam param) { if (logger.isDebugEnabled()) { - logger.debug("[ZLM HOOK] 鎾斁閴存潈锛歿}->{}" + param.getMediaServerId(), param); + logger.debug("[ZLM HOOK] 鎾斁閴存潈锛歿}->{}", param.getMediaServerId(), param); } String mediaServerId = param.getMediaServerId(); @@ -242,21 +233,14 @@ // 閴存潈閫氳繃 redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), param.getStream(), streamAuthorityInfo); } - } else { - zlmMediaListManager.sendStreamEvent(param.getApp(), param.getStream(), param.getMediaServerId()); } - HookResultForOnPublish result = HookResultForOnPublish.SUCCESS(); result.setEnable_audio(true); taskExecutor.execute(() -> { ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_publish, json); if (subscribe != null) { - if (mediaInfo != null) { - subscribe.response(mediaInfo, param); - } else { - new HookResultForOnPublish(1, "zlm not register"); - } + subscribe.response(mediaInfo, param); } }); @@ -270,7 +254,6 @@ if ("rtp".equals(param.getApp())) { InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, param.getStream()); - // 鍗曠鍙fā寮忎笅淇敼娴� ID if (!mediaInfo.isRtpEnable() && inviteInfo == null) { String ssrc = String.format("%010d", Long.parseLong(param.getStream(), 16)); @@ -278,6 +261,8 @@ if (inviteInfo != null) { result.setStream_replace(inviteInfo.getStream()); logger.info("[ZLM HOOK]鎺ㄦ祦閴存潈 stream: {} 鏇挎崲涓� {}", param.getStream(), inviteInfo.getStream()); + // 鍗曠鍙fā寮忎笅淇敼娴両D涓虹洰鏍囨祦ID锛屼笉鐒跺叾浠栧湴鏂瑰彲鑳介兘鏃犳硶瀵瑰簲 + param.setStream(inviteInfo.getStream()); } } @@ -474,8 +459,7 @@ || param.getOriginType() == OriginType.RTMP_PUSH.ordinal() || param.getOriginType() == OriginType.RTC_PUSH.ordinal()) { param.setSeverId(userSetting.getServerId()); - zlmMediaListManager.addPush(param); - + streamPushService.updatePush(param); // 鍐椾綑鏁版嵁锛岃嚜宸辩郴缁熶腑鑷敤 redisCatchStorage.addPushListItem(param.getApp(), param.getStream(), param); } @@ -492,10 +476,13 @@ } } GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream()); - if (gbStream != null) { -// eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF); + // 鏌ユ壘鏄惁鍏宠仈浜嗗浗鏍囷紝 鍏宠仈浜嗕笉鍒犻櫎锛� 缃负绂荤嚎 + if (gbStream == null) { + storager.removeMedia(param.getApp(), param.getStream()); + }else { +// eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF); + storager.mediaOffline(param.getApp(), param.getStream()); } - zlmMediaListManager.removeMedia(param.getApp(), param.getStream()); } GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream()); if (gbStream != null) { @@ -519,32 +506,46 @@ List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(param.getStream()); if (!sendRtpItems.isEmpty()) { for (SendRtpItem sendRtpItem : sendRtpItems) { - if (sendRtpItem != null && sendRtpItem.getApp().equals(param.getApp())) { - String platformId = sendRtpItem.getPlatformId(); - ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId); - Device device = deviceService.getDevice(platformId); + if (sendRtpItem == null) { + continue; + } - try { - if (platform != null) { - commanderFroPlatform.streamByeCmd(platform, sendRtpItem); - redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(), - sendRtpItem.getCallId(), sendRtpItem.getStream()); - } else { - cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId()); - if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST) - || sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) { - AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); - if (audioBroadcastCatch != null) { - // 鏉ヨ嚜涓婄骇骞冲彴鐨勫仠姝㈠璁� - logger.info("[鍋滄瀵硅] 鏉ヨ嚜涓婄骇锛屽钩鍙帮細{}, 閫氶亾锛歿}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); - audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + if (sendRtpItem.getApp().equals(param.getApp())) { + logger.info(sendRtpItem.toString()); + if (userSetting.getServerId().equals(sendRtpItem.getServerId())) { + MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, + sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), + sendRtpItem.getPlatformId(), null, userSetting.getServerId(), param.getMediaServerId()); + // 閫氱煡鍏朵粬wvp鍋滄鍙戞祦 + redisCatchStorage.sendPushStreamClose(messageForPushChannel); + }else { + String platformId = sendRtpItem.getPlatformId(); + ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId); + Device device = deviceService.getDevice(platformId); + + try { + if (platform != null) { + commanderFroPlatform.streamByeCmd(platform, sendRtpItem); + redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(), + sendRtpItem.getCallId(), sendRtpItem.getStream()); + } else { + cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId()); + if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST) + || sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) { + AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + if (audioBroadcastCatch != null) { + // 鏉ヨ嚜涓婄骇骞冲彴鐨勫仠姝㈠璁� + logger.info("[鍋滄瀵硅] 鏉ヨ嚜涓婄骇锛屽钩鍙帮細{}, 閫氶亾锛歿}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + } } } + } catch (SipException | InvalidArgumentException | ParseException | + SsrcTransactionNotFoundException e) { + logger.error("[鍛戒护鍙戦�佸け璐 鍙戦�丅YE: {}", e.getMessage()); } - } catch (SipException | InvalidArgumentException | ParseException | - SsrcTransactionNotFoundException e) { - logger.error("[鍛戒护鍙戦�佸け璐 鍙戦�丅YE: {}", e.getMessage()); } + } } } @@ -798,7 +799,7 @@ logger.info("[ZLM HOOK] zlm 鍚姩 " + zlmServerConfig.getGeneralMediaServerId()); taskExecutor.execute(() -> { List<ZlmHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_server_started); - if (subscribes != null && subscribes.size() > 0) { + if (subscribes != null && !subscribes.isEmpty()) { for (ZlmHttpHookSubscribe.Event subscribe : subscribes) { subscribe.response(null, zlmServerConfig); } @@ -847,12 +848,11 @@ */ @ResponseBody @PostMapping(value = "/on_rtp_server_timeout", produces = "application/json;charset=UTF-8") - public HookResult onRtpServerTimeout(HttpServletRequest request, @RequestBody OnRtpServerTimeoutHookParam + public HookResult onRtpServerTimeout(@RequestBody OnRtpServerTimeoutHookParam param) { logger.info("[ZLM HOOK] rtpServer鏀舵祦瓒呮椂锛歿}->{}({})", param.getMediaServerId(), param.getStream_id(), param.getSsrc()); taskExecutor.execute(() -> { - JSONObject json = (JSONObject) JSON.toJSON(param); List<ZlmHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_rtp_server_timeout); if (subscribes != null && !subscribes.isEmpty()) { for (ZlmHttpHookSubscribe.Event subscribe : subscribes) { -- Gitblit v1.8.0