src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
@@ -91,6 +91,10 @@ * 接收推流设备的GPS变化通知 */ public static final String VM_MSG_PUSH_STREAM_STATUS_CHANGE = "VM_MSG_PUSH_STREAM_STATUS_CHANGE"; /** * 接收推流设备列表更新变化通知 */ public static final String VM_MSG_PUSH_STREAM_LIST_CHANGE = "VM_MSG_PUSH_STREAM_LIST_CHANGE"; /** * redis 消息通知设备推流到平台 src/main/java/com/genersoft/iot/vmp/conf/RedisConfig.java
@@ -43,6 +43,9 @@ @Autowired private RedisPushStreamStatusMsgListener redisPushStreamStatusMsgListener; @Autowired private RedisPushStreamListMsgListener redisPushStreamListMsgListener; @Bean public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>(); @@ -80,6 +83,7 @@ container.addMessageListener(redisStreamMsgListener, new PatternTopic(VideoManagerConstants.WVP_MSG_STREAM_CHANGE_PREFIX + "PUSH")); container.addMessageListener(redisGbPlayMsgListener, new PatternTopic(RedisGbPlayMsgListener.WVP_PUSH_STREAM_KEY)); container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE)); container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE)); return container; } src/main/java/com/genersoft/iot/vmp/service/IGbStreamService.java
@@ -3,6 +3,7 @@ import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.GbStream; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.github.pagehelper.PageInfo; import java.util.List; @@ -45,4 +46,11 @@ void sendCatalogMsg(GbStream gbStream, String type); void sendCatalogMsgs(List<GbStream> gbStreams, String type); /** * 修改gbId或name * @param streamPushItemForUpdate * @return */ int updateGbIdOrName(List<StreamPushItem> streamPushItemForUpdate); } src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java
@@ -100,4 +100,10 @@ * 增加推流 */ boolean add(StreamPushItem stream); /** * 获取全部的app+Streanm 用于判断推流列表是新增还是修改 * @return */ List<String> getAllAppAndStream(); } src/main/java/com/genersoft/iot/vmp/service/impl/GbStreamServiceImpl.java
@@ -1,10 +1,9 @@ package com.genersoft.iot.vmp.service.impl; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.genersoft.iot.vmp.storager.dao.GbStreamMapper; import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper; import com.genersoft.iot.vmp.storager.dao.PlatformCatalogMapper; @@ -183,4 +182,9 @@ } } } @Override public int updateGbIdOrName(List<StreamPushItem> streamPushItemForUpdate) { return gbStreamMapper.updateGbIdOrName(streamPushItemForUpdate); } } src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamListMsgListener.java
New file @@ -0,0 +1,83 @@ package com.genersoft.iot.vmp.service.impl; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.genersoft.iot.vmp.service.IGbStreamService; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IStreamPushService; import com.genersoft.iot.vmp.utils.DateUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.*; /** * @Auther: JiangFeng * @Date: 2022/8/16 11:32 * @Description: 接收redis发送的推流设备列表更新通知 */ @Component public class RedisPushStreamListMsgListener implements MessageListener { private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamListMsgListener.class); @Resource private IMediaServerService mediaServerService; @Resource private IStreamPushService streamPushService; @Resource private IGbStreamService gbStreamService; @Override public void onMessage(Message message, byte[] bytes) { // logger.warn("[REDIS消息-推流设备列表更新]: {}", new String(message.getBody())); List<StreamPushItem> streamPushItems = JSON.parseArray(new String(message.getBody()), StreamPushItem.class); //查询全部的app+stream 用于判断是添加还是修改 List<String> allAppAndStream = streamPushService.getAllAppAndStream(); /** * 用于存储更具APP+Stream过滤后的数据,可以直接存入stream_push表与gb_stream表 */ List<StreamPushItem> streamPushItemForSave = new ArrayList<>(); List<StreamPushItem> streamPushItemForUpdate = new ArrayList<>(); for (StreamPushItem streamPushItem : streamPushItems) { String app = streamPushItem.getApp(); String stream = streamPushItem.getStream(); boolean contains = allAppAndStream.contains(app + stream); //不存在就添加 if (!contains) { streamPushItem.setStatus(false); streamPushItem.setStreamType("push"); streamPushItem.setCreateTime(DateUtil.getNow()); streamPushItem.setMediaServerId(mediaServerService.getDefaultMediaServer().getId()); streamPushItem.setOriginType(2); streamPushItem.setOriginTypeStr("rtsp_push"); streamPushItem.setTotalReaderCount("0"); streamPushItemForSave.add(streamPushItem); } else { //存在就只修改 name和gbId streamPushItemForUpdate.add(streamPushItem); } } if (streamPushItemForSave.size() > 0) { logger.info("添加{}条",streamPushItemForSave.size()); logger.info(JSONObject.toJSONString(streamPushItemForSave)); streamPushService.batchAdd(streamPushItemForSave); } if(streamPushItemForUpdate.size()>0){ logger.info("修改{}条",streamPushItemForUpdate.size()); logger.info(JSONObject.toJSONString(streamPushItemForUpdate)); gbStreamService.updateGbIdOrName(streamPushItemForUpdate); } } } src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
@@ -340,6 +340,7 @@ gbStreamMapper.batchAdd(streamPushItems); } @Override public void batchAddForUpload(List<StreamPushItem> streamPushItems, Map<String, List<String[]>> streamPushItemsForAll ) { // 存储数据到stream_push表 @@ -503,4 +504,9 @@ } return result; } @Override public List<String> getAllAppAndStream() { return streamPushMapper.getAllAppAndStream(); } } src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java
@@ -148,4 +148,14 @@ "SET mediaServerId=#{mediaServerId}" + "WHERE app=#{app} AND stream=#{stream}") void updateMediaServer(String app, String stream, String mediaServerId); @Update("<script> "+ " <foreach collection='list' item='item' index='index' separator=';'>"+ "UPDATE gb_stream " + " SET name=#{item.name},"+ " gbId=#{item.gbId}"+ " WHERE app=#{item.app} and stream=#{item.stream}"+ "</foreach>"+ "</script>") int updateGbIdOrName(List<StreamPushItem> streamPushItemForUpdate); } src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java
@@ -168,4 +168,7 @@ @Update("UPDATE stream_push SET status=0") void setAllStreamOffline(); @Select("SELECT CONCAT(app,stream) FROM gb_stream") List<String> getAllAppAndStream(); }