From ab81136765f1b641223b982b2baef13e06307fe4 Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期三, 08 十二月 2021 16:45:50 +0800
Subject: [PATCH] 优化适配zlm的hook保活
---
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java | 98 +++++++++-
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java | 20 +
src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOnlineEventListener.java | 65 +++++++
src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java | 6
src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOfflineEvent.java | 11 +
src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java | 12 +
src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java | 8
src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java | 2
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java | 73 ++++---
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java | 16 +
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java | 6
src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java | 5
src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMKeepliveTimeoutListener.java | 52 +++++
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java | 1
src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java | 14 +
src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOnlineEvent.java | 11 +
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java | 28 ++-
src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java | 42 +++
src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOfflineEventListener.java | 44 ++++
src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java | 6
20 files changed, 441 insertions(+), 79 deletions(-)
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java b/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java
index f3d542c..2f62287 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java
@@ -1,9 +1,7 @@
package com.genersoft.iot.vmp.gb28181;
import com.genersoft.iot.vmp.conf.SipConfig;
-import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.transmit.ISIPProcessorObserver;
-import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import gov.nist.javax.sip.SipProviderImpl;
import gov.nist.javax.sip.SipStackImpl;
import org.slf4j.Logger;
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java
index fd0cfdc..9495e9d 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java
@@ -5,6 +5,7 @@
import com.genersoft.iot.vmp.gb28181.event.platformKeepaliveExpire.PlatformKeepaliveExpireEvent;
import com.genersoft.iot.vmp.gb28181.event.platformNotRegister.PlatformNotRegisterEvent;
import com.genersoft.iot.vmp.media.zlm.event.ZLMOfflineEvent;
+import com.genersoft.iot.vmp.media.zlm.event.ZLMOnlineEvent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;
@@ -73,5 +74,10 @@
outEvent.setMediaServerId(mediaServerId);
applicationEventPublisher.publishEvent(outEvent);
}
-
+
+ public void zlmOnlineEventPublish(String mediaServerId) {
+ ZLMOnlineEvent outEvent = new ZLMOnlineEvent(this);
+ outEvent.setMediaServerId(mediaServerId);
+ applicationEventPublisher.publishEvent(outEvent);
+ }
}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
index 7065558..8c1239e 100644
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -179,29 +179,33 @@
public ResponseEntity<String> onPublish(@RequestBody JSONObject json) {
logger.debug("[ ZLM HOOK ]on_publish API璋冪敤锛屽弬鏁帮細" + json.toString());
-
+ JSONObject ret = new JSONObject();
+ ret.put("code", 0);
+ ret.put("msg", "success");
+ ret.put("enableHls", true);
+ ret.put("enableMP4", userSetup.isRecordPushLive());
String mediaServerId = json.getString("mediaServerId");
ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_publish, json);
if (subscribe != null) {
MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
if (mediaInfo != null) {
subscribe.response(mediaInfo, json);
+ }else {
+ ret.put("code", 1);
+ ret.put("msg", "zlm not register");
}
}
String app = json.getString("app");
String stream = json.getString("stream");
StreamInfo streamInfo = redisCatchStorage.queryPlaybackByStreamId(stream);
- JSONObject ret = new JSONObject();
+
// 褰曞儚鍥炴斁鏃朵笉杩涜褰曞儚涓嬭浇
if (streamInfo != null) {
ret.put("enableMP4", false);
}else {
ret.put("enableMP4", userSetup.isRecordPushLive());
}
- ret.put("code", 0);
- ret.put("msg", "success");
- ret.put("enableHls", true);
- ret.put("enableMP4", userSetup.isRecordPushLive());
+
return new ResponseEntity<String>(ret.toString(), HttpStatus.OK);
}
@@ -340,37 +344,38 @@
if (!"rtp".equals(app)){
String type = OriginType.values()[item.getOriginType()].getType();
MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
- if (regist) {
- StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, app, streamId, tracks);
- redisCatchStorage.addStream(mediaServerItem, type, app, streamId, streamInfo);
- if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal()
- || item.getOriginType() == OriginType.RTMP_PUSH.ordinal()
- || item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) {
- zlmMediaListManager.addPush(item);
- }
- }else {
- // 鍏煎娴佹敞閿�鏃剁被鍨嬮敊璇殑闂锛岀瓑zlm鏇存柊鍚庡垹闄�
- StreamPushItem streamPushItem = streamPushService.getPush(app, streamId);
- if (streamPushItem != null) {
- type = "PUSH";
- }else {
- StreamProxyItem streamProxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(app, streamId);
- if (streamProxyByAppAndStream != null) {
- type = "PULL";
+ if (mediaServerItem != null){
+ if (regist) {
+ StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, app, streamId, tracks);
+ redisCatchStorage.addStream(mediaServerItem, type, app, streamId, streamInfo);
+ if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal()
+ || item.getOriginType() == OriginType.RTMP_PUSH.ordinal()
+ || item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) {
+ zlmMediaListManager.addPush(item);
}
+ }else {
+ // 鍏煎娴佹敞閿�鏃剁被鍨嬮敊璇殑闂锛岀瓑zlm鏇存柊鍚庡垹闄�
+ StreamPushItem streamPushItem = streamPushService.getPush(app, streamId);
+ if (streamPushItem != null) {
+ type = "PUSH";
+ }else {
+ StreamProxyItem streamProxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(app, streamId);
+ if (streamProxyByAppAndStream != null) {
+ type = "PULL";
+ }
+ }
+ zlmMediaListManager.removeMedia(app, streamId);
+ redisCatchStorage.removeStream(mediaServerItem.getId(), type, app, streamId);
}
- zlmMediaListManager.removeMedia(app, streamId);
- redisCatchStorage.removeStream(mediaServerItem, type, app, streamId);
+ // 鍙戦�佹祦鍙樺寲redis娑堟伅
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put("serverId", userSetup.getServerId());
+ jsonObject.put("app", app);
+ jsonObject.put("stream", streamId);
+ jsonObject.put("register", regist);
+ jsonObject.put("mediaServerId", mediaServerId);
+ redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
}
-
- // 鍙戦�佹祦鍙樺寲redis娑堟伅
- JSONObject jsonObject = new JSONObject();
- jsonObject.put("serverId", userSetup.getServerId());
- jsonObject.put("app", app);
- jsonObject.put("stream", streamId);
- jsonObject.put("register", regist);
- jsonObject.put("mediaServerId", mediaServerId);
- redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
}
}
}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java
index f185d82..5b7ba1c 100644
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java
@@ -141,7 +141,6 @@
}else {
gbStreamMapper.add(transform);
}
-
}
}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java
index 5555617..4315c8d 100644
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java
@@ -4,6 +4,7 @@
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.conf.MediaConfig;
+import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IStreamProxyService;
@@ -17,6 +18,7 @@
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
+import org.springframework.util.StringUtils;
import java.util.*;
@@ -36,6 +38,9 @@
@Autowired
private IStreamProxyService streamProxyService;
+
+ @Autowired
+ private EventPublisher publisher;
@Autowired
private IMediaServerService mediaServerService;
@@ -117,7 +122,7 @@
@Async
public void connectZlmServer(MediaServerItem mediaServerItem){
- ZLMServerConfig zlmServerConfig = getMediaServerConfig(mediaServerItem);
+ ZLMServerConfig zlmServerConfig = getMediaServerConfig(mediaServerItem, 1);
if (zlmServerConfig != null) {
zlmServerConfig.setIp(mediaServerItem.getIp());
zlmServerConfig.setHttpPort(mediaServerItem.getHttpPort());
@@ -126,7 +131,7 @@
}
}
- public ZLMServerConfig getMediaServerConfig(MediaServerItem mediaServerItem) {
+ public ZLMServerConfig getMediaServerConfig(MediaServerItem mediaServerItem, int index) {
if (startGetMedia == null) { return null;}
if (!mediaServerItem.isDefaultServer() && mediaServerService.getOne(mediaServerItem.getId()) == null) {
return null;
@@ -143,14 +148,19 @@
ZLMServerConfig.setIp(mediaServerItem.getIp());
}
} else {
- logger.error("[ {} ]-[ {}:{} ]涓诲姩杩炴帴澶辫触澶辫触, 2s鍚庨噸璇�",
- mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort());
+ logger.error("[ {} ]-[ {}:{} ]绗瑊}娆′富鍔ㄨ繛鎺ュけ璐�, 2s鍚庨噸璇�",
+ mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort(), index);
+ if (index == 1 && !StringUtils.isEmpty(mediaServerItem.getId())) {
+ logger.info("[ {} ]-[ {}:{} ]绗瑊}娆′富鍔ㄨ繛鎺ュけ璐�, 寮�濮嬫竻鐞嗙浉鍏宠祫婧�",
+ mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort(), index);
+ publisher.zlmOfflineEventPublish(mediaServerItem.getId());
+ }
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
- ZLMServerConfig = getMediaServerConfig(mediaServerItem);
+ ZLMServerConfig = getMediaServerConfig(mediaServerItem, index += 1);
}
return ZLMServerConfig;
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMKeepliveTimeoutListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMKeepliveTimeoutListener.java
new file mode 100644
index 0000000..33a251c
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMKeepliveTimeoutListener.java
@@ -0,0 +1,52 @@
+package com.genersoft.iot.vmp.media.zlm.event;
+
+import com.genersoft.iot.vmp.common.VideoManagerConstants;
+import com.genersoft.iot.vmp.conf.UserSetup;
+import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
+import org.springframework.data.redis.listener.RedisMessageListenerContainer;
+import org.springframework.stereotype.Component;
+
+/**
+ * @description:璁惧蹇冭烦瓒呮椂鐩戝惉,鍊熷姪redis杩囨湡鐗规�э紝杩涜鐩戝惉锛岀洃鍚埌璇存槑璁惧蹇冭烦瓒呮椂锛屽彂閫佺绾夸簨浠�
+ * @author: swwheihei
+ * @date: 2020骞�5鏈�6鏃� 涓婂崍11:35:46
+ */
+@Component
+public class ZLMKeepliveTimeoutListener extends KeyExpirationEventMessageListener {
+
+ private Logger logger = LoggerFactory.getLogger(ZLMKeepliveTimeoutListener.class);
+
+ @Autowired
+ private EventPublisher publisher;
+
+ @Autowired
+ private UserSetup userSetup;
+
+ public ZLMKeepliveTimeoutListener(RedisMessageListenerContainer listenerContainer) {
+ super(listenerContainer);
+ }
+
+ /**
+ * 鐩戝惉澶辨晥鐨刱ey锛宬ey鏍煎紡涓簁eeplive_deviceId
+ * @param message
+ * @param pattern
+ */
+ @Override
+ public void onMessage(Message message, byte[] pattern) {
+ // 鑾峰彇澶辨晥鐨刱ey
+ String expiredKey = message.toString();
+ String KEEPLIVEKEY_PREFIX = VideoManagerConstants.MEDIA_SERVER_KEEPALIVE_PREFIX + userSetup.getServerId() + "_";
+ if(!expiredKey.startsWith(KEEPLIVEKEY_PREFIX)){
+ return;
+ }
+
+ String mediaServerId = expiredKey.substring(KEEPLIVEKEY_PREFIX.length(),expiredKey.length());
+
+ publisher.zlmOfflineEventPublish(mediaServerId);
+ }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOfflineEvent.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOfflineEvent.java
new file mode 100644
index 0000000..8207bdd
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOfflineEvent.java
@@ -0,0 +1,11 @@
+package com.genersoft.iot.vmp.media.zlm.event;
+
+/**
+ * zlm绂荤嚎浜嬩欢绫�
+ */
+public class ZLMOfflineEvent extends ZLMEventAbstract {
+
+ public ZLMOfflineEvent(Object source) {
+ super(source);
+ }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOfflineEventListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOfflineEventListener.java
new file mode 100644
index 0000000..b713552
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOfflineEventListener.java
@@ -0,0 +1,44 @@
+package com.genersoft.iot.vmp.media.zlm.event;
+
+import com.genersoft.iot.vmp.conf.UserSetup;
+import com.genersoft.iot.vmp.service.IMediaServerService;
+import com.genersoft.iot.vmp.service.IStreamProxyService;
+import com.genersoft.iot.vmp.service.IStreamPushService;
+import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
+import com.genersoft.iot.vmp.utils.redis.RedisUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationListener;
+import org.springframework.stereotype.Component;
+
+/**
+ *
+ */
+@Component
+public class ZLMOfflineEventListener implements ApplicationListener<ZLMOfflineEvent> {
+
+ private final static Logger logger = LoggerFactory.getLogger(ZLMOfflineEventListener.class);
+
+ @Autowired
+ private IMediaServerService mediaServerService;
+
+ @Autowired
+ private IStreamPushService streamPushService;
+
+ @Autowired
+ private IStreamProxyService streamProxyService;
+
+ @Override
+ public void onApplicationEvent(ZLMOfflineEvent event) {
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("ZLM绂荤嚎浜嬩欢瑙﹀彂锛孖D锛�" + event.getMediaServerId());
+ }
+ // 澶勭悊ZLM绂荤嚎
+ mediaServerService.zlmServerOffline(event.getMediaServerId());
+ streamProxyService.zlmServerOffline(event.getMediaServerId());
+ streamPushService.zlmServerOffline(event.getMediaServerId());
+ // TODO 澶勭悊瀵瑰浗鏍囩殑褰卞搷
+ }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOnlineEvent.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOnlineEvent.java
new file mode 100644
index 0000000..612ff9d
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOnlineEvent.java
@@ -0,0 +1,11 @@
+package com.genersoft.iot.vmp.media.zlm.event;
+
+/**
+ * zlm鍦ㄧ嚎浜嬩欢
+ */
+public class ZLMOnlineEvent extends ZLMEventAbstract {
+
+ public ZLMOnlineEvent(Object source) {
+ super(source);
+ }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOnlineEventListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOnlineEventListener.java
new file mode 100644
index 0000000..5731ea0
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOnlineEventListener.java
@@ -0,0 +1,65 @@
+package com.genersoft.iot.vmp.media.zlm.event;
+
+import com.genersoft.iot.vmp.conf.SipConfig;
+import com.genersoft.iot.vmp.conf.UserSetup;
+import com.genersoft.iot.vmp.service.IMediaServerService;
+import com.genersoft.iot.vmp.service.IStreamProxyService;
+import com.genersoft.iot.vmp.service.IStreamPushService;
+import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
+import com.genersoft.iot.vmp.utils.redis.RedisUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationListener;
+import org.springframework.stereotype.Component;
+
+import java.text.SimpleDateFormat;
+
+/**
+ * @description: 鍦ㄧ嚎浜嬩欢鐩戝惉鍣紝鐩戝惉鍒扮绾垮悗锛屼慨鏀硅澶囩鍦ㄧ嚎鐘舵�併�� 璁惧鍦ㄧ嚎鏈変袱涓潵婧愶細
+ * 1銆佽澶囦富鍔ㄦ敞閿�锛屽彂閫佹敞閿�鎸囦护
+ * 2銆佽澶囨湭鐭ュ師鍥犵绾匡紝蹇冭烦瓒呮椂
+ * @author: swwheihei
+ * @date: 2020骞�5鏈�6鏃� 涓嬪崍1:51:23
+ */
+@Component
+public class ZLMOnlineEventListener implements ApplicationListener<ZLMOnlineEvent> {
+
+ private final static Logger logger = LoggerFactory.getLogger(ZLMOnlineEventListener.class);
+
+ @Autowired
+ private IVideoManagerStorager storager;
+
+ @Autowired
+ private RedisUtil redis;
+
+ @Autowired
+ private SipConfig sipConfig;
+
+ @Autowired
+ private UserSetup userSetup;
+
+ @Autowired
+ private IMediaServerService mediaServerService;
+
+ @Autowired
+ private IStreamPushService streamPushService;
+
+ @Autowired
+ private IStreamProxyService streamProxyService;
+
+ private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+ @Override
+ public void onApplicationEvent(ZLMOnlineEvent event) {
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("ZLM涓婄嚎浜嬩欢瑙﹀彂锛孖D锛�" + event.getMediaServerId());
+ }
+ streamPushService.zlmServerOnline(event.getMediaServerId());
+ streamProxyService.zlmServerOnline(event.getMediaServerId());
+
+
+
+ }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java
index 618b824..40b2c9a 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java
@@ -78,10 +78,10 @@
/**
* 鏂扮殑鑺傜偣鍔犲叆
- * @param zlmServerConfig
+ * @param mediaServerId
* @return
*/
- void zlmServerOnline(ZLMServerConfig zlmServerConfig);
+ void zlmServerOnline(String mediaServerId);
/**
* 鑺傜偣绂荤嚎
@@ -89,4 +89,6 @@
* @return
*/
void zlmServerOffline(String mediaServerId);
+
+ void clean();
}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java
index d8a4465..d228a1a 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java
@@ -34,6 +34,7 @@
* @return
*/
PageInfo<StreamPushItem> getPushList(Integer page, Integer count);
+ List<StreamPushItem> getPushList(String mediaSererId);
StreamPushItem transform(MediaItem item);
@@ -49,10 +50,10 @@
/**
* 鏂扮殑鑺傜偣鍔犲叆
- * @param zlmServerConfig
+ * @param mediaServerId
* @return
*/
- void zlmServerOnline(ZLMServerConfig zlmServerConfig);
+ void zlmServerOnline(String mediaServerId);
/**
* 鑺傜偣绂荤嚎
@@ -61,4 +62,5 @@
*/
void zlmServerOffline(String mediaServerId);
+ void clean();
}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
index e02bd3f..7d9f748 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
@@ -4,10 +4,10 @@
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
-import com.genersoft.iot.vmp.conf.MediaConfig;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetup;
import com.genersoft.iot.vmp.gb28181.bean.Device;
+import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.session.SsrcConfig;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
@@ -69,6 +69,9 @@
@Autowired
private RedisUtil redisUtil;
+
+ @Autowired
+ private EventPublisher publisher;
@Autowired
JedisUtil jedisUtil;
@@ -312,8 +315,6 @@
return mediaServerMapper.update(mediaSerItem);
}
-
-
/**
* 澶勭悊zlm涓婄嚎
* @param zlmServerConfig zlm涓婄嚎鎼哄甫鐨勫弬鏁�
@@ -353,27 +354,30 @@
if (serverItem.getRtpProxyPort() == 0) {
serverItem.setRtpProxyPort(zlmServerConfig.getRtpProxyPort());
}
- if (StringUtils.isEmpty(serverItem.getId())) {
- serverItem.setId(zlmServerConfig.getGeneralMediaServerId());
- }
serverItem.setStatus(true);
+
if (StringUtils.isEmpty(serverItem.getId())) {
serverItem.setId(zlmServerConfig.getGeneralMediaServerId());
mediaServerMapper.updateByHostAndPort(serverItem);
}else {
mediaServerMapper.update(serverItem);
}
- String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetup.getServerId() + "_" + serverItem.getId();
+ String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetup.getServerId() + "_" + zlmServerConfig.getGeneralMediaServerId();
if (redisUtil.get(key) == null) {
- SsrcConfig ssrcConfig = new SsrcConfig(serverItem.getId(), null, sipConfig.getDomain());
+ SsrcConfig ssrcConfig = new SsrcConfig(zlmServerConfig.getGeneralMediaServerId(), null, sipConfig.getDomain());
serverItem.setSsrcConfig(ssrcConfig);
- redisUtil.set(key, serverItem);
+ }else {
+ MediaServerItem mediaServerItemInRedis = (MediaServerItem)redisUtil.get(key);
+ serverItem.setSsrcConfig(mediaServerItemInRedis.getSsrcConfig());
}
-
+ redisUtil.set(key, serverItem);
resetOnlineServerItem(serverItem);
updateMediaServerKeepalive(serverItem.getId(), null);
setZLMConfig(serverItem);
+ publisher.zlmOnlineEventPublish(serverItem.getId());
+
}
+
@Override
public void zlmServerOffline(String mediaServerId) {
@@ -567,6 +571,10 @@
@Override
public void updateMediaServerKeepalive(String mediaServerId, JSONObject data) {
MediaServerItem mediaServerItem = getOne(mediaServerId);
+ if (mediaServerItem == null) {
+ logger.warn("[鏇存柊ZLM 淇濇椿淇℃伅]澶辫触锛屾湭鎵惧埌娴佸獟浣撲俊鎭�");
+ return;
+ }
String key = VideoManagerConstants.MEDIA_SERVER_KEEPALIVE_PREFIX + userSetup.getServerId() + "_" + mediaServerId;
int hookAliveInterval = mediaServerItem.getHookAliveInterval() + 2;
redisUtil.set(key, data, hookAliveInterval);
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
index 907893d..19bf13a 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
@@ -2,6 +2,7 @@
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
+import com.genersoft.iot.vmp.conf.UserSetup;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
@@ -28,8 +29,7 @@
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.*;
/**
* 瑙嗛浠g悊涓氬姟
@@ -53,6 +53,9 @@
@Autowired
private IRedisCatchStorage redisCatchStorage;
+
+ @Autowired
+ private UserSetup userSetup;
@Autowired
private GbStreamMapper gbStreamMapper;
@@ -160,6 +163,9 @@
}else {
mediaServerItem = mediaServerService.getOne(param.getMediaServerId());
}
+ if (mediaServerItem == null) {
+ return null;
+ }
if ("default".equals(param.getType())){
result = zlmresTfulUtils.addStreamProxy(mediaServerItem, param.getApp(), param.getStream(), param.getUrl(),
param.isEnable_hls(), param.isEnable_mp4(), param.getRtp_type());
@@ -244,7 +250,6 @@
}
}
}
-
return result;
}
@@ -255,18 +260,41 @@
}
@Override
- public void zlmServerOnline(ZLMServerConfig zlmServerConfig) {
-
+ public void zlmServerOnline(String mediaServerId) {
+ zlmServerOffline(mediaServerId);
}
@Override
public void zlmServerOffline(String mediaServerId) {
// 绉婚櫎寮�鍚簡鏃犱汉瑙傜湅鑷姩绉婚櫎鐨勬祦
+ List<StreamProxyItem> streamProxyItemList = streamProxyMapper.selecAutoRemoveItemByMediaServerId(mediaServerId);
+ if (streamProxyItemList.size() > 0) {
+ gbStreamMapper.batchDel(streamProxyItemList);
+ }
streamProxyMapper.deleteAutoRemoveItemByMediaServerId(mediaServerId);
// 鍏朵粬鐨勬祦璁剧疆鏈惎鐢�
streamProxyMapper.updateStatus(false, mediaServerId);
- // 绉婚櫎redis鍐呮祦鐨勪俊鎭�
- redisCatchStorage.removeStream(mediaServerId, "PULL");
+ String type = "PULL";
+
+ // 鍙戦�乺edis娑堟伅
+ List<StreamInfo> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type);
+ if (streamInfoList.size() > 0) {
+ for (StreamInfo streamInfo : streamInfoList) {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put("serverId", userSetup.getServerId());
+ jsonObject.put("app", streamInfo.getApp());
+ jsonObject.put("stream", streamInfo.getStreamId());
+ jsonObject.put("register", false);
+ jsonObject.put("mediaServerId", mediaServerId);
+ redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
+ // 绉婚櫎redis鍐呮祦鐨勪俊鎭�
+ redisCatchStorage.removeStream(mediaServerId, type, streamInfo.getApp(), streamInfo.getStreamId());
+ }
+ }
+ }
+
+ @Override
+ public void clean() {
}
}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
index 7c17c2a..dcca0e5 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
@@ -3,11 +3,15 @@
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
+import com.genersoft.iot.vmp.common.StreamInfo;
+import com.genersoft.iot.vmp.conf.UserSetup;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
+import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig;
import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
+import com.genersoft.iot.vmp.media.zlm.dto.OriginType;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IStreamPushService;
@@ -20,10 +24,7 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
@Service
public class StreamPushServiceImpl implements IStreamPushService {
@@ -44,6 +45,9 @@
private IRedisCatchStorage redisCatchStorage;
@Autowired
+ private UserSetup userSetup;
+
+ @Autowired
private IMediaServerService mediaServerService;
@Override
@@ -56,7 +60,9 @@
for (MediaItem item : mediaItems) {
// 涓嶄繚瀛樺浗鏍囨帹鐞嗕互鍙婃媺娴佷唬鐞嗙殑娴�
- if (item.getOriginType() == 1 || item.getOriginType() == 2 || item.getOriginType() == 8) {
+ if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal()
+ || item.getOriginType() == OriginType.RTMP_PUSH.ordinal()
+ || item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) {
String key = item.getApp() + "_" + item.getStream();
StreamPushItem streamPushItem = result.get(key);
if (streamPushItem == null) {
@@ -95,6 +101,11 @@
PageHelper.startPage(page, count);
List<StreamPushItem> all = streamPushMapper.selectAll();
return new PageInfo<>(all);
+ }
+
+ @Override
+ public List<StreamPushItem> getPushList(String mediaServerId) {
+ return streamPushMapper.selectAllByMediaServerId(mediaServerId);
}
@Override
@@ -137,17 +148,84 @@
}
@Override
- public void zlmServerOnline(ZLMServerConfig zlmServerConfig) {
- // 浼间箮娌″暐闇�瑕佸仛鐨�
+ public void zlmServerOnline(String mediaServerId) {
+ // 鍚屾zlm鎺ㄦ祦淇℃伅
+ MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
+ if (mediaServerItem == null) {
+ return;
+ }
+ List<StreamPushItem> pushList = getPushList(mediaServerId);
+ if (pushList.size() > 0) {
+ Map<String, StreamPushItem> pushItemMap = new HashMap<>();
+ for (StreamPushItem streamPushItem : pushList) {
+ pushItemMap.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem);
+ }
+ zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{
+ if (mediaList == null) return;
+ String dataStr = mediaList.getString("data");
+
+ Integer code = mediaList.getInteger("code");
+ List<StreamPushItem> streamPushItems = null;
+ if (code == 0 ) {
+ if (dataStr != null) {
+ streamPushItems = handleJSON(dataStr, mediaServerItem);
+ }
+ }
+
+ if (streamPushItems != null) {
+ for (StreamPushItem streamPushItem : streamPushItems) {
+ pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
+ }
+ }
+ Collection<StreamPushItem> offlinePushItems = pushItemMap.values();
+ if (offlinePushItems.size() > 0) {
+ String type = "PUSH";
+ streamPushMapper.delAll(new ArrayList<>(offlinePushItems));
+ for (StreamPushItem offlinePushItem : offlinePushItems) {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put("serverId", userSetup.getServerId());
+ jsonObject.put("app", offlinePushItem.getApp());
+ jsonObject.put("stream", offlinePushItem.getStream());
+ jsonObject.put("register", false);
+ jsonObject.put("mediaServerId", mediaServerId);
+ redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
+ // 绉婚櫎redis鍐呮祦鐨勪俊鎭�
+ redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlinePushItem.getApp(), offlinePushItem.getStream());
+ }
+ }
+ }));
+ }
}
@Override
public void zlmServerOffline(String mediaServerId) {
- // 绉婚櫎娌℃湁serverId鐨勬帹娴�
+ List<StreamPushItem> streamPushItems = streamPushMapper.selectAllByMediaServerId(mediaServerId);
+ // 绉婚櫎娌℃湁GBId鐨勬帹娴�
streamPushMapper.deleteWithoutGBId(mediaServerId);
+ gbStreamMapper.deleteWithoutGBId("push", mediaServerId);
// 鍏朵粬鐨勬祦璁剧疆鏈惎鐢�
gbStreamMapper.updateStatusByMediaServerId(mediaServerId, false);
- // 绉婚櫎redis鍐呮祦鐨勪俊鎭�
- redisCatchStorage.removeStream(mediaServerId, "PUSH");
+ // 鍙戦�佹祦鍋滄娑堟伅
+ String type = "PUSH";
+ // 鍙戦�乺edis娑堟伅
+ List<StreamInfo> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type);
+ if (streamInfoList.size() > 0) {
+ for (StreamInfo streamInfo : streamInfoList) {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put("serverId", userSetup.getServerId());
+ jsonObject.put("app", streamInfo.getApp());
+ jsonObject.put("stream", streamInfo.getStreamId());
+ jsonObject.put("register", false);
+ jsonObject.put("mediaServerId", mediaServerId);
+ redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
+ // 绉婚櫎redis鍐呮祦鐨勪俊鎭�
+ redisCatchStorage.removeStream(mediaServerId, type, streamInfo.getApp(), streamInfo.getStreamId());
+ }
+ }
+ }
+
+ @Override
+ public void clean() {
+
}
}
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
index 0803dd6..4f240d8 100644
--- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
@@ -140,11 +140,11 @@
/**
* 绉婚櫎娴佷俊鎭粠redis
- * @param mediaServerItem
+ * @param mediaServerId
* @param app
* @param streamId
*/
- void removeStream(MediaServerItem mediaServerItem, String type, String app, String streamId);
+ void removeStream(String mediaServerId, String type, String app, String streamId);
/**
@@ -167,4 +167,6 @@
* @return
*/
ThirdPartyGB queryMemberNoGBId(String queryKey);
+
+ List<StreamInfo> getStreams(String mediaServerId, String pull);
}
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java
index f159925..9757b13 100644
--- a/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java
@@ -65,4 +65,18 @@
"SET status=${status} " +
"WHERE mediaServerId=#{mediaServerId} ")
void updateStatusByMediaServerId(String mediaServerId, boolean status);
+
+ @Select("SELECT * FROM gb_stream WHERE mediaServerId=#{mediaServerId}")
+ void delByMediaServerId(String mediaServerId);
+
+ @Delete("DELETE FROM gb_stream WHERE streamType=#{type} AND gbId=NULL AND mediaServerId=#{mediaServerId}")
+ void deleteWithoutGBId(String type, String mediaServerId);
+
+ @Delete("<script> "+
+ "DELETE FROM gb_stream where " +
+ "<foreach collection='streamProxyItemList' item='item' separator='or'>" +
+ "(app=#{item.app} and stream=#{item.stream}) " +
+ "</foreach>" +
+ "</script>")
+ void batchDel(List<StreamProxyItem> streamProxyItemList);
}
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java
index 82520ec..b6e1ba1 100644
--- a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java
@@ -62,6 +62,9 @@
"WHERE mediaServerId=#{mediaServerId}")
void updateStatus(boolean status, String mediaServerId);
- @Delete("DELETE FROM stream_proxy WHERE mediaServerId=#{mediaServerId}")
+ @Delete("DELETE FROM stream_proxy WHERE enable_remove_none_reader=true AND mediaServerId=#{mediaServerId}")
void deleteAutoRemoveItemByMediaServerId(String mediaServerId);
+
+ @Select("SELECT st.*, pgs.gbId, pgs.name, pgs.longitude, pgs.latitude FROM stream_proxy st LEFT JOIN gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream WHERE st.enable_remove_none_reader=true AND st.mediaServerId=#{mediaServerId} order by st.createTime desc")
+ List<StreamProxyItem> selecAutoRemoveItemByMediaServerId(String mediaServerId);
}
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java
index 9fe6ebf..c5b22f8 100644
--- a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java
@@ -4,6 +4,7 @@
import org.apache.ibatis.annotations.*;
import org.springframework.stereotype.Repository;
+import java.util.Collection;
import java.util.List;
@Mapper
@@ -31,6 +32,14 @@
@Delete("DELETE FROM stream_push WHERE app=#{app} AND stream=#{stream}")
int del(String app, String stream);
+ @Delete("<script> "+
+ "DELETE FROM stream_push where " +
+ "<foreach collection='streamPushItems' item='item' separator='or'>" +
+ "(app=#{item.app} and stream=#{item.stream}) " +
+ "</foreach>" +
+ "</script>")
+ int delAll(List<StreamPushItem> streamPushItems);
+
@Select("SELECT st.*, pgs.gbId, pgs.status, pgs.name, pgs.longitude, pgs.latitude FROM stream_push st LEFT JOIN gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream")
List<StreamPushItem> selectAll();
@@ -56,4 +65,7 @@
@Delete("DELETE FROM stream_push WHERE mediaServerId=#{mediaServerId}")
void deleteWithoutGBId(String mediaServerId);
+ @Select("SELECT * FROM stream_push WHERE mediaServerId=#{mediaServerId}")
+ List<StreamPushItem> selectAllByMediaServerId(String mediaServerId);
+
}
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
index 6adc05d..af9a206 100644
--- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -338,8 +338,8 @@
}
@Override
- public void removeStream(MediaServerItem mediaServerItem, String type, String app, String streamId) {
- String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetup.getServerId() + "_" + type + "_" + app + "_" + streamId + "_" + mediaServerItem.getId();
+ public void removeStream(String mediaServerId, String type, String app, String streamId) {
+ String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetup.getServerId() + "_" + type + "_" + app + "_" + streamId + "_" + mediaServerId;
redis.del(key);
}
@@ -365,4 +365,16 @@
redis.del((String) stream);
}
}
+
+ @Override
+ public List<StreamInfo> getStreams(String mediaServerId, String type) {
+ List<StreamInfo> result = new ArrayList<>();
+ String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetup.getServerId() + "_" + type + "_*_*_" + mediaServerId;
+ List<Object> streams = redis.scan(key);
+ for (Object stream : streams) {
+ StreamInfo streamInfo = (StreamInfo)redis.get((String) stream);
+ result.add(streamInfo);
+ }
+ return result;
+ }
}
--
Gitblit v1.8.0