From cc793d16f5114122304573e452bcef9dd23d32c2 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期四, 28 三月 2024 18:35:28 +0800 Subject: [PATCH] 调整hook订阅通知的位置 --- src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java | 338 +++++++++++++++++++++++++++++++++++-------------------- 1 files changed, 215 insertions(+), 123 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index ca16bfc..330497f 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -1,14 +1,9 @@ package com.genersoft.iot.vmp.service.impl; -import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.baomidou.dynamic.datasource.annotation.DS; -import com.genersoft.iot.vmp.common.InviteInfo; -import com.genersoft.iot.vmp.common.InviteSessionStatus; -import com.genersoft.iot.vmp.common.InviteSessionType; -import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.common.*; import com.genersoft.iot.vmp.conf.DynamicTask; -import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.exception.ServiceException; @@ -18,40 +13,29 @@ import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; -import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; +import com.genersoft.iot.vmp.media.bean.MediaInfo; +import com.genersoft.iot.vmp.media.event.MediaArrivalEvent; +import com.genersoft.iot.vmp.media.event.MediaDepartureEvent; +import com.genersoft.iot.vmp.media.event.MediaNotFoundEvent; +import com.genersoft.iot.vmp.media.service.IMediaServerService; +import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; -import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; -import com.genersoft.iot.vmp.media.zlm.dto.*; -import com.genersoft.iot.vmp.media.zlm.*; -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; -import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.event.HookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRecordMp4; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; -import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRecordMp4HookParam; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.*; -import com.genersoft.iot.vmp.service.bean.ErrorCallback; -import com.genersoft.iot.vmp.service.bean.InviteErrorCode; -import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg; -import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener; -import com.genersoft.iot.vmp.service.bean.DownloadFileInfo; -import com.genersoft.iot.vmp.service.bean.ErrorCallback; -import com.genersoft.iot.vmp.service.bean.InviteErrorCode; -import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; -import com.genersoft.iot.vmp.storager.dao.CloudRecordServiceMapper; import com.genersoft.iot.vmp.utils.CloudRecordUtils; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult; @@ -62,9 +46,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.context.event.EventListener; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.util.ObjectUtils; @@ -111,13 +94,10 @@ private IInviteStreamService inviteStreamService; @Autowired - private ZlmHttpHookSubscribe subscribe; + private HookSubscribe subscribe; @Autowired private SendRtpPortManager sendRtpPortManager; - - @Autowired - private ZLMRESTfulUtils zlmresTfulUtils; @Autowired private IMediaService mediaService; @@ -135,37 +115,137 @@ private IDeviceChannelService channelService; @Autowired - private SipConfig sipConfig; - - @Autowired private DynamicTask dynamicTask; @Autowired - private CloudRecordServiceMapper cloudRecordServiceMapper; - - @Autowired private ISIPCommanderForPlatform commanderForPlatform; - - - @Qualifier("taskExecutor") - @Autowired - private ThreadPoolTaskExecutor taskExecutor; @Autowired private RedisGbPlayMsgListener redisGbPlayMsgListener; @Autowired - private ZlmHttpHookSubscribe hookSubscribe; - - @Autowired private SSRCFactory ssrcFactory; - @Autowired - private RedisTemplate<Object, Object> redisTemplate; + /** + * 娴佸埌鏉ョ殑澶勭悊 + */ + @Async("taskExecutor") + @org.springframework.context.event.EventListener + public void onApplicationEvent(MediaArrivalEvent event) { + if ("broadcast".equals(event.getApp())) { + if (event.getStream().indexOf("_") > 0) { + String[] streamArray = event.getStream().split("_"); + if (streamArray.length == 2) { + String deviceId = streamArray[0]; + String channelId = streamArray[1]; + Device device = deviceService.getDevice(deviceId); + if (device == null) { + logger.info("[璇煶瀵硅/鍠婅瘽] 鏈壘鍒拌澶囷細{}", deviceId); + return; + } + if ("broadcast".equals(event.getApp())) { + if (audioBroadcastManager.exit(deviceId, channelId)) { + stopAudioBroadcast(deviceId, channelId); + } + // 寮�鍚闊冲璁查�氶亾 + try { + audioBroadcastCmd(device, channelId, event.getMediaServer(), + event.getApp(), event.getStream(), 60, false, (msg) -> { + logger.info("[璇煶瀵硅] 閫氶亾寤虹珛鎴愬姛, device: {}, channel: {}", deviceId, channelId); + }); + } catch (InvalidArgumentException | ParseException | SipException e) { + logger.error("[鍛戒护鍙戦�佸け璐 璇煶瀵硅: {}", e.getMessage()); + } + }else if ("talk".equals(event.getApp())) { + // 寮�鍚闊冲璁查�氶亾 + talkCmd(device, channelId, event.getMediaServer(), event.getStream(), (msg) -> { + logger.info("[璇煶瀵硅] 閫氶亾寤虹珛鎴愬姛, device: {}, channel: {}", deviceId, channelId); + }); + } + } + } + } + + + } + + /** + * 娴佺寮�鐨勫鐞� + */ + @Async("taskExecutor") + @EventListener + public void onApplicationEvent(MediaDepartureEvent event) { + if ("broadcast".equals(event.getApp()) || "talk".equals(event.getApp())) { + if (event.getStream().indexOf("_") > 0) { + String[] streamArray = event.getStream().split("_"); + if (streamArray.length == 2) { + String deviceId = streamArray[0]; + String channelId = streamArray[1]; + Device device = deviceService.getDevice(deviceId); + if (device == null) { + logger.info("[璇煶瀵硅/鍠婅瘽] 鏈壘鍒拌澶囷細{}", deviceId); + return; + } + if ("broadcast".equals(event.getApp())) { + stopAudioBroadcast(deviceId, channelId); + }else if ("talk".equals(event.getApp())) { + stopTalk(device, channelId, false); + } + } + } + } + } + + /** + * 娴佹湭鎵惧埌鐨勫鐞� + */ + @Async("taskExecutor") + @EventListener + public void onApplicationEvent(MediaNotFoundEvent event) { + if (!"rtp".equals(event.getApp())) { + return; + } + String[] s = event.getStream().split("_"); + if ((s.length != 2 && s.length != 4)) { + return; + } + String deviceId = s[0]; + String channelId = s[1]; + Device device = redisCatchStorage.getDevice(deviceId); + if (device == null || !device.isOnLine()) { + return; + } + DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId); + if (deviceChannel == null) { + return; + } + if (s.length == 2) { + logger.info("[ZLM HOOK] 棰勮娴佹湭鎵惧埌, 鍙戣捣鑷姩鐐规挱锛歿}->{}->{}/{}", event.getMediaServer().getId(), event.getSchema(), event.getApp(), event.getStream()); + play(event.getMediaServer(), deviceId, channelId, null, null); + } else if (s.length == 4) { + // 姝ゆ椂涓哄綍鍍忓洖鏀撅紝 褰曞儚鍥炴斁鏍煎紡涓�> 璁惧ID_閫氶亾ID_寮�濮嬫椂闂確缁撴潫鏃堕棿 + String startTimeStr = s[2]; + String endTimeStr = s[3]; + if (startTimeStr == null || endTimeStr == null || startTimeStr.length() != 14 || endTimeStr.length() != 14) { + return; + } + String startTime = DateUtil.urlToyyyy_MM_dd_HH_mm_ss(startTimeStr); + String endTime = DateUtil.urlToyyyy_MM_dd_HH_mm_ss(endTimeStr); + logger.info("[ZLM HOOK] 鍥炴斁娴佹湭鎵惧埌, 鍙戣捣鑷姩鐐规挱锛歿}->{}->{}/{}-{}-{}", + event.getMediaServer().getId(), event.getSchema(), + event.getApp(), event.getStream(), + startTime, endTime + ); + + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(event.getMediaServer(), event.getStream(), null, + device.isSsrcCheck(), true, 0, false, false, device.getStreamModeForParam()); + playBack(event.getMediaServer(), ssrcInfo, deviceId, channelId, startTime, endTime, null); + } + } @Override - public SSRCInfo play(MediaServerItem mediaServerItem, String deviceId, String channelId, String ssrc, ErrorCallback<Object> callback) { + public SSRCInfo play(MediaServer mediaServerItem, String deviceId, String channelId, String ssrc, ErrorCallback<Object> callback) { if (mediaServerItem == null) { logger.warn("[鐐规挱] 鏈壘鍒板彲鐢ㄧ殑zlm deviceId: {},channelId:{}", deviceId, channelId); throw new ControllerException(ErrorCode.ERROR100.getCode(), "鏈壘鍒板彲鐢ㄧ殑zlm"); @@ -200,7 +280,7 @@ return inviteInfo.getSsrcInfo(); } String mediaServerId = streamInfo.getMediaServerId(); - MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId); + MediaServer mediaInfo = mediaServerService.getOne(mediaServerId); Boolean ready = zlmServerFactory.isStreamReady(mediaInfo, "rtp", streamId); if (ready != null && ready) { @@ -219,7 +299,7 @@ } } } - String streamId = String.format("%s_%s", device.getDeviceId(), channelId);; + String streamId = String.format("%s_%s", device.getDeviceId(), channelId); SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, ssrc, device.isSsrcCheck(), false, 0, false, false, device.getStreamModeForParam()); if (ssrcInfo == null) { callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getMsg(), null); @@ -233,8 +313,8 @@ return ssrcInfo; } - private void talk(MediaServerItem mediaServerItem, Device device, String channelId, String stream, - ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent, + private void talk(MediaServer mediaServerItem, Device device, String channelId, String stream, + HookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent, Runnable timeoutCallback, AudioBroadcastEvent audioEvent) { String playSsrc = ssrcFactory.getPlaySsrc(mediaServerItem.getId()); @@ -376,7 +456,7 @@ @Override - public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channel, + public void play(MediaServer mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channel, ErrorCallback<Object> callback) { if (mediaServerItem == null || ssrcInfo == null) { @@ -510,7 +590,7 @@ } private void tcpActiveHandler(Device device, String channelId, String contentString, - MediaServerItem mediaServerItem, + MediaServer mediaServerItem, String timeOutTaskKey, SSRCInfo ssrcInfo, ErrorCallback<Object> callback){ if (!device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) { return; @@ -537,8 +617,23 @@ } } logger.info("[TCP涓诲姩杩炴帴瀵规柟] deviceId: {}, channelId: {}, 杩炴帴瀵规柟鐨勫湴鍧�锛歿}:{}, 鏀舵祦妯″紡锛歿}, SSRC: {}, SSRC鏍¢獙锛歿}", device.getDeviceId(), channelId, sdp.getConnection().getAddress(), port, device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck()); - JSONObject jsonObject = zlmresTfulUtils.connectRtpServer(mediaServerItem, sdp.getConnection().getAddress(), port, ssrcInfo.getStream()); - logger.info("[TCP涓诲姩杩炴帴瀵规柟] 缁撴灉锛� {}", jsonObject); + Boolean result = mediaServerService.connectRtpServer(mediaServerItem, sdp.getConnection().getAddress(), port, ssrcInfo.getStream()); + logger.info("[TCP涓诲姩杩炴帴瀵规柟] 缁撴灉锛� {}" , result); + if (!result) { + // 涓诲姩杩炴帴澶辫触锛岀粨鏉熸祦绋嬶紝 娓呯悊鏁版嵁 + dynamicTask.stop(timeOutTaskKey); + mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); + // 閲婃斁ssrc + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); + + streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); + + callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(), + InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null); + inviteStreamService.call(InviteSessionType.BROADCAST, device.getDeviceId(), channelId, null, + InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(), + InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null); + } } catch (SdpException e) { logger.error("[TCP涓诲姩杩炴帴瀵规柟] deviceId: {}, channelId: {}, 瑙f瀽200OK鐨凷DP淇℃伅澶辫触", device.getDeviceId(), channelId, e); dynamicTask.stop(timeOutTaskKey); @@ -550,7 +645,7 @@ callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(), InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null); - inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null, + inviteStreamService.call(InviteSessionType.BROADCAST, device.getDeviceId(), channelId, null, InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(), InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null); } @@ -564,7 +659,7 @@ * @param channelId 閫氶亾 ID * @param stream ssrc */ - private void snapOnPlay(MediaServerItem mediaServerItemInuse, String deviceId, String channelId, String stream) { + private void snapOnPlay(MediaServer mediaServerItemInuse, String deviceId, String channelId, String stream) { String streamUrl; if (mediaServerItemInuse.getRtspPort() != 0) { streamUrl = String.format("rtsp://127.0.0.1:%s/%s/%s", mediaServerItemInuse.getRtspPort(), "rtp", stream); @@ -575,10 +670,10 @@ String fileName = deviceId + "_" + channelId + ".jpg"; // 璇锋眰鎴浘 logger.info("[璇锋眰鎴浘]: " + fileName); - zlmresTfulUtils.getSnap(mediaServerItemInuse, streamUrl, 15, 1, path, fileName); + mediaServerService.getSnap(mediaServerItemInuse, streamUrl, 15, 1, path, fileName); } - public StreamInfo onPublishHandlerForPlay(MediaServerItem mediaServerItem, HookParam hookParam, String deviceId, String channelId) { + public StreamInfo onPublishHandlerForPlay(MediaServer mediaServerItem, HookParam hookParam, String deviceId, String channelId) { StreamInfo streamInfo = null; Device device = redisCatchStorage.getDevice(deviceId); OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam; @@ -600,7 +695,7 @@ } - private StreamInfo onPublishHandlerForPlayback(MediaServerItem mediaServerItem, HookParam param, String deviceId, String channelId, String startTime, String endTime) { + private StreamInfo onPublishHandlerForPlayback(MediaServer mediaServerItem, HookParam param, String deviceId, String channelId, String startTime, String endTime) { OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam) param; StreamInfo streamInfo = onPublishHandler(mediaServerItem, streamChangedHookParam, deviceId, channelId); if (streamInfo != null) { @@ -624,11 +719,11 @@ } @Override - public MediaServerItem getNewMediaServerItem(Device device) { + public MediaServer getNewMediaServerItem(Device device) { if (device == null) { return null; } - MediaServerItem mediaServerItem; + MediaServer mediaServerItem; if (ObjectUtils.isEmpty(device.getMediaServerId()) || "auto".equals(device.getMediaServerId())) { mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(null); } else { @@ -649,7 +744,7 @@ throw new ControllerException(ErrorCode.ERROR100.getCode(), "鏈壘鍒拌澶囷細" + deviceId); } - MediaServerItem newMediaServerItem = getNewMediaServerItem(device); + MediaServer newMediaServerItem = getNewMediaServerItem(device); if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE") && ! newMediaServerItem.isRtpEnable()) { logger.warn("[褰曞儚鍥炴斁] 鍗曠鍙f敹娴佹椂涓嶆敮鎸乀CP涓诲姩鏂瑰紡鏀舵祦 deviceId: {},channelId:{}", deviceId, channelId); throw new ControllerException(ErrorCode.ERROR100.getCode(), "鍗曠鍙f敹娴佹椂涓嶆敮鎸乀CP涓诲姩鏂瑰紡鏀舵祦"); @@ -666,7 +761,7 @@ } @Override - public void playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, + public void playBack(MediaServer mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, ErrorCallback<Object> callback) { if (mediaServerItem == null || ssrcInfo == null) { @@ -717,7 +812,7 @@ inviteStreamService.removeInviteInfo(inviteInfo); }; - ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInuse, hookParam) -> { + HookSubscribe.Event hookEvent = (mediaServerItemInuse, hookParam) -> { logger.info("鏀跺埌鍥炴斁璁㈤槄娑堟伅锛� " + hookParam); dynamicTask.stop(playBackTimeOutTaskKey); StreamInfo streamInfo = onPublishHandlerForPlayback(mediaServerItemInuse, hookParam, deviceId, channelId, startTime, endTime); @@ -750,7 +845,7 @@ } - private void InviteOKHandler(SipSubscribe.EventResult eventResult, SSRCInfo ssrcInfo, MediaServerItem mediaServerItem, + private void InviteOKHandler(SipSubscribe.EventResult eventResult, SSRCInfo ssrcInfo, MediaServer mediaServerItem, Device device, String channelId, String timeOutTaskKey, ErrorCallback<Object> callback, InviteInfo inviteInfo, InviteSessionType inviteSessionType){ inviteInfo.setStatus(InviteSessionStatus.ok); @@ -846,7 +941,7 @@ if (device == null) { return; } - MediaServerItem newMediaServerItem = this.getNewMediaServerItem(device); + MediaServer newMediaServerItem = this.getNewMediaServerItem(device); if (newMediaServerItem == null) { callback.run(InviteErrorCode.ERROR_FOR_ASSIST_NOT_READY.getCode(), InviteErrorCode.ERROR_FOR_ASSIST_NOT_READY.getMsg(), @@ -860,7 +955,7 @@ @Override - public void download(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback<Object> callback) { + public void download(MediaServer mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback<Object> callback) { if (mediaServerItem == null || ssrcInfo == null) { callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(), InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(), @@ -906,7 +1001,7 @@ streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); inviteStreamService.removeInviteInfo(inviteInfo); }; - ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInuse, hookParam) -> { + HookSubscribe.Event hookEvent = (mediaServerItemInuse, hookParam) -> { logger.info("[褰曞儚涓嬭浇]鏀跺埌璁㈤槄娑堟伅锛� " + hookParam); dynamicTask.stop(downLoadTimeOutTaskKey); StreamInfo streamInfo = onPublishHandlerForDownload(mediaServerItemInuse, hookParam, deviceId, channelId, startTime, endTime); @@ -927,7 +1022,7 @@ downLoadTimeOutTaskKey, callback, inviteInfo, InviteSessionType.DOWNLOAD); // 娉ㄥ唽褰曞儚鍥炶皟浜嬩欢锛屽綍鍍忎笅杞界粨鏉熷悗鍐欏叆涓嬭浇鍦板潃 - ZlmHttpHookSubscribe.Event hookEventForRecord = (mediaServerItemInuse, hookParam) -> { + HookSubscribe.Event hookEventForRecord = (mediaServerItemInuse, hookParam) -> { logger.info("[褰曞儚涓嬭浇] 鏀跺埌褰曞儚鍐欏叆纾佺洏娑堟伅锛� 锛� {}/{}-{}", inviteInfo.getDeviceId(), inviteInfo.getChannelId(), ssrcInfo.getStream()); logger.info("[褰曞儚涓嬭浇] 鏀跺埌褰曞儚鍐欏叆纾佺洏娑堟伅鍐呭锛� " + hookParam); @@ -973,7 +1068,7 @@ // 鑾峰彇褰撳墠宸蹭笅杞芥椂闀� String mediaServerId = inviteInfo.getStreamInfo().getMediaServerId(); - MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); + MediaServer mediaServerItem = mediaServerService.getOne(mediaServerId); if (mediaServerItem == null) { logger.warn("[鑾峰彇涓嬭浇杩涘害] 鏌ヨ褰曞儚淇℃伅鏃跺彂鐜拌妭鐐逛笉瀛樺湪"); return null; @@ -984,30 +1079,13 @@ logger.warn("[鑾峰彇涓嬭浇杩涘害] 涓嬭浇宸茬粨鏉�"); return null; } - - JSONObject mediaListJson= zlmresTfulUtils.getMediaList(mediaServerItem, "rtp", stream); - if (mediaListJson == null) { - logger.warn("[鑾峰彇涓嬭浇杩涘害] 浠巣lm鏌ヨ杩涘害澶辫触"); + String app = "rtp"; + MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServerItem, app, stream); + if (mediaInfo == null) { + logger.warn("[鑾峰彇涓嬭浇杩涘害] 鏌ヨ杩涘害澶辫触, 鑺傜偣Id锛� {}锛� {}/{}", mediaServerId, app, stream); return null; } - if (mediaListJson.getInteger("code") != 0) { - logger.warn("[鑾峰彇涓嬭浇杩涘害] 浠巣lm鏌ヨ杩涘害鍑虹幇閿欒锛� {}", mediaListJson.getString("msg")); - return null; - } - JSONArray data = mediaListJson.getJSONArray("data"); - if (data == null) { - logger.warn("[鑾峰彇涓嬭浇杩涘害] 浠巣lm鏌ヨ杩涘害鏃舵湭杩斿洖鏁版嵁"); - return null; - } - JSONObject mediaJSON = data.getJSONObject(0); - JSONArray tracks = mediaJSON.getJSONArray("tracks"); - if (tracks.isEmpty()) { - logger.warn("[鑾峰彇涓嬭浇杩涘害] 浠巣lm鏌ヨ杩涘害鏃舵湭杩斿洖鏁版嵁"); - return null; - } - JSONObject jsonObject = tracks.getJSONObject(0); - long duration = jsonObject.getLongValue("duration"); - if (duration == 0) { + if (mediaInfo.getDuration() == 0) { inviteInfo.getStreamInfo().setProgress(0); } else { String startTime = inviteInfo.getStreamInfo().getStartTime(); @@ -1016,7 +1094,7 @@ long start = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(startTime); long end = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(endTime); - BigDecimal currentCount = new BigDecimal(duration); + BigDecimal currentCount = new BigDecimal(mediaInfo.getDuration()); BigDecimal totalCount = new BigDecimal((end - start) * 1000); BigDecimal divide = currentCount.divide(totalCount, 2, RoundingMode.HALF_UP); double process = divide.doubleValue(); @@ -1029,7 +1107,7 @@ return inviteInfo.getStreamInfo(); } - private StreamInfo onPublishHandlerForDownload(MediaServerItem mediaServerItemInuse, HookParam hookParam, String deviceId, String channelId, String startTime, String endTime) { + private StreamInfo onPublishHandlerForDownload(MediaServer mediaServerItemInuse, HookParam hookParam, String deviceId, String channelId, String startTime, String endTime) { OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam) hookParam; StreamInfo streamInfo = onPublishHandler(mediaServerItemInuse, streamChangedHookParam, deviceId, channelId); if (streamInfo != null) { @@ -1048,8 +1126,9 @@ } - public StreamInfo onPublishHandler(MediaServerItem mediaServerItem, OnStreamChangedHookParam hookParam, String deviceId, String channelId) { - StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, "rtp", hookParam.getStream(), hookParam.getTracks(), null); + public StreamInfo onPublishHandler(MediaServer mediaServerItem, OnStreamChangedHookParam hookParam, String deviceId, String channelId) { + MediaInfo mediaInfo = MediaInfo.getInstance(hookParam); + StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, "rtp", hookParam.getStream(), mediaInfo, null); streamInfo.setDeviceID(deviceId); streamInfo.setChannelId(channelId); return streamInfo; @@ -1105,7 +1184,7 @@ logger.warn("寮�鍚闊冲箍鎾殑鏃跺�欐湭鎵惧埌閫氶亾锛� {}", channelId); return null; } - MediaServerItem mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(null); + MediaServer mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(null); if (broadcastMode == null) { broadcastMode = true; } @@ -1120,7 +1199,7 @@ } @Override - public boolean audioBroadcastCmd(Device device, String channelId, MediaServerItem mediaServerItem, String app, String stream, int timeout, boolean isFromPlatform, AudioBroadcastEvent event) throws InvalidArgumentException, ParseException, SipException { + public boolean audioBroadcastCmd(Device device, String channelId, MediaServer mediaServerItem, String app, String stream, int timeout, boolean isFromPlatform, AudioBroadcastEvent event) throws InvalidArgumentException, ParseException, SipException { if (device == null || channelId == null) { return false; } @@ -1164,6 +1243,15 @@ // 鍙戦�佹垚鍔� AudioBroadcastCatch audioBroadcastCatch = new AudioBroadcastCatch(device.getDeviceId(), channelId, mediaServerItem, app, stream, event, AudioBroadcastCatchStatus.Ready, isFromPlatform); audioBroadcastManager.update(audioBroadcastCatch); + // 绛夊緟invite娑堟伅锛� 瓒呮椂鍒欑粨鏉� + String key = VideoManagerConstants.BROADCAST_WAITE_INVITE + device.getDeviceId(); + if (!SipUtils.isFrontEnd(device.getDeviceId())) { + key += audioBroadcastCatch.getChannelId(); + } + dynamicTask.startDelay(key, ()->{ + logger.info("[璇煶骞挎挱]绛夊緟invite娑堟伅瓒呮椂锛歿}/{}", device.getDeviceId(), channelId); + stopAudioBroadcast(device.getDeviceId(), channelId); + }, 2000); }, eventResultForError -> { // 鍙戦�佸け璐� logger.error("璇煶骞挎挱鍙戦�佸け璐ワ細 {}:{}", channelId, eventResultForError.msg); @@ -1179,7 +1267,7 @@ SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null); if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) { // 鏌ヨ娴佹槸鍚﹀瓨鍦紝涓嶅瓨鍦ㄥ垯璁や负鏄紓甯哥姸鎬� - MediaServerItem mediaServerServiceOne = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + MediaServer mediaServerServiceOne = mediaServerService.getOne(sendRtpItem.getMediaServerId()); Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerServiceOne, sendRtpItem.getApp(), sendRtpItem.getStream()); if (streamReady) { logger.warn("璇煶骞挎挱閫氶亾浣跨敤涓細 {}", channelId); @@ -1209,12 +1297,8 @@ SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(deviceId, audioBroadcastCatch.getChannelId(), null, null); if (sendRtpItem != null) { redisCatchStorage.deleteSendRTPServer(deviceId, sendRtpItem.getChannelId(), null, null); - MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - Map<String, Object> param = new HashMap<>(); - param.put("vhost", "__defaultVhost__"); - param.put("app", sendRtpItem.getApp()); - param.put("stream", sendRtpItem.getStream()); - zlmresTfulUtils.stopSendRtp(mediaInfo, param); + MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + mediaServerService.stopSendRtp(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream(), null); try { cmder.streamByeCmdForDeviceInvite(device, sendRtpItem.getChannelId(), audioBroadcastCatch.getSipTransactionInfo(), null); } catch (InvalidArgumentException | ParseException | SipException | @@ -1290,7 +1374,7 @@ } inviteInfo.getStreamInfo().setPause(true); inviteStreamService.updateInviteInfo(inviteInfo); - MediaServerItem mediaServerItem = mediaServerService.getOne(inviteInfo.getStreamInfo().getMediaServerId()); + MediaServer mediaServerItem = mediaServerService.getOne(inviteInfo.getStreamInfo().getMediaServerId()); if (null == mediaServerItem) { logger.warn("mediaServer 涓嶅瓨鍦�!"); throw new ServiceException("mediaServer涓嶅瓨鍦�"); @@ -1301,8 +1385,8 @@ if (!mediaServerItem.isRtpEnable()) { streamKey = Long.toHexString(Long.parseLong(inviteInfo.getSsrcInfo().getSsrc())).toUpperCase(); } - JSONObject jsonObject = zlmresTfulUtils.pauseRtpCheck(mediaServerItem, streamKey); - if (jsonObject == null || jsonObject.getInteger("code") != 0) { + Boolean result = mediaServerService.pauseRtpCheck(mediaServerItem, streamKey); + if (!result) { throw new ServiceException("鏆傚仠RTP鎺ユ敹澶辫触"); } Device device = storager.queryVideoDevice(inviteInfo.getDeviceId()); @@ -1318,7 +1402,7 @@ } inviteInfo.getStreamInfo().setPause(false); inviteStreamService.updateInviteInfo(inviteInfo); - MediaServerItem mediaServerItem = mediaServerService.getOne(inviteInfo.getStreamInfo().getMediaServerId()); + MediaServer mediaServerItem = mediaServerService.getOne(inviteInfo.getStreamInfo().getMediaServerId()); if (null == mediaServerItem) { logger.warn("mediaServer 涓嶅瓨鍦�!"); throw new ServiceException("mediaServer涓嶅瓨鍦�"); @@ -1329,8 +1413,8 @@ if (!mediaServerItem.isRtpEnable()) { streamKey = Long.toHexString(Long.parseLong(inviteInfo.getSsrcInfo().getSsrc())).toUpperCase(); } - JSONObject jsonObject = zlmresTfulUtils.resumeRtpCheck(mediaServerItem, streamKey); - if (jsonObject == null || jsonObject.getInteger("code") != 0) { + boolean result = mediaServerService.resumeRtpCheck(mediaServerItem, streamKey); + if (!result) { throw new ServiceException("缁х画RTP鎺ユ敹澶辫触"); } Device device = storager.queryVideoDevice(inviteInfo.getDeviceId()); @@ -1341,7 +1425,7 @@ public void startPushStream(SendRtpItem sendRtpItem, SIPResponse sipResponse, ParentPlatform platform, CallIdHeader callIdHeader) { // 寮�濮嬪彂娴� String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; - MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); logger.info("[寮�濮嬫帹娴乚 rtp/{}, 鐩爣={}:{}锛孲SRC={}, RTCP={}", sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp()); Map<String, Object> param = new HashMap<>(12); @@ -1402,6 +1486,14 @@ logger.info("璋冪敤ZLM鎺ㄦ祦鎺ュ彛, 缁撴灉锛� {}", jsonObject); logger.info("RTP鎺ㄦ祦鎴愬姛[ {}/{} ]锛寋}->{}, ", param.get("app"), param.get("stream"), jsonObject.getString("local_port"), sendRtpItem.isTcpActive()?"琚姩鍙戞祦": param.get("dst_url") + ":" + param.get("dst_port")); + if (sendRtpItem.getPlayType() == InviteStreamType.PUSH && correlationInfo instanceof ParentPlatform) { + ParentPlatform platform = (ParentPlatform)correlationInfo; + MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, sendRtpItem.getApp(), sendRtpItem.getStream(), + sendRtpItem.getChannelId(), platform.getServerGBId(), platform.getName(), userSetting.getServerId(), + sendRtpItem.getMediaServerId()); + messageForPushChannel.setPlatFormIndex(platform.getId()); + redisCatchStorage.sendPlatformStartPlayMsg(messageForPushChannel); + } } else { logger.error("RTP鎺ㄦ祦澶辫触: {}, 鍙傛暟锛歿}", jsonObject.getString("msg"), JSONObject.toJSONString(param)); if (sendRtpItem.isOnlyAudio()) { @@ -1430,7 +1522,7 @@ } @Override - public void talkCmd(Device device, String channelId, MediaServerItem mediaServerItem, String stream, AudioBroadcastEvent event) { + public void talkCmd(Device device, String channelId, MediaServer mediaServerItem, String stream, AudioBroadcastEvent event) { if (device == null || channelId == null) { return; } @@ -1447,7 +1539,7 @@ SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null); if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) { // 鏌ヨ娴佹槸鍚﹀瓨鍦紝涓嶅瓨鍦ㄥ垯璁や负鏄紓甯哥姸鎬� - MediaServerItem mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId()); Boolean streamReady = zlmServerFactory.isStreamReady(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream()); if (streamReady) { logger.warn("[璇煶瀵硅] 姝e湪璇煶骞挎挱锛屾棤娉曞紑鍚闊抽�氳瘽锛� {}", channelId); @@ -1461,7 +1553,7 @@ SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, stream, null); if (sendRtpItem != null) { - MediaServerItem mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId()); Boolean streamReady = zlmServerFactory.isStreamReady(mediaServer, "rtp", sendRtpItem.getReceiveStream()); if (streamReady) { logger.warn("[璇煶瀵硅] 杩涜涓細 {}", channelId); @@ -1506,7 +1598,7 @@ return; } - MediaServerItem mediaServer = mediaServerService.getOne(mediaServerId); + MediaServer mediaServer = mediaServerService.getOne(mediaServerId); if (streamIsReady == null || streamIsReady) { Map<String, Object> param = new HashMap<>(); @@ -1541,7 +1633,7 @@ if (inviteInfo != null) { if (inviteInfo.getStreamInfo() != null) { // 宸插瓨鍦ㄧ嚎鐩存帴鎴浘 - MediaServerItem mediaServerItemInuse = mediaServerService.getOne(inviteInfo.getStreamInfo().getMediaServerId()); + MediaServer mediaServerItemInuse = mediaServerService.getOne(inviteInfo.getStreamInfo().getMediaServerId()); String streamUrl; if (mediaServerItemInuse.getRtspPort() != 0) { streamUrl = String.format("rtsp://127.0.0.1:%s/%s/%s", mediaServerItemInuse.getRtspPort(), "rtp", inviteInfo.getStreamInfo().getStream()); @@ -1551,7 +1643,7 @@ String path = "snap"; // 璇锋眰鎴浘 logger.info("[璇锋眰鎴浘]: " + fileName); - zlmresTfulUtils.getSnap(mediaServerItemInuse, streamUrl, 15, 1, path, fileName); + mediaServerService.getSnap(mediaServerItemInuse, streamUrl, 15, 1, path, fileName); File snapFile = new File(path + File.separator + fileName); if (snapFile.exists()) { errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), snapFile.getAbsoluteFile()); @@ -1562,7 +1654,7 @@ } } - MediaServerItem newMediaServerItem = getNewMediaServerItem(device); + MediaServer newMediaServerItem = getNewMediaServerItem(device); play(newMediaServerItem, deviceId, channelId, null, (code, msg, data)->{ if (code == InviteErrorCode.SUCCESS.getCode()) { InviteInfo inviteInfoForPlay = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId); -- Gitblit v1.8.0