From d7a1b94f905c5f28c9c8f2d48c3f9e28ebcf9cc4 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期六, 24 九月 2022 21:04:58 +0800 Subject: [PATCH] Merge branch 'wvp-28181-2.0' --- src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java | 103 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 103 insertions(+), 0 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java new file mode 100644 index 0000000..b69a587 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java @@ -0,0 +1,103 @@ +package com.genersoft.iot.vmp.service.redisMsg; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; +import com.genersoft.iot.vmp.service.IGbStreamService; +import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.service.IStreamPushService; +import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; +import com.genersoft.iot.vmp.utils.DateUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.*; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * @Auther: JiangFeng + * @Date: 2022/8/16 11:32 + * @Description: 鎺ユ敹redis鍙戦�佺殑鎺ㄦ祦璁惧鍒楄〃鏇存柊閫氱煡 + */ +@Component +public class RedisPushStreamStatusListMsgListener implements MessageListener { + + private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamStatusListMsgListener.class); + @Resource + private IMediaServerService mediaServerService; + + @Resource + private IStreamPushService streamPushService; + @Resource + private IGbStreamService gbStreamService; + + private boolean taskQueueHandlerRun = false; + + private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); + + @Qualifier("taskExecutor") + @Autowired + private ThreadPoolTaskExecutor taskExecutor; + + @Override + public void onMessage(Message message, byte[] bytes) { + logger.info("[REDIS娑堟伅-鎺ㄦ祦璁惧鍒楄〃鏇存柊]锛� {}", new String(message.getBody())); + + taskQueue.offer(message); + if (!taskQueueHandlerRun) { + taskQueueHandlerRun = true; + taskExecutor.execute(() -> { + while (!taskQueue.isEmpty()) { + Message msg = taskQueue.poll(); + List<StreamPushItem> streamPushItems = JSON.parseArray(new String(msg.getBody()), StreamPushItem.class); + //鏌ヨ鍏ㄩ儴鐨刟pp+stream 鐢ㄤ簬鍒ゆ柇鏄坊鍔犺繕鏄慨鏀� + List<String> allAppAndStream = streamPushService.getAllAppAndStream(); + + /** + * 鐢ㄤ簬瀛樺偍鏇村叿APP+Stream杩囨护鍚庣殑鏁版嵁锛屽彲浠ョ洿鎺ュ瓨鍏tream_push琛ㄤ笌gb_stream琛� + */ + List<StreamPushItem> streamPushItemForSave = new ArrayList<>(); + List<StreamPushItem> streamPushItemForUpdate = new ArrayList<>(); + for (StreamPushItem streamPushItem : streamPushItems) { + String app = streamPushItem.getApp(); + String stream = streamPushItem.getStream(); + boolean contains = allAppAndStream.contains(app + stream); + //涓嶅瓨鍦ㄥ氨娣诲姞 + if (!contains) { + streamPushItem.setStreamType("push"); + streamPushItem.setCreateTime(DateUtil.getNow()); + streamPushItem.setMediaServerId(mediaServerService.getDefaultMediaServer().getId()); + streamPushItem.setOriginType(2); + streamPushItem.setOriginTypeStr("rtsp_push"); + streamPushItem.setTotalReaderCount("0"); + streamPushItemForSave.add(streamPushItem); + } else { + //瀛樺湪灏卞彧淇敼 name鍜実bId + streamPushItemForUpdate.add(streamPushItem); + } + } + if (streamPushItemForSave.size() > 0) { + + logger.info("娣诲姞{}鏉�",streamPushItemForSave.size()); + logger.info(JSONObject.toJSONString(streamPushItemForSave)); + streamPushService.batchAdd(streamPushItemForSave); + + } + if(streamPushItemForUpdate.size()>0){ + logger.info("淇敼{}鏉�",streamPushItemForUpdate.size()); + logger.info(JSONObject.toJSONString(streamPushItemForUpdate)); + gbStreamService.updateGbIdOrName(streamPushItemForUpdate); + } + } + taskQueueHandlerRun = false; + }); + } + } +} -- Gitblit v1.8.0