From 764d04b497356ba6bcbb75fd42b51eca750f7223 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期三, 29 五月 2024 15:02:51 +0800 Subject: [PATCH] 调整上级观看消息的发送 --- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java | 60 +++++++++++++++++++++++++++++------------------------------- 1 files changed, 29 insertions(+), 31 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index f1f277f..de1a929 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -19,25 +19,19 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; +import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.event.hook.Hook; import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; import com.genersoft.iot.vmp.media.event.hook.HookType; import com.genersoft.iot.vmp.media.service.IMediaServerService; -import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; -import com.genersoft.iot.vmp.service.IInviteStreamService; import com.genersoft.iot.vmp.service.IPlayService; import com.genersoft.iot.vmp.service.IStreamProxyService; import com.genersoft.iot.vmp.service.IStreamPushService; import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; -import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; -import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; -import com.genersoft.iot.vmp.media.zlm.dto.*; -import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; -import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.service.bean.InviteErrorCode; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; @@ -341,8 +335,12 @@ return; } String username = sdp.getOrigin().getUsername(); - String addressStr = sdp.getConnection().getAddress(); - + String addressStr; + if(StringUtils.isEmpty(platform.getSendStreamIp())){ + addressStr = sdp.getConnection().getAddress(); + }else { + addressStr = platform.getSendStreamIp(); + } Device device = null; // 閫氳繃 channel 鍜� gbStream 鏄惁涓簄ull 鍊煎垽鏂潵婧愭槸鐩存挱娴佸悎閫傚浗鏍� @@ -468,7 +466,8 @@ if (sendRtpItem.isTcpActive()) { MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId()); try { - mediaServerService.startSendRtpPassive(mediaServer, platform, sendRtpItem, 5); + mediaServerService.startSendRtpPassive(mediaServer, sendRtpItem, 5); + redisCatchStorage.sendPlatformStartPlayMsg(sendRtpItem, platform); }catch (ControllerException e) {} } } catch (SipException | InvalidArgumentException | ParseException e) { @@ -491,7 +490,7 @@ String startTimeStr = DateUtil.urlFormatter.format(start); String endTimeStr = DateUtil.urlFormatter.format(end); String stream = device.getDeviceId() + "_" + channelId + "_" + startTimeStr + "_" + endTimeStr; - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, stream, null, device.isSsrcCheck(), true, 0,false,!channel.isHasAudio(), false, device.getStreamModeForParam()); + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, stream, null, device.isSsrcCheck(), true, 0,false,!channel.getHasAudio(), false, device.getStreamModeForParam()); sendRtpItem.setStream(stream); // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� redisCatchStorage.updateSendRTPSever(sendRtpItem); @@ -521,7 +520,7 @@ } sendRtpItem.setPlayType(InviteStreamType.DOWNLOAD); - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, null, device.isSsrcCheck(), true, 0, false,!channel.isHasAudio(), false, device.getStreamModeForParam()); + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, null, device.isSsrcCheck(), true, 0, false,!channel.getHasAudio(), false, device.getStreamModeForParam()); sendRtpItem.setStream(ssrcInfo.getStream()); // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� redisCatchStorage.updateSendRTPSever(sendRtpItem); @@ -596,11 +595,10 @@ // 浠巖edis鏌ヨ鏄惁姝e湪鎺ユ敹杩欎釜鎺ㄦ祦 StreamPushItem pushListItem = redisCatchStorage.getPushListItem(gbStream.getApp(), gbStream.getStream()); if (pushListItem != null) { - sendRtpItem.setServerId(pushListItem.getSeverId()); + sendRtpItem.setServerId(pushListItem.getServerId()); sendRtpItem.setMediaServerId(pushListItem.getMediaServerId()); - StreamPushItem transform = streamPushService.transform(pushListItem); - transform.setSelf(userSetting.getServerId().equals(pushListItem.getSeverId())); + pushListItem.setSelf(userSetting.getServerId().equals(pushListItem.getServerId())); redisCatchStorage.updateSendRTPSever(sendRtpItem); // 寮�濮嬫帹娴� sendPushStream(sendRtpItem, mediaServerItem, platform, request); @@ -657,9 +655,10 @@ /** * 瀹夋帓鎺ㄦ祦 */ - private void sendProxyStream(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) { - Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream()); - if (streamReady != null && streamReady) { + private void sendProxyStream(SendRtpItem sendRtpItem, MediaServer mediaServerItem, ParentPlatform platform, SIPRequest request) { + MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream()); + + if (mediaInfo != null) { // 鑷钩鍙板唴瀹� int localPort = sendRtpPortManager.getNextPort(mediaServerItem); @@ -677,7 +676,7 @@ sendRtpItem.setStatus(1); sendRtpItem.setLocalIp(mediaServerItem.getSdpIp()); - SIPResponse response = sendStreamAck(mediaServer, request, sendRtpItem, platform, evt); + SIPResponse response = sendStreamAck(request, sendRtpItem, platform); if (response != null) { sendRtpItem.setToTag(response.getToTag()); } @@ -685,11 +684,11 @@ } } - private void sendPushStream(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) { + private void sendPushStream(SendRtpItem sendRtpItem, MediaServer mediaServerItem, ParentPlatform platform, SIPRequest request) { // 鎺ㄦ祦 if (sendRtpItem.getServerId().equals(userSetting.getServerId())) { - Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream()); - if (streamReady != null && streamReady) { + MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream()); + if (mediaInfo != null ) { // 鑷钩鍙板唴瀹� int localPort = sendRtpPortManager.getNextPort(mediaServerItem); if (localPort == 0) { @@ -726,20 +725,19 @@ /** * 閫氱煡娴佷笂绾� */ - private void notifyProxyStreamOnline(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) { + private void notifyProxyStreamOnline(SendRtpItem sendRtpItem, MediaServer mediaServerItem, ParentPlatform platform, SIPRequest request) { // TODO 鎺у埗鍚敤浠ヤ娇璁惧涓婄嚎 logger.info("[ app={}, stream={} ]閫氶亾鏈帹娴侊紝鍚敤娴佸悗寮�濮嬫帹娴�", sendRtpItem.getApp(), sendRtpItem.getStream()); // 鐩戝惉娴佷笂绾� - HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", mediaServerItem.getId()); - zlmHttpHookSubscribe.addSubscribe(hookSubscribe, (mediaServerItemInUSe, hookParam) -> { - OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam; - logger.info("[涓婄骇鐐规挱]鎷夋祦浠g悊宸茬粡灏辩华锛� {}/{}", streamChangedHookParam.getApp(), streamChangedHookParam.getStream()); + Hook hook = Hook.getInstance(HookType.on_media_arrival, sendRtpItem.getApp(), sendRtpItem.getStream(), mediaServerItem.getId()); + hookSubscribe.addSubscribe(hook, (hookData)->{ + logger.info("[涓婄骇鐐规挱]鎷夋祦浠g悊宸茬粡灏辩华锛� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); dynamicTask.stop(sendRtpItem.getCallId()); sendProxyStream(sendRtpItem, mediaServerItem, platform, request); }); dynamicTask.startDelay(sendRtpItem.getCallId(), () -> { logger.info("[ app={}, stream={} ] 绛夊緟鎷夋祦浠g悊娴佽秴鏃�", sendRtpItem.getApp(), sendRtpItem.getStream()); - zlmHttpHookSubscribe.removeSubscribe(hookSubscribe); + hookSubscribe.removeSubscribe(hook); }, userSetting.getPlatformPlayTimeout()); boolean start = streamProxyService.start(sendRtpItem.getApp(), sendRtpItem.getStream()); if (!start) { @@ -748,7 +746,7 @@ } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[鍛戒护鍙戦�佸け璐 invite 閫氶亾鏈帹娴�: {}", e.getMessage()); } - zlmHttpHookSubscribe.removeSubscribe(hookSubscribe); + hookSubscribe.removeSubscribe(hook); dynamicTask.stop(sendRtpItem.getCallId()); } } @@ -756,7 +754,7 @@ /** * 閫氱煡娴佷笂绾� */ - private void notifyPushStreamOnline(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) { + private void notifyPushStreamOnline(SendRtpItem sendRtpItem, MediaServer mediaServerItem, ParentPlatform platform, SIPRequest request) { // 鍙戦�乺edis娑堟伅浠ヤ娇璁惧涓婄嚎锛屾祦涓婄嚎鍚庤 logger.info("[ app={}, stream={} ]閫氶亾鏈帹娴侊紝鍙戦�乺edis淇℃伅鎺у埗璁惧寮�濮嬫帹娴�", sendRtpItem.getApp(), sendRtpItem.getStream()); MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1, @@ -860,7 +858,7 @@ redisCatchStorage.updateSendRTPSever(sendRtpItem); } - public SIPResponse sendStreamAck(MediaServerItem mediaServerItem, SIPRequest request, SendRtpItem sendRtpItem, ParentPlatform platform, RequestEvent evt) { + public SIPResponse sendStreamAck(SIPRequest request, SendRtpItem sendRtpItem, ParentPlatform platform) { String sdpIp = sendRtpItem.getLocalIp(); if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) { -- Gitblit v1.8.0