From c1ac16bf620d09a94a0cfe184cbe1c9cb531a01b Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期六, 23 三月 2024 15:38:54 +0800 Subject: [PATCH] 优化媒体节点服务的代码结构 --- src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java | 149 ++++++++++++++++++++++++------------------------- 1 files changed, 74 insertions(+), 75 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index b6cd90e..a247a78 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -10,19 +10,20 @@ import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; -import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; +import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; -import com.genersoft.iot.vmp.media.zlm.dto.HookType; -import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; -import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; -import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; +import com.genersoft.iot.vmp.media.bean.MediaInfo; +import com.genersoft.iot.vmp.media.service.IMediaServerService; +import com.genersoft.iot.vmp.media.zlm.dto.*; import com.genersoft.iot.vmp.media.zlm.dto.hook.*; +import com.genersoft.iot.vmp.media.zlm.event.HookZlmServerKeepaliveEvent; +import com.genersoft.iot.vmp.media.zlm.event.HookZlmServerStartEvent; import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.bean.SSRCInfo; @@ -37,6 +38,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.util.ObjectUtils; @@ -71,9 +73,6 @@ @Autowired private AudioBroadcastManager audioBroadcastManager; - - @Autowired - private ZLMServerFactory zlmServerFactory; @Autowired private IPlayService playService; @@ -124,9 +123,6 @@ private VideoStreamSessionManager sessionManager; @Autowired - private AssistRESTfulUtils assistRESTfulUtils; - - @Autowired private SSRCFactory ssrcFactory; @Qualifier("taskExecutor") @@ -136,6 +132,9 @@ @Autowired private RedisTemplate<Object, Object> redisTemplate; + @Autowired + private ApplicationEventPublisher applicationEventPublisher; + /** * 鏈嶅姟鍣ㄥ畾鏃朵笂鎶ユ椂闂达紝涓婃姤闂撮殧鍙厤缃紝榛樿10s涓婃姤涓�娆� */ @@ -143,18 +142,16 @@ @PostMapping(value = "/on_server_keepalive", produces = "application/json;charset=UTF-8") public HookResult onServerKeepalive(@RequestBody OnServerKeepaliveHookParam param) { - - - taskExecutor.execute(() -> { - List<ZlmHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_server_keepalive); - if (subscribes != null && subscribes.size() > 0) { - for (ZlmHttpHookSubscribe.Event subscribe : subscribes) { - subscribe.response(null, param); - } + try { + HookZlmServerKeepaliveEvent event = new HookZlmServerKeepaliveEvent(this); + MediaServer mediaServerItem = mediaServerService.getOne(param.getMediaServerId()); + if (mediaServerItem != null) { + event.setMediaServerItem(mediaServerItem); + applicationEventPublisher.publishEvent(event); } - }); - mediaServerService.updateMediaServerKeepalive(param.getMediaServerId(), param.getData()); - + }catch (Exception e) { + logger.info("[ZLM-HOOK-蹇冭烦] 鍙戦�侀�氱煡澶辫触 ", e); + } return HookResult.SUCCESS(); } @@ -162,11 +159,10 @@ * 鎾斁鍣ㄩ壌鏉冧簨浠讹紝rtsp/rtmp/http-flv/ws-flv/hls鐨勬挱鏀鹃兘灏嗚Е鍙戞閴存潈浜嬩欢銆� */ @ResponseBody - @PostMapping(value = "/on_play", produces = "application/json;charset=UTF-8") public HookResult onPlay(@RequestBody OnPlayHookParam param) { if (logger.isDebugEnabled()) { - logger.debug("[ZLM HOOK] 鎾斁閴存潈锛歿}->{}" + param.getMediaServerId(), param); + logger.debug("[ZLM HOOK] 鎾斁閴存潈锛歿}->{}", param.getMediaServerId(), param); } String mediaServerId = param.getMediaServerId(); @@ -174,12 +170,13 @@ JSONObject json = (JSONObject) JSON.toJSON(param); ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_play, json); if (subscribe != null) { - MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId); + MediaServer mediaInfo = mediaServerService.getOne(mediaServerId); if (mediaInfo != null) { subscribe.response(mediaInfo, param); } } }); + // TODO 姝ゅ閫昏緫閫傚悎杩佺Щ鍒癕ediaService涓� if (!"rtp".equals(param.getApp())) { Map<String, String> paramMap = urlParamToMap(param.getParams()); StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(param.getApp(), param.getStream()); @@ -201,10 +198,11 @@ JSONObject json = (JSONObject) JSON.toJSON(param); logger.info("[ZLM HOOK]鎺ㄦ祦閴存潈锛歿}->{}", param.getMediaServerId(), param); + // TODO 鍔犲揩澶勭悊閫熷害 String mediaServerId = json.getString("mediaServerId"); - MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId); - if (mediaInfo == null) { + MediaServer mediaServer = mediaServerService.getOne(mediaServerId); + if (mediaServer == null) { return new HookResultForOnPublish(200, "success"); } // 鎺ㄦ祦閴存潈鐨勫鐞� @@ -252,11 +250,7 @@ taskExecutor.execute(() -> { ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_publish, json); if (subscribe != null) { - if (mediaInfo != null) { - subscribe.response(mediaInfo, param); - } else { - new HookResultForOnPublish(1, "zlm not register"); - } + subscribe.response(mediaServer, param); } }); @@ -267,12 +261,12 @@ result.setEnable_mp4(userSetting.isRecordPushLive()); } // 鍥芥爣娴� - if ("rtp".equals(param.getApp()) ) { + if ("rtp".equals(param.getApp())) { InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, param.getStream()); // 鍗曠鍙fā寮忎笅淇敼娴� ID - if (!mediaInfo.isRtpEnable() && inviteInfo == null) { + if (!mediaServer.isRtpEnable() && inviteInfo == null) { String ssrc = String.format("%010d", Long.parseLong(param.getStream(), 16)); inviteInfo = inviteStreamService.getInviteInfoBySSRC(ssrc); if (inviteInfo != null) { @@ -318,13 +312,17 @@ result.setEnable_audio(true); } } + } else if (param.getApp().equals("broadcast")) { + result.setEnable_audio(true); + } else if (param.getApp().equals("talk")) { + result.setEnable_audio(true); } if (param.getApp().equalsIgnoreCase("rtp")) { String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_" + param.getStream(); - OtherRtpSendInfo otherRtpSendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(receiveKey); + OtherRtpSendInfo otherRtpSendInfo = (OtherRtpSendInfo) redisTemplate.opsForValue().get(receiveKey); String receiveKeyForPS = VideoManagerConstants.WVP_OTHER_RECEIVE_PS_INFO + userSetting.getServerId() + "_" + param.getStream(); - OtherPsSendInfo otherPsSendInfo = (OtherPsSendInfo)redisTemplate.opsForValue().get(receiveKeyForPS); + OtherPsSendInfo otherPsSendInfo = (OtherPsSendInfo) redisTemplate.opsForValue().get(receiveKeyForPS); if (otherRtpSendInfo != null || otherPsSendInfo != null) { result.setEnable_mp4(true); } @@ -347,13 +345,10 @@ logger.info("[ZLM HOOK] 娴佹敞閿�, {}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream()); } - JSONObject ret = new JSONObject(); - ret.put("code", 0); - ret.put("msg", "success"); - MediaServerItem mediaInfo = mediaServerService.getOne(param.getMediaServerId()); JSONObject json = (JSONObject) JSON.toJSON(param); taskExecutor.execute(() -> { ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_stream_changed, json); + MediaServer mediaInfo = mediaServerService.getOne(param.getMediaServerId()); if (mediaInfo == null) { logger.info("[ZLM HOOK] 娴佸彉鍖栨湭鎵惧埌ZLM, {}", param.getMediaServerId()); return; @@ -362,7 +357,6 @@ subscribe.response(mediaInfo, param); } - List<OnStreamChangedHookParam.MediaTrack> tracks = param.getTracks(); // TODO 閲嶆瀯姝ゅ閫昏緫 if (param.isRegist()) { // 澶勭悊娴佹敞鍐岀殑閴存潈淇℃伅锛� 娴佹敞閿�杩欓噷涓嶅啀鍒犻櫎閴存潈淇℃伅锛屼笅娆℃潵浜嗘柊鐨勯壌鏉冧俊鎭細瀵瑰氨鐨勮繘琛岃鐩� @@ -379,7 +373,6 @@ redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), param.getStream(), streamAuthorityInfo); } } - if ("rtsp".equals(param.getSchema())) { logger.info("娴佸彉鍖栵細娉ㄥ唽->{}, app->{}, stream->{}", param.isRegist(), param.getApp(), param.getStream()); if (param.isRegist()) { @@ -467,7 +460,7 @@ callId = streamAuthorityInfo.getCallId(); } StreamInfo streamInfoByAppAndStream = mediaService.getStreamInfoByAppAndStream(mediaInfo, - param.getApp(), param.getStream(), tracks, callId); + param.getApp(), param.getStream(), MediaInfo.getInstance(param), callId); param.setStreamInfo(new StreamContent(streamInfoByAppAndStream)); redisCatchStorage.addStream(mediaInfo, type, param.getApp(), param.getStream(), param); if (param.getOriginType() == OriginType.RTSP_PUSH.ordinal() @@ -500,7 +493,7 @@ GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream()); if (gbStream != null) { if (userSetting.isUsePushingAsStatus()) { - eventPublisher.catalogEventPublishForStream(null, gbStream, param.isRegist()?CatalogEvent.ON:CatalogEvent.OFF); + eventPublisher.catalogEventPublishForStream(null, gbStream, param.isRegist() ? CatalogEvent.ON : CatalogEvent.OFF); } } if (type != null) { @@ -524,33 +517,31 @@ ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId); Device device = deviceService.getDevice(platformId); - try { - if (platform != null) { - commanderFroPlatform.streamByeCmd(platform, sendRtpItem); - redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(), - sendRtpItem.getCallId(), sendRtpItem.getStream()); - } else { - cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId()); - if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST) - || sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) { - AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); - if (audioBroadcastCatch != null) { - // 鏉ヨ嚜涓婄骇骞冲彴鐨勫仠姝㈠璁� - logger.info("[鍋滄瀵硅] 鏉ヨ嚜涓婄骇锛屽钩鍙帮細{}, 閫氶亾锛歿}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); - audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); - } + try { + if (platform != null) { + commanderFroPlatform.streamByeCmd(platform, sendRtpItem); + redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(), + sendRtpItem.getCallId(), sendRtpItem.getStream()); + } else { + cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId()); + if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST) + || sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) { + AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + if (audioBroadcastCatch != null) { + // 鏉ヨ嚜涓婄骇骞冲彴鐨勫仠姝㈠璁� + logger.info("[鍋滄瀵硅] 鏉ヨ嚜涓婄骇锛屽钩鍙帮細{}, 閫氶亾锛歿}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); } } - } catch (SipException | InvalidArgumentException | ParseException | - SsrcTransactionNotFoundException e) { - logger.error("[鍛戒护鍙戦�佸け璐 鍙戦�丅YE: {}", e.getMessage()); } + } catch (SipException | InvalidArgumentException | ParseException | + SsrcTransactionNotFoundException e) { + logger.error("[鍛戒护鍙戦�佸け璐 鍙戦�丅YE: {}", e.getMessage()); } } } } } - } }); return HookResult.SUCCESS(); @@ -581,9 +572,9 @@ } // 鏀跺埌鏃犱汉瑙傜湅璇存槑娴佷篃娌℃湁鍦ㄥ線涓婄骇鎺ㄩ�� if (redisCatchStorage.isChannelSendingRTP(inviteInfo.getChannelId())) { - List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId( + List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChannelId( inviteInfo.getChannelId()); - if (sendRtpItems.size() > 0) { + if (!sendRtpItems.isEmpty()) { for (SendRtpItem sendRtpItem : sendRtpItems) { ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId()); try { @@ -612,14 +603,14 @@ if (info != null) { cmder.streamByeCmd(device, inviteInfo.getChannelId(), inviteInfo.getStream(), null); - }else { + } else { logger.info("[鏃犱汉瑙傜湅] 鏈壘鍒拌澶囩殑鐐规挱淇℃伅锛� {}锛� 娴侊細{}", inviteInfo.getDeviceId(), param.getStream()); } } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) { logger.error("[鏃犱汉瑙傜湅]鐐规挱锛� 鍙戦�丅YE澶辫触 {}", e.getMessage()); } - }else { + } else { logger.info("[鏃犱汉瑙傜湅] 鏈壘鍒拌澶囷細 {}锛屾祦锛歿}", inviteInfo.getDeviceId(), param.getStream()); } @@ -677,7 +668,7 @@ DeferredResult<HookResult> defaultResult = new DeferredResult<>(); - MediaServerItem mediaInfo = mediaServerService.getOne(param.getMediaServerId()); + MediaServer mediaInfo = mediaServerService.getOne(param.getMediaServerId()); if (!userSetting.isAutoApplyPlay() || mediaInfo == null) { defaultResult.setResult(new HookResult(ErrorCode.ERROR404.getCode(), ErrorCode.ERROR404.getMsg())); return defaultResult; @@ -729,7 +720,7 @@ }); } return result; - }else if(s.length == 4){ + } else if (s.length == 4) { // 姝ゆ椂涓哄綍鍍忓洖鏀撅紝 褰曞儚鍥炴斁鏍煎紡涓�> 璁惧ID_閫氶亾ID_寮�濮嬫椂闂確缁撴潫鏃堕棿 String startTimeStr = s[2]; String endTimeStr = s[3]; @@ -763,14 +754,14 @@ if (!exist) { SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaInfo, param.getStream(), null, - device.isSsrcCheck(), true, 0, false, false, device.getStreamModeForParam()); + device.isSsrcCheck(), true, 0, false, false, device.getStreamModeForParam()); playService.playBack(mediaInfo, ssrcInfo, deviceId, channelId, startTime, endTime, (code, message, data) -> { msg.setData(new HookResult(code, message)); resultHolder.invokeResult(msg); }); } return result; - }else { + } else { defaultResult.setResult(HookResult.SUCCESS()); return defaultResult; } @@ -800,13 +791,22 @@ logger.info("[ZLM HOOK] zlm 鍚姩 " + zlmServerConfig.getGeneralMediaServerId()); taskExecutor.execute(() -> { List<ZlmHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_server_started); - if (subscribes != null && subscribes.size() > 0) { + if (subscribes != null && !subscribes.isEmpty()) { for (ZlmHttpHookSubscribe.Event subscribe : subscribes) { subscribe.response(null, zlmServerConfig); } } - mediaServerService.zlmServerOnline(zlmServerConfig); }); + try { + HookZlmServerStartEvent event = new HookZlmServerStartEvent(this); + MediaServer mediaServerItem = mediaServerService.getOne(zlmServerConfig.getMediaServerId()); + if (mediaServerItem != null) { + event.setMediaServerItem(mediaServerItem); + applicationEventPublisher.publishEvent(event); + } + }catch (Exception e) { + logger.info("[ZLM-HOOK-ZLM鍚姩] 鍙戦�侀�氱煡澶辫触 ", e); + } return HookResult.SUCCESS(); } @@ -849,12 +849,11 @@ */ @ResponseBody @PostMapping(value = "/on_rtp_server_timeout", produces = "application/json;charset=UTF-8") - public HookResult onRtpServerTimeout(HttpServletRequest request, @RequestBody OnRtpServerTimeoutHookParam + public HookResult onRtpServerTimeout(@RequestBody OnRtpServerTimeoutHookParam param) { logger.info("[ZLM HOOK] rtpServer鏀舵祦瓒呮椂锛歿}->{}({})", param.getMediaServerId(), param.getStream_id(), param.getSsrc()); taskExecutor.execute(() -> { - JSONObject json = (JSONObject) JSON.toJSON(param); List<ZlmHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_rtp_server_timeout); if (subscribes != null && !subscribes.isEmpty()) { for (ZlmHttpHookSubscribe.Event subscribe : subscribes) { -- Gitblit v1.8.0