From ef6693aabbbf12e83d09ad8749f6e60faacc012d Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期二, 29 十一月 2022 15:18:56 +0800 Subject: [PATCH] Merge branch 'wvp-28181-2.0' into Zafu-Dev-1127 --- src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java | 83 +++++++++++++++++++++-------------------- 1 files changed, 42 insertions(+), 41 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java index 15e37ec..d8ed1a0 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java @@ -6,7 +6,6 @@ import com.genersoft.iot.vmp.service.IGbStreamService; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IStreamPushService; -import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.utils.DateUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -18,7 +17,8 @@ import org.springframework.stereotype.Component; import javax.annotation.Resource; -import java.util.*; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; /** @@ -38,7 +38,6 @@ @Resource private IGbStreamService gbStreamService; - private boolean taskQueueHandlerRun = false; private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); @@ -49,54 +48,56 @@ @Override public void onMessage(Message message, byte[] bytes) { logger.info("[REDIS娑堟伅-鎺ㄦ祦璁惧鍒楄〃鏇存柊]锛� {}", new String(message.getBody())); - + boolean isEmpty = taskQueue.isEmpty(); taskQueue.offer(message); - if (!taskQueueHandlerRun) { - taskQueueHandlerRun = true; + if (isEmpty) { taskExecutor.execute(() -> { while (!taskQueue.isEmpty()) { Message msg = taskQueue.poll(); - List<StreamPushItem> streamPushItems = JSON.parseArray(new String(msg.getBody()), StreamPushItem.class); - //鏌ヨ鍏ㄩ儴鐨刟pp+stream 鐢ㄤ簬鍒ゆ柇鏄坊鍔犺繕鏄慨鏀� - List<String> allAppAndStream = streamPushService.getAllAppAndStream(); + try { + List<StreamPushItem> streamPushItems = JSON.parseArray(new String(msg.getBody()), StreamPushItem.class); + //鏌ヨ鍏ㄩ儴鐨刟pp+stream 鐢ㄤ簬鍒ゆ柇鏄坊鍔犺繕鏄慨鏀� + List<String> allAppAndStream = streamPushService.getAllAppAndStream(); - /** - * 鐢ㄤ簬瀛樺偍鏇村叿APP+Stream杩囨护鍚庣殑鏁版嵁锛屽彲浠ョ洿鎺ュ瓨鍏tream_push琛ㄤ笌gb_stream琛� - */ - List<StreamPushItem> streamPushItemForSave = new ArrayList<>(); - List<StreamPushItem> streamPushItemForUpdate = new ArrayList<>(); - for (StreamPushItem streamPushItem : streamPushItems) { - String app = streamPushItem.getApp(); - String stream = streamPushItem.getStream(); - boolean contains = allAppAndStream.contains(app + stream); - //涓嶅瓨鍦ㄥ氨娣诲姞 - if (!contains) { - streamPushItem.setStreamType("push"); - streamPushItem.setCreateTime(DateUtil.getNow()); - streamPushItem.setMediaServerId(mediaServerService.getDefaultMediaServer().getId()); - streamPushItem.setOriginType(2); - streamPushItem.setOriginTypeStr("rtsp_push"); - streamPushItem.setTotalReaderCount("0"); - streamPushItemForSave.add(streamPushItem); - } else { - //瀛樺湪灏卞彧淇敼 name鍜実bId - streamPushItemForUpdate.add(streamPushItem); + /** + * 鐢ㄤ簬瀛樺偍鏇村叿APP+Stream杩囨护鍚庣殑鏁版嵁锛屽彲浠ョ洿鎺ュ瓨鍏tream_push琛ㄤ笌gb_stream琛� + */ + List<StreamPushItem> streamPushItemForSave = new ArrayList<>(); + List<StreamPushItem> streamPushItemForUpdate = new ArrayList<>(); + for (StreamPushItem streamPushItem : streamPushItems) { + String app = streamPushItem.getApp(); + String stream = streamPushItem.getStream(); + boolean contains = allAppAndStream.contains(app + stream); + //涓嶅瓨鍦ㄥ氨娣诲姞 + if (!contains) { + streamPushItem.setStreamType("push"); + streamPushItem.setCreateTime(DateUtil.getNow()); + streamPushItem.setMediaServerId(mediaServerService.getDefaultMediaServer().getId()); + streamPushItem.setOriginType(2); + streamPushItem.setOriginTypeStr("rtsp_push"); + streamPushItem.setTotalReaderCount("0"); + streamPushItemForSave.add(streamPushItem); + } else { + //瀛樺湪灏卞彧淇敼 name鍜実bId + streamPushItemForUpdate.add(streamPushItem); + } } - } - if (streamPushItemForSave.size() > 0) { + if (streamPushItemForSave.size() > 0) { - logger.info("娣诲姞{}鏉�",streamPushItemForSave.size()); - logger.info(JSONObject.toJSONString(streamPushItemForSave)); - streamPushService.batchAdd(streamPushItemForSave); + logger.info("娣诲姞{}鏉�",streamPushItemForSave.size()); + logger.info(JSONObject.toJSONString(streamPushItemForSave)); + streamPushService.batchAdd(streamPushItemForSave); - } - if(streamPushItemForUpdate.size()>0){ - logger.info("淇敼{}鏉�",streamPushItemForUpdate.size()); - logger.info(JSONObject.toJSONString(streamPushItemForUpdate)); - gbStreamService.updateGbIdOrName(streamPushItemForUpdate); + } + if(streamPushItemForUpdate.size()>0){ + logger.info("淇敼{}鏉�",streamPushItemForUpdate.size()); + logger.info(JSONObject.toJSONString(streamPushItemForUpdate)); + gbStreamService.updateGbIdOrName(streamPushItemForUpdate); + } + }catch (Exception e) { + logger.warn("[REDIS鐨凙LARM閫氱煡] 鍙戠幇鏈鐞嗙殑寮傚父, {}",e.getMessage()); } } - taskQueueHandlerRun = false; }); } } -- Gitblit v1.8.0