From bf6e09d231f49fb0c2cd5a81f6b31cc64d27c368 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期三, 17 四月 2024 12:56:22 +0800 Subject: [PATCH] 修复多wvp国标级联机制 --- src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java | 4 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java | 18 ++ src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java | 73 +++++------ src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java | 15 +- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java | 37 ++--- src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java | 5 src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java | 57 +++++++++ src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java | 7 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java | 76 ++++++++++++ src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java | 17 ++ src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java | 2 src/main/resources/application.yml | 2 12 files changed, 237 insertions(+), 76 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java index 5ddaed3..dcf2830 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java @@ -42,6 +42,9 @@ @Autowired private RedisRpcConfig redisRpcConfig; + @Autowired + private RedisPushStreamResponseListener redisPushStreamCloseResponseListener; + /** * redis娑堟伅鐩戝惉鍣ㄥ鍣� 鍙互娣诲姞澶氫釜鐩戝惉涓嶅悓璇濋鐨剅edis鐩戝惉鍣紝鍙渶瑕佹妸娑堟伅鐩戝惉鍣ㄥ拰鐩稿簲鐨勬秷鎭闃呭鐞嗗櫒缁戝畾锛岃娑堟伅鐩戝惉鍣� @@ -61,6 +64,7 @@ container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE)); container.addMessageListener(redisCloseStreamMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE)); container.addMessageListener(redisRpcConfig, new PatternTopic(RedisRpcConfig.REDIS_REQUEST_CHANNEL_KEY)); + container.addMessageListener(redisPushStreamCloseResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE)); return container; } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java index 2c134fd..2e0ecf5 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -15,9 +15,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; -import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.genersoft.iot.vmp.service.*; -import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; @@ -135,29 +133,22 @@ logger.info("[鏀跺埌bye] 鍋滄鎺ㄦ祦锛歿}, 濯掍綋鑺傜偣锛� {}", streamId, sendRtpItem.getMediaServerId()); if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) { - // 鏌ヨ杩欒矾娴佹槸鍚︽槸鏈钩鍙扮殑 - StreamPushItem push = pushService.getPush(sendRtpItem.getApp(), sendRtpItem.getStream()); - if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) { - redisRpcService.stopSendRtp(sendRtpItem); - }else { - MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), - callIdHeader.getCallId(), null); - zlmServerFactory.stopSendRtpStream(mediaInfo, param); - if (userSetting.getUseCustomSsrcForParentInvite()) { - mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc()); - } - - ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId()); - if (platform != null) { - MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, - sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), - sendRtpItem.getPlatformId(), platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); - messageForPushChannel.setPlatFormIndex(platform.getId()); - redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel); + ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId()); + if (platform != null) { + redisCatchStorage.sendPlatformStopPlayMsg(sendRtpItem, platform); + if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) { + redisRpcService.stopSendRtp(sendRtpItem); }else { - logger.info("[涓婄骇骞冲彴鍋滄瑙傜湅] 鏈壘鍒板钩鍙皗}鐨勪俊鎭紝鍙戦�乺edis娑堟伅澶辫触", sendRtpItem.getPlatformId()); + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), + callIdHeader.getCallId(), null); + zlmServerFactory.stopSendRtpStream(mediaInfo, param); + if (userSetting.getUseCustomSsrcForParentInvite()) { + mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc()); + } } + }else { + logger.info("[涓婄骇骞冲彴鍋滄瑙傜湅] 鏈壘鍒板钩鍙皗}鐨勪俊鎭紝鍙戦�乺edis娑堟伅澶辫触", sendRtpItem.getPlatformId()); } }else { MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 59ff50c..a51dd37 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -29,6 +29,7 @@ import com.genersoft.iot.vmp.service.bean.InviteErrorCode; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.bean.SSRCInfo; +import com.genersoft.iot.vmp.service.redisMsg.RedisPushStreamResponseListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.DateUtil; @@ -125,6 +126,9 @@ @Autowired private SendRtpPortManager sendRtpPortManager; + + @Autowired + private RedisPushStreamResponseListener redisPushStreamResponseListener; @Override @@ -759,6 +763,7 @@ redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel); // 璁剧疆瓒呮椂 dynamicTask.startDelay(sendRtpItem.getCallId(), () -> { + redisRpcService.stopWaitePushStreamOnline(sendRtpItem); logger.info("[ app={}, stream={} ] 绛夊緟璁惧寮�濮嬫帹娴佽秴鏃�", sendRtpItem.getApp(), sendRtpItem.getStream()); try { responseAck(request, Response.REQUEST_TIMEOUT); // 瓒呮椂 @@ -801,7 +806,18 @@ // 鍏朵粬骞冲彴鍐呭 otherWvpPushStream(sendRtpItemFromRedis, request, platform); } - + }); + // 娣诲姞鍥炲鐨勬嫆缁濇垨鑰呴敊璇殑閫氱煡 + redisPushStreamResponseListener.addEvent(sendRtpItem.getApp(), sendRtpItem.getStream(), response -> { + if (response.getCode() != 0) { + dynamicTask.stop(sendRtpItem.getCallId()); + redisRpcService.stopWaitePushStreamOnline(sendRtpItem); + try { + responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg()); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鐐规挱鍥炲: {}", e.getMessage()); + } + } }); } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 32bf76a..2a45ad8 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -10,19 +10,22 @@ import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; -import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; +import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; -import com.genersoft.iot.vmp.media.zlm.dto.*; +import com.genersoft.iot.vmp.media.zlm.dto.HookType; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; +import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.media.zlm.dto.hook.*; import com.genersoft.iot.vmp.service.*; -import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.bean.SSRCInfo; +import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.DateUtil; @@ -77,6 +80,10 @@ @Autowired private IRedisCatchStorage redisCatchStorage; + + + @Autowired + private IRedisRpcService redisRpcService; @Autowired private IInviteStreamService inviteStreamService; @@ -511,41 +518,33 @@ } if (sendRtpItem.getApp().equals(param.getApp())) { - logger.info(sendRtpItem.toString()); - if (userSetting.getServerId().equals(sendRtpItem.getServerId())) { - MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, - sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), - sendRtpItem.getPlatformId(), null, userSetting.getServerId(), param.getMediaServerId()); - // 閫氱煡鍏朵粬wvp鍋滄鍙戞祦 - redisCatchStorage.sendPushStreamClose(messageForPushChannel); - }else { - String platformId = sendRtpItem.getPlatformId(); - ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId); - Device device = deviceService.getDevice(platformId); - - try { - if (platform != null) { - commanderFroPlatform.streamByeCmd(platform, sendRtpItem); - redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(), - sendRtpItem.getCallId(), sendRtpItem.getStream()); - } else { - cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId()); - if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST) - || sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) { - AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); - if (audioBroadcastCatch != null) { - // 鏉ヨ嚜涓婄骇骞冲彴鐨勫仠姝㈠璁� - logger.info("[鍋滄瀵硅] 鏉ヨ嚜涓婄骇锛屽钩鍙帮細{}, 閫氶亾锛歿}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); - audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); - } + ParentPlatform platform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId()); + Device device = deviceService.getDevice(sendRtpItem.getPlatformId()); + try { + if (platform != null) { + commanderFroPlatform.streamByeCmd(platform, sendRtpItem); + redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), + sendRtpItem.getCallId(), sendRtpItem.getStream()); + redisCatchStorage.sendPlatformStopPlayMsg(sendRtpItem, platform); + } else if (device != null) { + cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId()); + if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST) + || sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) { + AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + if (audioBroadcastCatch != null) { + // 鏉ヨ嚜涓婄骇骞冲彴鐨勫仠姝㈠璁� + logger.info("[鍋滄瀵硅] 鏉ヨ嚜涓婄骇锛屽钩鍙帮細{}, 閫氶亾锛歿}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); } } - } catch (SipException | InvalidArgumentException | ParseException | - SsrcTransactionNotFoundException e) { - logger.error("[鍛戒护鍙戦�佸け璐 鍙戦�丅YE: {}", e.getMessage()); + }else { + // 閫氱煡鍏朵粬wvp鍋滄鍙戞祦 + redisRpcService.rtpSendStopped(sendRtpItem); } + } catch (SipException | InvalidArgumentException | ParseException | + SsrcTransactionNotFoundException e) { + logger.error("[鍛戒护鍙戦�佸け璐 鍙戦�丅YE: {}", e.getMessage()); } - } } } @@ -593,11 +592,7 @@ redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStream()); if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) { - MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, - sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), - sendRtpItem.getPlatformId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); - messageForPushChannel.setPlatFormIndex(parentPlatform.getId()); - redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel); + redisCatchStorage.sendPlatformStopPlayMsg(sendRtpItem, parentPlatform); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java index ec24abe..b503fab 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java @@ -372,6 +372,6 @@ param.put("app", sendRtpItem.getApp()); param.put("stream", sendRtpItem.getStream()); param.put("ssrc", sendRtpItem.getSsrc()); - return zlmresTfulUtils.startSendRtp(mediaServerItem, param); + return zlmresTfulUtils.stopSendRtp(mediaServerItem, param); } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java index a601ae9..8d1b7f0 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java @@ -13,4 +13,9 @@ void waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback<SendRtpItem> callback); WVPResult stopSendRtp(SendRtpItem sendRtpItem); + + void stopWaitePushStreamOnline(SendRtpItem sendRtpItem); + + void rtpSendStopped(SendRtpItem sendRtpItem); + } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java new file mode 100644 index 0000000..c90771b --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java @@ -0,0 +1,76 @@ +package com.genersoft.iot.vmp.service.redisMsg; + +import com.alibaba.fastjson2.JSON; +import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Component; +import org.springframework.util.ObjectUtils; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * 鎺ユ敹redis杩斿洖鐨勬帹娴佺粨鏋� + * @author lin + */ +@Component +public class RedisPushStreamResponseListener implements MessageListener { + + private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamResponseListener.class); + + private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); + + @Qualifier("taskExecutor") + @Autowired + private ThreadPoolTaskExecutor taskExecutor; + + + private Map<String, PushStreamResponseEvent> responseEvents = new ConcurrentHashMap<>(); + + public interface PushStreamResponseEvent{ + void run(MessageForPushChannelResponse response); + } + + @Override + public void onMessage(Message message, byte[] bytes) { + logger.info("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉]锛� {}", new String(message.getBody())); + boolean isEmpty = taskQueue.isEmpty(); + taskQueue.offer(message); + if (isEmpty) { + taskExecutor.execute(() -> { + while (!taskQueue.isEmpty()) { + Message msg = taskQueue.poll(); + try { + MessageForPushChannelResponse response = JSON.parseObject(new String(msg.getBody()), MessageForPushChannelResponse.class); + if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){ + logger.info("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉]锛氬弬鏁颁笉鍏�"); + continue; + } + // 鏌ョ湅姝e湪绛夊緟鐨刬nvite娑堟伅 + if (responseEvents.get(response.getApp() + response.getStream()) != null) { + responseEvents.get(response.getApp() + response.getStream()).run(response); + } + }catch (Exception e) { + logger.warn("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉] 鍙戠幇鏈鐞嗙殑寮傚父, \r\n{}", JSON.toJSONString(message)); + logger.error("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉] 寮傚父鍐呭锛� ", e); + } + } + }); + } + } + + public void addEvent(String app, String stream, PushStreamResponseEvent callback) { + responseEvents.put(app + stream, callback); + } + + public void removeEvent(String app, String stream) { + responseEvents.remove(app + stream); + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java index 7a81eab..682f564 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java @@ -7,8 +7,10 @@ import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse; +import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; @@ -18,12 +20,17 @@ import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; + +import javax.sip.InvalidArgumentException; +import javax.sip.SipException; +import java.text.ParseException; /** * 鍏朵粬wvp鍙戣捣鐨剅pc璋冪敤锛岃繖閲岀殑鏂规硶琚� RedisRpcConfig 閫氳繃鍙嶅皠瀵绘壘瀵瑰簲鐨勬柟娉曞悕绉拌皟鐢� @@ -57,6 +64,14 @@ @Autowired private RedisTemplate<Object, Object> redisTemplate; + + + @Autowired + private ISIPCommanderForPlatform commanderFroPlatform; + + + @Autowired + private IVideoManagerStorage storager; /** @@ -133,6 +148,20 @@ return null; } + /** + * 鍋滄鐩戝惉娴佷笂绾� + */ + public RedisRpcResponse stopWaitePushStreamOnline(RedisRpcRequest request) { + SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class); + logger.info("[redis-rpc] 鍋滄鐩戝惉娴佷笂绾匡細 {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() ); + + // 鐩戝惉娴佷笂绾裤�� 娴佷笂绾跨洿鎺ュ彂閫乻endRtpItem娑堟伅缁欏疄闄呯殑淇′护澶勭悊鑰� + HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed( + sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null); + hookSubscribe.removeSubscribe(hook); + return null; + } + /** * 寮�濮嬪彂娴� @@ -194,6 +223,34 @@ return response; } + /** + * 鍏朵粬wvp閫氱煡鎺ㄦ祦宸茬粡鍋滄浜� + */ + public RedisRpcResponse rtpSendStopped(RedisRpcRequest request) { + SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class); + logger.info("[redis-rpc] 鎺ㄦ祦宸茬粡鍋滄锛� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() ); + SendRtpItem sendRtpItemInCatch = redisCatchStorage.querySendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getStream(), sendRtpItem.getCallId()); + RedisRpcResponse response = request.getResponse(); + response.setStatusCode(200); + if (sendRtpItemInCatch == null) { + return response; + } + String platformId = sendRtpItem.getPlatformId(); + ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId); + if (platform == null) { + return response; + } + try { + commanderFroPlatform.streamByeCmd(platform, sendRtpItem); + redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(), + sendRtpItem.getCallId(), sendRtpItem.getStream()); + redisCatchStorage.sendPlatformStopPlayMsg(sendRtpItem, platform); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[鍛戒护鍙戦�佸け璐 鍙戦�丅YE: {}", e.getMessage()); + } + return response; + } + private void sendResponse(RedisRpcResponse response){ response.setToId(userSetting.getServerId()); RedisRpcMessage message = new RedisRpcMessage(); diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java index e9a00a9..f11b9aa 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java @@ -97,4 +97,21 @@ }); } + + @Override + public void stopWaitePushStreamOnline(SendRtpItem sendRtpItem) { + HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed( + sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null); + hookSubscribe.removeSubscribe(hook); + RedisRpcRequest request = buildRequest("stopWaitePushStreamOnline", sendRtpItem); + request.setToId(sendRtpItem.getServerId()); + redisRpcConfig.request(request, 10); + } + + @Override + public void rtpSendStopped(SendRtpItem sendRtpItem) { + RedisRpcRequest request = buildRequest("rtpSendStopped", sendRtpItem); + request.setToId(sendRtpItem.getServerId()); + redisRpcConfig.request(request, 10); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index 108bc17..0cf6c39 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -2,10 +2,7 @@ import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.SystemAllInfo; -import com.genersoft.iot.vmp.gb28181.bean.AlarmChannelMessage; -import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch; -import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; @@ -207,7 +204,7 @@ void sendPlatformStartPlayMsg(MessageForPushChannel messageForPushChannel); - void sendPlatformStopPlayMsg(MessageForPushChannel messageForPushChannel); + void sendPlatformStopPlayMsg(SendRtpItem sendRtpItem, ParentPlatform platform); void addPushListItem(String app, String stream, OnStreamChangedHookParam param); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 60ebfab..d1e8052 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -5,10 +5,7 @@ import com.genersoft.iot.vmp.common.SystemAllInfo; import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.gb28181.bean.AlarmChannelMessage; -import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch; -import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; @@ -644,9 +641,15 @@ } @Override - public void sendPlatformStopPlayMsg(MessageForPushChannel msg) { + public void sendPlatformStopPlayMsg(SendRtpItem sendRtpItem, ParentPlatform platform) { + + MessageForPushChannel msg = MessageForPushChannel.getInstance(0, + sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), + sendRtpItem.getPlatformId(), platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); + msg.setPlatFormIndex(platform.getId()); + String key = VideoManagerConstants.VM_MSG_STREAM_STOP_PLAY_NOTIFY; - logger.info("[redis鍙戦�侀�氱煡] 鍙戦�� 涓婄骇骞冲彴鍋滄瑙傜湅 {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId()); + logger.info("[redis鍙戦�侀�氱煡] 鍙戦�� 涓婄骇骞冲彴鍋滄瑙傜湅 {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), platform.getServerGBId()); redisTemplate.convertAndSend(key, JSON.toJSON(msg)); } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 3f47844..69f947e 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -2,4 +2,4 @@ application: name: wvp profiles: - active: local \ No newline at end of file + active: local2 \ No newline at end of file -- Gitblit v1.8.0