From 2b1f7a47394363e95deb4dfa0f1c67d41e747f7f Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期三, 01 二月 2023 10:56:40 +0800 Subject: [PATCH] Merge branch 'wvp-28181-2.0' into fix-269 --- src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java | 393 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 393 insertions(+), 0 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java new file mode 100644 index 0000000..35ed99e --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java @@ -0,0 +1,393 @@ +package com.genersoft.iot.vmp.service.redisMsg; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.conf.DynamicTask; +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; +import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; +import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.service.bean.*; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.utils.redis.RedisUtil; +import com.genersoft.iot.vmp.vmanager.bean.WVPResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Component; + +import java.text.ParseException; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; + + +/** + * 鐩戝惉涓嬬骇鍙戦�佹帹閫佷俊鎭紝骞跺彂閫佸浗鏍囨帹娴佹秷鎭笂绾� + * @author lin + */ +@Component +public class RedisGbPlayMsgListener implements MessageListener { + + private final static Logger logger = LoggerFactory.getLogger(RedisGbPlayMsgListener.class); + + public static final String WVP_PUSH_STREAM_KEY = "WVP_PUSH_STREAM"; + + /** + * 娴佸獟浣撲笉瀛樺湪鐨勯敊璇帥 + */ + public static final int ERROR_CODE_MEDIA_SERVER_NOT_FOUND = -1; + + /** + * 绂荤嚎鐨勯敊璇帥 + */ + public static final int ERROR_CODE_OFFLINE = -2; + + /** + * 瓒呮椂鐨勯敊璇帥 + */ + public static final int ERROR_CODE_TIMEOUT = -3; + + private Map<String, PlayMsgCallback> callbacks = new ConcurrentHashMap<>(); + private Map<String, PlayMsgCallbackForStartSendRtpStream> callbacksForStartSendRtpStream = new ConcurrentHashMap<>(); + private Map<String, PlayMsgErrorCallback> callbacksForError = new ConcurrentHashMap<>(); + + @Autowired + private UserSetting userSetting; + + + @Autowired + private ZLMMediaListManager zlmMediaListManager; + + @Autowired + private ZLMRTPServerFactory zlmrtpServerFactory; + + @Autowired + private IMediaServerService mediaServerService; + + @Autowired + private IRedisCatchStorage redisCatchStorage; + + @Autowired + private DynamicTask dynamicTask; + + @Autowired + private ZLMMediaListManager mediaListManager; + + @Autowired + private ZlmHttpHookSubscribe subscribe; + + private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); + + @Qualifier("taskExecutor") + @Autowired + private ThreadPoolTaskExecutor taskExecutor; + + + public interface PlayMsgCallback{ + void handler(ResponseSendItemMsg responseSendItemMsg) throws ParseException; + } + + public interface PlayMsgCallbackForStartSendRtpStream{ + void handler(JSONObject jsonObject); + } + + public interface PlayMsgErrorCallback{ + void handler(WVPResult wvpResult); + } + + @Override + public void onMessage(Message message, byte[] bytes) { + boolean isEmpty = taskQueue.isEmpty(); + taskQueue.offer(message); + if (isEmpty) { + taskExecutor.execute(() -> { + while (!taskQueue.isEmpty()) { + Message msg = taskQueue.poll(); + try { + JSONObject msgJSON = JSON.parseObject(msg.getBody(), JSONObject.class); + WvpRedisMsg wvpRedisMsg = JSON.to(WvpRedisMsg.class, msgJSON); + if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) { + continue; + } + if (WvpRedisMsg.isRequest(wvpRedisMsg)) { + logger.info("[鏀跺埌REDIS閫氱煡] 璇锋眰锛� {}", new String(msg.getBody())); + + switch (wvpRedisMsg.getCmd()){ + case WvpRedisMsgCmd.GET_SEND_ITEM: + RequestSendItemMsg content = JSON.to(RequestSendItemMsg.class, wvpRedisMsg.getContent()); + requestSendItemMsgHand(content, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); + break; + case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: + RequestPushStreamMsg param = JSON.to(RequestPushStreamMsg.class, wvpRedisMsg.getContent()); + requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); + break; + default: + break; + } + + }else { + logger.info("[鏀跺埌REDIS閫氱煡] 鍥炲锛� {}", new String(msg.getBody())); + switch (wvpRedisMsg.getCmd()){ + case WvpRedisMsgCmd.GET_SEND_ITEM: + + WVPResult content = JSON.to(WVPResult.class, wvpRedisMsg.getContent()); + + String key = wvpRedisMsg.getSerial(); + switch (content.getCode()) { + case 0: + ResponseSendItemMsg responseSendItemMsg =JSON.to(ResponseSendItemMsg.class, content.getData()); + PlayMsgCallback playMsgCallback = callbacks.get(key); + if (playMsgCallback != null) { + callbacksForError.remove(key); + try { + playMsgCallback.handler(responseSendItemMsg); + } catch (ParseException e) { + logger.error("[REDIS娑堟伅澶勭悊寮傚父] ", e); + } + } + break; + case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: + case ERROR_CODE_OFFLINE: + case ERROR_CODE_TIMEOUT: + PlayMsgErrorCallback errorCallback = callbacksForError.get(key); + if (errorCallback != null) { + callbacks.remove(key); + errorCallback.handler(content); + } + break; + default: + break; + } + break; + case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: + WVPResult wvpResult = JSON.to(WVPResult.class, wvpRedisMsg.getContent()); + String serial = wvpRedisMsg.getSerial(); + switch (wvpResult.getCode()) { + case 0: + JSONObject jsonObject = (JSONObject)wvpResult.getData(); + PlayMsgCallbackForStartSendRtpStream playMsgCallback = callbacksForStartSendRtpStream.get(serial); + if (playMsgCallback != null) { + callbacksForError.remove(serial); + playMsgCallback.handler(jsonObject); + } + break; + case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: + case ERROR_CODE_OFFLINE: + case ERROR_CODE_TIMEOUT: + PlayMsgErrorCallback errorCallback = callbacksForError.get(serial); + if (errorCallback != null) { + callbacks.remove(serial); + errorCallback.handler(wvpResult); + } + break; + default: + break; + } + break; + default: + break; + } + + } + }catch (Exception e) { + logger.warn("[RedisGbPlayMsg] 鍙戠幇鏈鐞嗙殑寮傚父, {}",e.getMessage()); + } + } + }); + } + } + + /** + * 澶勭悊鏀跺埌鐨勮姹傛帹娴佺殑璇锋眰 + */ + private void requestPushStreamMsgHand(RequestPushStreamMsg requestPushStreamMsg, String fromId, String serial) { + MediaServerItem mediaInfo = mediaServerService.getOne(requestPushStreamMsg.getMediaServerId()); + if (mediaInfo == null) { + // TODO 鍥炲閿欒 + return; + } + String is_Udp = requestPushStreamMsg.isTcp() ? "0" : "1"; + Map<String, Object> param = new HashMap<>(); + param.put("vhost","__defaultVhost__"); + param.put("app",requestPushStreamMsg.getApp()); + param.put("stream",requestPushStreamMsg.getStream()); + param.put("ssrc", requestPushStreamMsg.getSsrc()); + param.put("dst_url",requestPushStreamMsg.getIp()); + param.put("dst_port", requestPushStreamMsg.getPort()); + param.put("is_udp", is_Udp); + param.put("src_port", requestPushStreamMsg.getSrcPort()); + param.put("pt", requestPushStreamMsg.getPt()); + param.put("use_ps", requestPushStreamMsg.isPs() ? "1" : "0"); + param.put("only_audio", requestPushStreamMsg.isOnlyAudio() ? "1" : "0"); + JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); + // 鍥炲娑堟伅 + responsePushStream(jsonObject, fromId, serial); + } + + private void responsePushStream(JSONObject content, String toId, String serial) { + + WVPResult<JSONObject> result = new WVPResult<>(); + result.setCode(0); + result.setData(content); + + WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), toId, + WvpRedisMsgCmd.REQUEST_PUSH_STREAM, serial, result); + JSONObject jsonObject = (JSONObject)JSON.toJSON(response); + RedisUtil.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); + } + + /** + * 澶勭悊鏀跺埌鐨勮姹俿endItem鐨勮姹� + */ + private void requestSendItemMsgHand(RequestSendItemMsg content, String toId, String serial) { + MediaServerItem mediaServerItem = mediaServerService.getOne(content.getMediaServerId()); + if (mediaServerItem == null) { + logger.info("[鍥炲鎺ㄦ祦淇℃伅] 娴佸獟浣搟}涓嶅瓨鍦� ", content.getMediaServerId()); + + WVPResult<SendRtpItem> result = new WVPResult<>(); + result.setCode(ERROR_CODE_MEDIA_SERVER_NOT_FOUND); + result.setMsg("娴佸獟浣撲笉瀛樺湪"); + + WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), toId, + WvpRedisMsgCmd.GET_SEND_ITEM, serial, result); + + JSONObject jsonObject = (JSONObject)JSON.toJSON(response); + RedisUtil.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); + return; + } + // 纭畾娴佹槸鍚﹀湪绾� + boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, content.getApp(), content.getStream()); + if (streamReady) { + logger.info("[鍥炲鎺ㄦ祦淇℃伅] {}/{}", content.getApp(), content.getStream()); + responseSendItem(mediaServerItem, content, toId, serial); + }else { + // 娴佸凡缁忕绾� + // 鍙戦�乺edis娑堟伅浠ヤ娇璁惧涓婄嚎 + logger.info("[ app={}, stream={} ]閫氶亾绂荤嚎锛屽彂閫乺edis淇℃伅鎺у埗璁惧寮�濮嬫帹娴�",content.getApp(), content.getStream()); + + String taskKey = UUID.randomUUID().toString(); + // 璁剧疆瓒呮椂 + dynamicTask.startDelay(taskKey, ()->{ + logger.info("[ app={}, stream={} ] 绛夊緟璁惧寮�濮嬫帹娴佽秴鏃�", content.getApp(), content.getStream()); + WVPResult<SendRtpItem> result = new WVPResult<>(); + result.setCode(ERROR_CODE_TIMEOUT); + WvpRedisMsg response = WvpRedisMsg.getResponseInstance( + userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, result + ); + JSONObject jsonObject = (JSONObject)JSON.toJSON(response); + RedisUtil.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); + }, userSetting.getPlatformPlayTimeout()); + + // 娣诲姞璁㈤槄 + HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(content.getApp(), content.getStream(), true, "rtsp", mediaServerItem.getId()); + + subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json)->{ + dynamicTask.stop(taskKey); + responseSendItem(mediaServerItem, content, toId, serial); + }); + + MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1, content.getApp(), content.getStream(), + content.getChannelId(), content.getPlatformId(), content.getPlatformName(), content.getServerId(), + content.getMediaServerId()); + redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel); + + } + } + + /** + * 灏嗚幏鍙栧埌鐨剆endItem鍙戦�佸嚭鍘� + */ + private void responseSendItem(MediaServerItem mediaServerItem, RequestSendItemMsg content, String toId, String serial) { + SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, content.getIp(), + content.getPort(), content.getSsrc(), content.getPlatformId(), + content.getApp(), content.getStream(), content.getChannelId(), + content.getTcp(), content.getRtcp()); + + WVPResult<ResponseSendItemMsg> result = new WVPResult<>(); + result.setCode(0); + ResponseSendItemMsg responseSendItemMsg = new ResponseSendItemMsg(); + responseSendItemMsg.setSendRtpItem(sendRtpItem); + responseSendItemMsg.setMediaServerItem(mediaServerItem); + result.setData(responseSendItemMsg); + + WvpRedisMsg response = WvpRedisMsg.getResponseInstance( + userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, result + ); + JSONObject jsonObject = (JSONObject)JSON.toJSON(response); + RedisUtil.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); + } + + /** + * 鍙戦�佹秷鎭姹備笅绾х敓鎴愭帹娴佷俊鎭� + * @param serverId 涓嬬骇鏈嶅姟ID + * @param app 搴旂敤鍚� + * @param stream 娴両D + * @param ip 鐩爣IP + * @param port 鐩爣绔彛 + * @param ssrc ssrc + * @param platformId 骞冲彴鍥芥爣缂栧彿 + * @param channelId 閫氶亾ID + * @param isTcp 鏄惁浣跨敤TCP + * @param callback 寰楀埌淇℃伅鐨勫洖璋� + */ + public void sendMsg(String serverId, String mediaServerId, String app, String stream, String ip, int port, String ssrc, + String platformId, String channelId, boolean isTcp, boolean rtcp, String platformName, PlayMsgCallback callback, PlayMsgErrorCallback errorCallback) { + RequestSendItemMsg requestSendItemMsg = RequestSendItemMsg.getInstance( + serverId, mediaServerId, app, stream, ip, port, ssrc, platformId, channelId, isTcp, rtcp, platformName); + requestSendItemMsg.setServerId(serverId); + String key = UUID.randomUUID().toString(); + WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId, WvpRedisMsgCmd.GET_SEND_ITEM, + key, requestSendItemMsg); + + JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg); + logger.info("[璇锋眰鎺ㄦ祦SendItem] {}: {}", serverId, jsonObject); + callbacks.put(key, callback); + callbacksForError.put(key, errorCallback); + dynamicTask.startDelay(key, ()->{ + callbacks.remove(key); + callbacksForError.remove(key); + WVPResult<Object> wvpResult = new WVPResult<>(); + wvpResult.setCode(ERROR_CODE_TIMEOUT); + wvpResult.setMsg("timeout"); + errorCallback.handler(wvpResult); + }, userSetting.getPlatformPlayTimeout()); + RedisUtil.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); + } + + /** + * 鍙戦�佽姹傛帹娴佺殑娑堟伅 + * @param param 鎺ㄦ祦鍙傛暟 + * @param callback 鍥炶皟 + */ + public void sendMsgForStartSendRtpStream(String serverId, RequestPushStreamMsg param, PlayMsgCallbackForStartSendRtpStream callback) { + String key = UUID.randomUUID().toString(); + WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId, + WvpRedisMsgCmd.REQUEST_PUSH_STREAM, key, param); + + JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg); + logger.info("[REDIS 璇锋眰鍏朵粬骞冲彴鎺ㄦ祦] {}: {}", serverId, jsonObject); + dynamicTask.startDelay(key, ()->{ + callbacksForStartSendRtpStream.remove(key); + callbacksForError.remove(key); + }, userSetting.getPlatformPlayTimeout()); + callbacksForStartSendRtpStream.put(key, callback); + callbacksForError.put(key, (wvpResult)->{ + logger.info("[REDIS 璇锋眰鍏朵粬骞冲彴鎺ㄦ祦] 澶辫触: {}", wvpResult.getMsg()); + callbacksForStartSendRtpStream.remove(key); + callbacksForError.remove(key); + }); + RedisUtil.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); + } +} -- Gitblit v1.8.0