From 9978d30aa1533c50bfaa48e44a1b0de4c157d10b Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期四, 11 四月 2024 09:36:48 +0800 Subject: [PATCH] Merge branch '2.7.0' --- src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java | 172 ++++++++++++++++++++++++++++++++++++++------------------ 1 files changed, 116 insertions(+), 56 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java old mode 100644 new mode 100755 index 7399b2a..68595a8 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java @@ -5,14 +5,16 @@ import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; -import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; -import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; -import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; -import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.media.bean.MediaServer; +import com.genersoft.iot.vmp.media.event.hook.Hook; +import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; +import com.genersoft.iot.vmp.media.event.hook.HookType; +import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.*; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.utils.redis.RedisUtil; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,7 +27,7 @@ import org.springframework.stereotype.Component; import java.text.ParseException; -import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -58,9 +60,9 @@ */ public static final int ERROR_CODE_TIMEOUT = -3; - private Map<String, PlayMsgCallback> callbacks = new ConcurrentHashMap<>(); - private Map<String, PlayMsgCallbackForStartSendRtpStream> callbacksForStartSendRtpStream = new ConcurrentHashMap<>(); - private Map<String, PlayMsgErrorCallback> callbacksForError = new ConcurrentHashMap<>(); + private final Map<String, PlayMsgCallback> callbacks = new ConcurrentHashMap<>(); + private final Map<String, PlayMsgCallbackForStartSendRtpStream> callbacksForStartSendRtpStream = new ConcurrentHashMap<>(); + private final Map<String, PlayMsgErrorCallback> callbacksForError = new ConcurrentHashMap<>(); @Autowired private UserSetting userSetting; @@ -70,10 +72,10 @@ private RedisTemplate<Object, Object> redisTemplate; @Autowired - private ZLMRTPServerFactory zlmrtpServerFactory; + private IMediaServerService mediaServerService; @Autowired - private IMediaServerService mediaServerService; + private IRedisCatchStorage redisCatchStorage; @Autowired @@ -81,9 +83,9 @@ @Autowired - private ZlmHttpHookSubscribe subscribe; + private HookSubscribe subscribe; - private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); + private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); @Qualifier("taskExecutor") @Autowired @@ -95,7 +97,7 @@ } public interface PlayMsgCallbackForStartSendRtpStream{ - void handler(JSONObject jsonObject); + void handler(); } public interface PlayMsgErrorCallback{ @@ -111,8 +113,8 @@ while (!taskQueue.isEmpty()) { Message msg = taskQueue.poll(); try { - JSONObject msgJSON = JSON.parseObject(msg.getBody(), JSONObject.class); - WvpRedisMsg wvpRedisMsg = JSON.to(WvpRedisMsg.class, msgJSON); + WvpRedisMsg wvpRedisMsg = JSON.parseObject(msg.getBody(), WvpRedisMsg.class); + logger.info("[鏀跺埌REDIS閫氱煡] 娑堟伅锛� {}", JSON.toJSONString(wvpRedisMsg)); if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) { continue; } @@ -121,12 +123,16 @@ switch (wvpRedisMsg.getCmd()){ case WvpRedisMsgCmd.GET_SEND_ITEM: - RequestSendItemMsg content = JSON.to(RequestSendItemMsg.class, wvpRedisMsg.getContent()); + RequestSendItemMsg content = JSON.parseObject(wvpRedisMsg.getContent(), RequestSendItemMsg.class); requestSendItemMsgHand(content, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); break; case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: RequestPushStreamMsg param = JSON.to(RequestPushStreamMsg.class, wvpRedisMsg.getContent()); requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); + break; + case WvpRedisMsgCmd.REQUEST_STOP_PUSH_STREAM: + RequestStopPushStreamMsg streamMsg = JSON.to(RequestStopPushStreamMsg.class, wvpRedisMsg.getContent()); + requestStopPushStreamMsgHand(streamMsg, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); break; default: break; @@ -171,11 +177,10 @@ String serial = wvpRedisMsg.getSerial(); switch (wvpResult.getCode()) { case 0: - JSONObject jsonObject = (JSONObject)wvpResult.getData(); PlayMsgCallbackForStartSendRtpStream playMsgCallback = callbacksForStartSendRtpStream.get(serial); if (playMsgCallback != null) { callbacksForError.remove(serial); - playMsgCallback.handler(jsonObject); + playMsgCallback.handler(); } break; case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: @@ -209,37 +214,25 @@ * 澶勭悊鏀跺埌鐨勮姹傛帹娴佺殑璇锋眰 */ private void requestPushStreamMsgHand(RequestPushStreamMsg requestPushStreamMsg, String fromId, String serial) { - MediaServerItem mediaInfo = mediaServerService.getOne(requestPushStreamMsg.getMediaServerId()); - if (mediaInfo == null) { + MediaServer mediaServer = mediaServerService.getOne(requestPushStreamMsg.getMediaServerId()); + if (mediaServer == null) { // TODO 鍥炲閿欒 return; } - String is_Udp = requestPushStreamMsg.isTcp() ? "0" : "1"; - Map<String, Object> param = new HashMap<>(); - param.put("vhost","__defaultVhost__"); - param.put("app",requestPushStreamMsg.getApp()); - param.put("stream",requestPushStreamMsg.getStream()); - param.put("ssrc", requestPushStreamMsg.getSsrc()); - param.put("dst_url",requestPushStreamMsg.getIp()); - param.put("dst_port", requestPushStreamMsg.getPort()); - param.put("is_udp", is_Udp); - param.put("src_port", requestPushStreamMsg.getSrcPort()); - param.put("pt", requestPushStreamMsg.getPt()); - param.put("use_ps", requestPushStreamMsg.isPs() ? "1" : "0"); - param.put("only_audio", requestPushStreamMsg.isOnlyAudio() ? "1" : "0"); - JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); + SendRtpItem sendRtpItem = SendRtpItem.getInstance(requestPushStreamMsg); + + try { + mediaServerService.startSendRtp(mediaServer, null, sendRtpItem); + }catch (ControllerException e) { + return; + } + // 鍥炲娑堟伅 - responsePushStream(jsonObject, fromId, serial); - } - - private void responsePushStream(JSONObject content, String toId, String serial) { - WVPResult<JSONObject> result = new WVPResult<>(); result.setCode(0); - result.setData(content); - WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), toId, - WvpRedisMsgCmd.REQUEST_PUSH_STREAM, serial, result); + WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), fromId, + WvpRedisMsgCmd.REQUEST_PUSH_STREAM, serial, JSON.toJSONString(result)); JSONObject jsonObject = (JSONObject)JSON.toJSON(response); redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); } @@ -248,7 +241,7 @@ * 澶勭悊鏀跺埌鐨勮姹俿endItem鐨勮姹� */ private void requestSendItemMsgHand(RequestSendItemMsg content, String toId, String serial) { - MediaServerItem mediaServerItem = mediaServerService.getOne(content.getMediaServerId()); + MediaServer mediaServerItem = mediaServerService.getOne(content.getMediaServerId()); if (mediaServerItem == null) { logger.info("[鍥炲鎺ㄦ祦淇℃伅] 娴佸獟浣搟}涓嶅瓨鍦� ", content.getMediaServerId()); @@ -257,14 +250,14 @@ result.setMsg("娴佸獟浣撲笉瀛樺湪"); WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), toId, - WvpRedisMsgCmd.GET_SEND_ITEM, serial, result); + WvpRedisMsgCmd.GET_SEND_ITEM, serial, JSON.toJSONString(result)); JSONObject jsonObject = (JSONObject)JSON.toJSON(response); redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); return; } // 纭畾娴佹槸鍚﹀湪绾� - Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, content.getApp(), content.getStream()); + Boolean streamReady = mediaServerService.isStreamReady(mediaServerItem, content.getApp(), content.getStream()); if (streamReady != null && streamReady) { logger.info("[鍥炲鎺ㄦ祦淇℃伅] {}/{}", content.getApp(), content.getStream()); responseSendItem(mediaServerItem, content, toId, serial); @@ -280,16 +273,15 @@ WVPResult<SendRtpItem> result = new WVPResult<>(); result.setCode(ERROR_CODE_TIMEOUT); WvpRedisMsg response = WvpRedisMsg.getResponseInstance( - userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, result + userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, JSON.toJSONString(result) ); JSONObject jsonObject = (JSONObject)JSON.toJSON(response); redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); }, userSetting.getPlatformPlayTimeout()); // 娣诲姞璁㈤槄 - HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(content.getApp(), content.getStream(), true, "rtsp", mediaServerItem.getId()); - - subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json)->{ + Hook hook = Hook.getInstance(HookType.on_media_arrival, content.getApp(), content.getStream(), content.getMediaServerId()); + subscribe.addSubscribe(hook, (hookData)->{ dynamicTask.stop(taskKey); responseSendItem(mediaServerItem, content, toId, serial); }); @@ -307,8 +299,8 @@ /** * 灏嗚幏鍙栧埌鐨剆endItem鍙戦�佸嚭鍘� */ - private void responseSendItem(MediaServerItem mediaServerItem, RequestSendItemMsg content, String toId, String serial) { - SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, content.getIp(), + private void responseSendItem(MediaServer mediaServerItem, RequestSendItemMsg content, String toId, String serial) { + SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, content.getIp(), content.getPort(), content.getSsrc(), content.getPlatformId(), content.getApp(), content.getStream(), content.getChannelId(), content.getTcp(), content.getRtcp()); @@ -319,9 +311,10 @@ responseSendItemMsg.setSendRtpItem(sendRtpItem); responseSendItemMsg.setMediaServerItem(mediaServerItem); result.setData(responseSendItemMsg); + redisCatchStorage.updateSendRTPSever(sendRtpItem); WvpRedisMsg response = WvpRedisMsg.getResponseInstance( - userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, result + userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, JSON.toJSONString(result) ); JSONObject jsonObject = (JSONObject)JSON.toJSON(response); redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); @@ -347,7 +340,7 @@ requestSendItemMsg.setServerId(serverId); String key = UUID.randomUUID().toString(); WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId, WvpRedisMsgCmd.GET_SEND_ITEM, - key, requestSendItemMsg); + key, JSON.toJSONString(requestSendItemMsg)); JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg); logger.info("[璇锋眰鎺ㄦ祦SendItem] {}: {}", serverId, jsonObject); @@ -372,7 +365,7 @@ public void sendMsgForStartSendRtpStream(String serverId, RequestPushStreamMsg param, PlayMsgCallbackForStartSendRtpStream callback) { String key = UUID.randomUUID().toString(); WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId, - WvpRedisMsgCmd.REQUEST_PUSH_STREAM, key, param); + WvpRedisMsgCmd.REQUEST_PUSH_STREAM, key, JSON.toJSONString(param)); JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg); logger.info("[REDIS 璇锋眰鍏朵粬骞冲彴鎺ㄦ祦] {}: {}", serverId, jsonObject); @@ -388,4 +381,71 @@ }); redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); } + + /** + * 鍙戦�佽姹傛帹娴佺殑娑堟伅 + */ + public void sendMsgForStopSendRtpStream(String serverId, RequestStopPushStreamMsg streamMsg) { + String key = UUID.randomUUID().toString(); + WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId, + WvpRedisMsgCmd.REQUEST_STOP_PUSH_STREAM, key, JSON.toJSONString(streamMsg)); + + JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg); + logger.info("[REDIS 璇锋眰鍏朵粬骞冲彴鍋滄鎺ㄦ祦] {}: {}", serverId, jsonObject); + redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); + } + + private SendRtpItem querySendRTPServer(String platformGbId, String channelId, String streamId, String callId) { + if (platformGbId == null) { + platformGbId = "*"; + } + if (channelId == null) { + channelId = "*"; + } + if (streamId == null) { + streamId = "*"; + } + if (callId == null) { + callId = "*"; + } + String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + + userSetting.getServerId() + "_*_" + + platformGbId + "_" + + channelId + "_" + + streamId + "_" + + callId; + List<Object> scan = RedisUtil.scan(redisTemplate, key); + if (scan.size() > 0) { + return (SendRtpItem)redisTemplate.opsForValue().get(scan.get(0)); + }else { + return null; + } + } + + /** + * 澶勭悊鏀跺埌鐨勮姹傛帹娴佺殑璇锋眰 + */ + private void requestStopPushStreamMsgHand(RequestStopPushStreamMsg streamMsg, String fromId, String serial) { + SendRtpItem sendRtpItem = streamMsg.getSendRtpItem(); + if (sendRtpItem == null) { + logger.info("[REDIS 鎵ц鍏朵粬骞冲彴鐨勮姹傚仠姝㈡帹娴乚 澶辫触锛� sendRtpItem涓篘ULL"); + return; + } + MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + if (mediaInfo == null) { + // TODO 鍥炲閿欒 + return; + } + + if (mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc())) { + logger.info("[REDIS 鎵ц鍏朵粬骞冲彴鐨勮姹傚仠姝㈡帹娴乚 鎴愬姛锛� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); + // 鍙戦�乺edis娑堟伅 + MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, + sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), + sendRtpItem.getPlatformId(), streamMsg.getPlatformName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); + messageForPushChannel.setPlatFormIndex(streamMsg.getPlatFormIndex()); + redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel); + } + + } } -- Gitblit v1.8.0