From 8f77d0c25cdd37d4cc96c923b46ae28607bae51d Mon Sep 17 00:00:00 2001 From: jiang <893224616@qq.com> Date: 星期四, 18 八月 2022 16:17:23 +0800 Subject: [PATCH] 根据redis消息更新推流列表 --- src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java | 6 ++ src/main/java/com/genersoft/iot/vmp/service/IGbStreamService.java | 8 ++ src/main/java/com/genersoft/iot/vmp/conf/RedisConfig.java | 4 + src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java | 3 + src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java | 10 +++ src/main/java/com/genersoft/iot/vmp/service/impl/GbStreamServiceImpl.java | 8 ++ src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamListMsgListener.java | 83 +++++++++++++++++++++++++++ src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java | 4 + src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java | 6 ++ 9 files changed, 130 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java index 510b5b2..bbbfce9 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java +++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java @@ -91,6 +91,10 @@ * 鎺ユ敹鎺ㄦ祦璁惧鐨凣PS鍙樺寲閫氱煡 */ public static final String VM_MSG_PUSH_STREAM_STATUS_CHANGE = "VM_MSG_PUSH_STREAM_STATUS_CHANGE"; + /** + * 鎺ユ敹鎺ㄦ祦璁惧鍒楄〃鏇存柊鍙樺寲閫氱煡 + */ + public static final String VM_MSG_PUSH_STREAM_LIST_CHANGE = "VM_MSG_PUSH_STREAM_LIST_CHANGE"; /** * redis 娑堟伅閫氱煡璁惧鎺ㄦ祦鍒板钩鍙� diff --git a/src/main/java/com/genersoft/iot/vmp/conf/RedisConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/RedisConfig.java index d2e1347..449a018 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/RedisConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/RedisConfig.java @@ -43,6 +43,9 @@ @Autowired private RedisPushStreamStatusMsgListener redisPushStreamStatusMsgListener; + @Autowired + private RedisPushStreamListMsgListener redisPushStreamListMsgListener; + @Bean public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>(); @@ -80,6 +83,7 @@ container.addMessageListener(redisStreamMsgListener, new PatternTopic(VideoManagerConstants.WVP_MSG_STREAM_CHANGE_PREFIX + "PUSH")); container.addMessageListener(redisGbPlayMsgListener, new PatternTopic(RedisGbPlayMsgListener.WVP_PUSH_STREAM_KEY)); container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE)); + container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE)); return container; } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IGbStreamService.java b/src/main/java/com/genersoft/iot/vmp/service/IGbStreamService.java index 0a39206..61f94c2 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IGbStreamService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IGbStreamService.java @@ -3,6 +3,7 @@ import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.GbStream; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; +import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.github.pagehelper.PageInfo; import java.util.List; @@ -45,4 +46,11 @@ void sendCatalogMsg(GbStream gbStream, String type); void sendCatalogMsgs(List<GbStream> gbStreams, String type); + + /** + * 淇敼gbId鎴杗ame + * @param streamPushItemForUpdate + * @return + */ + int updateGbIdOrName(List<StreamPushItem> streamPushItemForUpdate); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java index b95ec48..5dbba92 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java @@ -100,4 +100,10 @@ * 澧炲姞鎺ㄦ祦 */ boolean add(StreamPushItem stream); + + /** + * 鑾峰彇鍏ㄩ儴鐨刟pp+Streanm 鐢ㄤ簬鍒ゆ柇鎺ㄦ祦鍒楄〃鏄柊澧炶繕鏄慨鏀� + * @return + */ + List<String> getAllAppAndStream(); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/GbStreamServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/GbStreamServiceImpl.java index 8734882..0ce898e 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/GbStreamServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/GbStreamServiceImpl.java @@ -1,10 +1,9 @@ package com.genersoft.iot.vmp.service.impl; -import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; -import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; +import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.genersoft.iot.vmp.storager.dao.GbStreamMapper; import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper; import com.genersoft.iot.vmp.storager.dao.PlatformCatalogMapper; @@ -183,4 +182,9 @@ } } } + + @Override + public int updateGbIdOrName(List<StreamPushItem> streamPushItemForUpdate) { + return gbStreamMapper.updateGbIdOrName(streamPushItemForUpdate); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamListMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamListMsgListener.java new file mode 100644 index 0000000..d70ddf1 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamListMsgListener.java @@ -0,0 +1,83 @@ +package com.genersoft.iot.vmp.service.impl; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; +import com.genersoft.iot.vmp.service.IGbStreamService; +import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.service.IStreamPushService; +import com.genersoft.iot.vmp.utils.DateUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.*; + +/** + * @Auther: JiangFeng + * @Date: 2022/8/16 11:32 + * @Description: 鎺ユ敹redis鍙戦�佺殑鎺ㄦ祦璁惧鍒楄〃鏇存柊閫氱煡 + */ +@Component +public class RedisPushStreamListMsgListener implements MessageListener { + + private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamListMsgListener.class); + @Resource + private IMediaServerService mediaServerService; + + @Resource + private IStreamPushService streamPushService; + @Resource + private IGbStreamService gbStreamService; + + @Override + public void onMessage(Message message, byte[] bytes) { + // + logger.warn("[REDIS娑堟伅-鎺ㄦ祦璁惧鍒楄〃鏇存柊]锛� {}", new String(message.getBody())); + List<StreamPushItem> streamPushItems = JSON.parseArray(new String(message.getBody()), StreamPushItem.class); + //鏌ヨ鍏ㄩ儴鐨刟pp+stream 鐢ㄤ簬鍒ゆ柇鏄坊鍔犺繕鏄慨鏀� + List<String> allAppAndStream = streamPushService.getAllAppAndStream(); + + /** + * 鐢ㄤ簬瀛樺偍鏇村叿APP+Stream杩囨护鍚庣殑鏁版嵁锛屽彲浠ョ洿鎺ュ瓨鍏tream_push琛ㄤ笌gb_stream琛� + */ + List<StreamPushItem> streamPushItemForSave = new ArrayList<>(); + List<StreamPushItem> streamPushItemForUpdate = new ArrayList<>(); + for (StreamPushItem streamPushItem : streamPushItems) { + String app = streamPushItem.getApp(); + String stream = streamPushItem.getStream(); + boolean contains = allAppAndStream.contains(app + stream); + //涓嶅瓨鍦ㄥ氨娣诲姞 + if (!contains) { + streamPushItem.setStatus(false); + streamPushItem.setStreamType("push"); + streamPushItem.setCreateTime(DateUtil.getNow()); + streamPushItem.setMediaServerId(mediaServerService.getDefaultMediaServer().getId()); + streamPushItem.setOriginType(2); + streamPushItem.setOriginTypeStr("rtsp_push"); + streamPushItem.setTotalReaderCount("0"); + streamPushItemForSave.add(streamPushItem); + } else { + //瀛樺湪灏卞彧淇敼 name鍜実bId + streamPushItemForUpdate.add(streamPushItem); + } + } + if (streamPushItemForSave.size() > 0) { + + logger.info("娣诲姞{}鏉�",streamPushItemForSave.size()); + logger.info(JSONObject.toJSONString(streamPushItemForSave)); + streamPushService.batchAdd(streamPushItemForSave); + + } + if(streamPushItemForUpdate.size()>0){ + logger.info("淇敼{}鏉�",streamPushItemForUpdate.size()); + logger.info(JSONObject.toJSONString(streamPushItemForUpdate)); + gbStreamService.updateGbIdOrName(streamPushItemForUpdate); + } + + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java index 6c6c04b..ed59230 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java @@ -340,6 +340,7 @@ gbStreamMapper.batchAdd(streamPushItems); } + @Override public void batchAddForUpload(List<StreamPushItem> streamPushItems, Map<String, List<String[]>> streamPushItemsForAll ) { // 瀛樺偍鏁版嵁鍒皊tream_push琛� @@ -503,4 +504,9 @@ } return result; } + + @Override + public List<String> getAllAppAndStream() { + return streamPushMapper.getAllAppAndStream(); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java index 7ed6b5a..df9143d 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java @@ -148,4 +148,14 @@ "SET mediaServerId=#{mediaServerId}" + "WHERE app=#{app} AND stream=#{stream}") void updateMediaServer(String app, String stream, String mediaServerId); + + @Update("<script> "+ + " <foreach collection='list' item='item' index='index' separator=';'>"+ + "UPDATE gb_stream " + + " SET name=#{item.name},"+ + " gbId=#{item.gbId}"+ + " WHERE app=#{item.app} and stream=#{item.stream}"+ + "</foreach>"+ + "</script>") + int updateGbIdOrName(List<StreamPushItem> streamPushItemForUpdate); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java index b4ee81e..706de93 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java @@ -168,4 +168,7 @@ @Update("UPDATE stream_push SET status=0") void setAllStreamOffline(); + + @Select("SELECT CONCAT(app,stream) FROM gb_stream") + List<String> getAllAppAndStream(); } -- Gitblit v1.8.0