From c6dfb63f8fd5f04fa00ac6c45da2eb6bcc5cada4 Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期一, 25 三月 2024 23:59:50 +0800
Subject: [PATCH] 优化hook通知
---
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/HookResult.java | 4
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java | 42 +++
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java | 81 +++++
src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java | 31 ++
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java | 190 +++++++++++++
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/HookResultForOnPublish.java | 13
src/main/java/com/genersoft/iot/vmp/media/event/MediaDepartureEvent.java | 65 ++++
src/main/java/com/genersoft/iot/vmp/media/bean/ResultForOnPublish.java | 59 ++++
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamAuthorityInfo.java | 23
src/main/java/com/genersoft/iot/vmp/media/event/MediaArrivalEvent.java | 75 +++++
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java | 138 +--------
src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java | 34 ++
src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java | 26 +
src/main/java/com/genersoft/iot/vmp/service/IMediaService.java | 3
14 files changed, 649 insertions(+), 135 deletions(-)
diff --git a/src/main/java/com/genersoft/iot/vmp/media/bean/ResultForOnPublish.java b/src/main/java/com/genersoft/iot/vmp/media/bean/ResultForOnPublish.java
new file mode 100644
index 0000000..88f7387
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/media/bean/ResultForOnPublish.java
@@ -0,0 +1,59 @@
+package com.genersoft.iot.vmp.media.bean;
+
+public class ResultForOnPublish {
+
+ private boolean enable_audio;
+ private boolean enable_mp4;
+ private int mp4_max_second;
+ private String mp4_save_path;
+ private String stream_replace;
+ private Integer modify_stamp;
+
+ public boolean isEnable_audio() {
+ return enable_audio;
+ }
+
+ public void setEnable_audio(boolean enable_audio) {
+ this.enable_audio = enable_audio;
+ }
+
+ public boolean isEnable_mp4() {
+ return enable_mp4;
+ }
+
+ public void setEnable_mp4(boolean enable_mp4) {
+ this.enable_mp4 = enable_mp4;
+ }
+
+ public int getMp4_max_second() {
+ return mp4_max_second;
+ }
+
+ public void setMp4_max_second(int mp4_max_second) {
+ this.mp4_max_second = mp4_max_second;
+ }
+
+ public String getMp4_save_path() {
+ return mp4_save_path;
+ }
+
+ public void setMp4_save_path(String mp4_save_path) {
+ this.mp4_save_path = mp4_save_path;
+ }
+
+ public String getStream_replace() {
+ return stream_replace;
+ }
+
+ public void setStream_replace(String stream_replace) {
+ this.stream_replace = stream_replace;
+ }
+
+ public Integer getModify_stamp() {
+ return modify_stamp;
+ }
+
+ public void setModify_stamp(Integer modify_stamp) {
+ this.modify_stamp = modify_stamp;
+ }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/MediaArrivalEvent.java b/src/main/java/com/genersoft/iot/vmp/media/event/MediaArrivalEvent.java
new file mode 100644
index 0000000..1b79b0c
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/media/event/MediaArrivalEvent.java
@@ -0,0 +1,75 @@
+package com.genersoft.iot.vmp.media.event;
+
+import com.genersoft.iot.vmp.media.bean.MediaInfo;
+import com.genersoft.iot.vmp.media.zlm.dto.MediaServer;
+import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
+import org.springframework.context.ApplicationEvent;
+
+/**
+ * 娴佸埌鏉ヤ簨浠�
+ */
+public class MediaArrivalEvent extends ApplicationEvent {
+ public MediaArrivalEvent(Object source) {
+ super(source);
+ }
+
+ public static MediaArrivalEvent getInstance(Object source, OnStreamChangedHookParam hookParam, MediaServer mediaServer){
+ MediaArrivalEvent mediaArrivalEvent = new MediaArrivalEvent(source);
+ mediaArrivalEvent.setMediaInfo(MediaInfo.getInstance(hookParam));
+ mediaArrivalEvent.setApp(hookParam.getApp());
+ mediaArrivalEvent.setStream(hookParam.getStream());
+ mediaArrivalEvent.setMediaServer(mediaServer);
+ mediaArrivalEvent.setSchema(hookParam.getSchema());
+ return mediaArrivalEvent;
+ }
+
+ private MediaInfo mediaInfo;
+
+ private String app;
+
+ private String stream;
+
+ private MediaServer mediaServer;
+
+ private String schema;
+
+ public MediaInfo getMediaInfo() {
+ return mediaInfo;
+ }
+
+ public void setMediaInfo(MediaInfo mediaInfo) {
+ this.mediaInfo = mediaInfo;
+ }
+
+ public String getApp() {
+ return app;
+ }
+
+ public void setApp(String app) {
+ this.app = app;
+ }
+
+ public String getStream() {
+ return stream;
+ }
+
+ public void setStream(String stream) {
+ this.stream = stream;
+ }
+
+ public MediaServer getMediaServer() {
+ return mediaServer;
+ }
+
+ public void setMediaServer(MediaServer mediaServer) {
+ this.mediaServer = mediaServer;
+ }
+
+ public String getSchema() {
+ return schema;
+ }
+
+ public void setSchema(String schema) {
+ this.schema = schema;
+ }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/MediaDepartureEvent.java b/src/main/java/com/genersoft/iot/vmp/media/event/MediaDepartureEvent.java
new file mode 100644
index 0000000..f111385
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/media/event/MediaDepartureEvent.java
@@ -0,0 +1,65 @@
+package com.genersoft.iot.vmp.media.event;
+
+import com.genersoft.iot.vmp.media.bean.MediaInfo;
+import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookListener;
+import com.genersoft.iot.vmp.media.zlm.dto.MediaServer;
+import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
+import org.springframework.context.ApplicationEvent;
+
+/**
+ * 娴佺寮�浜嬩欢
+ */
+public class MediaDepartureEvent extends ApplicationEvent {
+ public MediaDepartureEvent(Object source) {
+ super(source);
+ }
+
+ private String app;
+
+ private String stream;
+
+ private MediaServer mediaServer;
+
+ private String schema;
+
+ public static MediaDepartureEvent getInstance(Object source, OnStreamChangedHookParam hookParam, MediaServer mediaServer){
+ MediaDepartureEvent mediaDepartureEven = new MediaDepartureEvent(source);
+ mediaDepartureEven.setApp(hookParam.getApp());
+ mediaDepartureEven.setStream(hookParam.getStream());
+ mediaDepartureEven.setSchema(hookParam.getSchema());
+ mediaDepartureEven.setMediaServer(mediaServer);
+ return mediaDepartureEven;
+ }
+
+ public String getApp() {
+ return app;
+ }
+
+ public void setApp(String app) {
+ this.app = app;
+ }
+
+ public String getStream() {
+ return stream;
+ }
+
+ public void setStream(String stream) {
+ this.stream = stream;
+ }
+
+ public MediaServer getMediaServer() {
+ return mediaServer;
+ }
+
+ public void setMediaServer(MediaServer mediaServer) {
+ this.mediaServer = mediaServer;
+ }
+
+ public String getSchema() {
+ return schema;
+ }
+
+ public void setSchema(String schema) {
+ this.schema = schema;
+ }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java
index c5367fb..e4538e1 100755
--- a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java
@@ -8,11 +8,15 @@
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
+import com.genersoft.iot.vmp.media.event.MediaArrivalEvent;
+import com.genersoft.iot.vmp.media.event.MediaDepartureEvent;
import com.genersoft.iot.vmp.media.event.MediaServerChangeEvent;
import com.genersoft.iot.vmp.media.event.MediaServerDeleteEvent;
import com.genersoft.iot.vmp.media.service.IMediaNodeServerService;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServer;
+import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
+import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType;
import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.service.bean.MediaServerLoad;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
@@ -30,7 +34,9 @@
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
+import org.springframework.context.event.EventListener;
import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
@@ -72,6 +78,31 @@
/**
+ * 娴佸埌鏉ョ殑澶勭悊
+ */
+ @Async("taskExecutor")
+ @org.springframework.context.event.EventListener
+ public void onApplicationEvent(MediaArrivalEvent event) {
+ if ("rtsp".equals(event.getSchema())) {
+ logger.info("娴佸彉鍖栵細娉ㄥ唽 app->{}, stream->{}", event.getApp(), event.getStream());
+ addCount(event.getSeverId());
+ }
+ }
+
+ /**
+ * 娴佺寮�鐨勫鐞�
+ */
+ @Async("taskExecutor")
+ @EventListener
+ public void onApplicationEvent(MediaDepartureEvent event) {
+ if ("rtsp".equals(event.getSchema())) {
+ logger.info("娴佸彉鍖栵細娉ㄩ攢, app->{}, stream->{}", event.getApp(), event.getStream());
+ removeCount(event.getSeverId());
+ }
+ }
+
+
+ /**
* 鍒濆鍖�
*/
@Override
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
index b8f6f2f..ca52619 100755
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -19,6 +19,9 @@
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
+import com.genersoft.iot.vmp.media.bean.ResultForOnPublish;
+import com.genersoft.iot.vmp.media.event.MediaArrivalEvent;
+import com.genersoft.iot.vmp.media.event.MediaDepartureEvent;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.dto.*;
import com.genersoft.iot.vmp.media.zlm.dto.hook.*;
@@ -190,50 +193,7 @@
if (mediaServer == null) {
return new HookResultForOnPublish(200, "success");
}
- // 鎺ㄦ祦閴存潈鐨勫鐞�
- if (!"rtp".equals(param.getApp())) {
- StreamProxyItem stream = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream());
- if (stream != null) {
- HookResultForOnPublish result = HookResultForOnPublish.SUCCESS();
- result.setEnable_audio(stream.isEnableAudio());
- result.setEnable_mp4(stream.isEnableMp4());
- return result;
- }
- if (userSetting.getPushAuthority()) {
- // 瀵逛簬鎺ㄦ祦杩涜閴存潈
- Map<String, String> paramMap = urlParamToMap(param.getParams());
- // 鎺ㄦ祦閴存潈
- if (param.getParams() == null) {
- logger.info("鎺ㄦ祦閴存潈澶辫触锛� 缂哄皯蹇呰鍙傛暟锛歴ign=md5(user琛ㄧ殑pushKey)");
- return new HookResultForOnPublish(401, "Unauthorized");
- }
- String sign = paramMap.get("sign");
- if (sign == null) {
- logger.info("鎺ㄦ祦閴存潈澶辫触锛� 缂哄皯蹇呰鍙傛暟锛歴ign=md5(user琛ㄧ殑pushKey)");
- return new HookResultForOnPublish(401, "Unauthorized");
- }
- // 鎺ㄦ祦鑷畾涔夋挱鏀鹃壌鏉冪爜
- String callId = paramMap.get("callId");
- // 閴存潈閰嶇疆
- boolean hasAuthority = userService.checkPushAuthority(callId, sign);
- if (!hasAuthority) {
- logger.info("鎺ㄦ祦閴存潈澶辫触锛� sign 鏃犳潈闄�: callId={}. sign={}", callId, sign);
- return new HookResultForOnPublish(401, "Unauthorized");
- }
- StreamAuthorityInfo streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(param);
- streamAuthorityInfo.setCallId(callId);
- streamAuthorityInfo.setSign(sign);
- // 閴存潈閫氳繃
- redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), param.getStream(), streamAuthorityInfo);
- }
- } else {
- zlmMediaListManager.sendStreamEvent(param.getApp(), param.getStream(), param.getMediaServerId());
- }
-
-
- HookResultForOnPublish result = HookResultForOnPublish.SUCCESS();
- result.setEnable_audio(true);
taskExecutor.execute(() -> {
ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_publish, json);
if (subscribe != null) {
@@ -241,81 +201,16 @@
}
});
- // 鏄惁褰曞儚
- if ("rtp".equals(param.getApp())) {
- result.setEnable_mp4(userSetting.getRecordSip());
- } else {
- result.setEnable_mp4(userSetting.isRecordPushLive());
+ ResultForOnPublish resultForOnPublish = mediaService.authenticatePublish(mediaServer, param.getApp(), param.getStream(), param.getParams());
+ if (resultForOnPublish != null) {
+ HookResultForOnPublish successResult = HookResultForOnPublish.getInstance(resultForOnPublish);
+ logger.info("[ZLM HOOK]鎺ㄦ祦閴存潈 鍝嶅簲锛歿}->{}->>>>{}", param.getMediaServerId(), param, successResult);
+ return successResult;
+ }else {
+ HookResultForOnPublish fail = HookResultForOnPublish.Fail();
+ logger.info("[ZLM HOOK]鎺ㄦ祦閴存潈 鍝嶅簲锛歿}->{}->>>>{}", param.getMediaServerId(), param, fail);
+ return fail;
}
- // 鍥芥爣娴�
- if ("rtp".equals(param.getApp())) {
-
- InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, param.getStream());
-
- // 鍗曠鍙fā寮忎笅淇敼娴� ID
- if (!mediaServer.isRtpEnable() && inviteInfo == null) {
- String ssrc = String.format("%010d", Long.parseLong(param.getStream(), 16));
- inviteInfo = inviteStreamService.getInviteInfoBySSRC(ssrc);
- if (inviteInfo != null) {
- result.setStream_replace(inviteInfo.getStream());
- logger.info("[ZLM HOOK]鎺ㄦ祦閴存潈 stream: {} 鏇挎崲涓� {}", param.getStream(), inviteInfo.getStream());
- }
- }
-
- // 璁剧疆闊抽淇℃伅鍙婂綍鍒朵俊鎭�
- List<SsrcTransaction> ssrcTransactionForAll = sessionManager.getSsrcTransactionForAll(null, null, null, param.getStream());
- if (ssrcTransactionForAll != null && ssrcTransactionForAll.size() == 1) {
-
- // 涓哄綍鍒跺浗鏍囨ā鎷熶竴涓壌鏉冧俊鎭�, 鏂逛究鍚庣画鍐欏叆褰曞儚鏂囦欢鏃朵娇鐢�
- StreamAuthorityInfo streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(param);
- streamAuthorityInfo.setApp(param.getApp());
- streamAuthorityInfo.setStream(ssrcTransactionForAll.get(0).getStream());
- streamAuthorityInfo.setCallId(ssrcTransactionForAll.get(0).getSipTransactionInfo().getCallId());
-
- redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), ssrcTransactionForAll.get(0).getStream(), streamAuthorityInfo);
-
- String deviceId = ssrcTransactionForAll.get(0).getDeviceId();
- String channelId = ssrcTransactionForAll.get(0).getChannelId();
- DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
- if (deviceChannel != null) {
- result.setEnable_audio(deviceChannel.isHasAudio());
- }
- // 濡傛灉鏄綍鍍忎笅杞藉氨璁剧疆瑙嗛闂撮殧鍗佺
- if (ssrcTransactionForAll.get(0).getType() == InviteSessionType.DOWNLOAD) {
- // 鑾峰彇褰曞儚鐨勬�绘椂闀匡紝鐒跺悗璁剧疆涓鸿繖涓棰戠殑鏃堕暱
- InviteInfo inviteInfoForDownload = inviteStreamService.getInviteInfo(InviteSessionType.DOWNLOAD, deviceId, channelId, param.getStream());
- if (inviteInfoForDownload != null && inviteInfoForDownload.getStreamInfo() != null) {
- String startTime = inviteInfoForDownload.getStreamInfo().getStartTime();
- String endTime = inviteInfoForDownload.getStreamInfo().getEndTime();
- long difference = DateUtil.getDifference(startTime, endTime) / 1000;
- result.setMp4_max_second((int) difference);
- result.setEnable_mp4(true);
- // 璁剧疆涓�2淇濊瘉寰楀埌鐨刴p4鐨勬椂闀挎槸姝e父鐨�
- result.setModify_stamp(2);
- }
- }
- // 濡傛灉鏄痶alk瀵硅锛屽垯榛樿鑾峰彇澹伴煶
- if (ssrcTransactionForAll.get(0).getType() == InviteSessionType.TALK) {
- result.setEnable_audio(true);
- }
- }
- } else if (param.getApp().equals("broadcast")) {
- result.setEnable_audio(true);
- } else if (param.getApp().equals("talk")) {
- result.setEnable_audio(true);
- }
- if (param.getApp().equalsIgnoreCase("rtp")) {
- String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_" + param.getStream();
- OtherRtpSendInfo otherRtpSendInfo = (OtherRtpSendInfo) redisTemplate.opsForValue().get(receiveKey);
-
- String receiveKeyForPS = VideoManagerConstants.WVP_OTHER_RECEIVE_PS_INFO + userSetting.getServerId() + "_" + param.getStream();
- OtherPsSendInfo otherPsSendInfo = (OtherPsSendInfo) redisTemplate.opsForValue().get(receiveKeyForPS);
- if (otherRtpSendInfo != null || otherPsSendInfo != null) {
- result.setEnable_mp4(true);
- }
- }
- logger.info("[ZLM HOOK]鎺ㄦ祦閴存潈 鍝嶅簲锛歿}->{}->>>>{}", param.getMediaServerId(), param, result);
- return result;
}
@@ -326,11 +221,20 @@
@PostMapping(value = "/on_stream_changed", produces = "application/json;charset=UTF-8")
public HookResult onStreamChanged(@RequestBody OnStreamChangedHookParam param) {
+ MediaServer mediaServer = mediaServerService.getOne(param.getMediaServerId());
+
if (param.isRegist()) {
logger.info("[ZLM HOOK] 娴佹敞鍐�, {}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream());
+ MediaArrivalEvent mediaArrivalEvent = MediaArrivalEvent.getInstance(this, param, mediaServer);
+ applicationEventPublisher.publishEvent(mediaArrivalEvent);
} else {
logger.info("[ZLM HOOK] 娴佹敞閿�, {}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream());
+ MediaDepartureEvent mediaArrivalEvent = MediaDepartureEvent.getInstance(this, param, mediaServer);
+ applicationEventPublisher.publishEvent(mediaArrivalEvent);
}
+ return HookResult.SUCCESS();
+
+
JSONObject json = (JSONObject) JSON.toJSON(param);
taskExecutor.execute(() -> {
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamAuthorityInfo.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamAuthorityInfo.java
index ef77225..7d66a1f 100755
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamAuthorityInfo.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamAuthorityInfo.java
@@ -1,5 +1,6 @@
package com.genersoft.iot.vmp.media.zlm.dto;
+import com.genersoft.iot.vmp.media.event.MediaArrivalEvent;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnPublishHookParam;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
@@ -97,21 +98,23 @@
this.sign = sign;
}
- public static StreamAuthorityInfo getInstanceByHook(OnPublishHookParam hookParam) {
+ public static StreamAuthorityInfo getInstanceByHook(String app, String stream, String id) {
StreamAuthorityInfo streamAuthorityInfo = new StreamAuthorityInfo();
- streamAuthorityInfo.setApp(hookParam.getApp());
- streamAuthorityInfo.setStream(hookParam.getStream());
- streamAuthorityInfo.setId(hookParam.getId());
+ streamAuthorityInfo.setApp(app);
+ streamAuthorityInfo.setStream(stream);
+ streamAuthorityInfo.setId(id);
return streamAuthorityInfo;
}
- public static StreamAuthorityInfo getInstanceByHook(OnStreamChangedHookParam onStreamChangedHookParam) {
+ public static StreamAuthorityInfo getInstanceByHook(MediaArrivalEvent event) {
StreamAuthorityInfo streamAuthorityInfo = new StreamAuthorityInfo();
- streamAuthorityInfo.setApp(onStreamChangedHookParam.getApp());
- streamAuthorityInfo.setStream(onStreamChangedHookParam.getStream());
- streamAuthorityInfo.setId(onStreamChangedHookParam.getMediaServerId());
- streamAuthorityInfo.setOriginType(onStreamChangedHookParam.getOriginType());
- streamAuthorityInfo.setOriginTypeStr(onStreamChangedHookParam.getOriginTypeStr());
+ streamAuthorityInfo.setApp(event.getApp());
+ streamAuthorityInfo.setStream(event.getStream());
+ streamAuthorityInfo.setId(event.getSeverId());
+ if (event.getMediaInfo() != null) {
+ streamAuthorityInfo.setOriginType(event.getMediaInfo().getOriginType());
+ }
+
return streamAuthorityInfo;
}
}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/HookResult.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/HookResult.java
index b327f13..dee9d66 100755
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/HookResult.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/HookResult.java
@@ -18,8 +18,8 @@
return new HookResult(0, "success");
}
- public static HookResult Fail(){
- return new HookResult(-1, "fail");
+ public static HookResultForOnPublish Fail(){
+ return new HookResultForOnPublish(-1, "fail");
}
public int getCode() {
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/HookResultForOnPublish.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/HookResultForOnPublish.java
index 55369e5..33f9856 100755
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/HookResultForOnPublish.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/HookResultForOnPublish.java
@@ -1,5 +1,7 @@
package com.genersoft.iot.vmp.media.zlm.dto.hook;
+import com.genersoft.iot.vmp.media.bean.ResultForOnPublish;
+
public class HookResultForOnPublish extends HookResult{
private boolean enable_audio;
@@ -16,6 +18,17 @@
return new HookResultForOnPublish(0, "success");
}
+ public static HookResultForOnPublish getInstance(ResultForOnPublish resultForOnPublish){
+ HookResultForOnPublish successResult = new HookResultForOnPublish(0, "success");
+ successResult.setEnable_audio(resultForOnPublish.isEnable_audio());
+ successResult.setEnable_mp4(resultForOnPublish.isEnable_mp4());
+ successResult.setModify_stamp(resultForOnPublish.getModify_stamp());
+ successResult.setStream_replace(resultForOnPublish.getStream_replace());
+ successResult.setMp4_max_second(resultForOnPublish.getMp4_max_second());
+ successResult.setMp4_save_path(resultForOnPublish.getMp4_save_path());
+ return successResult;
+ }
+
public HookResultForOnPublish(int code, String msg) {
setCode(code);
setMsg(msg);
diff --git a/src/main/java/com/genersoft/iot/vmp/service/IMediaService.java b/src/main/java/com/genersoft/iot/vmp/service/IMediaService.java
index 9230b35..4905455 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/IMediaService.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/IMediaService.java
@@ -2,6 +2,7 @@
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
+import com.genersoft.iot.vmp.media.bean.ResultForOnPublish;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServer;
/**
@@ -47,5 +48,5 @@
*/
boolean authenticatePlay(String app, String stream, String callId);
- boolean authenticatePublish(String app, String stream, String callId, String sign);
+ ResultForOnPublish authenticatePublish(MediaServer mediaServer, String app, String stream, String params);
}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java
index 9dc86f8..014c761 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java
@@ -6,13 +6,18 @@
import com.genersoft.iot.vmp.common.InviteSessionStatus;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
+import com.genersoft.iot.vmp.media.event.MediaArrivalEvent;
+import com.genersoft.iot.vmp.media.event.MediaDepartureEvent;
import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
+import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.event.EventListener;
import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.List;
@@ -31,6 +36,35 @@
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
+ @Autowired
+ private IVideoManagerStorage storage;
+
+ /**
+ * 娴佸埌鏉ョ殑澶勭悊
+ */
+ @Async("taskExecutor")
+ @org.springframework.context.event.EventListener
+ public void onApplicationEvent(MediaArrivalEvent event) {
+ if ("rtsp".equals(event.getSchema()) && "rtp".equals(event.getApp())) {
+
+ }
+ }
+
+ /**
+ * 娴佺寮�鐨勫鐞�
+ */
+ @Async("taskExecutor")
+ @EventListener
+ public void onApplicationEvent(MediaDepartureEvent event) {
+ if ("rtsp".equals(event.getSchema()) && "rtp".equals(event.getApp())) {
+ InviteInfo inviteInfo = getInviteInfoByStream(null, event.getStream());
+ if (inviteInfo != null && (inviteInfo.getType() == InviteSessionType.PLAY || inviteInfo.getType() == InviteSessionType.PLAYBACK)) {
+ removeInviteInfo(inviteInfo);
+ stopPlay(inviteInfo.getDeviceId(), inviteInfo.getChannelId());
+ }
+ }
+ }
+
@Override
public void updateInviteInfo(InviteInfo inviteInfo) {
if (inviteInfo == null || (inviteInfo.getDeviceId() == null || inviteInfo.getChannelId() == null)) {
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java
index 9c54068..21c3472 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java
@@ -1,22 +1,43 @@
package com.genersoft.iot.vmp.service.impl;
+import com.genersoft.iot.vmp.common.InviteInfo;
+import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.common.StreamInfo;
+import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.MediaConfig;
+import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.conf.exception.ControllerException;
+import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
+import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
+import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
+import com.genersoft.iot.vmp.media.bean.ResultForOnPublish;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookListener;
+import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
+import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
+import com.genersoft.iot.vmp.media.zlm.dto.HookType;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServer;
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
+import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookResultForOnPublish;
-import com.genersoft.iot.vmp.service.IMediaService;
+import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
+import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
+import com.genersoft.iot.vmp.utils.DateUtil;
+import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
+import com.genersoft.iot.vmp.vmanager.bean.OtherPsSendInfo;
+import com.genersoft.iot.vmp.vmanager.bean.OtherRtpSendInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
@Service
public class MediaServiceImpl implements IMediaService {
@@ -31,6 +52,31 @@
@Autowired
private MediaConfig mediaConfig;
+
+ @Autowired
+ private IStreamProxyService streamProxyService;
+
+ @Autowired
+ private UserSetting userSetting;
+
+ @Autowired
+ private RedisTemplate<Object, Object> redisTemplate;
+
+ @Autowired
+ private IUserService userService;
+
+ @Autowired
+ private IInviteStreamService inviteStreamService;
+
+ @Autowired
+ private VideoStreamSessionManager sessionManager;
+
+ @Autowired
+ private IVideoManagerStorage storager;
+
+ @Autowired
+ private ZLMMediaListManager zlmMediaListManager;
+
@Override
@@ -103,4 +149,146 @@
StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(app, stream);
return (streamAuthorityInfo != null && streamAuthorityInfo.getCallId() != null && !streamAuthorityInfo.getCallId().equals(callId));
}
+
+ @Override
+ public ResultForOnPublish authenticatePublish(MediaServer mediaServer, String app, String stream, String params) {
+ // 鎺ㄦ祦閴存潈鐨勫鐞�
+ if (!"rtp".equals(app)) {
+ StreamProxyItem streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(app, stream);
+ if (streamProxyItem != null) {
+ ResultForOnPublish result = new ResultForOnPublish();
+ result.setEnable_audio(streamProxyItem.isEnableAudio());
+ result.setEnable_mp4(streamProxyItem.isEnableMp4());
+ return result;
+ }
+ if (userSetting.getPushAuthority()) {
+ // 瀵逛簬鎺ㄦ祦杩涜閴存潈
+ Map<String, String> paramMap = urlParamToMap(params);
+ // 鎺ㄦ祦閴存潈
+ if (params == null) {
+ logger.info("鎺ㄦ祦閴存潈澶辫触锛� 缂哄皯蹇呰鍙傛暟锛歴ign=md5(user琛ㄧ殑pushKey)");
+ throw new ControllerException(ErrorCode.ERROR401.getCode(), "Unauthorized");
+ }
+
+ String sign = paramMap.get("sign");
+ if (sign == null) {
+ logger.info("鎺ㄦ祦閴存潈澶辫触锛� 缂哄皯蹇呰鍙傛暟锛歴ign=md5(user琛ㄧ殑pushKey)");
+ throw new ControllerException(ErrorCode.ERROR401.getCode(), "Unauthorized");
+ }
+ // 鎺ㄦ祦鑷畾涔夋挱鏀鹃壌鏉冪爜
+ String callId = paramMap.get("callId");
+ // 閴存潈閰嶇疆
+ boolean hasAuthority = userService.checkPushAuthority(callId, sign);
+ if (!hasAuthority) {
+ logger.info("鎺ㄦ祦閴存潈澶辫触锛� sign 鏃犳潈闄�: callId={}. sign={}", callId, sign);
+ throw new ControllerException(ErrorCode.ERROR401.getCode(), "Unauthorized");
+ }
+ StreamAuthorityInfo streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(app, stream, mediaServer.getId());
+ streamAuthorityInfo.setCallId(callId);
+ streamAuthorityInfo.setSign(sign);
+ // 閴存潈閫氳繃
+ redisCatchStorage.updateStreamAuthorityInfo(app, stream, streamAuthorityInfo);
+ }
+ } else {
+ zlmMediaListManager.sendStreamEvent(app, stream, mediaServer.getId());
+ }
+
+
+ ResultForOnPublish result = new ResultForOnPublish();
+ result.setEnable_audio(true);
+
+
+ // 鏄惁褰曞儚
+ if ("rtp".equals(app)) {
+ result.setEnable_mp4(userSetting.getRecordSip());
+ } else {
+ result.setEnable_mp4(userSetting.isRecordPushLive());
+ }
+ // 鍥芥爣娴�
+ if ("rtp".equals(app)) {
+
+ InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, stream);
+
+ // 鍗曠鍙fā寮忎笅淇敼娴� ID
+ if (!mediaServer.isRtpEnable() && inviteInfo == null) {
+ String ssrc = String.format("%010d", Long.parseLong(stream, 16));
+ inviteInfo = inviteStreamService.getInviteInfoBySSRC(ssrc);
+ if (inviteInfo != null) {
+ result.setStream_replace(inviteInfo.getStream());
+ logger.info("[ZLM HOOK]鎺ㄦ祦閴存潈 stream: {} 鏇挎崲涓� {}", stream, inviteInfo.getStream());
+ }
+ }
+
+ // 璁剧疆闊抽淇℃伅鍙婂綍鍒朵俊鎭�
+ List<SsrcTransaction> ssrcTransactionForAll = sessionManager.getSsrcTransactionForAll(null, null, null, stream);
+ if (ssrcTransactionForAll != null && ssrcTransactionForAll.size() == 1) {
+
+ // 涓哄綍鍒跺浗鏍囨ā鎷熶竴涓壌鏉冧俊鎭�, 鏂逛究鍚庣画鍐欏叆褰曞儚鏂囦欢鏃朵娇鐢�
+ StreamAuthorityInfo streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(app, stream, mediaServer.getId());
+ streamAuthorityInfo.setApp(app);
+ streamAuthorityInfo.setStream(ssrcTransactionForAll.get(0).getStream());
+ streamAuthorityInfo.setCallId(ssrcTransactionForAll.get(0).getSipTransactionInfo().getCallId());
+
+ redisCatchStorage.updateStreamAuthorityInfo(app, ssrcTransactionForAll.get(0).getStream(), streamAuthorityInfo);
+
+ String deviceId = ssrcTransactionForAll.get(0).getDeviceId();
+ String channelId = ssrcTransactionForAll.get(0).getChannelId();
+ DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
+ if (deviceChannel != null) {
+ result.setEnable_audio(deviceChannel.isHasAudio());
+ }
+ // 濡傛灉鏄綍鍍忎笅杞藉氨璁剧疆瑙嗛闂撮殧鍗佺
+ if (ssrcTransactionForAll.get(0).getType() == InviteSessionType.DOWNLOAD) {
+ // 鑾峰彇褰曞儚鐨勬�绘椂闀匡紝鐒跺悗璁剧疆涓鸿繖涓棰戠殑鏃堕暱
+ InviteInfo inviteInfoForDownload = inviteStreamService.getInviteInfo(InviteSessionType.DOWNLOAD, deviceId, channelId, stream);
+ if (inviteInfoForDownload != null && inviteInfoForDownload.getStreamInfo() != null) {
+ String startTime = inviteInfoForDownload.getStreamInfo().getStartTime();
+ String endTime = inviteInfoForDownload.getStreamInfo().getEndTime();
+ long difference = DateUtil.getDifference(startTime, endTime) / 1000;
+ result.setMp4_max_second((int) difference);
+ result.setEnable_mp4(true);
+ // 璁剧疆涓�2淇濊瘉寰楀埌鐨刴p4鐨勬椂闀挎槸姝e父鐨�
+ result.setModify_stamp(2);
+ }
+ }
+ // 濡傛灉鏄痶alk瀵硅锛屽垯榛樿鑾峰彇澹伴煶
+ if (ssrcTransactionForAll.get(0).getType() == InviteSessionType.TALK) {
+ result.setEnable_audio(true);
+ }
+ }
+ } else if (app.equals("broadcast")) {
+ result.setEnable_audio(true);
+ } else if (app.equals("talk")) {
+ result.setEnable_audio(true);
+ }
+ if (app.equalsIgnoreCase("rtp")) {
+ String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_" + stream;
+ OtherRtpSendInfo otherRtpSendInfo = (OtherRtpSendInfo) redisTemplate.opsForValue().get(receiveKey);
+
+ String receiveKeyForPS = VideoManagerConstants.WVP_OTHER_RECEIVE_PS_INFO + userSetting.getServerId() + "_" + stream;
+ OtherPsSendInfo otherPsSendInfo = (OtherPsSendInfo) redisTemplate.opsForValue().get(receiveKeyForPS);
+ if (otherRtpSendInfo != null || otherPsSendInfo != null) {
+ result.setEnable_mp4(true);
+ }
+ }
+ return result;
+ }
+
+ private Map<String, String> urlParamToMap(String params) {
+ HashMap<String, String> map = new HashMap<>();
+ if (ObjectUtils.isEmpty(params)) {
+ return map;
+ }
+ String[] paramsArray = params.split("&");
+ if (paramsArray.length == 0) {
+ return map;
+ }
+ for (String param : paramsArray) {
+ String[] paramArray = param.split("=");
+ if (paramArray.length == 2) {
+ map.put(paramArray[0], paramArray[1]);
+ }
+ }
+ return map;
+ }
}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
index ee84dc8..65cf693 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -17,17 +17,17 @@
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
+import com.genersoft.iot.vmp.media.event.MediaArrivalEvent;
+import com.genersoft.iot.vmp.media.event.MediaDepartureEvent;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
-import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
-import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRecordMp4;
-import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
-import com.genersoft.iot.vmp.media.zlm.dto.MediaServer;
+import com.genersoft.iot.vmp.media.zlm.dto.*;
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRecordMp4HookParam;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
+import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType;
import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.*;
import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
@@ -43,6 +43,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.event.EventListener;
+import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
@@ -121,6 +123,77 @@
@Autowired
private SSRCFactory ssrcFactory;
+ /**
+ * 娴佸埌鏉ョ殑澶勭悊
+ */
+ @Async("taskExecutor")
+ @org.springframework.context.event.EventListener
+ public void onApplicationEvent(MediaArrivalEvent event) {
+ if ("broadcast".equals(event.getApp())) {
+ if (event.getStream().indexOf("_") > 0) {
+ String[] streamArray = event.getStream().split("_");
+ if (streamArray.length == 2) {
+ String deviceId = streamArray[0];
+ String channelId = streamArray[1];
+ Device device = deviceService.getDevice(deviceId);
+ if (device == null) {
+ logger.info("[璇煶瀵硅/鍠婅瘽] 鏈壘鍒拌澶囷細{}", deviceId);
+ return;
+ }
+ if ("broadcast".equals(event.getApp())) {
+ if (audioBroadcastManager.exit(deviceId, channelId)) {
+ stopAudioBroadcast(deviceId, channelId);
+ }
+ // 寮�鍚闊冲璁查�氶亾
+ try {
+ audioBroadcastCmd(device, channelId, event.getMediaServer(),
+ event.getApp(), event.getStream(), 60, false, (msg) -> {
+ logger.info("[璇煶瀵硅] 閫氶亾寤虹珛鎴愬姛, device: {}, channel: {}", deviceId, channelId);
+ });
+ } catch (InvalidArgumentException | ParseException | SipException e) {
+ logger.error("[鍛戒护鍙戦�佸け璐 璇煶瀵硅: {}", e.getMessage());
+ }
+ }else if ("talk".equals(event.getApp())) {
+ // 寮�鍚闊冲璁查�氶亾
+ talkCmd(device, channelId, event.getMediaServer(), event.getStream(), (msg) -> {
+ logger.info("[璇煶瀵硅] 閫氶亾寤虹珛鎴愬姛, device: {}, channel: {}", deviceId, channelId);
+ });
+ }
+ }
+ }
+ }
+
+
+ }
+
+ /**
+ * 娴佺寮�鐨勫鐞�
+ */
+ @Async("taskExecutor")
+ @EventListener
+ public void onApplicationEvent(MediaDepartureEvent event) {
+ if ("broadcast".equals(event.getApp()) || "talk".equals(event.getApp())) {
+ if (event.getStream().indexOf("_") > 0) {
+ String[] streamArray = event.getStream().split("_");
+ if (streamArray.length == 2) {
+ String deviceId = streamArray[0];
+ String channelId = streamArray[1];
+ Device device = deviceService.getDevice(deviceId);
+ if (device == null) {
+ logger.info("[璇煶瀵硅/鍠婅瘽] 鏈壘鍒拌澶囷細{}", deviceId);
+ return;
+ }
+ if ("broadcast".equals(event.getApp())) {
+ stopAudioBroadcast(deviceId, channelId);
+ }else if ("talk".equals(event.getApp())) {
+ stopTalk(device, channelId, false);
+ }
+
+ }
+ }
+ }
+ }
+
@Override
public SSRCInfo play(MediaServer mediaServerItem, String deviceId, String channelId, String ssrc, ErrorCallback<Object> callback) {
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
index ea824ff..dd0eae2 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
@@ -9,6 +9,8 @@
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
+import com.genersoft.iot.vmp.media.event.MediaArrivalEvent;
+import com.genersoft.iot.vmp.media.event.MediaDepartureEvent;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
@@ -33,7 +35,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.event.EventListener;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
+import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionDefinition;
@@ -102,6 +106,28 @@
@Autowired
TransactionDefinition transactionDefinition;
+ /**
+ * 娴佸埌鏉ョ殑澶勭悊
+ */
+ @Async("taskExecutor")
+ @org.springframework.context.event.EventListener
+ public void onApplicationEvent(MediaArrivalEvent event) {
+ if ("rtsp".equals(event.getSchema())) {
+ updateStatus(true, event.getApp(), event.getStream());
+ }
+ }
+
+ /**
+ * 娴佺寮�鐨勫鐞�
+ */
+ @Async("taskExecutor")
+ @EventListener
+ public void onApplicationEvent(MediaDepartureEvent event) {
+ if ("rtsp".equals(event.getSchema())) {
+ updateStatus(true, event.getApp(), event.getStream());
+ }
+ }
+
@Override
public void save(StreamProxyItem param, GeneralCallback<StreamInfo> callback) {
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
index c5b7f58..2c1a476 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
@@ -10,6 +10,10 @@
import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
+import com.genersoft.iot.vmp.media.bean.MediaInfo;
+import com.genersoft.iot.vmp.media.event.MediaArrivalEvent;
+import com.genersoft.iot.vmp.media.event.MediaDepartureEvent;
+import com.genersoft.iot.vmp.media.event.MediaServerChangeEvent;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServer;
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
@@ -28,7 +32,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.event.EventListener;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
+import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
@@ -85,6 +91,42 @@
@Autowired
private MediaConfig mediaConfig;
+ /**
+ * 娴佸埌鏉ョ殑澶勭悊
+ */
+ @Async("taskExecutor")
+ @EventListener
+ public void onApplicationEvent(MediaArrivalEvent event) {
+ MediaInfo mediaInfo = event.getMediaInfo();
+ if (mediaInfo == null) {
+ return;
+ }
+ if (mediaInfo.getOriginType() != OriginType.RTMP_PUSH.ordinal()
+ && mediaInfo.getOriginType() != OriginType.RTSP_PUSH.ordinal()
+ && mediaInfo.getOriginType() != OriginType.RTC_PUSH.ordinal()) {
+ return;
+ }
+
+ StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(event.getApp(), event.getStream());
+ if (streamAuthorityInfo == null) {
+ streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(event);
+ } else {
+ streamAuthorityInfo.setOriginType(mediaInfo.getOriginType());
+ }
+ redisCatchStorage.updateStreamAuthorityInfo(event.getApp(), event.getStream(), streamAuthorityInfo);
+
+
+ }
+
+ /**
+ * 娴佺寮�鐨勫鐞�
+ */
+ @Async("taskExecutor")
+ @EventListener
+ public void onApplicationEvent(MediaDepartureEvent event) {
+
+ }
+
private List<StreamPushItem> handleJSON(List<StreamInfo> streamInfoList) {
if (streamInfoList == null || streamInfoList.isEmpty()) {
--
Gitblit v1.8.0