From f961515317a33fe965287ca5c978b85e9ce1abcc Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期二, 27 六月 2023 17:18:46 +0800
Subject: [PATCH] 合并主线
---
src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java | 39 +++++++------------
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java | 12 ++---
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java | 16 ++++----
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java | 5 +-
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java | 22 +++++-----
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java | 2
src/main/java/com/genersoft/iot/vmp/service/IPlayService.java | 3 +
7 files changed, 45 insertions(+), 54 deletions(-)
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java
index a160921..f32d420 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java
@@ -56,7 +56,7 @@
//via
ArrayList<ViaHeader> viaHeaders = new ArrayList<ViaHeader>();
ViaHeader viaHeader = SipFactory.getInstance().createHeaderFactory().createViaHeader(parentPlatform.getDeviceIp(),
- Integer.parseInt(parentPlatform.getDevicePort()), parentPlatform.getTransport(), SipUtils.getNewViaTag());
+ parentPlatform.getDevicePort(), parentPlatform.getTransport(), SipUtils.getNewViaTag());
viaHeader.setRPort();
viaHeaders.add(viaHeader);
//from
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
index bbaf22c..ffd41db 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
@@ -6,9 +6,7 @@
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.SipLayer;
-import com.genersoft.iot.vmp.gb28181.bean.Device;
-import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm;
-import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
+import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.SIPSender;
@@ -624,9 +622,9 @@
logger.info("[璇煶鍠婅瘽] {} 鍒嗛厤鐨刏LM涓�: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getIp(), sendRtpItem.getPort());
HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId());
- subscribe.addSubscribe(hookSubscribeForStreamChange, (MediaServerItem mediaServerItemInUse, JSONObject json) -> {
+ subscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItemInUse, hookParam) -> {
if (event != null) {
- event.response(mediaServerItemInUse, json);
+ event.response(mediaServerItemInUse, hookParam);
subscribe.removeSubscribe(hookSubscribeForStreamChange);
}
});
@@ -634,9 +632,9 @@
CallIdHeader callIdHeader = sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()), device.getTransport());
callIdHeader.setCallId(callId);
HookSubscribeForStreamPush hookSubscribeForStreamPush = HookSubscribeFactory.on_publish("rtp", stream, null, mediaServerItem.getId());
- subscribe.addSubscribe(hookSubscribeForStreamPush, (MediaServerItem mediaServerItemInUse, JSONObject json) -> {
+ subscribe.addSubscribe(hookSubscribeForStreamPush, (mediaServerItemInUse, hookParam) -> {
if (eventForPush != null) {
- eventForPush.response(mediaServerItemInUse, json);
+ eventForPush.response(mediaServerItemInUse, hookParam);
}
});
//
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
index 792d901..30d6256 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
@@ -19,6 +19,7 @@
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
+import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
@@ -899,9 +900,9 @@
logger.info("{} 鍒嗛厤鐨刏LM涓�: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort());
HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId());
- subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json) -> {
+ subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> {
if (event != null) {
- event.response(mediaServerItemInUse, json);
+ event.response(mediaServerItemInUse, hookParam);
subscribe.removeSubscribe(hookSubscribe);
}
});
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java
index ab54d15..a970eb5 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java
@@ -10,6 +10,7 @@
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.NotifyMessageHandler;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
+import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IPlatformService;
@@ -127,10 +128,9 @@
// 娑堟伅鍙戦�佹垚鍔燂紝 鍚戜笂绾у彂閫乮nvite锛岃幏鍙栨帹娴�
try {
- platformService.broadcastInvite(platform, deviceChannel.getChannelId(), mediaServerForMinimumLoad, (mediaServerItem, response)->{
+ platformService.broadcastInvite(platform, deviceChannel.getChannelId(), mediaServerForMinimumLoad, (mediaServerItem, hookParam)->{
+ OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam;
// 涓婄骇骞冲彴鎺ㄦ祦鎴愬姛
- String app = response.getString("app");
- String stream = response.getString("stream");
AudioBroadcastCatch broadcastCatch = audioBroadcastManager.get(device.getDeviceId(), targetId);
if (broadcastCatch != null ) {
if (playService.audioBroadcastInUse(device, targetId)) {
@@ -138,24 +138,24 @@
platform.getServerGBId(), deviceChannel.getChannelId());
// 鏌ョ湅璇煶閫氶亾宸茬粡寤虹珛涓斿凡缁忓崰鐢� 鍥炲BYE
try {
- platformService.stopBroadcast(platform, deviceChannel.getChannelId(), stream);
+ platformService.stopBroadcast(platform, deviceChannel.getChannelId(), streamChangedHookParam.getStream());
} catch (InvalidArgumentException | ParseException | SsrcTransactionNotFoundException |
SipException e) {
logger.info("[娑堟伅鍙戦�佸け璐 鍥芥爣绾ц仈 璇煶鍠婅瘽 platform锛� {}锛� channel: {}", platform.getServerGBId(), deviceChannel.getChannelId());
}
}else {
// 鏌ョ湅璇煶閫氶亾宸茬粡寤虹珛浣嗘槸鏈崰鐢�
- broadcastCatch.setApp(app);
- broadcastCatch.setStream(stream);
+ broadcastCatch.setApp(streamChangedHookParam.getApp());
+ broadcastCatch.setStream(streamChangedHookParam.getStream());
broadcastCatch.setMediaServerItem(mediaServerItem);
audioBroadcastManager.update(broadcastCatch);
// 鎺ㄦ祦鍒拌澶�
- SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, targetId, stream, null);
+ SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, targetId, streamChangedHookParam.getStream(), null);
if (sendRtpItem == null) {
- logger.warn("[鍥芥爣绾ц仈] 璇煶鍠婅瘽 寮傚父锛屾湭鎵惧埌鍙戞祦淇℃伅锛� channelId: {}, stream: {}", targetId, stream);
- logger.info("[鍥芥爣绾ц仈] 璇煶鍠婅瘽 閲嶆柊寮�濮嬶紝channelId: {}, stream: {}", targetId, stream);
+ logger.warn("[鍥芥爣绾ц仈] 璇煶鍠婅瘽 寮傚父锛屾湭鎵惧埌鍙戞祦淇℃伅锛� channelId: {}, stream: {}", targetId, streamChangedHookParam.getStream());
+ logger.info("[鍥芥爣绾ц仈] 璇煶鍠婅瘽 閲嶆柊寮�濮嬶紝channelId: {}, stream: {}", targetId, streamChangedHookParam.getStream());
try {
- playService.audioBroadcastCmd(device, targetId, mediaServerItem, app, stream, 60, true, msg -> {
+ playService.audioBroadcastCmd(device, targetId, mediaServerItem, streamChangedHookParam.getApp(), streamChangedHookParam.getStream(), 60, true, msg -> {
logger.info("[璇煶鍠婅瘽] 閫氶亾寤虹珛鎴愬姛, device: {}, channel: {}", device.getDeviceId(), targetId);
});
} catch (SipException | InvalidArgumentException | ParseException e) {
@@ -173,7 +173,7 @@
}
}else {
try {
- playService.audioBroadcastCmd(device, targetId, mediaServerItem, app, stream, 60, true, msg -> {
+ playService.audioBroadcastCmd(device, targetId, mediaServerItem, streamChangedHookParam.getApp(), streamChangedHookParam.getStream(), 60, true, msg -> {
logger.info("[璇煶鍠婅瘽] 閫氶亾寤虹珛鎴愬姛, device: {}, channel: {}", device.getDeviceId(), targetId);
});
} catch (SipException | InvalidArgumentException | ParseException e) {
diff --git a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java
index 95fb3ac..44bf11b 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java
@@ -7,6 +7,7 @@
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
+import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult;
@@ -28,7 +29,7 @@
ErrorCallback<Object> callback);
SSRCInfo play(MediaServerItem mediaServerItem, String deviceId, String channelId, ErrorCallback<Object> callback);
- StreamInfo onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId);
+ StreamInfo onPublishHandlerForPlay(MediaServerItem mediaServerItem, HookParam hookParam, String deviceId, String channelId);
MediaServerItem getNewMediaServerItem(Device device);
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
index 6c33770..c43591b 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
@@ -1,6 +1,5 @@
package com.genersoft.iot.vmp.service.impl;
-import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.InviteInfo;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.conf.DynamicTask;
@@ -16,6 +15,7 @@
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
+import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IPlatformService;
@@ -442,10 +442,11 @@
inviteStreamService.removeInviteInfo(inviteInfo);
}else {
// 娴佺‘瀹炲皻鍦ㄦ帹娴侊紝鐩存帴鍥炶皟缁撴灉
- JSONObject json = new JSONObject();
- json.put("app", inviteInfo.getStreamInfo().getApp());
- json.put("stream", inviteInfo.getStreamInfo().getStream());
- hookEvent.response(mediaServerItemForStreamInfo, json);
+ OnStreamChangedHookParam hookParam = new OnStreamChangedHookParam();
+ hookParam.setApp(inviteInfo.getStreamInfo().getApp());
+ hookParam.setStream(inviteInfo.getStreamInfo().getStream());
+
+ hookEvent.response(mediaServerItemForStreamInfo, hookParam);
return;
}
}
@@ -498,14 +499,14 @@
}
}
}, userSetting.getPlayTimeout());
- commanderForPlatform.broadcastInviteCmd(platform, channelId, mediaServerItem, ssrcInfo, (mediaServerItemForInvite, response)->{
+ commanderForPlatform.broadcastInviteCmd(platform, channelId, mediaServerItem, ssrcInfo, (mediaServerItemForInvite, hookParam)->{
logger.info("[鍥芥爣绾ц仈] 鍙戣捣璇煶鍠婅瘽 鏀跺埌涓婄骇鎺ㄦ祦 deviceId: {}, channelId: {}", platform.getServerGBId(), channelId);
dynamicTask.stop(timeOutTaskKey);
// hook鍝嶅簲
- playService.onPublishHandlerForPlay(mediaServerItemForInvite, response, platform.getServerGBId(), channelId);
+ playService.onPublishHandlerForPlay(mediaServerItemForInvite, hookParam, platform.getServerGBId(), channelId);
// 鏀跺埌娴�
if (hookEvent != null) {
- hookEvent.response(mediaServerItem, response);
+ hookEvent.response(mediaServerItem, hookParam);
}
}, event -> {
// 鏀跺埌200OK 妫�娴媠src鏄惁鏈夊彉鍖栵紝闃叉涓婄骇鑷畾涔変簡ssrc
@@ -524,30 +525,20 @@
logger.info("[鐐规挱娑堟伅] 鏀跺埌invite 200, 鍙戠幇涓嬬骇鑷畾涔変簡ssrc: {}", ssrcInResponse);
if (!mediaServerItem.isRtpEnable()) {
logger.info("[鐐规挱娑堟伅] SSRC淇 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
-
- if (!ssrcFactory.checkSsrc(mediaServerItem.getId(), ssrcInResponse)) {
- // ssrc 涓嶅彲鐢�
- // 閲婃斁ssrc
- mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
- streamSession.remove(platform.getServerGBId(), channelId, ssrcInfo.getStream());
- event.msg = "涓嬬骇鑷畾涔変簡ssrc,浣嗘槸姝src涓嶅彲鐢�";
- event.statusCode = 400;
- errorEvent.response(event);
- return;
- }
-
+ // 閲婃斁ssrc
+ mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
// 鍗曠鍙fā寮弒treamId涔熸湁鍙樺寲锛岄渶瑕侀噸鏂拌缃洃鍚�
if (!mediaServerItem.isRtpEnable()) {
// 娣诲姞璁㈤槄
HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
subscribe.removeSubscribe(hookSubscribe);
hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());
- subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> {
- logger.info("[ZLM HOOK] ssrc淇鍚庢敹鍒拌闃呮秷鎭細 " + response.toJSONString());
+ subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> {
+ logger.info("[ZLM HOOK] ssrc淇鍚庢敹鍒拌闃呮秷鎭細 " + hookParam);
dynamicTask.stop(timeOutTaskKey);
// hook鍝嶅簲
- playService.onPublishHandlerForPlay(mediaServerItemInUse, response, platform.getServerGBId(), channelId);
- hookEvent.response(mediaServerItemInUse, response);
+ playService.onPublishHandlerForPlay(mediaServerItemInUse, hookParam, platform.getServerGBId(), channelId);
+ hookEvent.response(mediaServerItemInUse, hookParam);
});
}
// 鍏抽棴rtp server
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
index c345770..65c479f 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -293,12 +293,12 @@
// 鏌ョ湅璁惧鏄惁宸茬粡鍦ㄦ帹娴�
try {
- cmder.talkStreamCmd(mediaServerItem, sendRtpItem, device, channelId, callId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
- logger.info("[璇煶瀵硅] 娴佸凡鐢熸垚锛� 寮�濮嬫帹娴侊細 " + response.toJSONString());
+ cmder.talkStreamCmd(mediaServerItem, sendRtpItem, device, channelId, callId, (mediaServerItemInuse, hookParam) -> {
+ logger.info("[璇煶瀵硅] 娴佸凡鐢熸垚锛� 寮�濮嬫帹娴侊細 " + hookParam);
dynamicTask.stop(timeOutTaskKey);
// TODO 鏆備笉鍋氬鐞�
- }, (MediaServerItem mediaServerItemInuse, JSONObject json) -> {
- logger.info("[璇煶瀵硅] 璁惧寮�濮嬫帹娴侊細 " + json.toJSONString());
+ }, (mediaServerItemInuse, hookParam) -> {
+ logger.info("[璇煶瀵硅] 璁惧寮�濮嬫帹娴侊細 " + hookParam);
dynamicTask.stop(timeOutTaskKey);
}, (event) -> {
@@ -617,10 +617,10 @@
}
@Override
- public StreamInfo onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId) {
- StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId);
+ public StreamInfo onPublishHandlerForPlay(MediaServerItem mediaServerItem, HookParam hookParam, String deviceId, String channelId) {
+ OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam) hookParam;
+ StreamInfo streamInfo = onPublishHandler(mediaServerItem, streamChangedHookParam, deviceId, channelId);
Device device = redisCatchStorage.getDevice(deviceId);
- OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam;
if (streamInfo != null) {
DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
if (deviceChannel != null) {
@@ -1571,7 +1571,7 @@
}
}
- talk(mediaServerItem, device, channelId, stream, (MediaServerItem mediaServerItem1, JSONObject response) -> {
+ talk(mediaServerItem, device, channelId, stream, (mediaServerItem1, hookParam) -> {
logger.info("[璇煶瀵硅] 鏀跺埌璁惧鍙戞潵鐨勬祦");
}, eventResult -> {
logger.warn("[璇煶瀵硅] 澶辫触锛寋}/{}, 閿欒鐮� {} {}", device.getDeviceId(), channelId, eventResult.statusCode, eventResult.msg);
--
Gitblit v1.8.0