From f9abfca003bc9515f1f6028657fa6347326a1402 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期一, 15 四月 2024 21:33:22 +0800 Subject: [PATCH] 临时提交 --- src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java | 12 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java | 386 +++++++++----------- /dev/null | 474 ------------------------ src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java | 28 + src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java | 12 src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java | 3 src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java | 1 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformWaitPushStreamOnlineListener.java | 81 ++++ src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ChannelOnlineEvent.java | 4 src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java | 2 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java | 81 ++++ 11 files changed, 395 insertions(+), 689 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java index d19b8f0..af574b9 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java +++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java @@ -72,6 +72,8 @@ public static final String REGISTER_EXPIRE_TASK_KEY_PREFIX = "VMP_device_register_expire_"; public static final String PUSH_STREAM_LIST = "VMP_PUSH_STREAM_LIST_"; + public static final String WAITE_SEND_PUSH_STREAM = "VMP_WAITE_SEND_PUSH_STREAM:"; + public static final String START_SEND_PUSH_STREAM = "VMP_START_SEND_PUSH_STREAM:"; diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java index c14ebcd..fb88f54 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java @@ -32,9 +32,6 @@ private RedisStreamMsgListener redisStreamMsgListener; @Autowired - private RedisGbPlayMsgListener redisGbPlayMsgListener; - - @Autowired private RedisPushStreamStatusMsgListener redisPushStreamStatusMsgListener; @Autowired @@ -48,6 +45,12 @@ @Autowired private RedisPushStreamCloseResponseListener redisPushStreamCloseResponseListener; + + @Autowired + private RedisPlatformWaitPushStreamOnlineListener redisPlatformWaitPushStreamOnlineListener; + + @Autowired + private RedisPlatformStartSendRtpListener redisPlatformStartSendRtpListener; /** @@ -65,12 +68,13 @@ container.addMessageListener(redisGPSMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_GPS)); container.addMessageListener(redisAlarmMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_SUBSCRIBE_ALARM_RECEIVE)); container.addMessageListener(redisStreamMsgListener, new PatternTopic(VideoManagerConstants.WVP_MSG_STREAM_CHANGE_PREFIX + "PUSH")); - container.addMessageListener(redisGbPlayMsgListener, new PatternTopic(RedisGbPlayMsgListener.WVP_PUSH_STREAM_KEY)); container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE)); container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE)); container.addMessageListener(redisPushStreamResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE)); container.addMessageListener(redisCloseStreamMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE)); container.addMessageListener(redisPushStreamCloseResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED)); + container.addMessageListener(redisPlatformWaitPushStreamOnlineListener, new PatternTopic(VideoManagerConstants.WAITE_SEND_PUSH_STREAM)); + container.addMessageListener(redisPlatformStartSendRtpListener, new PatternTopic(VideoManagerConstants.START_SEND_PUSH_STREAM)); return container; } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java index 30193d2..c0507df 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java @@ -22,6 +22,11 @@ */ private String platformId; + /** + * 骞冲彴鍚嶇О + */ + private String platformName; + /** * 瀵瑰簲璁惧id */ @@ -60,6 +65,11 @@ * 鏄惁涓簍cp涓诲姩妯″紡 */ private boolean tcpActive; + + /** + * 鑷繁鎺ㄦ祦浣跨敤鐨処P + */ + private String localIp; /** * 鑷繁鎺ㄦ祦浣跨敤鐨勭鍙� @@ -306,6 +316,22 @@ this.receiveStream = receiveStream; } + public String getPlatformName() { + return platformName; + } + + public void setPlatformName(String platformName) { + this.platformName = platformName; + } + + public String getLocalIp() { + return localIp; + } + + public void setLocalIp(String localIp) { + this.localIp = localIp; + } + @Override public String toString() { return "SendRtpItem{" + @@ -313,6 +339,7 @@ ", port=" + port + ", ssrc='" + ssrc + '\'' + ", platformId='" + platformId + '\'' + + ", platformName='" + platformName + '\'' + ", deviceId='" + deviceId + '\'' + ", app='" + app + '\'' + ", channelId='" + channelId + '\'' + @@ -320,6 +347,7 @@ ", stream='" + stream + '\'' + ", tcp=" + tcp + ", tcpActive=" + tcpActive + + ", localIp=" + localIp + ", localPort=" + localPort + ", mediaServerId='" + mediaServerId + '\'' + ", serverId='" + serverId + '\'' + diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 96b8b11..3539922 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -18,6 +18,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; +import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; @@ -28,7 +29,6 @@ import com.genersoft.iot.vmp.service.bean.InviteErrorCode; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.bean.SSRCInfo; -import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener; import com.genersoft.iot.vmp.service.redisMsg.RedisPushStreamResponseListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; @@ -127,12 +127,11 @@ @Autowired private SipConfig config; - - @Autowired - private RedisGbPlayMsgListener redisGbPlayMsgListener; - @Autowired private VideoStreamSessionManager streamSession; + + @Autowired + private SendRtpPortManager sendRtpPortManager; @Override @@ -577,21 +576,40 @@ }else { ssrc = gb28181Sdp.getSsrc(); } + SendRtpItem sendRtpItem = new SendRtpItem(); + sendRtpItem.setTcpActive(tcpActive); + sendRtpItem.setTcp(mediaTransmissionTCP); + sendRtpItem.setRtcp(platform.isRtcp()); + sendRtpItem.setSsrc(ssrc); + sendRtpItem.setPlatformName(platform.getName()); + sendRtpItem.setPlatformId(platform.getServerGBId()); + sendRtpItem.setMediaServerId(mediaServerItem.getId()); + sendRtpItem.setChannelId(channelId); + sendRtpItem.setIp(addressStr); + sendRtpItem.setPort(port); + sendRtpItem.setUsePs(true); + sendRtpItem.setApp(gbStream.getApp()); + sendRtpItem.setStream(gbStream.getStream()); + sendRtpItem.setCallId(callIdHeader.getCallId()); + sendRtpItem.setFromTag(request.getFromTag()); + sendRtpItem.setOnlyAudio(false); + sendRtpItem.setPlayType(InviteStreamType.PUSH); + sendRtpItem.setStatus(0); if ("push".equals(gbStream.getStreamType())) { if (streamPushItem != null) { // 浠巖edis鏌ヨ鏄惁姝e湪鎺ユ敹杩欎釜鎺ㄦ祦 OnStreamChangedHookParam pushListItem = redisCatchStorage.getPushListItem(gbStream.getApp(), gbStream.getStream()); + + sendRtpItem.setServerId(pushListItem.getSeverId()); if (pushListItem != null) { StreamPushItem transform = streamPushService.transform(pushListItem); transform.setSelf(userSetting.getServerId().equals(pushListItem.getSeverId())); // 鎺ㄦ祦鐘舵�� - pushStream(evt, request, gbStream, transform, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + pushStream(sendRtpItem, mediaServerItem, platform, request); }else { // 鏈帹娴� 鎷夎捣 - notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request); } } } else if ("proxy".equals(gbStream.getStreamType())) { @@ -601,8 +619,7 @@ mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); } else { //寮�鍚唬鐞嗘媺娴� - notifyStreamOnline(evt, request, gbStream, null, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + notifyProxyStreamOnline(sendRtpItem, mediaServerItem, platform, request); } } @@ -659,8 +676,9 @@ sendRtpItem.setStatus(1); sendRtpItem.setCallId(callIdHeader.getCallId()); sendRtpItem.setFromTag(request.getFromTag()); + sendRtpItem.setLocalIp(mediaServerItem.getSdpIp()); - SIPResponse response = sendStreamAck(mediaServerItem, request, sendRtpItem, platform, evt); + SIPResponse response = sendStreamAck(request, sendRtpItem, platform); if (response != null) { sendRtpItem.setToTag(response.getToTag()); } @@ -670,19 +688,14 @@ } - private void pushStream(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform, - CallIdHeader callIdHeader, MediaServerItem mediaServerItem, - int port, Boolean tcpActive, boolean mediaTransmissionTCP, - String channelId, String addressStr, String ssrc, String requesterId) { + private void pushStream(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) { // 鎺ㄦ祦 - if (streamPushItem.isSelf()) { - Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream()); + if (sendRtpItem.getServerId().equals(userSetting.getServerId())) { + Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream()); if (streamReady != null && streamReady) { // 鑷钩鍙板唴瀹� - SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, - gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp()); - - if (sendRtpItem == null) { + int localPort = sendRtpPortManager.getNextPort(mediaServerItem); + if (localPort == 0) { logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�"); try { responseAck(request, Response.BUSY_HERE); @@ -691,16 +704,11 @@ } return; } - if (tcpActive != null) { - sendRtpItem.setTcpActive(tcpActive); - } - sendRtpItem.setPlayType(InviteStreamType.PUSH); // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� sendRtpItem.setStatus(1); - sendRtpItem.setCallId(callIdHeader.getCallId()); - sendRtpItem.setFromTag(request.getFromTag()); - SIPResponse response = sendStreamAck(mediaServerItem, request, sendRtpItem, platform, evt); + sendRtpItem.setLocalIp(mediaServerItem.getSdpIp()); + SIPResponse response = sendStreamAck(request, sendRtpItem, platform); if (response != null) { sendRtpItem.setToTag(response.getToTag()); } @@ -708,210 +716,168 @@ } else { // 涓嶅湪绾� 鎷夎捣 - notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request); } - } else { + SendRtpItem sendRtpItem = new SendRtpItem(); + sendRtpItem.setRtcp(platform.isRtcp()); + sendRtpItem.setTcp(mediaTransmissionTCP); + sendRtpItem.setTcpActive(); // 鍏朵粬骞冲彴鍐呭 - otherWvpPushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + otherWvpPushStream(sendRtpItem, request, platform); } } /** * 閫氱煡娴佷笂绾� */ - private void notifyStreamOnline(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform, - CallIdHeader callIdHeader, MediaServerItem mediaServerItem, - int port, Boolean tcpActive, boolean mediaTransmissionTCP, - String channelId, String addressStr, String ssrc, String requesterId) { - if ("proxy".equals(gbStream.getStreamType())) { - // TODO 鎺у埗鍚敤浠ヤ娇璁惧涓婄嚎 - logger.info("[ app={}, stream={} ]閫氶亾鏈帹娴侊紝鍚敤娴佸悗寮�濮嬫帹娴�", gbStream.getApp(), gbStream.getStream()); - // 鐩戝惉娴佷笂绾� - HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(gbStream.getApp(), gbStream.getStream(), true, "rtsp", mediaServerItem.getId()); - zlmHttpHookSubscribe.addSubscribe(hookSubscribe, (mediaServerItemInUSe, hookParam) -> { - OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam; - logger.info("[涓婄骇鐐规挱]鎷夋祦浠g悊宸茬粡灏辩华锛� {}/{}", streamChangedHookParam.getApp(), streamChangedHookParam.getStream()); - dynamicTask.stop(callIdHeader.getCallId()); - pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); - }); - dynamicTask.startDelay(callIdHeader.getCallId(), () -> { - logger.info("[ app={}, stream={} ] 绛夊緟鎷夋祦浠g悊娴佽秴鏃�", gbStream.getApp(), gbStream.getStream()); - zlmHttpHookSubscribe.removeSubscribe(hookSubscribe); - }, userSetting.getPlatformPlayTimeout()); - boolean start = streamProxyService.start(gbStream.getApp(), gbStream.getStream()); - if (!start) { - try { - responseAck(request, Response.BUSY_HERE, "channel [" + gbStream.getGbId() + "] offline"); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[鍛戒护鍙戦�佸け璐 invite 閫氶亾鏈帹娴�: {}", e.getMessage()); - } - zlmHttpHookSubscribe.removeSubscribe(hookSubscribe); - dynamicTask.stop(callIdHeader.getCallId()); + private void notifyProxyStreamOnline(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) { + // TODO 鎺у埗鍚敤浠ヤ娇璁惧涓婄嚎 + logger.info("[ app={}, stream={} ]閫氶亾鏈帹娴侊紝鍚敤娴佸悗寮�濮嬫帹娴�", gbStream.getApp(), gbStream.getStream()); + // 鐩戝惉娴佷笂绾� + HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(gbStream.getApp(), gbStream.getStream(), true, "rtsp", mediaServerItem.getId()); + zlmHttpHookSubscribe.addSubscribe(hookSubscribe, (mediaServerItemInUSe, hookParam) -> { + OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam; + logger.info("[涓婄骇鐐规挱]鎷夋祦浠g悊宸茬粡灏辩华锛� {}/{}", streamChangedHookParam.getApp(), streamChangedHookParam.getStream()); + dynamicTask.stop(callIdHeader.getCallId()); + pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive, + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + }); + dynamicTask.startDelay(callIdHeader.getCallId(), () -> { + logger.info("[ app={}, stream={} ] 绛夊緟鎷夋祦浠g悊娴佽秴鏃�", gbStream.getApp(), gbStream.getStream()); + zlmHttpHookSubscribe.removeSubscribe(hookSubscribe); + }, userSetting.getPlatformPlayTimeout()); + boolean start = streamProxyService.start(gbStream.getApp(), gbStream.getStream()); + if (!start) { + try { + responseAck(request, Response.BUSY_HERE, "channel [" + gbStream.getGbId() + "] offline"); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[鍛戒护鍙戦�佸け璐 invite 閫氶亾鏈帹娴�: {}", e.getMessage()); } - } else if ("push".equals(gbStream.getStreamType())) { - if (!platform.isStartOfflinePush()) { - // 骞冲彴璁剧疆涓叧闂簡鎷夎捣绂荤嚎鐨勬帹娴佸垯鐩存帴鍥炲 - try { - logger.info("[涓婄骇鐐规挱] 澶辫触锛屾帹娴佽澶囨湭鎺ㄦ祦锛宑hannel: {}, app: {}, stream: {}", gbStream.getGbId(), gbStream.getApp(), gbStream.getStream()); - responseAck(request, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing"); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[鍛戒护鍙戦�佸け璐 invite 閫氶亾鏈帹娴�: {}", e.getMessage()); - } - return; - } - // 鍙戦�乺edis娑堟伅浠ヤ娇璁惧涓婄嚎 - logger.info("[ app={}, stream={} ]閫氶亾鏈帹娴侊紝鍙戦�乺edis淇℃伅鎺у埗璁惧寮�濮嬫帹娴�", gbStream.getApp(), gbStream.getStream()); - - MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1, - gbStream.getApp(), gbStream.getStream(), gbStream.getGbId(), gbStream.getPlatformId(), - platform.getName(), null, gbStream.getMediaServerId()); - redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel); - // 璁剧疆瓒呮椂 - dynamicTask.startDelay(callIdHeader.getCallId(), () -> { - logger.info("[ app={}, stream={} ] 绛夊緟璁惧寮�濮嬫帹娴佽秴鏃�", gbStream.getApp(), gbStream.getStream()); - try { - redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream()); - mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream()); - responseAck(request, Response.REQUEST_TIMEOUT); // 瓒呮椂 - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("鏈鐞嗙殑寮傚父 ", e); - } - }, userSetting.getPlatformPlayTimeout()); - // 娣诲姞鐩戝惉 - int finalPort = port; - Boolean finalTcpActive = tcpActive; - - // 娣诲姞鍦ㄦ湰鏈轰笂绾跨殑閫氱煡 - mediaListManager.addChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream(), (app, stream, serverId) -> { - dynamicTask.stop(callIdHeader.getCallId()); - redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream()); - if (serverId.equals(userSetting.getServerId())) { - SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId, - app, stream, channelId, mediaTransmissionTCP, platform.isRtcp()); - - if (sendRtpItem == null) { - logger.warn("涓婄骇鐐规椂鍒涘缓sendRTPItem澶辫触锛屽彲鑳芥槸鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�"); - try { - responseAck(request, Response.BUSY_HERE); - } catch (SipException e) { - logger.error("鏈鐞嗙殑寮傚父 ", e); - } catch (InvalidArgumentException e) { - logger.error("鏈鐞嗙殑寮傚父 ", e); - } catch (ParseException e) { - logger.error("鏈鐞嗙殑寮傚父 ", e); - } - return; - } - if (finalTcpActive != null) { - sendRtpItem.setTcpActive(finalTcpActive); - } - sendRtpItem.setPlayType(InviteStreamType.PUSH); - // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� - sendRtpItem.setStatus(1); - sendRtpItem.setCallId(callIdHeader.getCallId()); - - sendRtpItem.setFromTag(request.getFromTag()); - SIPResponse response = sendStreamAck(mediaServerItem, request, sendRtpItem, platform, evt); - if (response != null) { - sendRtpItem.setToTag(response.getToTag()); - } - redisCatchStorage.updateSendRTPSever(sendRtpItem); - } else { - // 鍏朵粬骞冲彴鍐呭 - otherWvpPushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); - } - }); - - // 娣诲姞鍥炲鐨勬嫆缁濇垨鑰呴敊璇殑閫氱煡 - redisPushStreamResponseListener.addEvent(gbStream.getApp(), gbStream.getStream(), response -> { - if (response.getCode() != 0) { - dynamicTask.stop(callIdHeader.getCallId()); - mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream()); - try { - responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg()); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鐐规挱鍥炲: {}", e.getMessage()); - } - } - }); + zlmHttpHookSubscribe.removeSubscribe(hookSubscribe); + dynamicTask.stop(callIdHeader.getCallId()); } } /** - * 鏉ヨ嚜鍏朵粬wvp鐨勬帹娴� + * 閫氱煡娴佷笂绾� */ - private void otherWvpPushStream(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform, - CallIdHeader callIdHeader, MediaServerItem mediaServerItem, - int port, Boolean tcpActive, boolean mediaTransmissionTCP, - String channelId, String addressStr, String ssrc, String requesterId) { - logger.info("[绾ц仈鐐规挱]鐩存挱娴佹潵鑷叾浠栧钩鍙帮紝鍙戦�乺edis娑堟伅"); - // 鍙戦�乺edis娑堟伅 - redisGbPlayMsgListener.sendMsg(streamPushItem.getServerId(), streamPushItem.getMediaServerId(), - streamPushItem.getApp(), streamPushItem.getStream(), addressStr, port, ssrc, requesterId, - channelId, mediaTransmissionTCP, platform.isRtcp(),platform.getName(), responseSendItemMsg -> { - SendRtpItem sendRtpItem = responseSendItemMsg.getSendRtpItem(); - if (sendRtpItem == null || responseSendItemMsg.getMediaServerItem() == null) { - logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�"); - try { - responseAck(request, Response.BUSY_HERE); - } catch (SipException e) { - logger.error("鏈鐞嗙殑寮傚父 ", e); - } catch (InvalidArgumentException e) { - logger.error("鏈鐞嗙殑寮傚父 ", e); - } catch (ParseException e) { - logger.error("鏈鐞嗙殑寮傚父 ", e); - } - return; - } - // 鏀跺埌sendItem - if (tcpActive != null) { - sendRtpItem.setTcpActive(tcpActive); - } - sendRtpItem.setPlayType(InviteStreamType.PUSH); - // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� - sendRtpItem.setStatus(1); - sendRtpItem.setCallId(callIdHeader.getCallId()); + private void notifyPushStreamOnline(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) { + if (!platform.isStartOfflinePush()) { + // 骞冲彴璁剧疆涓叧闂簡鎷夎捣绂荤嚎鐨勬帹娴佸垯鐩存帴鍥炲 + try { + logger.info("[涓婄骇鐐规挱] 澶辫触锛屾帹娴佽澶囨湭鎺ㄦ祦锛宑hannel: {}, app: {}, stream: {}", gbStream.getGbId(), gbStream.getApp(), gbStream.getStream()); + responseAck(request, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing"); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[鍛戒护鍙戦�佸け璐 invite 閫氶亾鏈帹娴�: {}", e.getMessage()); + } + return; + } + // 鍙戦�乺edis娑堟伅浠ヤ娇璁惧涓婄嚎 + logger.info("[ app={}, stream={} ]閫氶亾鏈帹娴侊紝鍙戦�乺edis淇℃伅鎺у埗璁惧寮�濮嬫帹娴�", gbStream.getApp(), gbStream.getStream()); - sendRtpItem.setFromTag(request.getFromTag()); - SIPResponse response = sendStreamAck(responseSendItemMsg.getMediaServerItem(), request, sendRtpItem, platform, evt); - if (response != null) { - sendRtpItem.setToTag(response.getToTag()); - } - redisCatchStorage.updateSendRTPSever(sendRtpItem); - }, (wvpResult) -> { + MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1, + gbStream.getApp(), gbStream.getStream(), gbStream.getGbId(), gbStream.getPlatformId(), + platform.getName(), userSetting.getServerId(), gbStream.getMediaServerId()); + redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel); + // 璁剧疆瓒呮椂 + dynamicTask.startDelay(callIdHeader.getCallId(), () -> { + logger.info("[ app={}, stream={} ] 绛夊緟璁惧寮�濮嬫帹娴佽秴鏃�", gbStream.getApp(), gbStream.getStream()); + try { + redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream()); + mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream()); + responseAck(request, Response.REQUEST_TIMEOUT); // 瓒呮椂 + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("鏈鐞嗙殑寮傚父 ", e); + } + }, userSetting.getPlatformPlayTimeout()); + // 鍐欏叆redis寰呭彂娴佷俊鎭紝渚涘叾浠杦vp璇诲彇骞剁敓鎴愬彂娴佷俊鎭� + SendRtpItem sendRtpItemTemp = new SendRtpItem(); + sendRtpItemTemp.setIp(addressStr); + sendRtpItemTemp.setPort(port); + sendRtpItemTemp.setSsrc(ssrc); + sendRtpItemTemp.setPlatformId(requesterId); + sendRtpItemTemp.setPlatformName(platform.getName()); + sendRtpItemTemp.setTcp(mediaTransmissionTCP); + sendRtpItemTemp.setRtcp(platform.isRtcp()); + sendRtpItemTemp.setTcpActive(tcpActive); + sendRtpItemTemp.setPlayType(InviteStreamType.PUSH); + redisCatchStorage.addWaiteSendRtpItem(sendRtpItemTemp, userSetting.getPlatformPlayTimeout()); + // 娣诲姞涓婄嚎鐨勯�氱煡 + mediaListManager.addChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream(), (sendRtpItemFromRedis) -> { + dynamicTask.stop(callIdHeader.getCallId()); + redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream()); + if (sendRtpItemFromRedis.getServerId().equals(userSetting.getServerId())) { - // 閿欒 - if (wvpResult.getCode() == RedisGbPlayMsgListener.ERROR_CODE_OFFLINE) { - // 绂荤嚎 - // 鏌ヨ鏄惁鍦ㄦ湰鏈轰笂绾夸簡 - StreamPushItem currentStreamPushItem = streamPushService.getPush(streamPushItem.getApp(), streamPushItem.getStream()); - if (currentStreamPushItem.isPushIng()) { - // 鍦ㄧ嚎鐘舵�� - pushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); - - } else { - // 涓嶅湪绾� 鎷夎捣 - notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); - } - } + int localPort = sendRtpPortManager.getNextPort(mediaServerItem); + if (localPort == 0) { + logger.warn("涓婄骇鐐规椂鍒涘缓sendRTPItem澶辫触锛屽彲鑳芥槸鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�"); try { responseAck(request, Response.BUSY_HERE); - } catch (InvalidArgumentException | ParseException | SipException e) { - logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鐐规挱鍥炲 BUSY_HERE: {}", e.getMessage()); + } catch (SipException e) { + logger.error("鏈鐞嗙殑寮傚父 ", e); + } catch (InvalidArgumentException e) { + logger.error("鏈鐞嗙殑寮傚父 ", e); + } catch (ParseException e) { + logger.error("鏈鐞嗙殑寮傚父 ", e); } - }); + return; + } + sendRtpItemTemp.setLocalPort(localPort); + sendRtpItemTemp.setLocalIp(ObjectUtils.isEmpty(platform.getSendStreamIp()): ); + // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� + sendRtpItemTemp.setStatus(1); + sendRtpItemTemp.setCallId(callIdHeader.getCallId()); + + sendRtpItemTemp.setFromTag(request.getFromTag()); + SIPResponse response = sendStreamAck(request, sendRtpItemTemp, platform); + if (response != null) { + sendRtpItemTemp.setToTag(response.getToTag()); + } + redisCatchStorage.updateSendRTPSever(sendRtpItemTemp); + } else { + // 鍏朵粬骞冲彴鍐呭 + otherWvpPushStream(sendRtpItemFromRedis, request, platform); + } + }); + + // 娣诲姞鍥炲鐨勬嫆缁濇垨鑰呴敊璇殑閫氱煡 + redisPushStreamResponseListener.addEvent(gbStream.getApp(), gbStream.getStream(), response -> { + if (response.getCode() != 0) { + dynamicTask.stop(callIdHeader.getCallId()); + mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream()); + try { + responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg()); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鐐规挱鍥炲: {}", e.getMessage()); + } + } + }); } - public SIPResponse sendStreamAck(MediaServerItem mediaServerItem, SIPRequest request, SendRtpItem sendRtpItem, ParentPlatform platform, RequestEvent evt) { - String sdpIp = mediaServerItem.getSdpIp(); + + /** + * 鏉ヨ嚜鍏朵粬wvp鐨勬帹娴� + */ + private void otherWvpPushStream(SendRtpItem sendRtpItem, SIPRequest request, ParentPlatform platform) { + logger.info("[绾ц仈鐐规挱]鐩存挱娴佹潵鑷叾浠栧钩鍙帮紝鍙戦�乺edis娑堟伅"); + // 鍙戦�乺edis娑堟伅 + redisCatchStorage.sendStartSendRtp(sendRtpItem); + // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� + sendRtpItem.setStatus(1); + sendRtpItem.setCallId(request.getCallIdHeader().getCallId()); + sendRtpItem.setFromTag(request.getFromTag()); + SIPResponse response = sendStreamAck(request, sendRtpItem, platform); + if (response != null) { + sendRtpItem.setToTag(response.getToTag()); + } + redisCatchStorage.updateSendRTPSever(sendRtpItem); + } + + public SIPResponse sendStreamAck(SIPRequest request, SendRtpItem sendRtpItem, ParentPlatform platform) { + + String sdpIp = sendRtpItem.getLocalIp(); if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) { sdpIp = platform.getSendStreamIp(); } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ChannelOnlineEvent.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ChannelOnlineEvent.java index 714838e..6b3c94f 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ChannelOnlineEvent.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ChannelOnlineEvent.java @@ -1,5 +1,7 @@ package com.genersoft.iot.vmp.media.zlm.dto; +import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; + import java.text.ParseException; /** @@ -7,5 +9,5 @@ */ public interface ChannelOnlineEvent { - void run(String app, String stream, String serverId) throws ParseException; + void run(SendRtpItem sendRtpItem) throws ParseException; } diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java b/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java index 1a9e3e5..6a4f866 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java @@ -61,6 +61,7 @@ messageForPushChannel.setGbId(gbId); messageForPushChannel.setApp(app); messageForPushChannel.setStream(stream); + messageForPushChannel.setServerId(serverId); messageForPushChannel.setMediaServerId(mediaServerId); messageForPushChannel.setPlatFormId(platFormId); messageForPushChannel.setPlatFormName(platFormName); diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java deleted file mode 100755 index 3b990f0..0000000 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java +++ /dev/null @@ -1,474 +0,0 @@ -package com.genersoft.iot.vmp.service.redisMsg; - -import com.alibaba.fastjson2.JSON; -import com.alibaba.fastjson2.JSONObject; -import com.genersoft.iot.vmp.common.VideoManagerConstants; -import com.genersoft.iot.vmp.conf.DynamicTask; -import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; -import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; -import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; -import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; -import com.genersoft.iot.vmp.service.IMediaServerService; -import com.genersoft.iot.vmp.service.bean.*; -import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import com.genersoft.iot.vmp.utils.redis.RedisUtil; -import com.genersoft.iot.vmp.vmanager.bean.WVPResult; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.data.redis.connection.Message; -import org.springframework.data.redis.connection.MessageListener; -import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import org.springframework.stereotype.Component; - -import java.text.ParseException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; - - -/** - * 鐩戝惉涓嬬骇鍙戦�佹帹閫佷俊鎭紝骞跺彂閫佸浗鏍囨帹娴佹秷鎭笂绾� - * @author lin - */ -@Component -public class RedisGbPlayMsgListener implements MessageListener { - - private final static Logger logger = LoggerFactory.getLogger(RedisGbPlayMsgListener.class); - - public static final String WVP_PUSH_STREAM_KEY = "WVP_PUSH_STREAM"; - - /** - * 娴佸獟浣撲笉瀛樺湪鐨勯敊璇帥 - */ - public static final int ERROR_CODE_MEDIA_SERVER_NOT_FOUND = -1; - - /** - * 绂荤嚎鐨勯敊璇帥 - */ - public static final int ERROR_CODE_OFFLINE = -2; - - /** - * 瓒呮椂鐨勯敊璇帥 - */ - public static final int ERROR_CODE_TIMEOUT = -3; - - private Map<String, PlayMsgCallback> callbacks = new ConcurrentHashMap<>(); - private Map<String, PlayMsgCallbackForStartSendRtpStream> callbacksForStartSendRtpStream = new ConcurrentHashMap<>(); - private Map<String, PlayMsgErrorCallback> callbacksForError = new ConcurrentHashMap<>(); - - @Autowired - private UserSetting userSetting; - - - @Autowired - private RedisTemplate<Object, Object> redisTemplate; - - @Autowired - private ZLMServerFactory zlmServerFactory; - - @Autowired - private IMediaServerService mediaServerService; - - @Autowired - private IRedisCatchStorage redisCatchStorage; - - - @Autowired - private DynamicTask dynamicTask; - - - @Autowired - private ZlmHttpHookSubscribe subscribe; - - private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); - - @Qualifier("taskExecutor") - @Autowired - private ThreadPoolTaskExecutor taskExecutor; - - - public interface PlayMsgCallback{ - void handler(ResponseSendItemMsg responseSendItemMsg) throws ParseException; - } - - public interface PlayMsgCallbackForStartSendRtpStream{ - void handler(JSONObject jsonObject); - } - - public interface PlayMsgErrorCallback{ - void handler(WVPResult wvpResult); - } - - @Override - public void onMessage(Message message, byte[] bytes) { - boolean isEmpty = taskQueue.isEmpty(); - taskQueue.offer(message); - if (isEmpty) { - taskExecutor.execute(() -> { - while (!taskQueue.isEmpty()) { - Message msg = taskQueue.poll(); - try { - WvpRedisMsg wvpRedisMsg = JSON.parseObject(msg.getBody(), WvpRedisMsg.class); - logger.info("[鏀跺埌REDIS閫氱煡] 娑堟伅锛� {}", JSON.toJSONString(wvpRedisMsg)); - if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) { - continue; - } - if (WvpRedisMsg.isRequest(wvpRedisMsg)) { - logger.info("[鏀跺埌REDIS閫氱煡] 璇锋眰锛� {}", new String(msg.getBody())); - - switch (wvpRedisMsg.getCmd()){ - case WvpRedisMsgCmd.GET_SEND_ITEM: - RequestSendItemMsg content = JSON.parseObject(wvpRedisMsg.getContent(), RequestSendItemMsg.class); - requestSendItemMsgHand(content, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); - break; - case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: - RequestPushStreamMsg param = JSON.to(RequestPushStreamMsg.class, wvpRedisMsg.getContent()); - requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); - break; - case WvpRedisMsgCmd.REQUEST_STOP_PUSH_STREAM: - RequestStopPushStreamMsg streamMsg = JSON.to(RequestStopPushStreamMsg.class, wvpRedisMsg.getContent()); - requestStopPushStreamMsgHand(streamMsg, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); - break; - default: - break; - } - - }else { - logger.info("[鏀跺埌REDIS閫氱煡] 鍥炲锛� {}", new String(msg.getBody())); - switch (wvpRedisMsg.getCmd()){ - case WvpRedisMsgCmd.GET_SEND_ITEM: - - WVPResult content = JSON.to(WVPResult.class, wvpRedisMsg.getContent()); - - String key = wvpRedisMsg.getSerial(); - switch (content.getCode()) { - case 0: - ResponseSendItemMsg responseSendItemMsg =JSON.to(ResponseSendItemMsg.class, content.getData()); - PlayMsgCallback playMsgCallback = callbacks.get(key); - if (playMsgCallback != null) { - callbacksForError.remove(key); - try { - playMsgCallback.handler(responseSendItemMsg); - } catch (ParseException e) { - logger.error("[REDIS娑堟伅澶勭悊寮傚父] ", e); - } - } - break; - case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: - case ERROR_CODE_OFFLINE: - case ERROR_CODE_TIMEOUT: - PlayMsgErrorCallback errorCallback = callbacksForError.get(key); - if (errorCallback != null) { - callbacks.remove(key); - errorCallback.handler(content); - } - break; - default: - break; - } - break; - case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: - WVPResult wvpResult = JSON.to(WVPResult.class, wvpRedisMsg.getContent()); - String serial = wvpRedisMsg.getSerial(); - switch (wvpResult.getCode()) { - case 0: - JSONObject jsonObject = (JSONObject)wvpResult.getData(); - PlayMsgCallbackForStartSendRtpStream playMsgCallback = callbacksForStartSendRtpStream.get(serial); - if (playMsgCallback != null) { - callbacksForError.remove(serial); - playMsgCallback.handler(jsonObject); - } - break; - case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: - case ERROR_CODE_OFFLINE: - case ERROR_CODE_TIMEOUT: - PlayMsgErrorCallback errorCallback = callbacksForError.get(serial); - if (errorCallback != null) { - callbacks.remove(serial); - errorCallback.handler(wvpResult); - } - break; - default: - break; - } - break; - default: - break; - } - - } - }catch (Exception e) { - logger.warn("[RedisGbPlayMsg] 鍙戠幇鏈鐞嗙殑寮傚父, \r\n{}", JSON.toJSONString(message)); - logger.error("[RedisGbPlayMsg] 寮傚父鍐呭锛� ", e); - } - } - }); - } - } - - /** - * 澶勭悊鏀跺埌鐨勮姹傛帹娴佺殑璇锋眰 - */ - private void requestPushStreamMsgHand(RequestPushStreamMsg requestPushStreamMsg, String fromId, String serial) { - MediaServerItem mediaInfo = mediaServerService.getOne(requestPushStreamMsg.getMediaServerId()); - if (mediaInfo == null) { - // TODO 鍥炲閿欒 - return; - } - String is_Udp = requestPushStreamMsg.isTcp() ? "0" : "1"; - Map<String, Object> param = new HashMap<>(); - param.put("vhost","__defaultVhost__"); - param.put("app",requestPushStreamMsg.getApp()); - param.put("stream",requestPushStreamMsg.getStream()); - param.put("ssrc", requestPushStreamMsg.getSsrc()); - param.put("dst_url",requestPushStreamMsg.getIp()); - param.put("dst_port", requestPushStreamMsg.getPort()); - param.put("is_udp", is_Udp); - param.put("src_port", requestPushStreamMsg.getSrcPort()); - param.put("pt", requestPushStreamMsg.getPt()); - param.put("use_ps", requestPushStreamMsg.isPs() ? "1" : "0"); - param.put("only_audio", requestPushStreamMsg.isOnlyAudio() ? "1" : "0"); - JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaInfo, param); - // 鍥炲娑堟伅 - responsePushStream(jsonObject, fromId, serial); - } - - private void responsePushStream(JSONObject content, String toId, String serial) { - - WVPResult<JSONObject> result = new WVPResult<>(); - result.setCode(0); - result.setData(content); - - WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), toId, - WvpRedisMsgCmd.REQUEST_PUSH_STREAM, serial, JSON.toJSONString(result)); - JSONObject jsonObject = (JSONObject)JSON.toJSON(response); - redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); - } - - /** - * 澶勭悊鏀跺埌鐨勮姹俿endItem鐨勮姹� - */ - private void requestSendItemMsgHand(RequestSendItemMsg content, String toId, String serial) { - MediaServerItem mediaServerItem = mediaServerService.getOne(content.getMediaServerId()); - if (mediaServerItem == null) { - logger.info("[鍥炲鎺ㄦ祦淇℃伅] 娴佸獟浣搟}涓嶅瓨鍦� ", content.getMediaServerId()); - - WVPResult<SendRtpItem> result = new WVPResult<>(); - result.setCode(ERROR_CODE_MEDIA_SERVER_NOT_FOUND); - result.setMsg("娴佸獟浣撲笉瀛樺湪"); - - WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), toId, - WvpRedisMsgCmd.GET_SEND_ITEM, serial, JSON.toJSONString(result)); - - JSONObject jsonObject = (JSONObject)JSON.toJSON(response); - redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); - return; - } - // 纭畾娴佹槸鍚﹀湪绾� - Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, content.getApp(), content.getStream()); - if (streamReady != null && streamReady) { - logger.info("[鍥炲鎺ㄦ祦淇℃伅] {}/{}", content.getApp(), content.getStream()); - responseSendItem(mediaServerItem, content, toId, serial); - }else { - // 娴佸凡缁忕绾� - // 鍙戦�乺edis娑堟伅浠ヤ娇璁惧涓婄嚎 - logger.info("[ app={}, stream={} ]閫氶亾绂荤嚎锛屽彂閫乺edis淇℃伅鎺у埗璁惧寮�濮嬫帹娴�",content.getApp(), content.getStream()); - - String taskKey = UUID.randomUUID().toString(); - // 璁剧疆瓒呮椂 - dynamicTask.startDelay(taskKey, ()->{ - logger.info("[ app={}, stream={} ] 绛夊緟璁惧寮�濮嬫帹娴佽秴鏃�", content.getApp(), content.getStream()); - WVPResult<SendRtpItem> result = new WVPResult<>(); - result.setCode(ERROR_CODE_TIMEOUT); - WvpRedisMsg response = WvpRedisMsg.getResponseInstance( - userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, JSON.toJSONString(result) - ); - JSONObject jsonObject = (JSONObject)JSON.toJSON(response); - redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); - }, userSetting.getPlatformPlayTimeout()); - - // 娣诲姞璁㈤槄 - HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(content.getApp(), content.getStream(), true, "rtsp", mediaServerItem.getId()); - - subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam)->{ - dynamicTask.stop(taskKey); - responseSendItem(mediaServerItem, content, toId, serial); - }); - - MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1, content.getApp(), content.getStream(), - content.getChannelId(), content.getPlatformId(), content.getPlatformName(), content.getServerId(), - content.getMediaServerId()); - - String key = VideoManagerConstants.VM_MSG_STREAM_PUSH_REQUESTED; - logger.info("[redis鍙戦�侀�氱煡] 鎺ㄦ祦琚姹� {}: {}/{}", key, messageForPushChannel.getApp(), messageForPushChannel.getStream()); - redisTemplate.convertAndSend(key, JSON.toJSON(messageForPushChannel)); - } - } - - /** - * 灏嗚幏鍙栧埌鐨剆endItem鍙戦�佸嚭鍘� - */ - private void responseSendItem(MediaServerItem mediaServerItem, RequestSendItemMsg content, String toId, String serial) { - SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, content.getIp(), - content.getPort(), content.getSsrc(), content.getPlatformId(), - content.getApp(), content.getStream(), content.getChannelId(), - content.getTcp(), content.getRtcp()); - - WVPResult<ResponseSendItemMsg> result = new WVPResult<>(); - result.setCode(0); - ResponseSendItemMsg responseSendItemMsg = new ResponseSendItemMsg(); - responseSendItemMsg.setSendRtpItem(sendRtpItem); - responseSendItemMsg.setMediaServerItem(mediaServerItem); - result.setData(responseSendItemMsg); - redisCatchStorage.updateSendRTPSever(sendRtpItem); - - WvpRedisMsg response = WvpRedisMsg.getResponseInstance( - userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, JSON.toJSONString(result) - ); - JSONObject jsonObject = (JSONObject)JSON.toJSON(response); - redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); - } - - /** - * 鍙戦�佹秷鎭姹備笅绾х敓鎴愭帹娴佷俊鎭� - * @param serverId 涓嬬骇鏈嶅姟ID - * @param app 搴旂敤鍚� - * @param stream 娴両D - * @param ip 鐩爣IP - * @param port 鐩爣绔彛 - * @param ssrc ssrc - * @param platformId 骞冲彴鍥芥爣缂栧彿 - * @param channelId 閫氶亾ID - * @param isTcp 鏄惁浣跨敤TCP - * @param callback 寰楀埌淇℃伅鐨勫洖璋� - */ - public void sendMsg(String serverId, String mediaServerId, String app, String stream, String ip, int port, String ssrc, - String platformId, String channelId, boolean isTcp, boolean rtcp, String platformName, PlayMsgCallback callback, PlayMsgErrorCallback errorCallback) { - RequestSendItemMsg requestSendItemMsg = RequestSendItemMsg.getInstance( - serverId, mediaServerId, app, stream, ip, port, ssrc, platformId, channelId, isTcp, rtcp, platformName); - requestSendItemMsg.setServerId(serverId); - String key = UUID.randomUUID().toString(); - WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId, WvpRedisMsgCmd.GET_SEND_ITEM, - key, JSON.toJSONString(requestSendItemMsg)); - - JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg); - logger.info("[璇锋眰鎺ㄦ祦SendItem] {}: {}", serverId, jsonObject); - callbacks.put(key, callback); - callbacksForError.put(key, errorCallback); - dynamicTask.startDelay(key, ()->{ - callbacks.remove(key); - callbacksForError.remove(key); - WVPResult<Object> wvpResult = new WVPResult<>(); - wvpResult.setCode(ERROR_CODE_TIMEOUT); - wvpResult.setMsg("timeout"); - errorCallback.handler(wvpResult); - }, userSetting.getPlatformPlayTimeout()); - redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); - } - - /** - * 鍙戦�佽姹傛帹娴佺殑娑堟伅 - * @param param 鎺ㄦ祦鍙傛暟 - * @param callback 鍥炶皟 - */ - public void sendMsgForStartSendRtpStream(String serverId, RequestPushStreamMsg param, PlayMsgCallbackForStartSendRtpStream callback) { - String key = UUID.randomUUID().toString(); - WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId, - WvpRedisMsgCmd.REQUEST_PUSH_STREAM, key, JSON.toJSONString(param)); - - JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg); - logger.info("[REDIS 璇锋眰鍏朵粬骞冲彴鎺ㄦ祦] {}: {}", serverId, jsonObject); - dynamicTask.startDelay(key, ()->{ - callbacksForStartSendRtpStream.remove(key); - callbacksForError.remove(key); - }, userSetting.getPlatformPlayTimeout()); - callbacksForStartSendRtpStream.put(key, callback); - callbacksForError.put(key, (wvpResult)->{ - logger.info("[REDIS 璇锋眰鍏朵粬骞冲彴鎺ㄦ祦] 澶辫触: {}", wvpResult.getMsg()); - callbacksForStartSendRtpStream.remove(key); - callbacksForError.remove(key); - }); - redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); - } - - /** - * 鍙戦�佽姹傛帹娴佺殑娑堟伅 - */ - public void sendMsgForStopSendRtpStream(String serverId, RequestStopPushStreamMsg streamMsg) { - String key = UUID.randomUUID().toString(); - WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId, - WvpRedisMsgCmd.REQUEST_STOP_PUSH_STREAM, key, JSON.toJSONString(streamMsg)); - - JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg); - logger.info("[REDIS 璇锋眰鍏朵粬骞冲彴鍋滄鎺ㄦ祦] {}: {}", serverId, jsonObject); - redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); - } - - private SendRtpItem querySendRTPServer(String platformGbId, String channelId, String streamId, String callId) { - if (platformGbId == null) { - platformGbId = "*"; - } - if (channelId == null) { - channelId = "*"; - } - if (streamId == null) { - streamId = "*"; - } - if (callId == null) { - callId = "*"; - } - String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX - + userSetting.getServerId() + "_*_" - + platformGbId + "_" - + channelId + "_" - + streamId + "_" - + callId; - List<Object> scan = RedisUtil.scan(redisTemplate, key); - if (scan.size() > 0) { - return (SendRtpItem)redisTemplate.opsForValue().get(scan.get(0)); - }else { - return null; - } - } - - /** - * 澶勭悊鏀跺埌鐨勮姹傛帹娴佺殑璇锋眰 - */ - private void requestStopPushStreamMsgHand(RequestStopPushStreamMsg streamMsg, String fromId, String serial) { - SendRtpItem sendRtpItem = streamMsg.getSendRtpItem(); - if (sendRtpItem == null) { - logger.info("[REDIS 鎵ц鍏朵粬骞冲彴鐨勮姹傚仠姝㈡帹娴乚 澶辫触锛� sendRtpItem涓篘ULL"); - return; - } - MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - if (mediaInfo == null) { - // TODO 鍥炲閿欒 - return; - } - Map<String, Object> param = new HashMap<>(); - param.put("vhost","__defaultVhost__"); - param.put("app",sendRtpItem.getApp()); - param.put("stream",sendRtpItem.getStream()); - param.put("ssrc", sendRtpItem.getSsrc()); - - if (zlmServerFactory.stopSendRtpStream(mediaInfo, param)) { - logger.info("[REDIS 鎵ц鍏朵粬骞冲彴鐨勮姹傚仠姝㈡帹娴乚 鎴愬姛锛� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); - // 鍙戦�乺edis娑堟伅 - MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, - sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), - sendRtpItem.getPlatformId(), streamMsg.getPlatformName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); - messageForPushChannel.setPlatFormIndex(streamMsg.getPlatFormIndex()); - redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel); - } - - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java new file mode 100755 index 0000000..14a96e8 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java @@ -0,0 +1,81 @@ +package com.genersoft.iot.vmp.service.redisMsg; + +import com.alibaba.fastjson2.JSON; +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; +import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Component; +import org.springframework.util.ObjectUtils; + +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * 鏀跺埌娑堟伅鍚庡紑濮嬬粰涓婄骇鍙戞祦 + * @author lin + */ +@Component +public class RedisPlatformStartSendRtpListener implements MessageListener { + + private final static Logger logger = LoggerFactory.getLogger(RedisPlatformStartSendRtpListener.class); + + private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); + + @Autowired + private UserSetting userSetting; + + @Autowired + private ZlmHttpHookSubscribe hookSubscribe; + + @Qualifier("taskExecutor") + @Autowired + private ThreadPoolTaskExecutor taskExecutor; + + + @Override + public void onMessage(Message message, byte[] bytes) { + logger.info("[REDIS娑堟伅-鏀跺埌涓婄骇绛夊埌璁惧鎺ㄦ祦鐨剅edis娑堟伅]锛� {}", new String(message.getBody())); + boolean isEmpty = taskQueue.isEmpty(); + taskQueue.offer(message); + if (isEmpty) { + taskExecutor.execute(() -> { + while (!taskQueue.isEmpty()) { + Message msg = taskQueue.poll(); + try { + MessageForPushChannel messageForPushChannel = JSON.parseObject(new String(msg.getBody()), MessageForPushChannel.class); + if (messageForPushChannel == null + || ObjectUtils.isEmpty(messageForPushChannel.getApp()) + || ObjectUtils.isEmpty(messageForPushChannel.getStream()) + || userSetting.getServerId().equals(messageForPushChannel.getServerId())){ + continue; + } + + // 鐩戝惉娴佷笂绾裤�� 娴佷笂绾跨洿鎺ュ彂閫乻endRtpItem娑堟伅缁欏疄闄呯殑淇′护澶勭悊鑰� + HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed( + messageForPushChannel.getApp(), messageForPushChannel.getStream(), true, "rtsp", + null); + hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> { + // 璇诲彇redis涓殑涓婄骇鐐规挱淇℃伅锛岀敓鎴恠endRtpItm鍙戦�佸嚭鍘� + + }); + + + }catch (Exception e) { + logger.warn("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉] 鍙戠幇鏈鐞嗙殑寮傚父, \r\n{}", JSON.toJSONString(message)); + logger.error("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉] 寮傚父鍐呭锛� ", e); + } + } + }); + } + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformWaitPushStreamOnlineListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformWaitPushStreamOnlineListener.java new file mode 100755 index 0000000..f3b415d --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformWaitPushStreamOnlineListener.java @@ -0,0 +1,81 @@ +package com.genersoft.iot.vmp.service.redisMsg; + +import com.alibaba.fastjson2.JSON; +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; +import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Component; +import org.springframework.util.ObjectUtils; + +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * 涓婄骇绛夊埌璁惧鎺ㄦ祦鐨剅edis娑堟伅 + * @author lin + */ +@Component +public class RedisPlatformWaitPushStreamOnlineListener implements MessageListener { + + private final static Logger logger = LoggerFactory.getLogger(RedisPlatformWaitPushStreamOnlineListener.class); + + private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); + + @Autowired + private UserSetting userSetting; + + @Autowired + private ZlmHttpHookSubscribe hookSubscribe; + + @Qualifier("taskExecutor") + @Autowired + private ThreadPoolTaskExecutor taskExecutor; + + + @Override + public void onMessage(Message message, byte[] bytes) { + logger.info("[REDIS娑堟伅-鏀跺埌涓婄骇绛夊埌璁惧鎺ㄦ祦鐨剅edis娑堟伅]锛� {}", new String(message.getBody())); + boolean isEmpty = taskQueue.isEmpty(); + taskQueue.offer(message); + if (isEmpty) { + taskExecutor.execute(() -> { + while (!taskQueue.isEmpty()) { + Message msg = taskQueue.poll(); + try { + MessageForPushChannel messageForPushChannel = JSON.parseObject(new String(msg.getBody()), MessageForPushChannel.class); + if (messageForPushChannel == null + || ObjectUtils.isEmpty(messageForPushChannel.getApp()) + || ObjectUtils.isEmpty(messageForPushChannel.getStream()) + || userSetting.getServerId().equals(messageForPushChannel.getServerId())){ + continue; + } + + // 鐩戝惉娴佷笂绾裤�� 娴佷笂绾跨洿鎺ュ彂閫乻endRtpItem娑堟伅缁欏疄闄呯殑淇′护澶勭悊鑰� + HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed( + messageForPushChannel.getApp(), messageForPushChannel.getStream(), true, "rtsp", + null); + hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> { + // 璇诲彇redis涓殑涓婄骇鐐规挱淇℃伅锛岀敓鎴恠endRtpItm鍙戦�佸嚭鍘� + + }); + + + }catch (Exception e) { + logger.warn("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉] 鍙戠幇鏈鐞嗙殑寮傚父, \r\n{}", JSON.toJSONString(message)); + logger.error("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉] 寮傚父鍐呭锛� ", e); + } + } + }); + } + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index 66db103..1e5e93d 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -217,4 +217,7 @@ void sendPushStreamClose(MessageForPushChannel messageForPushChannel); + void addWaiteSendRtpItem(SendRtpItem sendRtpItem, int platformPlayTimeout); + + void sendStartSendRtp(SendRtpItem sendRtpItem); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 1eac4df..60084df 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -678,4 +678,16 @@ logger.info("[redis鍙戦�侀�氱煡] 鍙戦�� 鍋滄鍚戜笂绾ф帹娴� {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId()); redisTemplate.convertAndSend(key, JSON.toJSON(msg)); } + + @Override + public void addWaiteSendRtpItem(SendRtpItem sendRtpItem, int platformPlayTimeout) { + String key = VideoManagerConstants.WAITE_SEND_PUSH_STREAM + sendRtpItem.getApp() + "_" + sendRtpItem.getStream(); + redisTemplate.opsForValue().set(key, platformPlayTimeout); + } + + @Override + public void sendStartSendRtp(SendRtpItem sendRtpItem) { + String key = VideoManagerConstants.START_SEND_PUSH_STREAM + sendRtpItem.getApp() + "_" + sendRtpItem.getStream(); + redisTemplate.opsForValue().set(key, JSON.toJSONString(sendRtpItem)); + } } -- Gitblit v1.8.0