From 26cd7dbaf9394ff26f5229b18cfdb72c203a6bc3 Mon Sep 17 00:00:00 2001 From: gaofw189 <gaofw189@chinatelecom.cn> Date: 星期二, 10 一月 2023 15:55:56 +0800 Subject: [PATCH] 修复WVP作为下级平台接收DeviceStatus指令固定响应的问题 --- src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java | 136 +++++++++++++++++++++++++++------------------ 1 files changed, 81 insertions(+), 55 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 6392ada..f2ae60f 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -1,41 +1,39 @@ package com.genersoft.iot.vmp.media.zlm; -import java.text.ParseException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; -import com.genersoft.iot.vmp.media.zlm.dto.*; +import com.genersoft.iot.vmp.media.zlm.dto.HookType; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; +import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.media.zlm.dto.hook.*; import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; +import com.genersoft.iot.vmp.vmanager.bean.StreamContent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.util.ObjectUtils; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestBody; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.ResponseBody; -import org.springframework.web.bind.annotation.RestController; - -import com.alibaba.fastjson2.JSONObject; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; +import org.springframework.web.bind.annotation.*; import javax.servlet.http.HttpServletRequest; import javax.sip.InvalidArgumentException; import javax.sip.SipException; +import java.text.ParseException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; /** * @description:閽堝 ZLMediaServer鐨刪ook浜嬩欢鐩戝惉 @@ -183,42 +181,45 @@ JSONObject ret = new JSONObject(); String mediaServerId = json.getString("mediaServerId"); MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId); + if (!"rtp".equals(param.getApp())) { - // 鎺ㄦ祦閴存潈 - if (param.getParams() == null) { - logger.info("鎺ㄦ祦閴存潈澶辫触锛� 缂哄皯涓嶈鍙傛暟锛歴ign=md5(user琛ㄧ殑pushKey)"); - ret.put("code", 401); - ret.put("msg", "Unauthorized"); - return ret; - } - Map<String, String> paramMap = urlParamToMap(param.getParams()); - String sign = paramMap.get("sign"); - if (sign == null) { - logger.info("鎺ㄦ祦閴存潈澶辫触锛� 缂哄皯涓嶈鍙傛暟锛歴ign=md5(user琛ㄧ殑pushKey)"); - ret.put("code", 401); - ret.put("msg", "Unauthorized"); - return ret; - } - // 鎺ㄦ祦鑷畾涔夋挱鏀鹃壌鏉冪爜 - String callId = paramMap.get("callId"); - // 閴存潈閰嶇疆 - boolean hasAuthority = userService.checkPushAuthority(callId, sign); - if (!hasAuthority) { - logger.info("鎺ㄦ祦閴存潈澶辫触锛� sign 鏃犳潈闄�: callId={}. sign={}", callId, sign); - ret.put("code", 401); - ret.put("msg", "Unauthorized"); - return ret; - } - StreamAuthorityInfo streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(param); - streamAuthorityInfo.setCallId(callId); - streamAuthorityInfo.setSign(sign); - // 閴存潈閫氳繃 - redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), param.getStream(), streamAuthorityInfo); - // 閫氱煡assist鏂扮殑callId - if (mediaInfo != null && mediaInfo.getRecordAssistPort() > 0) { - taskExecutor.execute(()->{ - assistRESTfulUtils.addStreamCallInfo(mediaInfo, param.getApp(), param.getStream(), callId, null); - }); + if (userSetting.getPushAuthority()) { + // 鎺ㄦ祦閴存潈 + if (param.getParams() == null) { + logger.info("鎺ㄦ祦閴存潈澶辫触锛� 缂哄皯涓嶈鍙傛暟锛歴ign=md5(user琛ㄧ殑pushKey)"); + ret.put("code", 401); + ret.put("msg", "Unauthorized"); + return ret; + } + Map<String, String> paramMap = urlParamToMap(param.getParams()); + String sign = paramMap.get("sign"); + if (sign == null) { + logger.info("鎺ㄦ祦閴存潈澶辫触锛� 缂哄皯涓嶈鍙傛暟锛歴ign=md5(user琛ㄧ殑pushKey)"); + ret.put("code", 401); + ret.put("msg", "Unauthorized"); + return ret; + } + // 鎺ㄦ祦鑷畾涔夋挱鏀鹃壌鏉冪爜 + String callId = paramMap.get("callId"); + // 閴存潈閰嶇疆 + boolean hasAuthority = userService.checkPushAuthority(callId, sign); + if (!hasAuthority) { + logger.info("鎺ㄦ祦閴存潈澶辫触锛� sign 鏃犳潈闄�: callId={}. sign={}", callId, sign); + ret.put("code", 401); + ret.put("msg", "Unauthorized"); + return ret; + } + StreamAuthorityInfo streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(param); + streamAuthorityInfo.setCallId(callId); + streamAuthorityInfo.setSign(sign); + // 閴存潈閫氳繃 + redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), param.getStream(), streamAuthorityInfo); + // 閫氱煡assist鏂扮殑callId + if (mediaInfo != null && mediaInfo.getRecordAssistPort() > 0) { + taskExecutor.execute(()->{ + assistRESTfulUtils.addStreamCallInfo(mediaInfo, param.getApp(), param.getStream(), callId, null); + }); + } } }else { zlmMediaListManager.sendStreamEvent(param.getApp(),param.getStream(), param.getMediaServerId()); @@ -226,7 +227,6 @@ ret.put("code", 0); ret.put("msg", "success"); - ret.put("enable_hls", false); if (!"rtp".equals(param.getApp())) { ret.put("enable_audio", true); @@ -348,7 +348,7 @@ } StreamInfo streamInfoByAppAndStream = mediaService.getStreamInfoByAppAndStream(mediaServerItem, param.getApp(), param.getStream(), tracks, callId); - param.setStreamInfo(streamInfoByAppAndStream); + param.setStreamInfo(new StreamContent(streamInfoByAppAndStream)); redisCatchStorage.addStream(mediaServerItem, type, param.getApp(), param.getStream(), param); if (param.getOriginType() == OriginType.RTSP_PUSH.ordinal() || param.getOriginType() == OriginType.RTMP_PUSH.ordinal() @@ -365,7 +365,7 @@ } GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream()); if (gbStream != null) { -// eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF); +// eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF); } zlmMediaListManager.removeMedia(param.getApp(), param.getStream()); } @@ -424,7 +424,7 @@ logger.info("[ZLM HOOK]娴佹棤浜鸿鐪嬶細{]->{}->{}/{}" + param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream()); JSONObject ret = new JSONObject(); ret.put("code", 0); - // 褰曞儚涓嬭浇 + // 鍥芥爣绫诲瀷鐨勬祦 if ("rtp".equals(param.getApp())){ ret.put("close", userSetting.getStreamOnDemand()); // 鍥芥爣娴侊紝 鐐规挱/褰曞儚鍥炴斁/褰曞儚涓嬭浇 @@ -506,6 +506,7 @@ // 淇敼鏁版嵁 streamProxyService.stop(param.getApp(), param.getStream()); }else { + // 鏃犱汉瑙傜湅涓嶅仛澶勭悊 ret.put("close", false); } return ret; @@ -527,7 +528,7 @@ @ResponseBody @PostMapping(value = "/on_stream_not_found", produces = "application/json;charset=UTF-8") public JSONObject onStreamNotFound(@RequestBody OnStreamNotFoundHookParam param){ - logger.info("[ZLM HOOK] 娴佹湭鎵惧埌锛歿}->{}->{}/{}" + param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream()); + logger.info("[ZLM HOOK] 娴佹湭鎵惧埌锛歿}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream()); taskExecutor.execute(()->{ MediaServerItem mediaInfo = mediaServerService.getOne(param.getMediaServerId()); if (userSetting.isAutoApplyPlay() && mediaInfo != null) { @@ -595,7 +596,7 @@ @PostMapping(value = "/on_send_rtp_stopped", produces = "application/json;charset=UTF-8") public JSONObject onSendRtpStopped(HttpServletRequest request, @RequestBody OnSendRtpStoppedHookParam param){ - logger.info("[ZLM HOOK] 鍙戦�乺tp琚姩鍏抽棴锛歿}->{}/{}", param.getMediaServerId(), param.getApp(), param.getStream()); + logger.info("[ZLM HOOK] rtp鍙戦�佸叧闂細{}->{}/{}", param.getMediaServerId(), param.getApp(), param.getStream()); JSONObject ret = new JSONObject(); ret.put("code", 0); @@ -625,6 +626,31 @@ return ret; } + /** + * rtpServer鏀舵祦瓒呮椂 + */ + @ResponseBody + @PostMapping(value = "/on_rtp_server_timeout", produces = "application/json;charset=UTF-8") + public JSONObject onRtpServerTimeout(HttpServletRequest request, @RequestBody OnRtpServerTimeoutHookParam param){ + logger.info("[ZLM HOOK] rtpServer鏀舵祦瓒呮椂锛歿}->{}({})", param.getMediaServerId(), param.getStream_id(), param.getSsrc()); + + JSONObject ret = new JSONObject(); + ret.put("code", 0); + ret.put("msg", "success"); + + taskExecutor.execute(()->{ + JSONObject json = (JSONObject) JSON.toJSON(param); + List<ZlmHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_rtp_server_timeout); + if (subscribes != null && subscribes.size() > 0) { + for (ZlmHttpHookSubscribe.Event subscribe : subscribes) { + subscribe.response(null, json); + } + } + }); + + return ret; + } + private Map<String, String> urlParamToMap(String params) { HashMap<String, String> map = new HashMap<>(); if (ObjectUtils.isEmpty(params)) { -- Gitblit v1.8.0