From b45d71ba6d7474dc21dfa54df37876429bf2ec46 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期三, 10 四月 2024 22:56:14 +0800 Subject: [PATCH] Merge pull request #1389 from ancienter/develop-add-api-key --- src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java | 85 +++++++++++++++--------------------------- 1 files changed, 31 insertions(+), 54 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java index 29bd734..68595a8 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java @@ -5,12 +5,12 @@ import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; -import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; -import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; -import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.bean.MediaServer; +import com.genersoft.iot.vmp.media.event.hook.Hook; +import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; +import com.genersoft.iot.vmp.media.event.hook.HookType; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.*; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; @@ -27,7 +27,6 @@ import org.springframework.stereotype.Component; import java.text.ParseException; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; @@ -61,9 +60,9 @@ */ public static final int ERROR_CODE_TIMEOUT = -3; - private Map<String, PlayMsgCallback> callbacks = new ConcurrentHashMap<>(); - private Map<String, PlayMsgCallbackForStartSendRtpStream> callbacksForStartSendRtpStream = new ConcurrentHashMap<>(); - private Map<String, PlayMsgErrorCallback> callbacksForError = new ConcurrentHashMap<>(); + private final Map<String, PlayMsgCallback> callbacks = new ConcurrentHashMap<>(); + private final Map<String, PlayMsgCallbackForStartSendRtpStream> callbacksForStartSendRtpStream = new ConcurrentHashMap<>(); + private final Map<String, PlayMsgErrorCallback> callbacksForError = new ConcurrentHashMap<>(); @Autowired private UserSetting userSetting; @@ -71,9 +70,6 @@ @Autowired private RedisTemplate<Object, Object> redisTemplate; - - @Autowired - private ZLMServerFactory zlmServerFactory; @Autowired private IMediaServerService mediaServerService; @@ -87,9 +83,9 @@ @Autowired - private ZlmHttpHookSubscribe subscribe; + private HookSubscribe subscribe; - private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); + private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); @Qualifier("taskExecutor") @Autowired @@ -101,7 +97,7 @@ } public interface PlayMsgCallbackForStartSendRtpStream{ - void handler(JSONObject jsonObject); + void handler(); } public interface PlayMsgErrorCallback{ @@ -181,11 +177,10 @@ String serial = wvpRedisMsg.getSerial(); switch (wvpResult.getCode()) { case 0: - JSONObject jsonObject = (JSONObject)wvpResult.getData(); PlayMsgCallbackForStartSendRtpStream playMsgCallback = callbacksForStartSendRtpStream.get(serial); if (playMsgCallback != null) { callbacksForError.remove(serial); - playMsgCallback.handler(jsonObject); + playMsgCallback.handler(); } break; case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: @@ -219,36 +214,24 @@ * 澶勭悊鏀跺埌鐨勮姹傛帹娴佺殑璇锋眰 */ private void requestPushStreamMsgHand(RequestPushStreamMsg requestPushStreamMsg, String fromId, String serial) { - MediaServerItem mediaInfo = mediaServerService.getOne(requestPushStreamMsg.getMediaServerId()); - if (mediaInfo == null) { + MediaServer mediaServer = mediaServerService.getOne(requestPushStreamMsg.getMediaServerId()); + if (mediaServer == null) { // TODO 鍥炲閿欒 return; } - String is_Udp = requestPushStreamMsg.isTcp() ? "0" : "1"; - Map<String, Object> param = new HashMap<>(); - param.put("vhost","__defaultVhost__"); - param.put("app",requestPushStreamMsg.getApp()); - param.put("stream",requestPushStreamMsg.getStream()); - param.put("ssrc", requestPushStreamMsg.getSsrc()); - param.put("dst_url",requestPushStreamMsg.getIp()); - param.put("dst_port", requestPushStreamMsg.getPort()); - param.put("is_udp", is_Udp); - param.put("src_port", requestPushStreamMsg.getSrcPort()); - param.put("pt", requestPushStreamMsg.getPt()); - param.put("use_ps", requestPushStreamMsg.isPs() ? "1" : "0"); - param.put("only_audio", requestPushStreamMsg.isOnlyAudio() ? "1" : "0"); - JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaInfo, param); + SendRtpItem sendRtpItem = SendRtpItem.getInstance(requestPushStreamMsg); + + try { + mediaServerService.startSendRtp(mediaServer, null, sendRtpItem); + }catch (ControllerException e) { + return; + } + // 鍥炲娑堟伅 - responsePushStream(jsonObject, fromId, serial); - } - - private void responsePushStream(JSONObject content, String toId, String serial) { - WVPResult<JSONObject> result = new WVPResult<>(); result.setCode(0); - result.setData(content); - WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), toId, + WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), fromId, WvpRedisMsgCmd.REQUEST_PUSH_STREAM, serial, JSON.toJSONString(result)); JSONObject jsonObject = (JSONObject)JSON.toJSON(response); redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); @@ -258,7 +241,7 @@ * 澶勭悊鏀跺埌鐨勮姹俿endItem鐨勮姹� */ private void requestSendItemMsgHand(RequestSendItemMsg content, String toId, String serial) { - MediaServerItem mediaServerItem = mediaServerService.getOne(content.getMediaServerId()); + MediaServer mediaServerItem = mediaServerService.getOne(content.getMediaServerId()); if (mediaServerItem == null) { logger.info("[鍥炲鎺ㄦ祦淇℃伅] 娴佸獟浣搟}涓嶅瓨鍦� ", content.getMediaServerId()); @@ -274,7 +257,7 @@ return; } // 纭畾娴佹槸鍚﹀湪绾� - Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, content.getApp(), content.getStream()); + Boolean streamReady = mediaServerService.isStreamReady(mediaServerItem, content.getApp(), content.getStream()); if (streamReady != null && streamReady) { logger.info("[鍥炲鎺ㄦ祦淇℃伅] {}/{}", content.getApp(), content.getStream()); responseSendItem(mediaServerItem, content, toId, serial); @@ -297,9 +280,8 @@ }, userSetting.getPlatformPlayTimeout()); // 娣诲姞璁㈤槄 - HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(content.getApp(), content.getStream(), true, "rtsp", mediaServerItem.getId()); - - subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam)->{ + Hook hook = Hook.getInstance(HookType.on_media_arrival, content.getApp(), content.getStream(), content.getMediaServerId()); + subscribe.addSubscribe(hook, (hookData)->{ dynamicTask.stop(taskKey); responseSendItem(mediaServerItem, content, toId, serial); }); @@ -317,8 +299,8 @@ /** * 灏嗚幏鍙栧埌鐨剆endItem鍙戦�佸嚭鍘� */ - private void responseSendItem(MediaServerItem mediaServerItem, RequestSendItemMsg content, String toId, String serial) { - SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, content.getIp(), + private void responseSendItem(MediaServer mediaServerItem, RequestSendItemMsg content, String toId, String serial) { + SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, content.getIp(), content.getPort(), content.getSsrc(), content.getPlatformId(), content.getApp(), content.getStream(), content.getChannelId(), content.getTcp(), content.getRtcp()); @@ -449,18 +431,13 @@ logger.info("[REDIS 鎵ц鍏朵粬骞冲彴鐨勮姹傚仠姝㈡帹娴乚 澶辫触锛� sendRtpItem涓篘ULL"); return; } - MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); if (mediaInfo == null) { // TODO 鍥炲閿欒 return; } - Map<String, Object> param = new HashMap<>(); - param.put("vhost","__defaultVhost__"); - param.put("app",sendRtpItem.getApp()); - param.put("stream",sendRtpItem.getStream()); - param.put("ssrc", sendRtpItem.getSsrc()); - if (zlmServerFactory.stopSendRtpStream(mediaInfo, param)) { + if (mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc())) { logger.info("[REDIS 鎵ц鍏朵粬骞冲彴鐨勮姹傚仠姝㈡帹娴乚 鎴愬姛锛� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); // 鍙戦�乺edis娑堟伅 MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, -- Gitblit v1.8.0