From ce950dea4aef933a12e78e4fe1f535ba9a83c3df Mon Sep 17 00:00:00 2001 From: leesam <leesam@leesam.cn> Date: 星期三, 10 四月 2024 22:18:40 +0800 Subject: [PATCH] Merge branch 'refs/heads/master' into develop-add-api-key --- src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java | 121 ++++++++++++++++++++++++---------------- 1 files changed, 73 insertions(+), 48 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java index fb6f068..68595a8 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java @@ -5,12 +5,12 @@ import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; -import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; -import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; -import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.bean.MediaServer; +import com.genersoft.iot.vmp.media.event.hook.Hook; +import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; +import com.genersoft.iot.vmp.media.event.hook.HookType; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.*; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; @@ -27,7 +27,6 @@ import org.springframework.stereotype.Component; import java.text.ParseException; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; @@ -61,9 +60,9 @@ */ public static final int ERROR_CODE_TIMEOUT = -3; - private Map<String, PlayMsgCallback> callbacks = new ConcurrentHashMap<>(); - private Map<String, PlayMsgCallbackForStartSendRtpStream> callbacksForStartSendRtpStream = new ConcurrentHashMap<>(); - private Map<String, PlayMsgErrorCallback> callbacksForError = new ConcurrentHashMap<>(); + private final Map<String, PlayMsgCallback> callbacks = new ConcurrentHashMap<>(); + private final Map<String, PlayMsgCallbackForStartSendRtpStream> callbacksForStartSendRtpStream = new ConcurrentHashMap<>(); + private final Map<String, PlayMsgErrorCallback> callbacksForError = new ConcurrentHashMap<>(); @Autowired private UserSetting userSetting; @@ -71,9 +70,6 @@ @Autowired private RedisTemplate<Object, Object> redisTemplate; - - @Autowired - private ZLMServerFactory zlmServerFactory; @Autowired private IMediaServerService mediaServerService; @@ -87,9 +83,9 @@ @Autowired - private ZlmHttpHookSubscribe subscribe; + private HookSubscribe subscribe; - private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); + private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); @Qualifier("taskExecutor") @Autowired @@ -101,7 +97,7 @@ } public interface PlayMsgCallbackForStartSendRtpStream{ - void handler(JSONObject jsonObject); + void handler(); } public interface PlayMsgErrorCallback{ @@ -133,7 +129,10 @@ case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: RequestPushStreamMsg param = JSON.to(RequestPushStreamMsg.class, wvpRedisMsg.getContent()); requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); - + break; + case WvpRedisMsgCmd.REQUEST_STOP_PUSH_STREAM: + RequestStopPushStreamMsg streamMsg = JSON.to(RequestStopPushStreamMsg.class, wvpRedisMsg.getContent()); + requestStopPushStreamMsgHand(streamMsg, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); break; default: break; @@ -178,11 +177,10 @@ String serial = wvpRedisMsg.getSerial(); switch (wvpResult.getCode()) { case 0: - JSONObject jsonObject = (JSONObject)wvpResult.getData(); PlayMsgCallbackForStartSendRtpStream playMsgCallback = callbacksForStartSendRtpStream.get(serial); if (playMsgCallback != null) { callbacksForError.remove(serial); - playMsgCallback.handler(jsonObject); + playMsgCallback.handler(); } break; case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: @@ -216,36 +214,24 @@ * 澶勭悊鏀跺埌鐨勮姹傛帹娴佺殑璇锋眰 */ private void requestPushStreamMsgHand(RequestPushStreamMsg requestPushStreamMsg, String fromId, String serial) { - MediaServerItem mediaInfo = mediaServerService.getOne(requestPushStreamMsg.getMediaServerId()); - if (mediaInfo == null) { + MediaServer mediaServer = mediaServerService.getOne(requestPushStreamMsg.getMediaServerId()); + if (mediaServer == null) { // TODO 鍥炲閿欒 return; } - String is_Udp = requestPushStreamMsg.isTcp() ? "0" : "1"; - Map<String, Object> param = new HashMap<>(); - param.put("vhost","__defaultVhost__"); - param.put("app",requestPushStreamMsg.getApp()); - param.put("stream",requestPushStreamMsg.getStream()); - param.put("ssrc", requestPushStreamMsg.getSsrc()); - param.put("dst_url",requestPushStreamMsg.getIp()); - param.put("dst_port", requestPushStreamMsg.getPort()); - param.put("is_udp", is_Udp); - param.put("src_port", requestPushStreamMsg.getSrcPort()); - param.put("pt", requestPushStreamMsg.getPt()); - param.put("use_ps", requestPushStreamMsg.isPs() ? "1" : "0"); - param.put("only_audio", requestPushStreamMsg.isOnlyAudio() ? "1" : "0"); - JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaInfo, param); + SendRtpItem sendRtpItem = SendRtpItem.getInstance(requestPushStreamMsg); + + try { + mediaServerService.startSendRtp(mediaServer, null, sendRtpItem); + }catch (ControllerException e) { + return; + } + // 鍥炲娑堟伅 - responsePushStream(jsonObject, fromId, serial); - } - - private void responsePushStream(JSONObject content, String toId, String serial) { - WVPResult<JSONObject> result = new WVPResult<>(); result.setCode(0); - result.setData(content); - WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), toId, + WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), fromId, WvpRedisMsgCmd.REQUEST_PUSH_STREAM, serial, JSON.toJSONString(result)); JSONObject jsonObject = (JSONObject)JSON.toJSON(response); redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); @@ -255,7 +241,7 @@ * 澶勭悊鏀跺埌鐨勮姹俿endItem鐨勮姹� */ private void requestSendItemMsgHand(RequestSendItemMsg content, String toId, String serial) { - MediaServerItem mediaServerItem = mediaServerService.getOne(content.getMediaServerId()); + MediaServer mediaServerItem = mediaServerService.getOne(content.getMediaServerId()); if (mediaServerItem == null) { logger.info("[鍥炲鎺ㄦ祦淇℃伅] 娴佸獟浣搟}涓嶅瓨鍦� ", content.getMediaServerId()); @@ -271,7 +257,7 @@ return; } // 纭畾娴佹槸鍚﹀湪绾� - Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, content.getApp(), content.getStream()); + Boolean streamReady = mediaServerService.isStreamReady(mediaServerItem, content.getApp(), content.getStream()); if (streamReady != null && streamReady) { logger.info("[鍥炲鎺ㄦ祦淇℃伅] {}/{}", content.getApp(), content.getStream()); responseSendItem(mediaServerItem, content, toId, serial); @@ -294,9 +280,8 @@ }, userSetting.getPlatformPlayTimeout()); // 娣诲姞璁㈤槄 - HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(content.getApp(), content.getStream(), true, "rtsp", mediaServerItem.getId()); - - subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam)->{ + Hook hook = Hook.getInstance(HookType.on_media_arrival, content.getApp(), content.getStream(), content.getMediaServerId()); + subscribe.addSubscribe(hook, (hookData)->{ dynamicTask.stop(taskKey); responseSendItem(mediaServerItem, content, toId, serial); }); @@ -314,8 +299,8 @@ /** * 灏嗚幏鍙栧埌鐨剆endItem鍙戦�佸嚭鍘� */ - private void responseSendItem(MediaServerItem mediaServerItem, RequestSendItemMsg content, String toId, String serial) { - SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, content.getIp(), + private void responseSendItem(MediaServer mediaServerItem, RequestSendItemMsg content, String toId, String serial) { + SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, content.getIp(), content.getPort(), content.getSsrc(), content.getPlatformId(), content.getApp(), content.getStream(), content.getChannelId(), content.getTcp(), content.getRtcp()); @@ -397,6 +382,19 @@ redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); } + /** + * 鍙戦�佽姹傛帹娴佺殑娑堟伅 + */ + public void sendMsgForStopSendRtpStream(String serverId, RequestStopPushStreamMsg streamMsg) { + String key = UUID.randomUUID().toString(); + WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId, + WvpRedisMsgCmd.REQUEST_STOP_PUSH_STREAM, key, JSON.toJSONString(streamMsg)); + + JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg); + logger.info("[REDIS 璇锋眰鍏朵粬骞冲彴鍋滄鎺ㄦ祦] {}: {}", serverId, jsonObject); + redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); + } + private SendRtpItem querySendRTPServer(String platformGbId, String channelId, String streamId, String callId) { if (platformGbId == null) { platformGbId = "*"; @@ -423,4 +421,31 @@ return null; } } + + /** + * 澶勭悊鏀跺埌鐨勮姹傛帹娴佺殑璇锋眰 + */ + private void requestStopPushStreamMsgHand(RequestStopPushStreamMsg streamMsg, String fromId, String serial) { + SendRtpItem sendRtpItem = streamMsg.getSendRtpItem(); + if (sendRtpItem == null) { + logger.info("[REDIS 鎵ц鍏朵粬骞冲彴鐨勮姹傚仠姝㈡帹娴乚 澶辫触锛� sendRtpItem涓篘ULL"); + return; + } + MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + if (mediaInfo == null) { + // TODO 鍥炲閿欒 + return; + } + + if (mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc())) { + logger.info("[REDIS 鎵ц鍏朵粬骞冲彴鐨勮姹傚仠姝㈡帹娴乚 鎴愬姛锛� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); + // 鍙戦�乺edis娑堟伅 + MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, + sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), + sendRtpItem.getPlatformId(), streamMsg.getPlatformName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); + messageForPushChannel.setPlatFormIndex(streamMsg.getPlatFormIndex()); + redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel); + } + + } } -- Gitblit v1.8.0