From 678bb3356232af320cc2686fa94223f8a8057093 Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期四, 18 四月 2024 18:07:47 +0800
Subject: [PATCH] 临时提交
---
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java | 107 +++++++++++++++++++++++++----------------------------
1 files changed, 51 insertions(+), 56 deletions(-)
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
index 3e51d38..4d29532 100755
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -10,8 +10,8 @@
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
-import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
+import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
@@ -24,8 +24,8 @@
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.*;
import com.genersoft.iot.vmp.service.*;
-import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
+import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
@@ -73,9 +73,6 @@
private AudioBroadcastManager audioBroadcastManager;
@Autowired
- private ZLMServerFactory zlmServerFactory;
-
- @Autowired
private IPlayService playService;
@Autowired
@@ -83,6 +80,10 @@
@Autowired
private IRedisCatchStorage redisCatchStorage;
+
+
+ @Autowired
+ private IRedisRpcService redisRpcService;
@Autowired
private IInviteStreamService inviteStreamService;
@@ -106,9 +107,6 @@
private EventPublisher eventPublisher;
@Autowired
- private ZLMMediaListManager zlmMediaListManager;
-
- @Autowired
private ZlmHttpHookSubscribe subscribe;
@Autowired
@@ -124,9 +122,6 @@
private VideoStreamSessionManager sessionManager;
@Autowired
- private AssistRESTfulUtils assistRESTfulUtils;
-
- @Autowired
private SSRCFactory ssrcFactory;
@Qualifier("taskExecutor")
@@ -135,6 +130,9 @@
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
+
+ @Autowired
+ private IStreamPushService streamPushService;
/**
* 鏈嶅姟鍣ㄥ畾鏃朵笂鎶ユ椂闂达紝涓婃姤闂撮殧鍙厤缃紝榛樿10s涓婃姤涓�娆�
@@ -147,7 +145,7 @@
taskExecutor.execute(() -> {
List<ZlmHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_server_keepalive);
- if (subscribes != null && subscribes.size() > 0) {
+ if (subscribes != null && !subscribes.isEmpty()) {
for (ZlmHttpHookSubscribe.Event subscribe : subscribes) {
subscribe.response(null, param);
}
@@ -166,7 +164,7 @@
@PostMapping(value = "/on_play", produces = "application/json;charset=UTF-8")
public HookResult onPlay(@RequestBody OnPlayHookParam param) {
if (logger.isDebugEnabled()) {
- logger.debug("[ZLM HOOK] 鎾斁閴存潈锛歿}->{}" + param.getMediaServerId(), param);
+ logger.debug("[ZLM HOOK] 鎾斁閴存潈锛歿}->{}", param.getMediaServerId(), param);
}
String mediaServerId = param.getMediaServerId();
@@ -242,21 +240,14 @@
// 閴存潈閫氳繃
redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), param.getStream(), streamAuthorityInfo);
}
- } else {
- zlmMediaListManager.sendStreamEvent(param.getApp(), param.getStream(), param.getMediaServerId());
}
-
HookResultForOnPublish result = HookResultForOnPublish.SUCCESS();
result.setEnable_audio(true);
taskExecutor.execute(() -> {
ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_publish, json);
if (subscribe != null) {
- if (mediaInfo != null) {
- subscribe.response(mediaInfo, param);
- } else {
- new HookResultForOnPublish(1, "zlm not register");
- }
+ subscribe.response(mediaInfo, param);
}
});
@@ -270,7 +261,6 @@
if ("rtp".equals(param.getApp())) {
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, param.getStream());
-
// 鍗曠鍙fā寮忎笅淇敼娴� ID
if (!mediaInfo.isRtpEnable() && inviteInfo == null) {
String ssrc = String.format("%010d", Long.parseLong(param.getStream(), 16));
@@ -278,6 +268,8 @@
if (inviteInfo != null) {
result.setStream_replace(inviteInfo.getStream());
logger.info("[ZLM HOOK]鎺ㄦ祦閴存潈 stream: {} 鏇挎崲涓� {}", param.getStream(), inviteInfo.getStream());
+ // 鍗曠鍙fā寮忎笅淇敼娴両D涓虹洰鏍囨祦ID锛屼笉鐒跺叾浠栧湴鏂瑰彲鑳介兘鏃犳硶瀵瑰簲
+ param.setStream(inviteInfo.getStream());
}
}
@@ -380,7 +372,6 @@
redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), param.getStream(), streamAuthorityInfo);
}
}
-
if ("rtsp".equals(param.getSchema())) {
logger.info("娴佸彉鍖栵細娉ㄥ唽->{}, app->{}, stream->{}", param.isRegist(), param.getApp(), param.getStream());
if (param.isRegist()) {
@@ -475,8 +466,7 @@
|| param.getOriginType() == OriginType.RTMP_PUSH.ordinal()
|| param.getOriginType() == OriginType.RTC_PUSH.ordinal()) {
param.setSeverId(userSetting.getServerId());
- zlmMediaListManager.addPush(param);
-
+ streamPushService.updatePush(param);
// 鍐椾綑鏁版嵁锛岃嚜宸辩郴缁熶腑鑷敤
redisCatchStorage.addPushListItem(param.getApp(), param.getStream(), param);
}
@@ -493,10 +483,13 @@
}
}
GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream());
- if (gbStream != null) {
-// eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF);
+ // 鏌ユ壘鏄惁鍏宠仈浜嗗浗鏍囷紝 鍏宠仈浜嗕笉鍒犻櫎锛� 缃负绂荤嚎
+ if (gbStream == null) {
+ storager.removeMedia(param.getApp(), param.getStream());
+ }else {
+// eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF);
+ storager.mediaOffline(param.getApp(), param.getStream());
}
- zlmMediaListManager.removeMedia(param.getApp(), param.getStream());
}
GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream());
if (gbStream != null) {
@@ -520,17 +513,20 @@
List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(param.getStream());
if (!sendRtpItems.isEmpty()) {
for (SendRtpItem sendRtpItem : sendRtpItems) {
- if (sendRtpItem != null && sendRtpItem.getApp().equals(param.getApp())) {
- String platformId = sendRtpItem.getPlatformId();
- ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
- Device device = deviceService.getDevice(platformId);
-
+ if (sendRtpItem == null) {
+ continue;
+ }
+ if (sendRtpItem.getApp().equals(param.getApp())) {
+ // 鍦╤ook鏀跺埌杩欎釜娑堟伅锛岃鏄庡彂娴佷竴瀹氭槸鏈骇瀹屾垚鐨勩��
+ redisCatchStorage.deleteSendRTPServer(sendRtpItem);
+ ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
+ ParentPlatform platform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
+ Device device = deviceService.getDevice(sendRtpItem.getPlatformId());
try {
if (platform != null) {
commanderFroPlatform.streamByeCmd(platform, sendRtpItem);
- redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(),
- sendRtpItem.getCallId(), sendRtpItem.getStream());
- } else {
+ redisCatchStorage.sendPlatformStopPlayMsg(sendRtpItem, platform);
+ } else if (device != null) {
cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId());
if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST)
|| sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) {
@@ -541,6 +537,9 @@
audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
}
}
+ }else {
+ // 閫氱煡鍏朵粬wvp鍋滄鍙戞祦
+ redisRpcService.rtpSendStopped(sendRtpItem.getRedisKey());
}
} catch (SipException | InvalidArgumentException | ParseException |
SsrcTransactionNotFoundException e) {
@@ -580,9 +579,9 @@
}
// 鏀跺埌鏃犱汉瑙傜湅璇存槑娴佷篃娌℃湁鍦ㄥ線涓婄骇鎺ㄩ��
if (redisCatchStorage.isChannelSendingRTP(inviteInfo.getChannelId())) {
- List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId(
+ List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChannelId(
inviteInfo.getChannelId());
- if (sendRtpItems.size() > 0) {
+ if (!sendRtpItems.isEmpty()) {
for (SendRtpItem sendRtpItem : sendRtpItems) {
ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
try {
@@ -590,14 +589,10 @@
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鍙戦�丅YE: {}", e.getMessage());
}
- redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(),
- sendRtpItem.getCallId(), sendRtpItem.getStream());
+ redisCatchStorage.deleteSendRTPServer(sendRtpItem);
+ ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) {
- MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
- sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
- sendRtpItem.getPlatformId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
- messageForPushChannel.setPlatFormIndex(parentPlatform.getId());
- redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
+ redisCatchStorage.sendPlatformStopPlayMsg(sendRtpItem, parentPlatform);
}
}
}
@@ -799,7 +794,7 @@
logger.info("[ZLM HOOK] zlm 鍚姩 " + zlmServerConfig.getGeneralMediaServerId());
taskExecutor.execute(() -> {
List<ZlmHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_server_started);
- if (subscribes != null && subscribes.size() > 0) {
+ if (subscribes != null && !subscribes.isEmpty()) {
for (ZlmHttpHookSubscribe.Event subscribe : subscribes) {
subscribe.response(null, zlmServerConfig);
}
@@ -825,17 +820,18 @@
}
taskExecutor.execute(() -> {
List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(param.getStream());
- if (sendRtpItems.size() > 0) {
+ if (!sendRtpItems.isEmpty()) {
for (SendRtpItem sendRtpItem : sendRtpItems) {
ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
- ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
- try {
- commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem.getCallId());
- } catch (SipException | InvalidArgumentException | ParseException e) {
- logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鍙戦�丅YE: {}", e.getMessage());
+ if(parentPlatform != null) {
+ try {
+ commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem.getCallId());
+ } catch (SipException | InvalidArgumentException | ParseException e) {
+ logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鍙戦�丅YE: {}", e.getMessage());
+ }
}
- redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(),
- sendRtpItem.getCallId(), sendRtpItem.getStream());
+ ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
+ redisCatchStorage.deleteSendRTPServer(sendRtpItem);
}
}
});
@@ -848,12 +844,11 @@
*/
@ResponseBody
@PostMapping(value = "/on_rtp_server_timeout", produces = "application/json;charset=UTF-8")
- public HookResult onRtpServerTimeout(HttpServletRequest request, @RequestBody OnRtpServerTimeoutHookParam
+ public HookResult onRtpServerTimeout(@RequestBody OnRtpServerTimeoutHookParam
param) {
logger.info("[ZLM HOOK] rtpServer鏀舵祦瓒呮椂锛歿}->{}({})", param.getMediaServerId(), param.getStream_id(), param.getSsrc());
taskExecutor.execute(() -> {
- JSONObject json = (JSONObject) JSON.toJSON(param);
List<ZlmHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_rtp_server_timeout);
if (subscribes != null && !subscribes.isEmpty()) {
for (ZlmHttpHookSubscribe.Event subscribe : subscribes) {
--
Gitblit v1.8.0