From 5cae58c69129631c80c611d1870c3a20721cebf4 Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期二, 14 二月 2023 10:04:48 +0800
Subject: [PATCH] Merge branch 'wvp-28181-2.0' into main2
---
src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java | 215 ++++++++++++++++++++++++++++++++++++++++++++++++-----
1 files changed, 192 insertions(+), 23 deletions(-)
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
index fe67ede..1ce02c4 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
@@ -1,15 +1,25 @@
package com.genersoft.iot.vmp.service.impl;
+import com.alibaba.fastjson2.JSONObject;
+import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
+import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
+import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IPlatformService;
+import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
+import com.genersoft.iot.vmp.service.bean.InviteTimeOutCallback;
+import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper;
@@ -21,12 +31,13 @@
import org.springframework.stereotype.Service;
import javax.sip.InvalidArgumentException;
+import javax.sip.ResponseEvent;
import javax.sip.SipException;
-import javax.sip.TimeoutEvent;
import java.text.ParseException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
/**
* @author lin
@@ -65,6 +76,16 @@
@Autowired
private UserSetting userSetting;
+
+ @Autowired
+ private ZlmHttpHookSubscribe subscribe;
+
+ @Autowired
+ private VideoStreamSessionManager streamSession;
+
+
+ @Autowired
+ private IPlayService playService;
@@ -131,20 +152,16 @@
}
final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId();
- if (dynamicTask.contains(registerTaskKey)) {
- dynamicTask.stop(registerTaskKey);
- }
- // 娣诲姞娉ㄥ唽浠诲姟
- dynamicTask.startDelay(registerTaskKey,
+ if (!dynamicTask.isAlive(registerTaskKey)) {
+ // 娣诲姞娉ㄥ唽浠诲姟
+ dynamicTask.startCron(registerTaskKey,
// 娉ㄥ唽澶辫触锛堟敞鍐屾垚鍔熸椂鐢辩▼搴忕洿鎺ヨ皟鐢ㄤ簡online鏂规硶锛�
()-> {
- try {
- commanderForPlatform.register(parentPlatform, eventResult -> offline(parentPlatform),null);
- } catch (InvalidArgumentException | ParseException | SipException e) {
- logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈瀹氭椂娉ㄥ唽: {}", e.getMessage());
- }
+ registerTask(parentPlatform);
},
(parentPlatform.getExpires() - 10) *1000);
+ }
+
final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + parentPlatform.getServerGBId();
if (!dynamicTask.contains(keepaliveTaskKey)) {
@@ -160,16 +177,11 @@
// 姝ゆ椂鏄涓夋蹇冭烦瓒呮椂锛� 骞冲彴绂荤嚎
if (platformCatch.getKeepAliveReply() == 2) {
// 璁剧疆骞冲彴绂荤嚎锛屽苟閲嶆柊娉ㄥ唽
- offline(parentPlatform);
logger.info("[鍥芥爣绾ц仈] {}锛屼笁娆″績璺宠秴鏃跺悗鍐嶆鍙戣捣娉ㄥ唽", parentPlatform.getServerGBId());
try {
commanderForPlatform.register(parentPlatform, eventResult1 -> {
logger.info("[鍥芥爣绾ц仈] {}锛屼笁娆″績璺宠秴鏃跺悗鍐嶆鍙戣捣娉ㄥ唽浠嶇劧澶辫触锛屽紑濮嬪畾鏃跺彂璧锋敞鍐岋紝闂撮殧涓�1鍒嗛挓", parentPlatform.getServerGBId());
- // 娣诲姞娉ㄥ唽浠诲姟
- dynamicTask.startCron(registerTaskKey,
- // 娉ㄥ唽澶辫触锛堟敞鍐屾垚鍔熸椂鐢辩▼搴忕洿鎺ヨ皟鐢ㄤ簡online鏂规硶锛�
- ()->logger.info("[鍥芥爣绾ц仈] {},骞冲彴绂荤嚎鍚庢寔缁彂璧锋敞鍐岋紝澶辫触", parentPlatform.getServerGBId()),
- 60*1000);
+ offline(parentPlatform, false);
}, null);
} catch (InvalidArgumentException | ParseException | SipException e) {
logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 娉ㄥ唽: {}", e.getMessage());
@@ -197,8 +209,30 @@
}
}
+ private void registerTask(ParentPlatform parentPlatform){
+ try {
+ // 璁剧疆瓒呮椂閲嶅彂锛� 鍚庣画浠庡簳灞傛敮鎸佹秷鎭噸鍙�
+ String key = KEEPALIVE_KEY_PREFIX + parentPlatform.getServerGBId() + "_timeout";
+ if (dynamicTask.isAlive(key)) {
+ return;
+ }
+ dynamicTask.startDelay(key, ()->{
+ registerTask(parentPlatform);
+ }, 1000);
+ logger.info("[鍥芥爣绾ц仈] 骞冲彴锛歿}娉ㄥ唽鍗冲皢鍒版湡锛岄噸鏂版敞鍐�", parentPlatform.getServerGBId());
+ commanderForPlatform.register(parentPlatform, eventResult -> {
+ dynamicTask.stop(key);
+ offline(parentPlatform, false);
+ },eventResult -> {
+ dynamicTask.stop(key);
+ });
+ } catch (InvalidArgumentException | ParseException | SipException e) {
+ logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈瀹氭椂娉ㄥ唽: {}", e.getMessage());
+ }
+ }
+
@Override
- public void offline(ParentPlatform parentPlatform) {
+ public void offline(ParentPlatform parentPlatform, boolean stopRegister) {
logger.info("[骞冲彴绂荤嚎]锛歿}", parentPlatform.getServerGBId());
ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
parentPlatformCatch.setKeepAliveReply(0);
@@ -212,11 +246,13 @@
// 鍋滄鎵�鏈夋帹娴�
logger.info("[骞冲彴绂荤嚎] {}, 鍋滄鎵�鏈夋帹娴�", parentPlatform.getServerGBId());
stopAllPush(parentPlatform.getServerGBId());
- // 娓呴櫎娉ㄥ唽瀹氭椂
- logger.info("[骞冲彴绂荤嚎] {}, 鍋滄瀹氭椂娉ㄥ唽浠诲姟", parentPlatform.getServerGBId());
- final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId();
- if (dynamicTask.contains(registerTaskKey)) {
- dynamicTask.stop(registerTaskKey);
+ if (stopRegister) {
+ // 娓呴櫎娉ㄥ唽瀹氭椂
+ logger.info("[骞冲彴绂荤嚎] {}, 鍋滄瀹氭椂娉ㄥ唽浠诲姟", parentPlatform.getServerGBId());
+ final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId();
+ if (dynamicTask.contains(registerTaskKey)) {
+ dynamicTask.stop(registerTaskKey);
+ }
}
// 娓呴櫎蹇冭烦瀹氭椂
logger.info("[骞冲彴绂荤嚎] {}, 鍋滄瀹氭椂鍙戦�佸績璺充换鍔�", parentPlatform.getServerGBId());
@@ -296,4 +332,137 @@
}
}
}
+
+ @Override
+ public void broadcastInvite(ParentPlatform platform, String channelId, MediaServerItem mediaServerItem, ZlmHttpHookSubscribe.Event hookEvent,
+ SipSubscribe.Event errorEvent, InviteTimeOutCallback timeoutCallback) throws InvalidArgumentException, ParseException, SipException {
+
+ if (mediaServerItem == null) {
+ logger.info("[鍥芥爣绾ц仈] 璇煶鍠婅瘽鏈壘鍒板彲鐢ㄧ殑zlm. platform: {}", platform.getServerGBId());
+ return;
+ }
+ StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(platform.getServerGBId(), channelId);
+ if (streamInfo != null) {
+ // 濡傛灉zlm涓嶅瓨鍦ㄨ繖涓祦锛屽垯鍒犻櫎鏁版嵁鍗冲彲
+ MediaServerItem mediaServerItemForStreamInfo = mediaServerService.getOne(streamInfo.getMediaServerId());
+ if (mediaServerItemForStreamInfo != null) {
+ Boolean ready = zlmrtpServerFactory.isStreamReady(mediaServerItemForStreamInfo, streamInfo.getApp(), streamInfo.getStream());
+ if (!ready) {
+ // 閿欒瀛樺湪浜巖edis涓殑鏁版嵁
+ redisCatchStorage.stopPlay(streamInfo);
+ }else {
+ // 娴佺‘瀹炲皻鍦ㄦ帹娴侊紝鐩存帴鍥炶皟缁撴灉
+ JSONObject json = new JSONObject();
+ json.put("app", streamInfo.getApp());
+ json.put("stream", streamInfo.getStream());
+ hookEvent.response(mediaServerItemForStreamInfo, json);
+ return;
+ }
+ }
+ }
+
+ String streamId = null;
+ if (mediaServerItem.isRtpEnable()) {
+ streamId = String.format("%s_%s", platform.getServerGBId(), channelId);
+ }
+ // 榛樿涓嶈繘琛孲SRC鏍¢獙锛� TODO 鍚庣画鍙敼涓洪厤缃�
+ boolean ssrcCheck = false;
+ SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, ssrcCheck, false);
+ if (ssrcInfo == null || ssrcInfo.getPort() < 0) {
+ logger.info("[鍥芥爣绾ц仈] 鍙戣捣璇煶鍠婅瘽 寮�鍚鍙g洃鍚け璐ワ紝 platform: {}, channel锛� {}", platform.getServerGBId(), channelId);
+ errorEvent.response(new SipSubscribe.EventResult(-1, "绔彛鐩戝惉澶辫触"));
+ return;
+ }
+ logger.info("[鍥芥爣绾ц仈] 鍙戣捣璇煶鍠婅瘽 deviceId: {}, channelId: {},鏀舵祦绔彛锛� {}, 鏀舵祦妯″紡锛歿}, SSRC: {}, SSRC鏍¢獙锛歿}",
+ platform.getServerGBId(), channelId, ssrcInfo.getPort(), userSetting.getBroadcastForPlatform(), ssrcInfo.getSsrc(), ssrcCheck);
+
+ String timeOutTaskKey = UUID.randomUUID().toString();
+ dynamicTask.startDelay(timeOutTaskKey, () -> {
+ // 鎵ц瓒呮椂浠诲姟鏃舵煡璇㈡槸鍚﹀凡缁忔垚鍔燂紝鎴愬姛浜嗗垯涓嶆墽琛岃秴鏃朵换鍔★紝闃叉瓒呮椂浠诲姟鍙栨秷澶辫触鐨勬儏鍐�
+ if (redisCatchStorage.queryPlayByDevice(platform.getServerGBId(), channelId) == null) {
+ logger.info("[鍥芥爣绾ц仈] 鍙戣捣璇煶鍠婅瘽 鏀舵祦瓒呮椂 deviceId: {}, channelId: {}锛岀鍙o細{}, SSRC: {}", platform.getServerGBId(), channelId, ssrcInfo.getPort(), ssrcInfo.getSsrc());
+ // 鐐规挱瓒呮椂鍥炲BYE 鍚屾椂閲婃斁ssrc浠ュ強姝ゆ鐐规挱鐨勮祫婧�
+ try {
+ commanderForPlatform.streamByeCmd(platform, channelId, ssrcInfo.getStream(), null, null);
+ } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
+ logger.error("[鐐规挱瓒呮椂]锛� 鍙戦�丅YE澶辫触 {}", e.getMessage());
+ } finally {
+ timeoutCallback.run(1, "鏀舵祦瓒呮椂");
+ mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
+ mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
+ streamSession.remove(platform.getServerGBId(), channelId, ssrcInfo.getStream());
+ mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
+ }
+ }
+ }, userSetting.getPlayTimeout());
+ commanderForPlatform.broadcastInviteCmd(platform, channelId, mediaServerItem, ssrcInfo, (mediaServerItemForInvite, response)->{
+ dynamicTask.stop(timeOutTaskKey);
+ // hook鍝嶅簲
+ playService.onPublishHandlerForPlay(mediaServerItemForInvite, response, platform.getServerGBId(), channelId);
+ // 鏀跺埌娴�
+ if (hookEvent != null) {
+ hookEvent.response(mediaServerItem, response);
+ }
+ }, event -> {
+ // 鏀跺埌200OK 妫�娴媠src鏄惁鏈夊彉鍖栵紝闃叉涓婄骇鑷畾涔変簡ssrc
+ ResponseEvent responseEvent = (ResponseEvent) event.event;
+ String contentString = new String(responseEvent.getResponse().getRawContent());
+ // 鑾峰彇ssrc
+ int ssrcIndex = contentString.indexOf("y=");
+ // 妫�鏌ユ槸鍚︽湁y瀛楁
+ if (ssrcIndex >= 0) {
+ //ssrc瑙勫畾闀垮害涓�10瀛楄妭锛屼笉鍙栦綑涓嬮暱搴︿互閬垮厤鍚庣画杩樻湁鈥渇=鈥濆瓧娈� TODO 鍚庣画瀵逛笉瑙勮寖鐨勯潪10浣峴src鍏煎
+ String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
+ // 鏌ヨ鍒皊src涓嶄竴鑷翠笖寮�鍚簡ssrc鏍¢獙鍒欓渶瑕侀拡瀵瑰鐞�
+ if (ssrcInfo.getSsrc().equals(ssrcInResponse) || ssrcCheck) {
+ return;
+ }
+ logger.info("[鐐规挱娑堟伅] 鏀跺埌invite 200, 鍙戠幇涓嬬骇鑷畾涔変簡ssrc: {}", ssrcInResponse);
+ if (!mediaServerItem.isRtpEnable()) {
+ logger.info("[鐐规挱娑堟伅] SSRC淇 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
+
+ if (!mediaServerItem.getSsrcConfig().checkSsrc(ssrcInResponse)) {
+ // ssrc 涓嶅彲鐢�
+ // 閲婃斁ssrc
+ mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
+ streamSession.remove(platform.getServerGBId(), channelId, ssrcInfo.getStream());
+ event.msg = "涓嬬骇鑷畾涔変簡ssrc,浣嗘槸姝src涓嶅彲鐢�";
+ event.statusCode = 400;
+ errorEvent.response(event);
+ return;
+ }
+
+ // 鍗曠鍙fā寮弒treamId涔熸湁鍙樺寲锛岄渶瑕侀噸鏂拌缃洃鍚�
+ if (!mediaServerItem.isRtpEnable()) {
+ // 娣诲姞璁㈤槄
+ HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
+ subscribe.removeSubscribe(hookSubscribe);
+ hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());
+ subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> {
+ logger.info("[ZLM HOOK] ssrc淇鍚庢敹鍒拌闃呮秷鎭細 " + response.toJSONString());
+ dynamicTask.stop(timeOutTaskKey);
+ // hook鍝嶅簲
+ playService.onPublishHandlerForPlay(mediaServerItemInUse, response, platform.getServerGBId(), channelId);
+ hookEvent.response(mediaServerItemInUse, response);
+ });
+ }
+ // 鍏抽棴rtp server
+ mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
+ // 閲嶆柊寮�鍚痵src server
+ mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, false, false, ssrcInfo.getPort());
+
+ }
+ }
+ }, eventResult -> {
+ // 鏀跺埌閿欒鍥炲
+ if (errorEvent != null) {
+ errorEvent.response(eventResult);
+ }
+ });
+ }
+
+ @Override
+ public void stopBroadcast(ParentPlatform platform, String channelId, String stream) throws InvalidArgumentException, ParseException, SsrcTransactionNotFoundException, SipException {
+ commanderForPlatform.streamByeCmd(platform, channelId, stream, null, null);
+ }
}
--
Gitblit v1.8.0