From 426ea72d7f31e0f4c5ad90c48d98299b9ac83e45 Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期一, 01 四月 2024 15:46:36 +0800
Subject: [PATCH] 修复拉流代理播放
---
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java | 167 +++++++++++++++++++++++++++++++++++++++----------------
1 files changed, 117 insertions(+), 50 deletions(-)
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
index 65cf693..32af057 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -17,17 +17,21 @@
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
-import com.genersoft.iot.vmp.media.event.MediaArrivalEvent;
-import com.genersoft.iot.vmp.media.event.MediaDepartureEvent;
+import com.genersoft.iot.vmp.media.bean.RecordInfo;
+import com.genersoft.iot.vmp.media.event.hook.Hook;
+import com.genersoft.iot.vmp.media.event.hook.HookData;
+import com.genersoft.iot.vmp.media.event.hook.HookType;
+import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
+import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
+import com.genersoft.iot.vmp.media.event.media.MediaNotFoundEvent;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
-import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
-import com.genersoft.iot.vmp.media.zlm.dto.*;
+import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
+import com.genersoft.iot.vmp.media.zlm.dto.MediaServer;
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRecordMp4HookParam;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
-import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType;
import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.*;
import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
@@ -91,13 +95,10 @@
private IInviteStreamService inviteStreamService;
@Autowired
- private ZlmHttpHookSubscribe subscribe;
+ private HookSubscribe subscribe;
@Autowired
private SendRtpPortManager sendRtpPortManager;
-
- @Autowired
- private IMediaService mediaService;
@Autowired
private IMediaServerService mediaServerService;
@@ -172,6 +173,33 @@
@Async("taskExecutor")
@EventListener
public void onApplicationEvent(MediaDepartureEvent event) {
+ List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(event.getStream());
+ if (!sendRtpItems.isEmpty()) {
+ for (SendRtpItem sendRtpItem : sendRtpItems) {
+ if (sendRtpItem != null && sendRtpItem.getApp().equals(event.getApp())) {
+ String platformId = sendRtpItem.getPlatformId();
+ Device device = deviceService.getDevice(platformId);
+ try {
+ if (device != null) {
+ cmder.streamByeCmd(device, sendRtpItem.getChannelId(), event.getStream(), sendRtpItem.getCallId());
+ if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST)
+ || sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) {
+ AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+ if (audioBroadcastCatch != null) {
+ // 鏉ヨ嚜涓婄骇骞冲彴鐨勫仠姝㈠璁�
+ logger.info("[鍋滄瀵硅] 鏉ヨ嚜涓婄骇锛屽钩鍙帮細{}, 閫氶亾锛歿}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+ audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+ }
+ }
+ }
+ } catch (SipException | InvalidArgumentException | ParseException |
+ SsrcTransactionNotFoundException e) {
+ logger.error("[鍛戒护鍙戦�佸け璐 鍙戦�丅YE: {}", e.getMessage());
+ }
+ }
+ }
+ }
+
if ("broadcast".equals(event.getApp()) || "talk".equals(event.getApp())) {
if (event.getStream().indexOf("_") > 0) {
String[] streamArray = event.getStream().split("_");
@@ -188,9 +216,55 @@
}else if ("talk".equals(event.getApp())) {
stopTalk(device, channelId, false);
}
-
}
}
+ }
+ }
+
+ /**
+ * 娴佹湭鎵惧埌鐨勫鐞�
+ */
+ @Async("taskExecutor")
+ @EventListener
+ public void onApplicationEvent(MediaNotFoundEvent event) {
+ if (!"rtp".equals(event.getApp())) {
+ return;
+ }
+ String[] s = event.getStream().split("_");
+ if ((s.length != 2 && s.length != 4)) {
+ return;
+ }
+ String deviceId = s[0];
+ String channelId = s[1];
+ Device device = redisCatchStorage.getDevice(deviceId);
+ if (device == null || !device.isOnLine()) {
+ return;
+ }
+ DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
+ if (deviceChannel == null) {
+ return;
+ }
+ if (s.length == 2) {
+ logger.info("[ZLM HOOK] 棰勮娴佹湭鎵惧埌, 鍙戣捣鑷姩鐐规挱锛歿}->{}->{}/{}", event.getMediaServer().getId(), event.getSchema(), event.getApp(), event.getStream());
+ play(event.getMediaServer(), deviceId, channelId, null, null);
+ } else if (s.length == 4) {
+ // 姝ゆ椂涓哄綍鍍忓洖鏀撅紝 褰曞儚鍥炴斁鏍煎紡涓�> 璁惧ID_閫氶亾ID_寮�濮嬫椂闂確缁撴潫鏃堕棿
+ String startTimeStr = s[2];
+ String endTimeStr = s[3];
+ if (startTimeStr == null || endTimeStr == null || startTimeStr.length() != 14 || endTimeStr.length() != 14) {
+ return;
+ }
+ String startTime = DateUtil.urlToyyyy_MM_dd_HH_mm_ss(startTimeStr);
+ String endTime = DateUtil.urlToyyyy_MM_dd_HH_mm_ss(endTimeStr);
+ logger.info("[ZLM HOOK] 鍥炴斁娴佹湭鎵惧埌, 鍙戣捣鑷姩鐐规挱锛歿}->{}->{}/{}-{}-{}",
+ event.getMediaServer().getId(), event.getSchema(),
+ event.getApp(), event.getStream(),
+ startTime, endTime
+ );
+
+ SSRCInfo ssrcInfo = mediaServerService.openRTPServer(event.getMediaServer(), event.getStream(), null,
+ device.isSsrcCheck(), true, 0, false, false, device.getStreamModeForParam());
+ playBack(event.getMediaServer(), ssrcInfo, deviceId, channelId, startTime, endTime, null);
}
}
@@ -265,7 +339,7 @@
}
private void talk(MediaServer mediaServerItem, Device device, String channelId, String stream,
- ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
+ HookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
Runnable timeoutCallback, AudioBroadcastEvent audioEvent) {
String playSsrc = ssrcFactory.getPlaySsrc(mediaServerItem.getId());
@@ -347,12 +421,12 @@
// 鏌ョ湅璁惧鏄惁宸茬粡鍦ㄦ帹娴�
try {
- cmder.talkStreamCmd(mediaServerItem, sendRtpItem, device, channelId, callId, (mediaServerItemInuse, hookParam) -> {
- logger.info("[璇煶瀵硅] 娴佸凡鐢熸垚锛� 寮�濮嬫帹娴侊細 " + hookParam);
+ cmder.talkStreamCmd(mediaServerItem, sendRtpItem, device, channelId, callId, (hookData) -> {
+ logger.info("[璇煶瀵硅] 娴佸凡鐢熸垚锛� 寮�濮嬫帹娴侊細 " + hookData);
dynamicTask.stop(timeOutTaskKey);
// TODO 鏆備笉鍋氬鐞�
- }, (mediaServerItemInuse, hookParam) -> {
- logger.info("[璇煶瀵硅] 璁惧寮�濮嬫帹娴侊細 " + hookParam);
+ }, (hookData) -> {
+ logger.info("[璇煶瀵硅] 璁惧寮�濮嬫帹娴侊細 " + hookData);
dynamicTask.stop(timeOutTaskKey);
}, (event) -> {
@@ -462,8 +536,7 @@
streamSession.remove(device.getDeviceId(), channel.getChannelId(), ssrcInfo.getStream());
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
// 鍙栨秷璁㈤槄娑堟伅鐩戝惉
- HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
- subscribe.removeSubscribe(hookSubscribe);
+ subscribe.removeSubscribe(Hook.getInstance(HookType.on_media_arrival, "rtp", ssrcInfo.getStream(), mediaServerItem.getId()));
}
}else {
logger.info("[鐐规挱瓒呮椂] 鏀舵祦瓒呮椂 deviceId: {}, channelId: {},鐮佹祦锛歿}锛岀鍙o細{}, SSRC: {}",
@@ -478,11 +551,11 @@
}, userSetting.getPlayTimeout());
try {
- cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channel, (mediaServerItemInuse, hookParam ) -> {
- logger.info("鏀跺埌璁㈤槄娑堟伅锛� " + hookParam);
+ cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channel, (hookData ) -> {
+ logger.info("鏀跺埌璁㈤槄娑堟伅锛� " + hookData);
dynamicTask.stop(timeOutTaskKey);
// hook鍝嶅簲
- StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInuse, hookParam, device.getDeviceId(), channel.getChannelId());
+ StreamInfo streamInfo = onPublishHandlerForPlay(hookData.getMediaServer(), hookData.getMediaInfo(), device.getDeviceId(), channel.getChannelId());
if (streamInfo == null){
callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
@@ -498,7 +571,7 @@
streamInfo);
logger.info("[鐐规挱鎴愬姛] deviceId: {}, channelId:{}, 鐮佹祦绫诲瀷锛歿}", device.getDeviceId(), channel.getChannelId(),
channel.getStreamIdentification());
- snapOnPlay(mediaServerItemInuse, device.getDeviceId(), channel.getChannelId(), ssrcInfo.getStream());
+ snapOnPlay(hookData.getMediaServer(), device.getDeviceId(), channel.getChannelId(), ssrcInfo.getStream());
}, (eventResult) -> {
// 澶勭悊鏀跺埌200ok鍚庣殑TCP涓诲姩杩炴帴浠ュ強SSRC涓嶄竴鑷寸殑闂
InviteOKHandler(eventResult, ssrcInfo, mediaServerItem, device, channel.getChannelId(),
@@ -624,11 +697,10 @@
mediaServerService.getSnap(mediaServerItemInuse, streamUrl, 15, 1, path, fileName);
}
- public StreamInfo onPublishHandlerForPlay(MediaServer mediaServerItem, HookParam hookParam, String deviceId, String channelId) {
+ public StreamInfo onPublishHandlerForPlay(MediaServer mediaServerItem, MediaInfo mediaInfo, String deviceId, String channelId) {
StreamInfo streamInfo = null;
Device device = redisCatchStorage.getDevice(deviceId);
- OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam;
- streamInfo = onPublishHandler(mediaServerItem, streamChangedHookParam, deviceId, channelId);
+ streamInfo = onPublishHandler(mediaServerItem, mediaInfo, deviceId, channelId);
if (streamInfo != null) {
DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
if (deviceChannel != null) {
@@ -646,9 +718,8 @@
}
- private StreamInfo onPublishHandlerForPlayback(MediaServer mediaServerItem, HookParam param, String deviceId, String channelId, String startTime, String endTime) {
- OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam) param;
- StreamInfo streamInfo = onPublishHandler(mediaServerItem, streamChangedHookParam, deviceId, channelId);
+ private StreamInfo onPublishHandlerForPlayback(MediaServer mediaServerItem, MediaInfo mediaInfo, String deviceId, String channelId, String startTime, String endTime) {
+ StreamInfo streamInfo = onPublishHandler(mediaServerItem, mediaInfo, deviceId, channelId);
if (streamInfo != null) {
streamInfo.setStartTime(startTime);
streamInfo.setEndTime(endTime);
@@ -657,7 +728,7 @@
deviceChannel.setStreamId(streamInfo.getStream());
storager.startPlay(deviceId, channelId, streamInfo.getStream());
}
- InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(InviteSessionType.PLAYBACK, ((OnStreamChangedHookParam) param).getStream());
+ InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(InviteSessionType.PLAYBACK, mediaInfo.getStream());
if (inviteInfo != null) {
inviteInfo.setStatus(InviteSessionStatus.ok);
@@ -763,10 +834,10 @@
inviteStreamService.removeInviteInfo(inviteInfo);
};
- ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInuse, hookParam) -> {
- logger.info("鏀跺埌鍥炴斁璁㈤槄娑堟伅锛� " + hookParam);
+ HookSubscribe.Event hookEvent = (hookData) -> {
+ logger.info("鏀跺埌鍥炴斁璁㈤槄娑堟伅锛� " + hookData);
dynamicTask.stop(playBackTimeOutTaskKey);
- StreamInfo streamInfo = onPublishHandlerForPlayback(mediaServerItemInuse, hookParam, deviceId, channelId, startTime, endTime);
+ StreamInfo streamInfo = onPublishHandlerForPlayback(hookData.getMediaServer(), hookData.getMediaInfo(), deviceId, channelId, startTime, endTime);
if (streamInfo == null) {
logger.warn("璁惧鍥炴斁API璋冪敤澶辫触锛�");
callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
@@ -952,10 +1023,10 @@
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
inviteStreamService.removeInviteInfo(inviteInfo);
};
- ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInuse, hookParam) -> {
- logger.info("[褰曞儚涓嬭浇]鏀跺埌璁㈤槄娑堟伅锛� " + hookParam);
+ HookSubscribe.Event hookEvent = (hookData) -> {
+ logger.info("[褰曞儚涓嬭浇]鏀跺埌璁㈤槄娑堟伅锛� " + hookData);
dynamicTask.stop(downLoadTimeOutTaskKey);
- StreamInfo streamInfo = onPublishHandlerForDownload(mediaServerItemInuse, hookParam, deviceId, channelId, startTime, endTime);
+ StreamInfo streamInfo = onPublishHandlerForDownload(hookData.getMediaServer(), hookData.getMediaInfo(), deviceId, channelId, startTime, endTime);
if (streamInfo == null) {
logger.warn("[褰曞儚涓嬭浇] 鑾峰彇娴佸湴鍧�淇℃伅澶辫触");
callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
@@ -973,26 +1044,24 @@
downLoadTimeOutTaskKey, callback, inviteInfo, InviteSessionType.DOWNLOAD);
// 娉ㄥ唽褰曞儚鍥炶皟浜嬩欢锛屽綍鍍忎笅杞界粨鏉熷悗鍐欏叆涓嬭浇鍦板潃
- ZlmHttpHookSubscribe.Event hookEventForRecord = (mediaServerItemInuse, hookParam) -> {
+ HookSubscribe.Event hookEventForRecord = (hookData) -> {
logger.info("[褰曞儚涓嬭浇] 鏀跺埌褰曞儚鍐欏叆纾佺洏娑堟伅锛� 锛� {}/{}-{}",
inviteInfo.getDeviceId(), inviteInfo.getChannelId(), ssrcInfo.getStream());
- logger.info("[褰曞儚涓嬭浇] 鏀跺埌褰曞儚鍐欏叆纾佺洏娑堟伅鍐呭锛� " + hookParam);
- OnRecordMp4HookParam recordMp4HookParam = (OnRecordMp4HookParam)hookParam;
- String filePath = recordMp4HookParam.getFile_path();
+ logger.info("[褰曞儚涓嬭浇] 鏀跺埌褰曞儚鍐欏叆纾佺洏娑堟伅鍐呭锛� " + hookData);
+ RecordInfo recordInfo = hookData.getRecordInfo();
+ String filePath = recordInfo.getFilePath();
DownloadFileInfo downloadFileInfo = CloudRecordUtils.getDownloadFilePath(mediaServerItem, filePath);
InviteInfo inviteInfoForNew = inviteStreamService.getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId()
, inviteInfo.getChannelId(), inviteInfo.getStream());
inviteInfoForNew.getStreamInfo().setDownLoadFilePath(downloadFileInfo);
inviteStreamService.updateInviteInfo(inviteInfoForNew);
};
- HookSubscribeForRecordMp4 hookSubscribe = HookSubscribeFactory.on_record_mp4(
- mediaServerItem.getId(), "rtp", ssrcInfo.getStream());
-
+ Hook hook = Hook.getInstance(HookType.on_record_mp4, "rtp", ssrcInfo.getStream(), mediaServerItem.getId());
// 璁剧疆杩囨湡鏃堕棿锛屼笅杞藉け璐ユ椂鑷姩澶勭悊璁㈤槄鏁版嵁
// long difference = DateUtil.getDifference(startTime, endTime)/1000;
// Instant expiresInstant = Instant.now().plusSeconds(TimeUnit.MINUTES.toSeconds(difference * 2));
// hookSubscribe.setExpires(expiresInstant);
- subscribe.addSubscribe(hookSubscribe, hookEventForRecord);
+ subscribe.addSubscribe(hook, hookEventForRecord);
});
} catch (InvalidArgumentException | SipException | ParseException e) {
logger.error("[鍛戒护鍙戦�佸け璐 褰曞儚涓嬭浇: {}", e.getMessage());
@@ -1058,9 +1127,8 @@
return inviteInfo.getStreamInfo();
}
- private StreamInfo onPublishHandlerForDownload(MediaServer mediaServerItemInuse, HookParam hookParam, String deviceId, String channelId, String startTime, String endTime) {
- OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam) hookParam;
- StreamInfo streamInfo = onPublishHandler(mediaServerItemInuse, streamChangedHookParam, deviceId, channelId);
+ private StreamInfo onPublishHandlerForDownload(MediaServer mediaServerItemInuse, MediaInfo mediaInfo, String deviceId, String channelId, String startTime, String endTime) {
+ StreamInfo streamInfo = onPublishHandler(mediaServerItemInuse, mediaInfo, deviceId, channelId);
if (streamInfo != null) {
streamInfo.setProgress(0);
streamInfo.setStartTime(startTime);
@@ -1077,9 +1145,8 @@
}
- public StreamInfo onPublishHandler(MediaServer mediaServerItem, OnStreamChangedHookParam hookParam, String deviceId, String channelId) {
- MediaInfo mediaInfo = MediaInfo.getInstance(hookParam);
- StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, "rtp", hookParam.getStream(), mediaInfo, null);
+ public StreamInfo onPublishHandler(MediaServer mediaServerItem, MediaInfo mediaInfo, String deviceId, String channelId) {
+ StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream(mediaServerItem, "rtp", mediaInfo.getStream(), mediaInfo, null);
streamInfo.setDeviceID(deviceId);
streamInfo.setChannelId(channelId);
return streamInfo;
@@ -1144,7 +1211,7 @@
AudioBroadcastResult audioBroadcastResult = new AudioBroadcastResult();
audioBroadcastResult.setApp(app);
audioBroadcastResult.setStream(stream);
- audioBroadcastResult.setStreamInfo(new StreamContent(mediaService.getStreamInfoByAppAndStream(mediaServerItem, app, stream, null, null, null, false)));
+ audioBroadcastResult.setStreamInfo(new StreamContent(mediaServerService.getStreamInfoByAppAndStream(mediaServerItem, app, stream, null, null, null, false)));
audioBroadcastResult.setCodec("G.711");
return audioBroadcastResult;
}
@@ -1515,7 +1582,7 @@
}
}
- talk(mediaServerItem, device, channelId, stream, (mediaServerItem1, hookParam) -> {
+ talk(mediaServerItem, device, channelId, stream, (hookData) -> {
logger.info("[璇煶瀵硅] 鏀跺埌璁惧鍙戞潵鐨勬祦");
}, eventResult -> {
logger.warn("[璇煶瀵硅] 澶辫触锛寋}/{}, 閿欒鐮� {} {}", device.getDeviceId(), channelId, eventResult.statusCode, eventResult.msg);
--
Gitblit v1.8.0