jiang
2022-08-18 8f77d0c25cdd37d4cc96c923b46ae28607bae51d
根据redis消息更新推流列表
8个文件已修改
1个文件已添加
132 ■■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/conf/RedisConfig.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/IGbStreamService.java 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/GbStreamServiceImpl.java 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamListMsgListener.java 83 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
@@ -91,6 +91,10 @@
     * 接收推流设备的GPS变化通知
     */
    public static final String VM_MSG_PUSH_STREAM_STATUS_CHANGE = "VM_MSG_PUSH_STREAM_STATUS_CHANGE";
    /**
     * 接收推流设备列表更新变化通知
     */
    public static final String VM_MSG_PUSH_STREAM_LIST_CHANGE = "VM_MSG_PUSH_STREAM_LIST_CHANGE";
    /**
     * redis 消息通知设备推流到平台
src/main/java/com/genersoft/iot/vmp/conf/RedisConfig.java
@@ -43,6 +43,9 @@
    @Autowired
    private RedisPushStreamStatusMsgListener redisPushStreamStatusMsgListener;
    @Autowired
    private RedisPushStreamListMsgListener redisPushStreamListMsgListener;
    @Bean
    public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
@@ -80,6 +83,7 @@
        container.addMessageListener(redisStreamMsgListener, new PatternTopic(VideoManagerConstants.WVP_MSG_STREAM_CHANGE_PREFIX + "PUSH"));
        container.addMessageListener(redisGbPlayMsgListener, new PatternTopic(RedisGbPlayMsgListener.WVP_PUSH_STREAM_KEY));
        container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE));
        container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE));
        return container;
    }
src/main/java/com/genersoft/iot/vmp/service/IGbStreamService.java
@@ -3,6 +3,7 @@
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.github.pagehelper.PageInfo;
import java.util.List;
@@ -45,4 +46,11 @@
    void sendCatalogMsg(GbStream gbStream, String type);
    void sendCatalogMsgs(List<GbStream> gbStreams, String type);
    /**
     * 修改gbId或name
     * @param streamPushItemForUpdate
     * @return
     */
    int updateGbIdOrName(List<StreamPushItem> streamPushItemForUpdate);
}
src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java
@@ -100,4 +100,10 @@
     * 增加推流
     */
    boolean add(StreamPushItem stream);
    /**
     * 获取全部的app+Streanm 用于判断推流列表是新增还是修改
     * @return
     */
    List<String> getAllAppAndStream();
}
src/main/java/com/genersoft/iot/vmp/service/impl/GbStreamServiceImpl.java
@@ -1,10 +1,9 @@
package com.genersoft.iot.vmp.service.impl;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper;
import com.genersoft.iot.vmp.storager.dao.PlatformCatalogMapper;
@@ -183,4 +182,9 @@
            }
        }
    }
    @Override
    public int updateGbIdOrName(List<StreamPushItem> streamPushItemForUpdate) {
        return gbStreamMapper.updateGbIdOrName(streamPushItemForUpdate);
    }
}
src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamListMsgListener.java
New file
@@ -0,0 +1,83 @@
package com.genersoft.iot.vmp.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.IGbStreamService;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.utils.DateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.*;
/**
 * @Auther: JiangFeng
 * @Date: 2022/8/16 11:32
 * @Description: 接收redis发送的推流设备列表更新通知
 */
@Component
public class RedisPushStreamListMsgListener implements MessageListener {
    private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamListMsgListener.class);
    @Resource
    private IMediaServerService mediaServerService;
    @Resource
    private IStreamPushService streamPushService;
    @Resource
    private IGbStreamService gbStreamService;
    @Override
    public void onMessage(Message message, byte[] bytes) {
        //
        logger.warn("[REDIS消息-推流设备列表更新]: {}", new String(message.getBody()));
        List<StreamPushItem> streamPushItems = JSON.parseArray(new String(message.getBody()), StreamPushItem.class);
        //查询全部的app+stream 用于判断是添加还是修改
        List<String> allAppAndStream = streamPushService.getAllAppAndStream();
        /**
         * 用于存储更具APP+Stream过滤后的数据,可以直接存入stream_push表与gb_stream表
         */
        List<StreamPushItem> streamPushItemForSave = new ArrayList<>();
        List<StreamPushItem> streamPushItemForUpdate = new ArrayList<>();
        for (StreamPushItem streamPushItem : streamPushItems) {
            String app = streamPushItem.getApp();
            String stream = streamPushItem.getStream();
            boolean contains = allAppAndStream.contains(app + stream);
            //不存在就添加
            if (!contains) {
                streamPushItem.setStatus(false);
                streamPushItem.setStreamType("push");
                streamPushItem.setCreateTime(DateUtil.getNow());
                streamPushItem.setMediaServerId(mediaServerService.getDefaultMediaServer().getId());
                streamPushItem.setOriginType(2);
                streamPushItem.setOriginTypeStr("rtsp_push");
                streamPushItem.setTotalReaderCount("0");
                streamPushItemForSave.add(streamPushItem);
            } else {
                //存在就只修改 name和gbId
                streamPushItemForUpdate.add(streamPushItem);
            }
        }
        if (streamPushItemForSave.size() > 0) {
            logger.info("添加{}条",streamPushItemForSave.size());
            logger.info(JSONObject.toJSONString(streamPushItemForSave));
            streamPushService.batchAdd(streamPushItemForSave);
        }
        if(streamPushItemForUpdate.size()>0){
            logger.info("修改{}条",streamPushItemForUpdate.size());
            logger.info(JSONObject.toJSONString(streamPushItemForUpdate));
            gbStreamService.updateGbIdOrName(streamPushItemForUpdate);
        }
    }
}
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
@@ -340,6 +340,7 @@
        gbStreamMapper.batchAdd(streamPushItems);
    }
    @Override
    public void batchAddForUpload(List<StreamPushItem> streamPushItems, Map<String, List<String[]>> streamPushItemsForAll ) {
        // 存储数据到stream_push表
@@ -503,4 +504,9 @@
        }
        return result;
    }
    @Override
    public List<String> getAllAppAndStream() {
        return streamPushMapper.getAllAppAndStream();
    }
}
src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java
@@ -148,4 +148,14 @@
            "SET mediaServerId=#{mediaServerId}" +
            "WHERE app=#{app} AND stream=#{stream}")
    void updateMediaServer(String app, String stream, String mediaServerId);
    @Update("<script> "+
                " <foreach collection='list' item='item' index='index' separator=';'>"+
                    "UPDATE gb_stream " +
                    " SET name=#{item.name},"+
                    " gbId=#{item.gbId}"+
                    " WHERE app=#{item.app} and stream=#{item.stream}"+
                "</foreach>"+
            "</script>")
    int updateGbIdOrName(List<StreamPushItem> streamPushItemForUpdate);
}
src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java
@@ -168,4 +168,7 @@
    @Update("UPDATE stream_push SET status=0")
    void setAllStreamOffline();
    @Select("SELECT CONCAT(app,stream) FROM gb_stream")
    List<String> getAllAppAndStream();
}