From 724963324aaa63feca2c6aed13785dc8485bc02c Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期一, 26 六月 2023 19:06:14 +0800
Subject: [PATCH] 修复上级点播时如果推流信息中mediaServerID错误的情况

---
 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java |   90 +++++++++++++++++++++++----------------------
 1 files changed, 46 insertions(+), 44 deletions(-)

diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java
index 3925836..cb34ff5 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java
@@ -1,12 +1,11 @@
 package com.genersoft.iot.vmp.service.redisMsg;
 
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONObject;
 import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
 import com.genersoft.iot.vmp.service.IGbStreamService;
 import com.genersoft.iot.vmp.service.IMediaServerService;
 import com.genersoft.iot.vmp.service.IStreamPushService;
-import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
 import com.genersoft.iot.vmp.utils.DateUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -18,7 +17,8 @@
 import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 /**
@@ -38,9 +38,8 @@
     @Resource
     private IGbStreamService gbStreamService;
 
-    private boolean taskQueueHandlerRun = false;
 
-    private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
+    private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
 
     @Qualifier("taskExecutor")
     @Autowired
@@ -49,54 +48,57 @@
     @Override
     public void onMessage(Message message, byte[] bytes) {
         logger.info("[REDIS娑堟伅-鎺ㄦ祦璁惧鍒楄〃鏇存柊]锛� {}", new String(message.getBody()));
-
+        boolean isEmpty = taskQueue.isEmpty();
         taskQueue.offer(message);
-        if (!taskQueueHandlerRun) {
-            taskQueueHandlerRun = true;
+        if (isEmpty) {
             taskExecutor.execute(() -> {
                 while (!taskQueue.isEmpty()) {
                     Message msg = taskQueue.poll();
-                    List<StreamPushItem> streamPushItems = JSON.parseArray(new String(msg.getBody()), StreamPushItem.class);
-                    //鏌ヨ鍏ㄩ儴鐨刟pp+stream 鐢ㄤ簬鍒ゆ柇鏄坊鍔犺繕鏄慨鏀�
-                    List<String> allAppAndStream = streamPushService.getAllAppAndStream();
+                    try {
+                        List<StreamPushItem> streamPushItems = JSON.parseArray(new String(msg.getBody()), StreamPushItem.class);
+                        //鏌ヨ鍏ㄩ儴鐨刟pp+stream 鐢ㄤ簬鍒ゆ柇鏄坊鍔犺繕鏄慨鏀�
+                        List<String> allAppAndStream = streamPushService.getAllAppAndStream();
 
-                    /**
-                     * 鐢ㄤ簬瀛樺偍鏇村叿APP+Stream杩囨护鍚庣殑鏁版嵁锛屽彲浠ョ洿鎺ュ瓨鍏tream_push琛ㄤ笌gb_stream琛�
-                     */
-                    List<StreamPushItem> streamPushItemForSave = new ArrayList<>();
-                    List<StreamPushItem> streamPushItemForUpdate = new ArrayList<>();
-                    for (StreamPushItem streamPushItem : streamPushItems) {
-                        String app = streamPushItem.getApp();
-                        String stream = streamPushItem.getStream();
-                        boolean contains = allAppAndStream.contains(app + stream);
-                        //涓嶅瓨鍦ㄥ氨娣诲姞
-                        if (!contains) {
-                            streamPushItem.setStreamType("push");
-                            streamPushItem.setCreateTime(DateUtil.getNow());
-                            streamPushItem.setMediaServerId(mediaServerService.getDefaultMediaServer().getId());
-                            streamPushItem.setOriginType(2);
-                            streamPushItem.setOriginTypeStr("rtsp_push");
-                            streamPushItem.setTotalReaderCount("0");
-                            streamPushItemForSave.add(streamPushItem);
-                        } else {
-                            //瀛樺湪灏卞彧淇敼 name鍜実bId
-                            streamPushItemForUpdate.add(streamPushItem);
+                        /**
+                         * 鐢ㄤ簬瀛樺偍鏇村叿APP+Stream杩囨护鍚庣殑鏁版嵁锛屽彲浠ョ洿鎺ュ瓨鍏tream_push琛ㄤ笌gb_stream琛�
+                         */
+                        List<StreamPushItem> streamPushItemForSave = new ArrayList<>();
+                        List<StreamPushItem> streamPushItemForUpdate = new ArrayList<>();
+                        for (StreamPushItem streamPushItem : streamPushItems) {
+                            String app = streamPushItem.getApp();
+                            String stream = streamPushItem.getStream();
+                            boolean contains = allAppAndStream.contains(app + stream);
+                            //涓嶅瓨鍦ㄥ氨娣诲姞
+                            if (!contains) {
+                                streamPushItem.setStreamType("push");
+                                streamPushItem.setCreateTime(DateUtil.getNow());
+                                streamPushItem.setMediaServerId(mediaServerService.getDefaultMediaServer().getId());
+                                streamPushItem.setOriginType(2);
+                                streamPushItem.setOriginTypeStr("rtsp_push");
+                                streamPushItem.setTotalReaderCount("0");
+                                streamPushItemForSave.add(streamPushItem);
+                            } else {
+                                //瀛樺湪灏卞彧淇敼 name鍜実bId
+                                streamPushItemForUpdate.add(streamPushItem);
+                            }
                         }
-                    }
-                    if (streamPushItemForSave.size() > 0) {
+                        if (streamPushItemForSave.size() > 0) {
 
-                        logger.info("娣诲姞{}鏉�",streamPushItemForSave.size());
-                        logger.info(JSONObject.toJSONString(streamPushItemForSave));
-                        streamPushService.batchAdd(streamPushItemForSave);
+                            logger.info("娣诲姞{}鏉�",streamPushItemForSave.size());
+                            logger.info(JSONObject.toJSONString(streamPushItemForSave));
+                            streamPushService.batchAdd(streamPushItemForSave);
 
-                    }
-                    if(streamPushItemForUpdate.size()>0){
-                        logger.info("淇敼{}鏉�",streamPushItemForUpdate.size());
-                        logger.info(JSONObject.toJSONString(streamPushItemForUpdate));
-                        gbStreamService.updateGbIdOrName(streamPushItemForUpdate);
+                        }
+                        if(streamPushItemForUpdate.size()>0){
+                            logger.info("淇敼{}鏉�",streamPushItemForUpdate.size());
+                            logger.info(JSONObject.toJSONString(streamPushItemForUpdate));
+                            gbStreamService.updateGbIdOrName(streamPushItemForUpdate);
+                        }
+                    }catch (Exception e) {
+                        logger.warn("[REDIS娑堟伅-鎺ㄦ祦璁惧鍒楄〃鏇存柊] 鍙戠幇鏈鐞嗙殑寮傚父, \r\n{}", JSON.toJSONString(message));
+                        logger.error("[REDIS娑堟伅-鎺ㄦ祦璁惧鍒楄〃鏇存柊] 寮傚父鍐呭锛� ", e);
                     }
                 }
-                taskQueueHandlerRun = false;
             });
         }
     }

--
Gitblit v1.8.0