From 8f77d0c25cdd37d4cc96c923b46ae28607bae51d Mon Sep 17 00:00:00 2001
From: jiang <893224616@qq.com>
Date: 星期四, 18 八月 2022 16:17:23 +0800
Subject: [PATCH] 根据redis消息更新推流列表
---
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java | 6 ++
src/main/java/com/genersoft/iot/vmp/service/IGbStreamService.java | 8 ++
src/main/java/com/genersoft/iot/vmp/conf/RedisConfig.java | 4 +
src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java | 3 +
src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java | 10 +++
src/main/java/com/genersoft/iot/vmp/service/impl/GbStreamServiceImpl.java | 8 ++
src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamListMsgListener.java | 83 +++++++++++++++++++++++++++
src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java | 4 +
src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java | 6 ++
9 files changed, 130 insertions(+), 2 deletions(-)
diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
index 510b5b2..bbbfce9 100644
--- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
+++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
@@ -91,6 +91,10 @@
* 鎺ユ敹鎺ㄦ祦璁惧鐨凣PS鍙樺寲閫氱煡
*/
public static final String VM_MSG_PUSH_STREAM_STATUS_CHANGE = "VM_MSG_PUSH_STREAM_STATUS_CHANGE";
+ /**
+ * 鎺ユ敹鎺ㄦ祦璁惧鍒楄〃鏇存柊鍙樺寲閫氱煡
+ */
+ public static final String VM_MSG_PUSH_STREAM_LIST_CHANGE = "VM_MSG_PUSH_STREAM_LIST_CHANGE";
/**
* redis 娑堟伅閫氱煡璁惧鎺ㄦ祦鍒板钩鍙�
diff --git a/src/main/java/com/genersoft/iot/vmp/conf/RedisConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/RedisConfig.java
index d2e1347..449a018 100644
--- a/src/main/java/com/genersoft/iot/vmp/conf/RedisConfig.java
+++ b/src/main/java/com/genersoft/iot/vmp/conf/RedisConfig.java
@@ -43,6 +43,9 @@
@Autowired
private RedisPushStreamStatusMsgListener redisPushStreamStatusMsgListener;
+ @Autowired
+ private RedisPushStreamListMsgListener redisPushStreamListMsgListener;
+
@Bean
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
@@ -80,6 +83,7 @@
container.addMessageListener(redisStreamMsgListener, new PatternTopic(VideoManagerConstants.WVP_MSG_STREAM_CHANGE_PREFIX + "PUSH"));
container.addMessageListener(redisGbPlayMsgListener, new PatternTopic(RedisGbPlayMsgListener.WVP_PUSH_STREAM_KEY));
container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE));
+ container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE));
return container;
}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/IGbStreamService.java b/src/main/java/com/genersoft/iot/vmp/service/IGbStreamService.java
index 0a39206..61f94c2 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/IGbStreamService.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/IGbStreamService.java
@@ -3,6 +3,7 @@
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
+import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.github.pagehelper.PageInfo;
import java.util.List;
@@ -45,4 +46,11 @@
void sendCatalogMsg(GbStream gbStream, String type);
void sendCatalogMsgs(List<GbStream> gbStreams, String type);
+
+ /**
+ * 淇敼gbId鎴杗ame
+ * @param streamPushItemForUpdate
+ * @return
+ */
+ int updateGbIdOrName(List<StreamPushItem> streamPushItemForUpdate);
}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java
index b95ec48..5dbba92 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java
@@ -100,4 +100,10 @@
* 澧炲姞鎺ㄦ祦
*/
boolean add(StreamPushItem stream);
+
+ /**
+ * 鑾峰彇鍏ㄩ儴鐨刟pp+Streanm 鐢ㄤ簬鍒ゆ柇鎺ㄦ祦鍒楄〃鏄柊澧炶繕鏄慨鏀�
+ * @return
+ */
+ List<String> getAllAppAndStream();
}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/GbStreamServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/GbStreamServiceImpl.java
index 8734882..0ce898e 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/GbStreamServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/GbStreamServiceImpl.java
@@ -1,10 +1,9 @@
package com.genersoft.iot.vmp.service.impl;
-import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
-import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
+import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper;
import com.genersoft.iot.vmp.storager.dao.PlatformCatalogMapper;
@@ -183,4 +182,9 @@
}
}
}
+
+ @Override
+ public int updateGbIdOrName(List<StreamPushItem> streamPushItemForUpdate) {
+ return gbStreamMapper.updateGbIdOrName(streamPushItemForUpdate);
+ }
}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamListMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamListMsgListener.java
new file mode 100644
index 0000000..d70ddf1
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamListMsgListener.java
@@ -0,0 +1,83 @@
+package com.genersoft.iot.vmp.service.impl;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
+import com.genersoft.iot.vmp.service.IGbStreamService;
+import com.genersoft.iot.vmp.service.IMediaServerService;
+import com.genersoft.iot.vmp.service.IStreamPushService;
+import com.genersoft.iot.vmp.utils.DateUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+import java.util.*;
+
+/**
+ * @Auther: JiangFeng
+ * @Date: 2022/8/16 11:32
+ * @Description: 鎺ユ敹redis鍙戦�佺殑鎺ㄦ祦璁惧鍒楄〃鏇存柊閫氱煡
+ */
+@Component
+public class RedisPushStreamListMsgListener implements MessageListener {
+
+ private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamListMsgListener.class);
+ @Resource
+ private IMediaServerService mediaServerService;
+
+ @Resource
+ private IStreamPushService streamPushService;
+ @Resource
+ private IGbStreamService gbStreamService;
+
+ @Override
+ public void onMessage(Message message, byte[] bytes) {
+ //
+ logger.warn("[REDIS娑堟伅-鎺ㄦ祦璁惧鍒楄〃鏇存柊]锛� {}", new String(message.getBody()));
+ List<StreamPushItem> streamPushItems = JSON.parseArray(new String(message.getBody()), StreamPushItem.class);
+ //鏌ヨ鍏ㄩ儴鐨刟pp+stream 鐢ㄤ簬鍒ゆ柇鏄坊鍔犺繕鏄慨鏀�
+ List<String> allAppAndStream = streamPushService.getAllAppAndStream();
+
+ /**
+ * 鐢ㄤ簬瀛樺偍鏇村叿APP+Stream杩囨护鍚庣殑鏁版嵁锛屽彲浠ョ洿鎺ュ瓨鍏tream_push琛ㄤ笌gb_stream琛�
+ */
+ List<StreamPushItem> streamPushItemForSave = new ArrayList<>();
+ List<StreamPushItem> streamPushItemForUpdate = new ArrayList<>();
+ for (StreamPushItem streamPushItem : streamPushItems) {
+ String app = streamPushItem.getApp();
+ String stream = streamPushItem.getStream();
+ boolean contains = allAppAndStream.contains(app + stream);
+ //涓嶅瓨鍦ㄥ氨娣诲姞
+ if (!contains) {
+ streamPushItem.setStatus(false);
+ streamPushItem.setStreamType("push");
+ streamPushItem.setCreateTime(DateUtil.getNow());
+ streamPushItem.setMediaServerId(mediaServerService.getDefaultMediaServer().getId());
+ streamPushItem.setOriginType(2);
+ streamPushItem.setOriginTypeStr("rtsp_push");
+ streamPushItem.setTotalReaderCount("0");
+ streamPushItemForSave.add(streamPushItem);
+ } else {
+ //瀛樺湪灏卞彧淇敼 name鍜実bId
+ streamPushItemForUpdate.add(streamPushItem);
+ }
+ }
+ if (streamPushItemForSave.size() > 0) {
+
+ logger.info("娣诲姞{}鏉�",streamPushItemForSave.size());
+ logger.info(JSONObject.toJSONString(streamPushItemForSave));
+ streamPushService.batchAdd(streamPushItemForSave);
+
+ }
+ if(streamPushItemForUpdate.size()>0){
+ logger.info("淇敼{}鏉�",streamPushItemForUpdate.size());
+ logger.info(JSONObject.toJSONString(streamPushItemForUpdate));
+ gbStreamService.updateGbIdOrName(streamPushItemForUpdate);
+ }
+
+ }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
index 6c6c04b..ed59230 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
@@ -340,6 +340,7 @@
gbStreamMapper.batchAdd(streamPushItems);
}
+
@Override
public void batchAddForUpload(List<StreamPushItem> streamPushItems, Map<String, List<String[]>> streamPushItemsForAll ) {
// 瀛樺偍鏁版嵁鍒皊tream_push琛�
@@ -503,4 +504,9 @@
}
return result;
}
+
+ @Override
+ public List<String> getAllAppAndStream() {
+ return streamPushMapper.getAllAppAndStream();
+ }
}
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java
index 7ed6b5a..df9143d 100644
--- a/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java
@@ -148,4 +148,14 @@
"SET mediaServerId=#{mediaServerId}" +
"WHERE app=#{app} AND stream=#{stream}")
void updateMediaServer(String app, String stream, String mediaServerId);
+
+ @Update("<script> "+
+ " <foreach collection='list' item='item' index='index' separator=';'>"+
+ "UPDATE gb_stream " +
+ " SET name=#{item.name},"+
+ " gbId=#{item.gbId}"+
+ " WHERE app=#{item.app} and stream=#{item.stream}"+
+ "</foreach>"+
+ "</script>")
+ int updateGbIdOrName(List<StreamPushItem> streamPushItemForUpdate);
}
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java
index b4ee81e..706de93 100644
--- a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java
@@ -168,4 +168,7 @@
@Update("UPDATE stream_push SET status=0")
void setAllStreamOffline();
+
+ @Select("SELECT CONCAT(app,stream) FROM gb_stream")
+ List<String> getAllAppAndStream();
}
--
Gitblit v1.8.0