From 9c6765d44ef2ccb06fdaf525a06e564a331ab892 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期二, 16 四月 2024 22:10:35 +0800 Subject: [PATCH] 重构多wvp国标级联机制 --- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java | 45 +++++++++------------------------------------ 1 files changed, 9 insertions(+), 36 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index f12f38a..59ff50c 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -19,7 +19,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; -import com.genersoft.iot.vmp.service.redisMsg.RedisPlatformPushStreamOnlineLister; +import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.*; @@ -29,7 +29,6 @@ import com.genersoft.iot.vmp.service.bean.InviteErrorCode; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.bean.SSRCInfo; -import com.genersoft.iot.vmp.service.redisMsg.RedisPushStreamResponseListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.DateUtil; @@ -86,16 +85,13 @@ private IRedisCatchStorage redisCatchStorage; @Autowired - private IInviteStreamService inviteStreamService; + private IRedisRpcService redisRpcService; @Autowired private SSRCFactory ssrcFactory; @Autowired private DynamicTask dynamicTask; - - @Autowired - private RedisPushStreamResponseListener redisPushStreamResponseListener; @Autowired private IPlayService playService; @@ -120,9 +116,6 @@ @Autowired private UserSetting userSetting; - - @Autowired - private RedisPlatformPushStreamOnlineLister mediaListManager; @Autowired private SipConfig config; @@ -594,15 +587,17 @@ sendRtpItem.setSessionName(sessionName); if ("push".equals(gbStream.getStreamType())) { + sendRtpItem.setPlayType(InviteStreamType.PUSH); if (streamPushItem != null) { // 浠巖edis鏌ヨ鏄惁姝e湪鎺ユ敹杩欎釜鎺ㄦ祦 OnStreamChangedHookParam pushListItem = redisCatchStorage.getPushListItem(gbStream.getApp(), gbStream.getStream()); - if (pushListItem != null) { sendRtpItem.setServerId(pushListItem.getSeverId()); + sendRtpItem.setMediaServerId(pushListItem.getMediaServerId()); + StreamPushItem transform = streamPushService.transform(pushListItem); transform.setSelf(userSetting.getServerId().equals(pushListItem.getSeverId())); - // 鎺ㄦ祦鐘舵�� + // 寮�濮嬫帹娴� sendPushStream(sendRtpItem, mediaServerItem, platform, request); }else { if (!platform.isStartOfflinePush()) { @@ -702,8 +697,6 @@ } // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� sendRtpItem.setStatus(1); - sendRtpItem.setFromTag(request.getFromTag()); - sendRtpItem.setLocalIp(mediaServerItem.getSdpIp()); SIPResponse response = sendStreamAck(request, sendRtpItem, platform); if (response != null) { sendRtpItem.setToTag(response.getToTag()); @@ -714,7 +707,6 @@ sendRtpItem.setSsrc(ssrc); } redisCatchStorage.updateSendRTPSever(sendRtpItem); - } else { // 涓嶅湪绾� 鎷夎捣 notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request); @@ -769,18 +761,14 @@ dynamicTask.startDelay(sendRtpItem.getCallId(), () -> { logger.info("[ app={}, stream={} ] 绛夊緟璁惧寮�濮嬫帹娴佽秴鏃�", sendRtpItem.getApp(), sendRtpItem.getStream()); try { - redisPushStreamResponseListener.removeEvent(sendRtpItem.getApp(), sendRtpItem.getStream()); - mediaListManager.removedChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream()); responseAck(request, Response.REQUEST_TIMEOUT); // 瓒呮椂 } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("鏈鐞嗙殑寮傚父 ", e); } }, userSetting.getPlatformPlayTimeout()); - redisCatchStorage.addWaiteSendRtpItem(sendRtpItem, userSetting.getPlatformPlayTimeout()); - // 娣诲姞涓婄嚎鐨勯�氱煡 - mediaListManager.addChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream(), (sendRtpItemFromRedis) -> { + // + redisRpcService.waitePushStreamOnline(sendRtpItem, (sendRtpItemFromRedis) -> { dynamicTask.stop(sendRtpItem.getCallId()); - redisPushStreamResponseListener.removeEvent(sendRtpItem.getApp(), sendRtpItem.getStream()); if (sendRtpItemFromRedis.getServerId().equals(userSetting.getServerId())) { int localPort = sendRtpPortManager.getNextPort(mediaServerItem); @@ -813,19 +801,7 @@ // 鍏朵粬骞冲彴鍐呭 otherWvpPushStream(sendRtpItemFromRedis, request, platform); } - }); - // 娣诲姞鍥炲鐨勬嫆缁濇垨鑰呴敊璇殑閫氱煡 - redisPushStreamResponseListener.addEvent(sendRtpItem.getApp(), sendRtpItem.getStream(), response -> { - if (response.getCode() != 0) { - dynamicTask.stop(sendRtpItem.getCallId()); - mediaListManager.removedChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream()); - try { - responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg()); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鐐规挱鍥炲: {}", e.getMessage()); - } - } }); } @@ -836,12 +812,9 @@ */ private void otherWvpPushStream(SendRtpItem sendRtpItem, SIPRequest request, ParentPlatform platform) { logger.info("[绾ц仈鐐规挱]鐩存挱娴佹潵鑷叾浠栧钩鍙帮紝鍙戦�乺edis娑堟伅"); - // 鍙戦�乺edis娑堟伅 - redisCatchStorage.sendStartSendRtp(sendRtpItem); + sendRtpItem = redisRpcService.getSendRtpItem(sendRtpItem); // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� sendRtpItem.setStatus(1); - sendRtpItem.setCallId(request.getCallIdHeader().getCallId()); - sendRtpItem.setFromTag(request.getFromTag()); SIPResponse response = sendStreamAck(request, sendRtpItem, platform); if (response != null) { sendRtpItem.setToTag(response.getToTag()); -- Gitblit v1.8.0