From ef6693aabbbf12e83d09ad8749f6e60faacc012d Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期二, 29 十一月 2022 15:18:56 +0800
Subject: [PATCH] Merge branch 'wvp-28181-2.0' into Zafu-Dev-1127

---
 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java |   83 +++++++++++++++++++++--------------------
 1 files changed, 42 insertions(+), 41 deletions(-)

diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java
index 15e37ec..d8ed1a0 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java
@@ -6,7 +6,6 @@
 import com.genersoft.iot.vmp.service.IGbStreamService;
 import com.genersoft.iot.vmp.service.IMediaServerService;
 import com.genersoft.iot.vmp.service.IStreamPushService;
-import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
 import com.genersoft.iot.vmp.utils.DateUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -18,7 +17,8 @@
 import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 /**
@@ -38,7 +38,6 @@
     @Resource
     private IGbStreamService gbStreamService;
 
-    private boolean taskQueueHandlerRun = false;
 
     private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
 
@@ -49,54 +48,56 @@
     @Override
     public void onMessage(Message message, byte[] bytes) {
         logger.info("[REDIS娑堟伅-鎺ㄦ祦璁惧鍒楄〃鏇存柊]锛� {}", new String(message.getBody()));
-
+        boolean isEmpty = taskQueue.isEmpty();
         taskQueue.offer(message);
-        if (!taskQueueHandlerRun) {
-            taskQueueHandlerRun = true;
+        if (isEmpty) {
             taskExecutor.execute(() -> {
                 while (!taskQueue.isEmpty()) {
                     Message msg = taskQueue.poll();
-                    List<StreamPushItem> streamPushItems = JSON.parseArray(new String(msg.getBody()), StreamPushItem.class);
-                    //鏌ヨ鍏ㄩ儴鐨刟pp+stream 鐢ㄤ簬鍒ゆ柇鏄坊鍔犺繕鏄慨鏀�
-                    List<String> allAppAndStream = streamPushService.getAllAppAndStream();
+                    try {
+                        List<StreamPushItem> streamPushItems = JSON.parseArray(new String(msg.getBody()), StreamPushItem.class);
+                        //鏌ヨ鍏ㄩ儴鐨刟pp+stream 鐢ㄤ簬鍒ゆ柇鏄坊鍔犺繕鏄慨鏀�
+                        List<String> allAppAndStream = streamPushService.getAllAppAndStream();
 
-                    /**
-                     * 鐢ㄤ簬瀛樺偍鏇村叿APP+Stream杩囨护鍚庣殑鏁版嵁锛屽彲浠ョ洿鎺ュ瓨鍏tream_push琛ㄤ笌gb_stream琛�
-                     */
-                    List<StreamPushItem> streamPushItemForSave = new ArrayList<>();
-                    List<StreamPushItem> streamPushItemForUpdate = new ArrayList<>();
-                    for (StreamPushItem streamPushItem : streamPushItems) {
-                        String app = streamPushItem.getApp();
-                        String stream = streamPushItem.getStream();
-                        boolean contains = allAppAndStream.contains(app + stream);
-                        //涓嶅瓨鍦ㄥ氨娣诲姞
-                        if (!contains) {
-                            streamPushItem.setStreamType("push");
-                            streamPushItem.setCreateTime(DateUtil.getNow());
-                            streamPushItem.setMediaServerId(mediaServerService.getDefaultMediaServer().getId());
-                            streamPushItem.setOriginType(2);
-                            streamPushItem.setOriginTypeStr("rtsp_push");
-                            streamPushItem.setTotalReaderCount("0");
-                            streamPushItemForSave.add(streamPushItem);
-                        } else {
-                            //瀛樺湪灏卞彧淇敼 name鍜実bId
-                            streamPushItemForUpdate.add(streamPushItem);
+                        /**
+                         * 鐢ㄤ簬瀛樺偍鏇村叿APP+Stream杩囨护鍚庣殑鏁版嵁锛屽彲浠ョ洿鎺ュ瓨鍏tream_push琛ㄤ笌gb_stream琛�
+                         */
+                        List<StreamPushItem> streamPushItemForSave = new ArrayList<>();
+                        List<StreamPushItem> streamPushItemForUpdate = new ArrayList<>();
+                        for (StreamPushItem streamPushItem : streamPushItems) {
+                            String app = streamPushItem.getApp();
+                            String stream = streamPushItem.getStream();
+                            boolean contains = allAppAndStream.contains(app + stream);
+                            //涓嶅瓨鍦ㄥ氨娣诲姞
+                            if (!contains) {
+                                streamPushItem.setStreamType("push");
+                                streamPushItem.setCreateTime(DateUtil.getNow());
+                                streamPushItem.setMediaServerId(mediaServerService.getDefaultMediaServer().getId());
+                                streamPushItem.setOriginType(2);
+                                streamPushItem.setOriginTypeStr("rtsp_push");
+                                streamPushItem.setTotalReaderCount("0");
+                                streamPushItemForSave.add(streamPushItem);
+                            } else {
+                                //瀛樺湪灏卞彧淇敼 name鍜実bId
+                                streamPushItemForUpdate.add(streamPushItem);
+                            }
                         }
-                    }
-                    if (streamPushItemForSave.size() > 0) {
+                        if (streamPushItemForSave.size() > 0) {
 
-                        logger.info("娣诲姞{}鏉�",streamPushItemForSave.size());
-                        logger.info(JSONObject.toJSONString(streamPushItemForSave));
-                        streamPushService.batchAdd(streamPushItemForSave);
+                            logger.info("娣诲姞{}鏉�",streamPushItemForSave.size());
+                            logger.info(JSONObject.toJSONString(streamPushItemForSave));
+                            streamPushService.batchAdd(streamPushItemForSave);
 
-                    }
-                    if(streamPushItemForUpdate.size()>0){
-                        logger.info("淇敼{}鏉�",streamPushItemForUpdate.size());
-                        logger.info(JSONObject.toJSONString(streamPushItemForUpdate));
-                        gbStreamService.updateGbIdOrName(streamPushItemForUpdate);
+                        }
+                        if(streamPushItemForUpdate.size()>0){
+                            logger.info("淇敼{}鏉�",streamPushItemForUpdate.size());
+                            logger.info(JSONObject.toJSONString(streamPushItemForUpdate));
+                            gbStreamService.updateGbIdOrName(streamPushItemForUpdate);
+                        }
+                    }catch (Exception e) {
+                        logger.warn("[REDIS鐨凙LARM閫氱煡] 鍙戠幇鏈鐞嗙殑寮傚父, {}",e.getMessage());
                     }
                 }
-                taskQueueHandlerRun = false;
             });
         }
     }

--
Gitblit v1.8.0