From f961515317a33fe965287ca5c978b85e9ce1abcc Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期二, 27 六月 2023 17:18:46 +0800 Subject: [PATCH] 合并主线 --- src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java | 39 +++++++------------ src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java | 12 ++--- src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java | 16 ++++---- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java | 5 +- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java | 22 +++++----- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java | 2 src/main/java/com/genersoft/iot/vmp/service/IPlayService.java | 3 + 7 files changed, 45 insertions(+), 54 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java index a160921..f32d420 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java @@ -56,7 +56,7 @@ //via ArrayList<ViaHeader> viaHeaders = new ArrayList<ViaHeader>(); ViaHeader viaHeader = SipFactory.getInstance().createHeaderFactory().createViaHeader(parentPlatform.getDeviceIp(), - Integer.parseInt(parentPlatform.getDevicePort()), parentPlatform.getTransport(), SipUtils.getNewViaTag()); + parentPlatform.getDevicePort(), parentPlatform.getTransport(), SipUtils.getNewViaTag()); viaHeader.setRPort(); viaHeaders.add(viaHeader); //from diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index bbaf22c..ffd41db 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -6,9 +6,7 @@ import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.SipLayer; -import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm; -import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction; +import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.SIPSender; @@ -624,9 +622,9 @@ logger.info("[璇煶鍠婅瘽] {} 鍒嗛厤鐨刏LM涓�: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getIp(), sendRtpItem.getPort()); HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId()); - subscribe.addSubscribe(hookSubscribeForStreamChange, (MediaServerItem mediaServerItemInUse, JSONObject json) -> { + subscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItemInUse, hookParam) -> { if (event != null) { - event.response(mediaServerItemInUse, json); + event.response(mediaServerItemInUse, hookParam); subscribe.removeSubscribe(hookSubscribeForStreamChange); } }); @@ -634,9 +632,9 @@ CallIdHeader callIdHeader = sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()), device.getTransport()); callIdHeader.setCallId(callId); HookSubscribeForStreamPush hookSubscribeForStreamPush = HookSubscribeFactory.on_publish("rtp", stream, null, mediaServerItem.getId()); - subscribe.addSubscribe(hookSubscribeForStreamPush, (MediaServerItem mediaServerItemInUse, JSONObject json) -> { + subscribe.addSubscribe(hookSubscribeForStreamPush, (mediaServerItemInUse, hookParam) -> { if (eventForPush != null) { - eventForPush.response(mediaServerItemInUse, json); + eventForPush.response(mediaServerItemInUse, hookParam); } }); // diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index 792d901..30d6256 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -19,6 +19,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.service.bean.SSRCInfo; @@ -899,9 +900,9 @@ logger.info("{} 鍒嗛厤鐨刏LM涓�: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort()); HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId()); - subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json) -> { + subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> { if (event != null) { - event.response(mediaServerItemInUse, json); + event.response(mediaServerItemInUse, hookParam); subscribe.removeSubscribe(hookSubscribe); } }); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java index ab54d15..a970eb5 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java @@ -10,6 +10,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.NotifyMessageHandler; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IPlatformService; @@ -127,10 +128,9 @@ // 娑堟伅鍙戦�佹垚鍔燂紝 鍚戜笂绾у彂閫乮nvite锛岃幏鍙栨帹娴� try { - platformService.broadcastInvite(platform, deviceChannel.getChannelId(), mediaServerForMinimumLoad, (mediaServerItem, response)->{ + platformService.broadcastInvite(platform, deviceChannel.getChannelId(), mediaServerForMinimumLoad, (mediaServerItem, hookParam)->{ + OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam; // 涓婄骇骞冲彴鎺ㄦ祦鎴愬姛 - String app = response.getString("app"); - String stream = response.getString("stream"); AudioBroadcastCatch broadcastCatch = audioBroadcastManager.get(device.getDeviceId(), targetId); if (broadcastCatch != null ) { if (playService.audioBroadcastInUse(device, targetId)) { @@ -138,24 +138,24 @@ platform.getServerGBId(), deviceChannel.getChannelId()); // 鏌ョ湅璇煶閫氶亾宸茬粡寤虹珛涓斿凡缁忓崰鐢� 鍥炲BYE try { - platformService.stopBroadcast(platform, deviceChannel.getChannelId(), stream); + platformService.stopBroadcast(platform, deviceChannel.getChannelId(), streamChangedHookParam.getStream()); } catch (InvalidArgumentException | ParseException | SsrcTransactionNotFoundException | SipException e) { logger.info("[娑堟伅鍙戦�佸け璐 鍥芥爣绾ц仈 璇煶鍠婅瘽 platform锛� {}锛� channel: {}", platform.getServerGBId(), deviceChannel.getChannelId()); } }else { // 鏌ョ湅璇煶閫氶亾宸茬粡寤虹珛浣嗘槸鏈崰鐢� - broadcastCatch.setApp(app); - broadcastCatch.setStream(stream); + broadcastCatch.setApp(streamChangedHookParam.getApp()); + broadcastCatch.setStream(streamChangedHookParam.getStream()); broadcastCatch.setMediaServerItem(mediaServerItem); audioBroadcastManager.update(broadcastCatch); // 鎺ㄦ祦鍒拌澶� - SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, targetId, stream, null); + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, targetId, streamChangedHookParam.getStream(), null); if (sendRtpItem == null) { - logger.warn("[鍥芥爣绾ц仈] 璇煶鍠婅瘽 寮傚父锛屾湭鎵惧埌鍙戞祦淇℃伅锛� channelId: {}, stream: {}", targetId, stream); - logger.info("[鍥芥爣绾ц仈] 璇煶鍠婅瘽 閲嶆柊寮�濮嬶紝channelId: {}, stream: {}", targetId, stream); + logger.warn("[鍥芥爣绾ц仈] 璇煶鍠婅瘽 寮傚父锛屾湭鎵惧埌鍙戞祦淇℃伅锛� channelId: {}, stream: {}", targetId, streamChangedHookParam.getStream()); + logger.info("[鍥芥爣绾ц仈] 璇煶鍠婅瘽 閲嶆柊寮�濮嬶紝channelId: {}, stream: {}", targetId, streamChangedHookParam.getStream()); try { - playService.audioBroadcastCmd(device, targetId, mediaServerItem, app, stream, 60, true, msg -> { + playService.audioBroadcastCmd(device, targetId, mediaServerItem, streamChangedHookParam.getApp(), streamChangedHookParam.getStream(), 60, true, msg -> { logger.info("[璇煶鍠婅瘽] 閫氶亾寤虹珛鎴愬姛, device: {}, channel: {}", device.getDeviceId(), targetId); }); } catch (SipException | InvalidArgumentException | ParseException e) { @@ -173,7 +173,7 @@ } }else { try { - playService.audioBroadcastCmd(device, targetId, mediaServerItem, app, stream, 60, true, msg -> { + playService.audioBroadcastCmd(device, targetId, mediaServerItem, streamChangedHookParam.getApp(), streamChangedHookParam.getStream(), 60, true, msg -> { logger.info("[璇煶鍠婅瘽] 閫氶亾寤虹珛鎴愬姛, device: {}, channel: {}", device.getDeviceId(), targetId); }); } catch (SipException | InvalidArgumentException | ParseException e) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java index 95fb3ac..44bf11b 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java @@ -7,6 +7,7 @@ import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult; @@ -28,7 +29,7 @@ ErrorCallback<Object> callback); SSRCInfo play(MediaServerItem mediaServerItem, String deviceId, String channelId, ErrorCallback<Object> callback); - StreamInfo onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId); + StreamInfo onPublishHandlerForPlay(MediaServerItem mediaServerItem, HookParam hookParam, String deviceId, String channelId); MediaServerItem getNewMediaServerItem(Device device); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java index 6c33770..c43591b 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java @@ -1,6 +1,5 @@ package com.genersoft.iot.vmp.service.impl; -import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.InviteInfo; import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.conf.DynamicTask; @@ -16,6 +15,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import com.genersoft.iot.vmp.service.IInviteStreamService; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IPlatformService; @@ -442,10 +442,11 @@ inviteStreamService.removeInviteInfo(inviteInfo); }else { // 娴佺‘瀹炲皻鍦ㄦ帹娴侊紝鐩存帴鍥炶皟缁撴灉 - JSONObject json = new JSONObject(); - json.put("app", inviteInfo.getStreamInfo().getApp()); - json.put("stream", inviteInfo.getStreamInfo().getStream()); - hookEvent.response(mediaServerItemForStreamInfo, json); + OnStreamChangedHookParam hookParam = new OnStreamChangedHookParam(); + hookParam.setApp(inviteInfo.getStreamInfo().getApp()); + hookParam.setStream(inviteInfo.getStreamInfo().getStream()); + + hookEvent.response(mediaServerItemForStreamInfo, hookParam); return; } } @@ -498,14 +499,14 @@ } } }, userSetting.getPlayTimeout()); - commanderForPlatform.broadcastInviteCmd(platform, channelId, mediaServerItem, ssrcInfo, (mediaServerItemForInvite, response)->{ + commanderForPlatform.broadcastInviteCmd(platform, channelId, mediaServerItem, ssrcInfo, (mediaServerItemForInvite, hookParam)->{ logger.info("[鍥芥爣绾ц仈] 鍙戣捣璇煶鍠婅瘽 鏀跺埌涓婄骇鎺ㄦ祦 deviceId: {}, channelId: {}", platform.getServerGBId(), channelId); dynamicTask.stop(timeOutTaskKey); // hook鍝嶅簲 - playService.onPublishHandlerForPlay(mediaServerItemForInvite, response, platform.getServerGBId(), channelId); + playService.onPublishHandlerForPlay(mediaServerItemForInvite, hookParam, platform.getServerGBId(), channelId); // 鏀跺埌娴� if (hookEvent != null) { - hookEvent.response(mediaServerItem, response); + hookEvent.response(mediaServerItem, hookParam); } }, event -> { // 鏀跺埌200OK 妫�娴媠src鏄惁鏈夊彉鍖栵紝闃叉涓婄骇鑷畾涔変簡ssrc @@ -524,30 +525,20 @@ logger.info("[鐐规挱娑堟伅] 鏀跺埌invite 200, 鍙戠幇涓嬬骇鑷畾涔変簡ssrc: {}", ssrcInResponse); if (!mediaServerItem.isRtpEnable()) { logger.info("[鐐规挱娑堟伅] SSRC淇 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse); - - if (!ssrcFactory.checkSsrc(mediaServerItem.getId(), ssrcInResponse)) { - // ssrc 涓嶅彲鐢� - // 閲婃斁ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - streamSession.remove(platform.getServerGBId(), channelId, ssrcInfo.getStream()); - event.msg = "涓嬬骇鑷畾涔変簡ssrc,浣嗘槸姝src涓嶅彲鐢�"; - event.statusCode = 400; - errorEvent.response(event); - return; - } - + // 閲婃斁ssrc + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); // 鍗曠鍙fā寮弒treamId涔熸湁鍙樺寲锛岄渶瑕侀噸鏂拌缃洃鍚� if (!mediaServerItem.isRtpEnable()) { // 娣诲姞璁㈤槄 HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId()); subscribe.removeSubscribe(hookSubscribe); hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase()); - subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> { - logger.info("[ZLM HOOK] ssrc淇鍚庢敹鍒拌闃呮秷鎭細 " + response.toJSONString()); + subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> { + logger.info("[ZLM HOOK] ssrc淇鍚庢敹鍒拌闃呮秷鎭細 " + hookParam); dynamicTask.stop(timeOutTaskKey); // hook鍝嶅簲 - playService.onPublishHandlerForPlay(mediaServerItemInUse, response, platform.getServerGBId(), channelId); - hookEvent.response(mediaServerItemInUse, response); + playService.onPublishHandlerForPlay(mediaServerItemInUse, hookParam, platform.getServerGBId(), channelId); + hookEvent.response(mediaServerItemInUse, hookParam); }); } // 鍏抽棴rtp server diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index c345770..65c479f 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -293,12 +293,12 @@ // 鏌ョ湅璁惧鏄惁宸茬粡鍦ㄦ帹娴� try { - cmder.talkStreamCmd(mediaServerItem, sendRtpItem, device, channelId, callId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> { - logger.info("[璇煶瀵硅] 娴佸凡鐢熸垚锛� 寮�濮嬫帹娴侊細 " + response.toJSONString()); + cmder.talkStreamCmd(mediaServerItem, sendRtpItem, device, channelId, callId, (mediaServerItemInuse, hookParam) -> { + logger.info("[璇煶瀵硅] 娴佸凡鐢熸垚锛� 寮�濮嬫帹娴侊細 " + hookParam); dynamicTask.stop(timeOutTaskKey); // TODO 鏆備笉鍋氬鐞� - }, (MediaServerItem mediaServerItemInuse, JSONObject json) -> { - logger.info("[璇煶瀵硅] 璁惧寮�濮嬫帹娴侊細 " + json.toJSONString()); + }, (mediaServerItemInuse, hookParam) -> { + logger.info("[璇煶瀵硅] 璁惧寮�濮嬫帹娴侊細 " + hookParam); dynamicTask.stop(timeOutTaskKey); }, (event) -> { @@ -617,10 +617,10 @@ } @Override - public StreamInfo onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId) { - StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId); + public StreamInfo onPublishHandlerForPlay(MediaServerItem mediaServerItem, HookParam hookParam, String deviceId, String channelId) { + OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam) hookParam; + StreamInfo streamInfo = onPublishHandler(mediaServerItem, streamChangedHookParam, deviceId, channelId); Device device = redisCatchStorage.getDevice(deviceId); - OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam; if (streamInfo != null) { DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId); if (deviceChannel != null) { @@ -1571,7 +1571,7 @@ } } - talk(mediaServerItem, device, channelId, stream, (MediaServerItem mediaServerItem1, JSONObject response) -> { + talk(mediaServerItem, device, channelId, stream, (mediaServerItem1, hookParam) -> { logger.info("[璇煶瀵硅] 鏀跺埌璁惧鍙戞潵鐨勬祦"); }, eventResult -> { logger.warn("[璇煶瀵硅] 澶辫触锛寋}/{}, 閿欒鐮� {} {}", device.getDeviceId(), channelId, eventResult.statusCode, eventResult.msg); -- Gitblit v1.8.0