From 3a056acbc1f0a5b601880659fe7719ca3170a9d6 Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期五, 24 五月 2024 10:18:29 +0800
Subject: [PATCH] 合并271分支
---
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java | 156 +++++++++++++++++++++++++--------------------------
1 files changed, 76 insertions(+), 80 deletions(-)
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
index 3a7f452..b639048 100755
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -10,8 +10,8 @@
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
-import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
+import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
@@ -24,11 +24,12 @@
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.*;
import com.genersoft.iot.vmp.service.*;
-import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
+import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
+import com.genersoft.iot.vmp.utils.MediaServerUtils;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.OtherPsSendInfo;
import com.genersoft.iot.vmp.vmanager.bean.OtherRtpSendInfo;
@@ -47,7 +48,6 @@
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import java.text.ParseException;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -81,6 +81,10 @@
@Autowired
private IRedisCatchStorage redisCatchStorage;
+
+ @Autowired
+ private IRedisRpcService redisRpcService;
+
@Autowired
private IInviteStreamService inviteStreamService;
@@ -101,9 +105,6 @@
@Autowired
private EventPublisher eventPublisher;
-
- @Autowired
- private ZLMMediaListManager zlmMediaListManager;
@Autowired
private ZlmHttpHookSubscribe subscribe;
@@ -129,6 +130,9 @@
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
+
+ @Autowired
+ private IStreamPushService streamPushService;
/**
* 鏈嶅姟鍣ㄥ畾鏃朵笂鎶ユ椂闂达紝涓婃姤闂撮殧鍙厤缃紝榛樿10s涓婃姤涓�娆�
@@ -175,7 +179,7 @@
}
});
if (!"rtp".equals(param.getApp())) {
- Map<String, String> paramMap = urlParamToMap(param.getParams());
+ Map<String, String> paramMap = MediaServerUtils.urlParamToMap(param.getParams());
StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(param.getApp(), param.getStream());
if (streamAuthorityInfo != null && streamAuthorityInfo.getCallId() != null && !streamAuthorityInfo.getCallId().equals(paramMap.get("callId"))) {
return new HookResult(401, "Unauthorized");
@@ -216,7 +220,7 @@
logger.info("鎺ㄦ祦閴存潈澶辫触锛� 缂哄皯蹇呰鍙傛暟锛歴ign=md5(user琛ㄧ殑pushKey)");
return new HookResultForOnPublish(401, "Unauthorized");
}
- Map<String, String> paramMap = urlParamToMap(param.getParams());
+ Map<String, String> paramMap = MediaServerUtils.urlParamToMap(param.getParams());
String sign = paramMap.get("sign");
if (sign == null) {
logger.info("鎺ㄦ祦閴存潈澶辫触锛� 缂哄皯蹇呰鍙傛暟锛歴ign=md5(user琛ㄧ殑pushKey)");
@@ -236,10 +240,7 @@
// 閴存潈閫氳繃
redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), param.getStream(), streamAuthorityInfo);
}
- } else {
- zlmMediaListManager.sendStreamEvent(param.getApp(), param.getStream(), param.getMediaServerId());
}
-
HookResultForOnPublish result = HookResultForOnPublish.SUCCESS();
result.setEnable_audio(true);
@@ -260,7 +261,6 @@
if ("rtp".equals(param.getApp())) {
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, param.getStream());
-
// 鍗曠鍙fā寮忎笅淇敼娴� ID
if (!mediaInfo.isRtpEnable() && inviteInfo == null) {
String ssrc = String.format("%010d", Long.parseLong(param.getStream(), 16));
@@ -268,6 +268,8 @@
if (inviteInfo != null) {
result.setStream_replace(inviteInfo.getStream());
logger.info("[ZLM HOOK]鎺ㄦ祦閴存潈 stream: {} 鏇挎崲涓� {}", param.getStream(), inviteInfo.getStream());
+ // 鍗曠鍙fā寮忎笅淇敼娴両D涓虹洰鏍囨祦ID锛屼笉鐒跺叾浠栧湴鏂瑰彲鑳介兘鏃犳硶瀵瑰簲
+ param.setStream(inviteInfo.getStream());
}
}
@@ -287,7 +289,7 @@
String channelId = ssrcTransactionForAll.get(0).getChannelId();
DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
if (deviceChannel != null) {
- result.setEnable_audio(deviceChannel.isHasAudio());
+ result.setEnable_audio(deviceChannel.getHasAudio());
}
// 濡傛灉鏄綍鍍忎笅杞藉氨璁剧疆瑙嗛闂撮殧鍗佺
if (ssrcTransactionForAll.get(0).getType() == InviteSessionType.DOWNLOAD) {
@@ -347,6 +349,11 @@
MediaServerItem mediaInfo = mediaServerService.getOne(param.getMediaServerId());
if (mediaInfo == null) {
logger.info("[ZLM HOOK] 娴佸彉鍖栨湭鎵惧埌ZLM, {}", param.getMediaServerId());
+ return;
+ }
+ if (!ObjectUtils.isEmpty(mediaInfo.getTranscodeSuffix())
+ && !"null".equalsIgnoreCase(mediaInfo.getTranscodeSuffix())
+ && param.getStream().endsWith(mediaInfo.getTranscodeSuffix()) ) {
return;
}
if (subscribe != null) {
@@ -464,8 +471,7 @@
|| param.getOriginType() == OriginType.RTMP_PUSH.ordinal()
|| param.getOriginType() == OriginType.RTC_PUSH.ordinal()) {
param.setSeverId(userSetting.getServerId());
- zlmMediaListManager.addPush(param);
-
+ streamPushService.updatePush(param);
// 鍐椾綑鏁版嵁锛岃嚜宸辩郴缁熶腑鑷敤
redisCatchStorage.addPushListItem(param.getApp(), param.getStream(), param);
}
@@ -482,10 +488,13 @@
}
}
GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream());
- if (gbStream != null) {
-// eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF);
+ // 鏌ユ壘鏄惁鍏宠仈浜嗗浗鏍囷紝 鍏宠仈浜嗕笉鍒犻櫎锛� 缃负绂荤嚎
+ if (gbStream == null) {
+ storager.removeMedia(param.getApp(), param.getStream());
+ }else {
+// eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF);
+ storager.mediaOffline(param.getApp(), param.getStream());
}
- zlmMediaListManager.removeMedia(param.getApp(), param.getStream());
}
GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream());
if (gbStream != null) {
@@ -512,40 +521,35 @@
if (sendRtpItem == null) {
continue;
}
-
if (sendRtpItem.getApp().equals(param.getApp())) {
- logger.info(sendRtpItem.toString());
- if (userSetting.getServerId().equals(sendRtpItem.getServerId())) {
- // 閫氱煡鍏朵粬wvp鍋滄鍙戞祦
-// redisCatchStorage.sendRtp
- }else {
- String platformId = sendRtpItem.getPlatformId();
- ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
- Device device = deviceService.getDevice(platformId);
-
- try {
- if (platform != null) {
- commanderFroPlatform.streamByeCmd(platform, sendRtpItem);
- redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(),
- sendRtpItem.getCallId(), sendRtpItem.getStream());
- } else {
- cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId());
- if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST)
- || sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) {
- AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
- if (audioBroadcastCatch != null) {
- // 鏉ヨ嚜涓婄骇骞冲彴鐨勫仠姝㈠璁�
- logger.info("[鍋滄瀵硅] 鏉ヨ嚜涓婄骇锛屽钩鍙帮細{}, 閫氶亾锛歿}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
- audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
- }
+ // 鍦╤ook鏀跺埌杩欎釜娑堟伅锛岃鏄庡彂娴佷竴瀹氭槸鏈骇瀹屾垚鐨勩��
+ ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
+ ParentPlatform platform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
+ Device device = deviceService.getDevice(sendRtpItem.getPlatformId());
+ try {
+ if (platform != null) {
+ commanderFroPlatform.streamByeCmd(platform, sendRtpItem);
+ redisCatchStorage.deleteSendRTPServer(sendRtpItem);
+ redisCatchStorage.sendPlatformStopPlayMsg(sendRtpItem, platform);
+ } else if (device != null) {
+ cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId());
+ if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST)
+ || sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) {
+ AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+ if (audioBroadcastCatch != null) {
+ // 鏉ヨ嚜涓婄骇骞冲彴鐨勫仠姝㈠璁�
+ logger.info("[鍋滄瀵硅] 鏉ヨ嚜涓婄骇锛屽钩鍙帮細{}, 閫氶亾锛歿}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+ audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
}
}
- } catch (SipException | InvalidArgumentException | ParseException |
- SsrcTransactionNotFoundException e) {
- logger.error("[鍛戒护鍙戦�佸け璐 鍙戦�丅YE: {}", e.getMessage());
+ }else {
+ // 閫氱煡鍏朵粬wvp鍋滄鍙戞祦
+ redisRpcService.rtpSendStopped(sendRtpItem.getRedisKey());
}
+ } catch (SipException | InvalidArgumentException | ParseException |
+ SsrcTransactionNotFoundException e) {
+ logger.error("[鍛戒护鍙戦�佸け璐 鍙戦�丅YE: {}", e.getMessage());
}
-
}
}
}
@@ -564,6 +568,19 @@
logger.info("[ZLM HOOK]娴佹棤浜鸿鐪嬶細{}->{}->{}/{}", param.getMediaServerId(), param.getSchema(),
param.getApp(), param.getStream());
+
+ MediaServerItem mediaInfo = mediaServerService.getOne(param.getMediaServerId());
+ if (mediaInfo == null) {
+ JSONObject ret = new JSONObject();
+ ret.put("code", 0);
+ return ret;
+ }
+ if (!ObjectUtils.isEmpty(mediaInfo.getTranscodeSuffix())
+ && !"null".equalsIgnoreCase(mediaInfo.getTranscodeSuffix())
+ && param.getStream().endsWith(mediaInfo.getTranscodeSuffix()) ) {
+ param.setStream(param.getStream().substring(0, param.getStream().lastIndexOf(mediaInfo.getTranscodeSuffix()) -1 ));
+ }
+
JSONObject ret = new JSONObject();
ret.put("code", 0);
// 鍥芥爣绫诲瀷鐨勬祦
@@ -590,14 +607,10 @@
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鍙戦�丅YE: {}", e.getMessage());
}
- redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(),
- sendRtpItem.getCallId(), sendRtpItem.getStream());
+ redisCatchStorage.deleteSendRTPServer(sendRtpItem);
+ ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) {
- MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
- sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
- sendRtpItem.getPlatformId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
- messageForPushChannel.setPlatFormIndex(parentPlatform.getId());
- redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
+ redisCatchStorage.sendPlatformStopPlayMsg(sendRtpItem, parentPlatform);
}
}
}
@@ -825,17 +838,18 @@
}
taskExecutor.execute(() -> {
List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(param.getStream());
- if (sendRtpItems.size() > 0) {
+ if (!sendRtpItems.isEmpty()) {
for (SendRtpItem sendRtpItem : sendRtpItems) {
ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
- ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
- try {
- commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem.getCallId());
- } catch (SipException | InvalidArgumentException | ParseException e) {
- logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鍙戦�丅YE: {}", e.getMessage());
+ if(parentPlatform != null) {
+ try {
+ commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem.getCallId());
+ } catch (SipException | InvalidArgumentException | ParseException e) {
+ logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鍙戦�丅YE: {}", e.getMessage());
+ }
}
- redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(),
- sendRtpItem.getCallId(), sendRtpItem.getStream());
+ ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
+ redisCatchStorage.deleteSendRTPServer(sendRtpItem);
}
}
});
@@ -884,23 +898,5 @@
});
return HookResult.SUCCESS();
- }
-
- private Map<String, String> urlParamToMap(String params) {
- HashMap<String, String> map = new HashMap<>();
- if (ObjectUtils.isEmpty(params)) {
- return map;
- }
- String[] paramsArray = params.split("&");
- if (paramsArray.length == 0) {
- return map;
- }
- for (String param : paramsArray) {
- String[] paramArray = param.split("=");
- if (paramArray.length == 2) {
- map.put(paramArray[0], paramArray[1]);
- }
- }
- return map;
}
}
--
Gitblit v1.8.0