From 19c996774edc4fd915eb4d61008df7e42461405f Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期三, 19 七月 2023 15:37:59 +0800 Subject: [PATCH] 合并主线 --- src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java | 114 +++++++++++++++++++++++++++++++++------------------------ 1 files changed, 66 insertions(+), 48 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 400a941..a33665c 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -5,6 +5,7 @@ import com.genersoft.iot.vmp.common.InviteInfo; import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.*; @@ -23,14 +24,17 @@ import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.media.zlm.dto.hook.*; import com.genersoft.iot.vmp.service.*; +import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; +import com.genersoft.iot.vmp.vmanager.bean.OtherRtpSendInfo; import com.genersoft.iot.vmp.vmanager.bean.StreamContent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.util.ObjectUtils; import org.springframework.web.bind.annotation.*; @@ -66,7 +70,7 @@ private AudioBroadcastManager audioBroadcastManager; @Autowired - private ZLMRTPServerFactory zlmrtpServerFactory; + private ZLMServerFactory zlmServerFactory; @Autowired private IPlayService playService; @@ -123,6 +127,9 @@ @Autowired private ThreadPoolTaskExecutor taskExecutor; + @Autowired + private RedisTemplate<Object, Object> redisTemplate; + /** * 鏈嶅姟鍣ㄥ畾鏃朵笂鎶ユ椂闂达紝涓婃姤闂撮殧鍙厤缃紝榛樿10s涓婃姤涓�娆� */ @@ -131,14 +138,12 @@ @PostMapping(value = "/on_server_keepalive", produces = "application/json;charset=UTF-8") public HookResult onServerKeepalive(@RequestBody OnServerKeepaliveHookParam param) { -// logger.info("[ZLM HOOK] 鏀跺埌zlm蹇冭烦锛�" + param.getMediaServerId()); taskExecutor.execute(() -> { List<ZlmHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_server_keepalive); - JSONObject json = (JSONObject) JSON.toJSON(param); if (subscribes != null && subscribes.size() > 0) { for (ZlmHttpHookSubscribe.Event subscribe : subscribes) { - subscribe.response(null, json); + subscribe.response(null, param); } } }); @@ -165,7 +170,7 @@ if (subscribe != null) { MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId); if (mediaInfo != null) { - subscribe.response(mediaInfo, json); + subscribe.response(mediaInfo, param); } } }); @@ -198,13 +203,13 @@ if (userSetting.getPushAuthority()) { // 鎺ㄦ祦閴存潈 if (param.getParams() == null) { - logger.info("鎺ㄦ祦閴存潈澶辫触锛� 缂哄皯涓嶈鍙傛暟锛歴ign=md5(user琛ㄧ殑pushKey)"); + logger.info("鎺ㄦ祦閴存潈澶辫触锛� 缂哄皯蹇呰鍙傛暟锛歴ign=md5(user琛ㄧ殑pushKey)"); return new HookResultForOnPublish(401, "Unauthorized"); } Map<String, String> paramMap = urlParamToMap(param.getParams()); String sign = paramMap.get("sign"); if (sign == null) { - logger.info("鎺ㄦ祦閴存潈澶辫触锛� 缂哄皯涓嶈鍙傛暟锛歴ign=md5(user琛ㄧ殑pushKey)"); + logger.info("鎺ㄦ祦閴存潈澶辫触锛� 缂哄皯蹇呰鍙傛暟锛歴ign=md5(user琛ㄧ殑pushKey)"); return new HookResultForOnPublish(401, "Unauthorized"); } // 鎺ㄦ祦鑷畾涔夋挱鏀鹃壌鏉冪爜 @@ -233,15 +238,12 @@ HookResultForOnPublish result = HookResultForOnPublish.SUCCESS(); - if (!"rtp".equals(param.getApp())) { - result.setEnable_audio(true); - } - + result.setEnable_audio(true); taskExecutor.execute(() -> { ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_publish, json); if (subscribe != null) { if (mediaInfo != null) { - subscribe.response(mediaInfo, json); + subscribe.response(mediaInfo, param); } else { new HookResultForOnPublish(1, "zlm not register"); } @@ -265,7 +267,6 @@ // 濡傛灉鏄綍鍍忎笅杞藉氨璁剧疆瑙嗛闂撮殧鍗佺 if (ssrcTransactionForAll.get(0).getType() == InviteSessionType.DOWNLOAD) { result.setMp4_max_second(10); - result.setEnable_audio(true); result.setEnable_mp4(true); } // 濡傛灉鏄痶alk瀵硅锛屽垯榛樿鑾峰彇澹伴煶 @@ -292,6 +293,14 @@ } } } + if (param.getApp().equalsIgnoreCase("rtp")) { + String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_" + param.getStream(); + OtherRtpSendInfo otherRtpSendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(receiveKey); + if (otherRtpSendInfo != null) { + result.setEnable_mp4(true); + } + } + logger.info("[ZLM HOOK]鎺ㄦ祦閴存潈 鍝嶅簲锛歿}->{}->>>>{}", param.getMediaServerId(), param, result); return result; } @@ -321,7 +330,7 @@ return; } if (subscribe != null) { - subscribe.response(mediaInfo, json); + subscribe.response(mediaInfo, param); } List<OnStreamChangedHookParam.MediaTrack> tracks = param.getTracks(); @@ -455,33 +464,35 @@ GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream()); if (gbStream != null) { // eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF); - } - zlmMediaListManager.removeMedia(param.getApp(), param.getStream()); } - GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream()); - if (gbStream != null) { - eventPublisher.catalogEventPublishForStream(null, gbStream, param.isRegist() ? CatalogEvent.ON : CatalogEvent.OFF); - } - if (type != null) { - // 鍙戦�佹祦鍙樺寲redis娑堟伅 - JSONObject jsonObject = new JSONObject(); - jsonObject.put("serverId", userSetting.getServerId()); - jsonObject.put("app", param.getApp()); - jsonObject.put("stream", param.getStream()); - jsonObject.put("register", param.isRegist()); - jsonObject.put("mediaServerId", param.getMediaServerId()); - redisCatchStorage.sendStreamChangeMsg(type, jsonObject); + zlmMediaListManager.removeMedia(param.getApp(), param.getStream()); + } + GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream()); + if (gbStream != null) { + if (userSetting.isUsePushingAsStatus()) { + eventPublisher.catalogEventPublishForStream(null, gbStream, param.isRegist()?CatalogEvent.ON:CatalogEvent.OFF); } } + if (type != null) { + // 鍙戦�佹祦鍙樺寲redis娑堟伅 + JSONObject jsonObject = new JSONObject(); + jsonObject.put("serverId", userSetting.getServerId()); + jsonObject.put("app", param.getApp()); + jsonObject.put("stream", param.getStream()); + jsonObject.put("register", param.isRegist()); + jsonObject.put("mediaServerId", param.getMediaServerId()); + redisCatchStorage.sendStreamChangeMsg(type, jsonObject); + } } - if (!param.isRegist()) { - List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(param.getStream()); - if (sendRtpItems.size() > 0) { - for (SendRtpItem sendRtpItem : sendRtpItems) { - if (sendRtpItem != null && sendRtpItem.getApp().equals(param.getApp())) { - String platformId = sendRtpItem.getPlatformId(); - ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId); - Device device = deviceService.getDevice(platformId); + } + if (!param.isRegist()) { + List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(param.getStream()); + if (sendRtpItems.size() > 0) { + for (SendRtpItem sendRtpItem : sendRtpItems) { + if (sendRtpItem != null && sendRtpItem.getApp().equals(param.getApp())) { + String platformId = sendRtpItem.getPlatformId(); + ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId); + Device device = deviceService.getDevice(platformId); try { if (platform != null) { @@ -521,8 +532,6 @@ if ("rtp".equals(param.getApp())) { ret.put("close", userSetting.getStreamOnDemand()); // 鍥芥爣娴侊紝 鐐规挱/褰曞儚鍥炴斁/褰曞儚涓嬭浇 -// StreamInfo streamInfoForPlayCatch = redisCatchStorage.queryPlayByStreamId(param.getStream()); - InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, param.getStream()); // 鐐规挱 if (inviteInfo != null) { @@ -545,13 +554,22 @@ } redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStream()); + if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) { + MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, + sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), + sendRtpItem.getPlatformId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); + messageForPushChannel.setPlatFormIndex(parentPlatform.getId()); + redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel); + } } } } Device device = deviceService.getDevice(inviteInfo.getDeviceId()); if (device != null) { try { - if (inviteStreamService.getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream()) != null) { + InviteInfo info = inviteStreamService.getInviteInfo(inviteInfo.getType(), + inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream()); + if (info != null) { cmder.streamByeCmd(device, inviteInfo.getChannelId(), inviteInfo.getStream(), null); } @@ -578,13 +596,13 @@ // 鎷夋祦浠g悊 StreamProxyItem streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream()); if (streamProxyItem != null) { - if (streamProxyItem.isEnable_remove_none_reader()) { + if (streamProxyItem.isEnableRemoveNoneReader()) { // 鏃犱汉瑙傜湅鑷姩绉婚櫎 ret.put("close", true); streamProxyService.del(param.getApp(), param.getStream()); - String url = streamProxyItem.getUrl() != null ? streamProxyItem.getUrl() : streamProxyItem.getSrc_url(); + String url = streamProxyItem.getUrl() != null ? streamProxyItem.getUrl() : streamProxyItem.getSrcUrl(); logger.info("[{}/{}]<-[{}] 鎷夋祦浠g悊鏃犱汉瑙傜湅宸茬粡绉婚櫎", param.getApp(), param.getStream(), url); - } else if (streamProxyItem.isEnable_disable_none_reader()) { + } else if (streamProxyItem.isEnableDisableNoneReader()) { // 鏃犱汉瑙傜湅鍋滅敤 ret.put("close", true); // 淇敼鏁版嵁 @@ -595,7 +613,7 @@ } return ret; } - // 鎺ㄦ祦鍏锋湁涓诲姩鎬э紝鏆傛椂涓嶅仛澶勭悊 + // TODO 鎺ㄦ祦鍏锋湁涓诲姩鎬э紝鏆傛椂涓嶅仛澶勭悊 // StreamPushItem streamPushItem = streamPushService.getPush(app, streamId); // if (streamPushItem != null) { // // TODO 鍙戦�佸仠姝� @@ -660,7 +678,7 @@ resultHolder.put(key, uuid, result); if (!exist) { - playService.play(mediaInfo, deviceId, channelId, (code, message, data) -> { + playService.play(mediaInfo, deviceId, channelId, null, (code, message, data) -> { msg.setData(new HookResult(code, message)); resultHolder.invokeResult(msg); }); @@ -669,7 +687,7 @@ } else { // 鎷夋祦浠g悊 StreamProxyItem streamProxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream()); - if (streamProxyByAppAndStream != null && streamProxyByAppAndStream.isEnable_disable_none_reader()) { + if (streamProxyByAppAndStream != null && streamProxyByAppAndStream.isEnableDisableNoneReader()) { streamProxyService.start(param.getApp(), param.getStream()); } DeferredResult<HookResult> result = new DeferredResult<>(); @@ -693,7 +711,7 @@ List<ZlmHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_server_started); if (subscribes != null && subscribes.size() > 0) { for (ZlmHttpHookSubscribe.Event subscribe : subscribes) { - subscribe.response(null, jsonObject); + subscribe.response(null, zlmServerConfig); } } mediaServerService.zlmServerOnline(zlmServerConfig); @@ -749,7 +767,7 @@ List<ZlmHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_rtp_server_timeout); if (subscribes != null && subscribes.size() > 0) { for (ZlmHttpHookSubscribe.Event subscribe : subscribes) { - subscribe.response(null, json); + subscribe.response(null, param); } } }); -- Gitblit v1.8.0