From 3e2486d0abd2063afa9f8fab93dc60db774298af Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期二, 08 八月 2023 09:50:31 +0800 Subject: [PATCH] Merge branch '2.6.8' into wvp-28181-2.0 --- src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java | 5 src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java | 303 +++++++++++++++++++++++++++++++++++++ src/main/java/com/genersoft/iot/vmp/vmanager/bean/OtherPsSendInfo.java | 137 +++++++++++++++++ src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java | 2 4 files changed, 446 insertions(+), 1 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java index a90670d..130f49d 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java +++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java @@ -158,7 +158,9 @@ public static final String WVP_STREAM_GB_ID_PREFIX = "memberNo_"; public static final String WVP_STREAM_GPS_MSG_PREFIX = "WVP_STREAM_GPS_MSG_"; public static final String WVP_OTHER_SEND_RTP_INFO = "VMP_OTHER_SEND_RTP_INFO_"; + public static final String WVP_OTHER_SEND_PS_INFO = "VMP_OTHER_SEND_PS_INFO_"; public static final String WVP_OTHER_RECEIVE_RTP_INFO = "VMP_OTHER_RECEIVE_RTP_INFO_"; + public static final String WVP_OTHER_RECEIVE_PS_INFO = "VMP_OTHER_RECEIVE_PS_INFO_"; /** * Redis Const diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 71bb5b8..0d6e69e 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -298,7 +298,10 @@ if (param.getApp().equalsIgnoreCase("rtp")) { String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_" + param.getStream(); OtherRtpSendInfo otherRtpSendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(receiveKey); - if (otherRtpSendInfo != null) { + + String receiveKeyForPS = VideoManagerConstants.WVP_OTHER_RECEIVE_PS_INFO + userSetting.getServerId() + "_" + param.getStream(); + OtherPsSendInfo otherPsSendInfo = (OtherPsSendInfo)redisTemplate.opsForValue().get(receiveKeyForPS); + if (otherRtpSendInfo != null || otherPsSendInfo != null) { result.setEnable_mp4(true); } } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/bean/OtherPsSendInfo.java b/src/main/java/com/genersoft/iot/vmp/vmanager/bean/OtherPsSendInfo.java new file mode 100644 index 0000000..ac98409 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/bean/OtherPsSendInfo.java @@ -0,0 +1,137 @@ +package com.genersoft.iot.vmp.vmanager.bean; + +public class OtherPsSendInfo { + + /** + * 鍙戞祦IP + */ + private String sendLocalIp; + + /** + * 鍙戞祦绔彛 + */ + private int sendLocalPort; + + /** + * 鏀舵祦IP + */ + private String receiveIp; + + /** + * 鏀舵祦绔彛 + */ + private int receivePort; + + + /** + * 浼氳瘽ID + */ + private String callId; + + /** + * 娴両D + */ + private String stream; + + /** + * 鎺ㄦ祦搴旂敤鍚� + */ + private String pushApp; + + /** + * 鎺ㄦ祦娴両D + */ + private String pushStream; + + /** + * 鎺ㄦ祦SSRC + */ + private String pushSSRC; + + public String getSendLocalIp() { + return sendLocalIp; + } + + public void setSendLocalIp(String sendLocalIp) { + this.sendLocalIp = sendLocalIp; + } + + public int getSendLocalPort() { + return sendLocalPort; + } + + public void setSendLocalPort(int sendLocalPort) { + this.sendLocalPort = sendLocalPort; + } + + public String getReceiveIp() { + return receiveIp; + } + + public void setReceiveIp(String receiveIp) { + this.receiveIp = receiveIp; + } + + public int getReceivePort() { + return receivePort; + } + + public void setReceivePort(int receivePort) { + this.receivePort = receivePort; + } + + public String getCallId() { + return callId; + } + + public void setCallId(String callId) { + this.callId = callId; + } + + public String getStream() { + return stream; + } + + public void setStream(String stream) { + this.stream = stream; + } + + public String getPushApp() { + return pushApp; + } + + public void setPushApp(String pushApp) { + this.pushApp = pushApp; + } + + public String getPushStream() { + return pushStream; + } + + public void setPushStream(String pushStream) { + this.pushStream = pushStream; + } + + public String getPushSSRC() { + return pushSSRC; + } + + public void setPushSSRC(String pushSSRC) { + this.pushSSRC = pushSSRC; + } + + @Override + public String toString() { + return "OtherPsSendInfo{" + + "sendLocalIp='" + sendLocalIp + '\'' + + ", sendLocalPort=" + sendLocalPort + + ", receiveIp='" + receiveIp + '\'' + + ", receivePort=" + receivePort + + ", callId='" + callId + '\'' + + ", stream='" + stream + '\'' + + ", pushApp='" + pushApp + '\'' + + ", pushStream='" + pushStream + '\'' + + ", pushSSRC='" + pushSSRC + '\'' + + '}'; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java new file mode 100644 index 0000000..a2fd81b --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java @@ -0,0 +1,303 @@ +package com.genersoft.iot.vmp.vmanager.ps; + +import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.common.VideoManagerConstants; +import com.genersoft.iot.vmp.conf.DynamicTask; +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.conf.exception.ControllerException; +import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; +import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.utils.redis.RedisUtil; +import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; +import com.genersoft.iot.vmp.vmanager.bean.OtherPsSendInfo; +import com.genersoft.iot.vmp.vmanager.bean.OtherRtpSendInfo; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.tags.Tag; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.web.bind.annotation.*; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +@SuppressWarnings("rawtypes") +@Tag(name = "绗笁鏂筆S鏈嶅姟瀵规帴") + +@RestController +@RequestMapping("/api/ps") +public class PsController { + + private final static Logger logger = LoggerFactory.getLogger(PsController.class); + + @Autowired + private ZLMRTPServerFactory zlmServerFactory; + + @Autowired + private ZlmHttpHookSubscribe hookSubscribe; + + @Autowired + private IMediaServerService mediaServerService; + + @Autowired + private SendRtpPortManager sendRtpPortManager; + + @Autowired + private UserSetting userSetting; + + @Autowired + private DynamicTask dynamicTask; + + + @Autowired + private RedisTemplate<Object, Object> redisTemplate; + + + @GetMapping(value = "/receive/open") + @ResponseBody + @Operation(summary = "寮�鍚敹娴佸拰鑾峰彇鍙戞祦淇℃伅") + @Parameter(name = "isSend", description = "鏄惁鍙戦�侊紝false鏃跺彧寮�鍚敹娴侊紝 true鍚屾椂杩斿洖鎺ㄦ祦淇℃伅", required = true) + @Parameter(name = "callId", description = "鏁翠釜杩囩▼鐨勫敮涓�鏍囪瘑锛屼负浜嗕笌鍚庣画鎺ュ彛鍏宠仈", required = true) + @Parameter(name = "ssrc", description = "鏉ユ簮娴佺殑SSRC锛屼笉浼犲垯涓嶆牎楠屾潵婧恠src", required = false) + @Parameter(name = "stream", description = "褰㈡垚鐨勬祦鐨処D", required = true) + @Parameter(name = "tcpMode", description = "鏀舵祦妯″紡锛� 0涓篣DP锛� 1涓篢CP琚姩", required = true) + @Parameter(name = "callBack", description = "鍥炶皟鍦板潃锛屽鏋滄敹娴佽秴鏃朵細閫氶亾鍥炶皟閫氱煡锛屽洖璋冧负get璇锋眰锛屽弬鏁颁负callId", required = true) + public OtherPsSendInfo openRtpServer(Boolean isSend, @RequestParam(required = false)String ssrc, String callId, String stream, Integer tcpMode, String callBack) { + + logger.info("[绗笁鏂筆S鏈嶅姟瀵规帴->寮�鍚敹娴佸拰鑾峰彇鍙戞祦淇℃伅] isSend->{}, ssrc->{}, callId->{}, stream->{}, tcpMode->{}, callBack->{}", + isSend, ssrc, callId, stream, tcpMode==0?"UDP":"TCP琚姩", callBack); + + MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer(); + if (mediaServerItem == null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(),"娌℃湁鍙敤鐨凪ediaServer"); + } + if (stream == null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(),"stream鍙傛暟涓嶅彲涓虹┖"); + } + if (isSend != null && isSend && callId == null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(),"isSend涓簍rue鏃讹紝CallID涓嶈兘涓虹┖"); + } + int ssrcInt = 0; + if (ssrc != null) { + try { + ssrcInt = Integer.parseInt(ssrc); + }catch (NumberFormatException e) { + throw new ControllerException(ErrorCode.ERROR100.getCode(),"ssrc鏍煎紡閿欒"); + } + } + String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_PS_INFO + userSetting.getServerId() + "_" + callId + "_" + stream; + int localPort = zlmServerFactory.createRTPServer(mediaServerItem, stream, ssrcInt, null, false, tcpMode); + if (localPort == 0) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "鑾峰彇绔彛澶辫触"); + } + // 娉ㄥ唽鍥炶皟濡傛灉rtp鏀舵祦瓒呮椂鍒欓�氳繃鍥炶皟鍙戦�侀�氱煡 + if (callBack != null) { + HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(stream, String.valueOf(ssrcInt), mediaServerItem.getId()); + // 璁㈤槄 zlm鍚姩浜嬩欢, 鏂扮殑zlm涔熶細浠庤繖閲岃繘鍏ョ郴缁� + hookSubscribe.addSubscribe(hookSubscribeForRtpServerTimeout, + (mediaServerItemInUse, response)->{ + if (stream.equals(response.getString("stream_id"))) { + logger.info("[绗笁鏂筆S鏈嶅姟瀵规帴->寮�鍚敹娴佸拰鑾峰彇鍙戞祦淇℃伅] 绛夊緟鏀舵祦瓒呮椂 callId->{}, 鍙戦�佸洖璋�", callId); + // 灏嗕俊鎭啓鍏edis涓紝浠ュ鍚庣敤 + redisTemplate.delete(receiveKey); + OkHttpClient.Builder httpClientBuilder = new OkHttpClient.Builder(); + OkHttpClient client = httpClientBuilder.build(); + String url = callBack + "?callId=" + callId; + Request request = new Request.Builder().get().url(url).build(); + try { + client.newCall(request).execute(); + } catch (IOException e) { + logger.error("[绗笁鏂筆S鏈嶅姟瀵规帴->寮�鍚敹娴佸拰鑾峰彇鍙戞祦淇℃伅] 绛夊緟鏀舵祦瓒呮椂 callId->{}, 鍙戦�佸洖璋冨け璐�", callId, e); + } + hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout); + } + }); + } + OtherPsSendInfo otherPsSendInfo = new OtherPsSendInfo(); + otherPsSendInfo.setReceiveIp(mediaServerItem.getSdpIp()); + otherPsSendInfo.setReceivePort(localPort); + otherPsSendInfo.setCallId(callId); + otherPsSendInfo.setStream(stream); + + // 灏嗕俊鎭啓鍏edis涓紝浠ュ鍚庣敤 + redisTemplate.opsForValue().set(receiveKey, otherPsSendInfo); + if (isSend != null && isSend) { + String key = VideoManagerConstants.WVP_OTHER_SEND_PS_INFO + userSetting.getServerId() + "_" + callId; + // 棰勫垱寤哄彂娴佷俊鎭� + int port = sendRtpPortManager.getNextPort(mediaServerItem); + + otherPsSendInfo.setSendLocalIp(mediaServerItem.getSdpIp()); + otherPsSendInfo.setSendLocalPort(port); + // 灏嗕俊鎭啓鍏edis涓紝浠ュ鍚庣敤 + redisTemplate.opsForValue().set(key, otherPsSendInfo, 300, TimeUnit.SECONDS); + logger.info("[绗笁鏂筆S鏈嶅姟瀵规帴->寮�鍚敹娴佸拰鑾峰彇鍙戞祦淇℃伅] 缁撴灉锛宑allId->{}锛� {}", callId, otherPsSendInfo); + } + return otherPsSendInfo; + } + + @GetMapping(value = "/receive/close") + @ResponseBody + @Operation(summary = "鍏抽棴鏀舵祦") + @Parameter(name = "stream", description = "娴佺殑ID", required = true) + public void closeRtpServer(String stream) { + logger.info("[绗笁鏂筆S鏈嶅姟瀵规帴->鍏抽棴鏀舵祦] stream->{}", stream); + MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer(); + zlmServerFactory.closeRtpServer(mediaServerItem,stream); + String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_PS_INFO + userSetting.getServerId() + "_*_" + stream; + List<Object> scan = RedisUtil.scan(redisTemplate, receiveKey); + if (scan.size() > 0) { + for (Object key : scan) { + // 灏嗕俊鎭啓鍏edis涓紝浠ュ鍚庣敤 + redisTemplate.delete(key); + } + } + } + + @GetMapping(value = "/send/start") + @ResponseBody + @Operation(summary = "鍙戦�佹祦") + @Parameter(name = "ssrc", description = "鍙戦�佹祦鐨凷SRC", required = true) + @Parameter(name = "dstIp", description = "鐩爣鏀舵祦IP", required = true) + @Parameter(name = "dstPort", description = "鐩爣鏀舵祦绔彛", required = true) + @Parameter(name = "app", description = "寰呭彂閫佸簲鐢ㄥ悕", required = true) + @Parameter(name = "stream", description = "寰呭彂閫佹祦Id", required = true) + @Parameter(name = "callId", description = "鏁翠釜杩囩▼鐨勫敮涓�鏍囪瘑锛屼笉浼犲垯浣跨敤闅忔満绔彛鍙戞祦", required = true) + @Parameter(name = "isUdp", description = "鏄惁涓篣DP", required = true) + public void sendRTP(String ssrc, + String dstIp, + Integer dstPort, + String app, + String stream, + String callId, + Boolean isUdp + ) { + logger.info("[绗笁鏂筆S鏈嶅姟瀵规帴->鍙戦�佹祦] " + + "ssrc->{}, \r\n" + + "dstIp->{}, \n" + + "dstPort->{}, \n" + + "app->{}, \n" + + "stream->{}, \n" + + "callId->{} \n", + ssrc, + dstIp, + dstPort, + app, + stream, + callId); + MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer(); + String key = VideoManagerConstants.WVP_OTHER_SEND_PS_INFO + userSetting.getServerId() + "_" + callId; + OtherRtpSendInfo sendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(key); + if (sendInfo == null) { + sendInfo = new OtherRtpSendInfo(); + } + sendInfo.setPushApp(app); + sendInfo.setPushStream(stream); + sendInfo.setPushSSRC(ssrc); + + Map<String, Object> param; + + + param = new HashMap<>(); + param.put("vhost","__defaultVhost__"); + param.put("app",app); + param.put("stream",stream); + param.put("ssrc", ssrc); + + param.put("dst_url", dstIp); + param.put("dst_port", dstPort); + String is_Udp = isUdp ? "1" : "0"; + param.put("is_udp", is_Udp); + param.put("src_port", sendInfo.getSendLocalPortForAudio()); + param.put("use_ps", "0"); + param.put("only_audio", "1"); + + + Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, app, stream); + if (streamReady) { + JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, param); + if (jsonObject.getInteger("code") == 0) { + logger.info("[绗笁鏂筆S鏈嶅姟瀵规帴->鍙戦�佹祦] 瑙嗛娴佸彂娴佹垚鍔燂紝callId->{}锛宲aram->{}", callId, param); + redisTemplate.opsForValue().set(key, sendInfo); + }else { + redisTemplate.delete(key); + logger.info("[绗笁鏂筆S鏈嶅姟瀵规帴->鍙戦�佹祦] 瑙嗛娴佸彂娴佸け璐ワ紝callId->{}, {}", callId, jsonObject.getString("msg")); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "[瑙嗛娴佸彂娴佸け璐 " + jsonObject.getString("msg")); + } + }else { + logger.info("[绗笁鏂筆S鏈嶅姟瀵规帴->鍙戦�佹祦] 娴佷笉瀛樺湪锛岀瓑寰呮祦涓婄嚎锛宑allId->{}", callId); + String uuid = UUID.randomUUID().toString(); + HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(app, stream, true, "rtsp", mediaServerItem.getId()); + dynamicTask.startDelay(uuid, ()->{ + logger.info("[绗笁鏂筆S鏈嶅姟瀵规帴->鍙戦�佹祦] 绛夊緟娴佷笂绾胯秴鏃� callId->{}", callId); + redisTemplate.delete(key); + hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); + }, 10000); + + // 璁㈤槄 zlm鍚姩浜嬩欢, 鏂扮殑zlm涔熶細浠庤繖閲岃繘鍏ョ郴缁� + OtherRtpSendInfo finalSendInfo = sendInfo; + hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); + hookSubscribe.addSubscribe(hookSubscribeForStreamChange, + (mediaServerItemInUse, response)->{ + dynamicTask.stop(uuid); + logger.info("[绗笁鏂筆S鏈嶅姟瀵规帴->鍙戦�佹祦] 娴佷笂绾匡紝寮�濮嬪彂娴� callId->{}", callId); + try { + Thread.sleep(400); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, param); + if (jsonObject.getInteger("code") == 0) { + logger.info("[绗笁鏂筆S鏈嶅姟瀵规帴->鍙戦�佹祦] 瑙嗛娴佸彂娴佹垚鍔燂紝callId->{}锛宲aram->{}", callId, param); + redisTemplate.opsForValue().set(key, finalSendInfo); + }else { + redisTemplate.delete(key); + logger.info("[绗笁鏂筆S鏈嶅姟瀵规帴->鍙戦�佹祦] 瑙嗛娴佸彂娴佸け璐ワ紝callId->{}, {}", callId, jsonObject.getString("msg")); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "[瑙嗛娴佸彂娴佸け璐 " + jsonObject.getString("msg")); + } + hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); + }); + } + } + + @GetMapping(value = "/send/stop") + @ResponseBody + @Operation(summary = "鍏抽棴鍙戦�佹祦") + @Parameter(name = "callId", description = "鏁翠釜杩囩▼鐨勫敮涓�鏍囪瘑锛屼笉浼犲垯浣跨敤闅忔満绔彛鍙戞祦", required = true) + public void closeSendRTP(String callId) { + logger.info("[绗笁鏂筆S鏈嶅姟瀵规帴->鍏抽棴鍙戦�佹祦] callId->{}", callId); + String key = VideoManagerConstants.WVP_OTHER_SEND_PS_INFO + userSetting.getServerId() + "_" + callId; + OtherRtpSendInfo sendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(key); + if (sendInfo == null){ + throw new ControllerException(ErrorCode.ERROR100.getCode(), "鏈紑鍚彂娴�"); + } + Map<String, Object> param = new HashMap<>(); + param.put("vhost","__defaultVhost__"); + param.put("app",sendInfo.getPushApp()); + param.put("stream",sendInfo.getPushStream()); + param.put("ssrc",sendInfo.getPushSSRC()); + MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer(); + Boolean result = zlmServerFactory.stopSendRtpStream(mediaServerItem, param); + if (!result) { + logger.info("[绗笁鏂筆S鏈嶅姟瀵规帴->鍏抽棴鍙戦�佹祦] 澶辫触 callId->{}", callId); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "鍋滄鍙戞祦澶辫触"); + }else { + logger.info("[绗笁鏂筆S鏈嶅姟瀵规帴->鍏抽棴鍙戦�佹祦] 鎴愬姛 callId->{}", callId); + } + redisTemplate.delete(key); + } + +} -- Gitblit v1.8.0