From ecd1d2a414f650987579ac95ebdf848cd98d7af0 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期一, 01 四月 2024 15:22:45 +0800 Subject: [PATCH] 临时提交 --- src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java | 163 ++++++++++++++++++++++++++++++++++++++++-------------- 1 files changed, 121 insertions(+), 42 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java old mode 100644 new mode 100755 index c372647..d14cea5 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java @@ -2,16 +2,16 @@ import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; -import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; -import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; -import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; -import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; -import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.media.event.hook.Hook; +import com.genersoft.iot.vmp.media.event.hook.HookType; +import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; +import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; +import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.*; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.utils.redis.RedisUtil; @@ -22,11 +22,13 @@ import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import java.text.ParseException; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -59,19 +61,19 @@ */ public static final int ERROR_CODE_TIMEOUT = -3; - private Map<String, PlayMsgCallback> callbacks = new ConcurrentHashMap<>(); - private Map<String, PlayMsgCallbackForStartSendRtpStream> callbacksForStartSendRtpStream = new ConcurrentHashMap<>(); - private Map<String, PlayMsgErrorCallback> callbacksForError = new ConcurrentHashMap<>(); + private final Map<String, PlayMsgCallback> callbacks = new ConcurrentHashMap<>(); + private final Map<String, PlayMsgCallbackForStartSendRtpStream> callbacksForStartSendRtpStream = new ConcurrentHashMap<>(); + private final Map<String, PlayMsgErrorCallback> callbacksForError = new ConcurrentHashMap<>(); @Autowired private UserSetting userSetting; @Autowired - private ZLMMediaListManager zlmMediaListManager; + private RedisTemplate<Object, Object> redisTemplate; @Autowired - private ZLMRTPServerFactory zlmrtpServerFactory; + private ZLMServerFactory zlmServerFactory; @Autowired private IMediaServerService mediaServerService; @@ -79,16 +81,15 @@ @Autowired private IRedisCatchStorage redisCatchStorage; + @Autowired private DynamicTask dynamicTask; - @Autowired - private ZLMMediaListManager mediaListManager; @Autowired - private ZlmHttpHookSubscribe subscribe; + private HookSubscribe subscribe; - private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); + private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); @Qualifier("taskExecutor") @Autowired @@ -116,8 +117,8 @@ while (!taskQueue.isEmpty()) { Message msg = taskQueue.poll(); try { - JSONObject msgJSON = JSON.parseObject(msg.getBody(), JSONObject.class); - WvpRedisMsg wvpRedisMsg = JSON.to(WvpRedisMsg.class, msgJSON); + WvpRedisMsg wvpRedisMsg = JSON.parseObject(msg.getBody(), WvpRedisMsg.class); + logger.info("[鏀跺埌REDIS閫氱煡] 娑堟伅锛� {}", JSON.toJSONString(wvpRedisMsg)); if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) { continue; } @@ -126,12 +127,16 @@ switch (wvpRedisMsg.getCmd()){ case WvpRedisMsgCmd.GET_SEND_ITEM: - RequestSendItemMsg content = JSON.to(RequestSendItemMsg.class, wvpRedisMsg.getContent()); + RequestSendItemMsg content = JSON.parseObject(wvpRedisMsg.getContent(), RequestSendItemMsg.class); requestSendItemMsgHand(content, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); break; case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: RequestPushStreamMsg param = JSON.to(RequestPushStreamMsg.class, wvpRedisMsg.getContent()); requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); + break; + case WvpRedisMsgCmd.REQUEST_STOP_PUSH_STREAM: + RequestStopPushStreamMsg streamMsg = JSON.to(RequestStopPushStreamMsg.class, wvpRedisMsg.getContent()); + requestStopPushStreamMsgHand(streamMsg, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); break; default: break; @@ -214,7 +219,7 @@ * 澶勭悊鏀跺埌鐨勮姹傛帹娴佺殑璇锋眰 */ private void requestPushStreamMsgHand(RequestPushStreamMsg requestPushStreamMsg, String fromId, String serial) { - MediaServerItem mediaInfo = mediaServerService.getOne(requestPushStreamMsg.getMediaServerId()); + MediaServer mediaInfo = mediaServerService.getOne(requestPushStreamMsg.getMediaServerId()); if (mediaInfo == null) { // TODO 鍥炲閿欒 return; @@ -232,7 +237,7 @@ param.put("pt", requestPushStreamMsg.getPt()); param.put("use_ps", requestPushStreamMsg.isPs() ? "1" : "0"); param.put("only_audio", requestPushStreamMsg.isOnlyAudio() ? "1" : "0"); - JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); + JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaInfo, param); // 鍥炲娑堟伅 responsePushStream(jsonObject, fromId, serial); } @@ -244,16 +249,16 @@ result.setData(content); WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), toId, - WvpRedisMsgCmd.REQUEST_PUSH_STREAM, serial, result); + WvpRedisMsgCmd.REQUEST_PUSH_STREAM, serial, JSON.toJSONString(result)); JSONObject jsonObject = (JSONObject)JSON.toJSON(response); - RedisUtil.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); + redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); } /** * 澶勭悊鏀跺埌鐨勮姹俿endItem鐨勮姹� */ private void requestSendItemMsgHand(RequestSendItemMsg content, String toId, String serial) { - MediaServerItem mediaServerItem = mediaServerService.getOne(content.getMediaServerId()); + MediaServer mediaServerItem = mediaServerService.getOne(content.getMediaServerId()); if (mediaServerItem == null) { logger.info("[鍥炲鎺ㄦ祦淇℃伅] 娴佸獟浣搟}涓嶅瓨鍦� ", content.getMediaServerId()); @@ -262,15 +267,15 @@ result.setMsg("娴佸獟浣撲笉瀛樺湪"); WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), toId, - WvpRedisMsgCmd.GET_SEND_ITEM, serial, result); + WvpRedisMsgCmd.GET_SEND_ITEM, serial, JSON.toJSONString(result)); JSONObject jsonObject = (JSONObject)JSON.toJSON(response); - RedisUtil.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); + redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); return; } // 纭畾娴佹槸鍚﹀湪绾� - boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, content.getApp(), content.getStream()); - if (streamReady) { + Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, content.getApp(), content.getStream()); + if (streamReady != null && streamReady) { logger.info("[鍥炲鎺ㄦ祦淇℃伅] {}/{}", content.getApp(), content.getStream()); responseSendItem(mediaServerItem, content, toId, serial); }else { @@ -285,16 +290,15 @@ WVPResult<SendRtpItem> result = new WVPResult<>(); result.setCode(ERROR_CODE_TIMEOUT); WvpRedisMsg response = WvpRedisMsg.getResponseInstance( - userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, result + userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, JSON.toJSONString(result) ); JSONObject jsonObject = (JSONObject)JSON.toJSON(response); - RedisUtil.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); + redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); }, userSetting.getPlatformPlayTimeout()); // 娣诲姞璁㈤槄 - HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(content.getApp(), content.getStream(), true, "rtsp", mediaServerItem.getId()); - - subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json)->{ + Hook hook = Hook.getInstance(HookType.on_media_arrival, content.getApp(), content.getStream(), content.getMediaServerId()); + subscribe.addSubscribe(hook, (hookData)->{ dynamicTask.stop(taskKey); responseSendItem(mediaServerItem, content, toId, serial); }); @@ -302,16 +306,18 @@ MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1, content.getApp(), content.getStream(), content.getChannelId(), content.getPlatformId(), content.getPlatformName(), content.getServerId(), content.getMediaServerId()); - redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel); + String key = VideoManagerConstants.VM_MSG_STREAM_PUSH_REQUESTED; + logger.info("[redis鍙戦�侀�氱煡] 鎺ㄦ祦琚姹� {}: {}/{}", key, messageForPushChannel.getApp(), messageForPushChannel.getStream()); + redisTemplate.convertAndSend(key, JSON.toJSON(messageForPushChannel)); } } /** * 灏嗚幏鍙栧埌鐨剆endItem鍙戦�佸嚭鍘� */ - private void responseSendItem(MediaServerItem mediaServerItem, RequestSendItemMsg content, String toId, String serial) { - SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, content.getIp(), + private void responseSendItem(MediaServer mediaServerItem, RequestSendItemMsg content, String toId, String serial) { + SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, content.getIp(), content.getPort(), content.getSsrc(), content.getPlatformId(), content.getApp(), content.getStream(), content.getChannelId(), content.getTcp(), content.getRtcp()); @@ -322,12 +328,13 @@ responseSendItemMsg.setSendRtpItem(sendRtpItem); responseSendItemMsg.setMediaServerItem(mediaServerItem); result.setData(responseSendItemMsg); + redisCatchStorage.updateSendRTPSever(sendRtpItem); WvpRedisMsg response = WvpRedisMsg.getResponseInstance( - userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, result + userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, JSON.toJSONString(result) ); JSONObject jsonObject = (JSONObject)JSON.toJSON(response); - RedisUtil.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); + redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); } /** @@ -350,7 +357,7 @@ requestSendItemMsg.setServerId(serverId); String key = UUID.randomUUID().toString(); WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId, WvpRedisMsgCmd.GET_SEND_ITEM, - key, requestSendItemMsg); + key, JSON.toJSONString(requestSendItemMsg)); JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg); logger.info("[璇锋眰鎺ㄦ祦SendItem] {}: {}", serverId, jsonObject); @@ -364,7 +371,7 @@ wvpResult.setMsg("timeout"); errorCallback.handler(wvpResult); }, userSetting.getPlatformPlayTimeout()); - RedisUtil.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); + redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); } /** @@ -375,7 +382,7 @@ public void sendMsgForStartSendRtpStream(String serverId, RequestPushStreamMsg param, PlayMsgCallbackForStartSendRtpStream callback) { String key = UUID.randomUUID().toString(); WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId, - WvpRedisMsgCmd.REQUEST_PUSH_STREAM, key, param); + WvpRedisMsgCmd.REQUEST_PUSH_STREAM, key, JSON.toJSONString(param)); JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg); logger.info("[REDIS 璇锋眰鍏朵粬骞冲彴鎺ㄦ祦] {}: {}", serverId, jsonObject); @@ -389,6 +396,78 @@ callbacksForStartSendRtpStream.remove(key); callbacksForError.remove(key); }); - RedisUtil.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); + redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); + } + + /** + * 鍙戦�佽姹傛帹娴佺殑娑堟伅 + */ + public void sendMsgForStopSendRtpStream(String serverId, RequestStopPushStreamMsg streamMsg) { + String key = UUID.randomUUID().toString(); + WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId, + WvpRedisMsgCmd.REQUEST_STOP_PUSH_STREAM, key, JSON.toJSONString(streamMsg)); + + JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg); + logger.info("[REDIS 璇锋眰鍏朵粬骞冲彴鍋滄鎺ㄦ祦] {}: {}", serverId, jsonObject); + redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); + } + + private SendRtpItem querySendRTPServer(String platformGbId, String channelId, String streamId, String callId) { + if (platformGbId == null) { + platformGbId = "*"; + } + if (channelId == null) { + channelId = "*"; + } + if (streamId == null) { + streamId = "*"; + } + if (callId == null) { + callId = "*"; + } + String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + + userSetting.getServerId() + "_*_" + + platformGbId + "_" + + channelId + "_" + + streamId + "_" + + callId; + List<Object> scan = RedisUtil.scan(redisTemplate, key); + if (scan.size() > 0) { + return (SendRtpItem)redisTemplate.opsForValue().get(scan.get(0)); + }else { + return null; + } + } + + /** + * 澶勭悊鏀跺埌鐨勮姹傛帹娴佺殑璇锋眰 + */ + private void requestStopPushStreamMsgHand(RequestStopPushStreamMsg streamMsg, String fromId, String serial) { + SendRtpItem sendRtpItem = streamMsg.getSendRtpItem(); + if (sendRtpItem == null) { + logger.info("[REDIS 鎵ц鍏朵粬骞冲彴鐨勮姹傚仠姝㈡帹娴乚 澶辫触锛� sendRtpItem涓篘ULL"); + return; + } + MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + if (mediaInfo == null) { + // TODO 鍥炲閿欒 + return; + } + Map<String, Object> param = new HashMap<>(); + param.put("vhost","__defaultVhost__"); + param.put("app",sendRtpItem.getApp()); + param.put("stream",sendRtpItem.getStream()); + param.put("ssrc", sendRtpItem.getSsrc()); + + if (zlmServerFactory.stopSendRtpStream(mediaInfo, param)) { + logger.info("[REDIS 鎵ц鍏朵粬骞冲彴鐨勮姹傚仠姝㈡帹娴乚 鎴愬姛锛� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); + // 鍙戦�乺edis娑堟伅 + MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, + sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), + sendRtpItem.getPlatformId(), streamMsg.getPlatformName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); + messageForPushChannel.setPlatFormIndex(streamMsg.getPlatFormIndex()); + redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel); + } + } } -- Gitblit v1.8.0