From a2cac5ca12f100d052cb31122a84de4bba829bca Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期四, 04 一月 2024 18:34:46 +0800
Subject: [PATCH] Merge branch '2.6.9' into wvp-28181-2.0
---
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java | 90 +++++++++++++++++++++++----------------------
1 files changed, 46 insertions(+), 44 deletions(-)
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java
old mode 100644
new mode 100755
index 3925836..cb34ff5
--- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java
@@ -1,12 +1,11 @@
package com.genersoft.iot.vmp.service.redisMsg;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.IGbStreamService;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IStreamPushService;
-import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.utils.DateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -18,7 +17,8 @@
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
@@ -38,9 +38,8 @@
@Resource
private IGbStreamService gbStreamService;
- private boolean taskQueueHandlerRun = false;
- private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
+ private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
@Qualifier("taskExecutor")
@Autowired
@@ -49,54 +48,57 @@
@Override
public void onMessage(Message message, byte[] bytes) {
logger.info("[REDIS娑堟伅-鎺ㄦ祦璁惧鍒楄〃鏇存柊]锛� {}", new String(message.getBody()));
-
+ boolean isEmpty = taskQueue.isEmpty();
taskQueue.offer(message);
- if (!taskQueueHandlerRun) {
- taskQueueHandlerRun = true;
+ if (isEmpty) {
taskExecutor.execute(() -> {
while (!taskQueue.isEmpty()) {
Message msg = taskQueue.poll();
- List<StreamPushItem> streamPushItems = JSON.parseArray(new String(msg.getBody()), StreamPushItem.class);
- //鏌ヨ鍏ㄩ儴鐨刟pp+stream 鐢ㄤ簬鍒ゆ柇鏄坊鍔犺繕鏄慨鏀�
- List<String> allAppAndStream = streamPushService.getAllAppAndStream();
+ try {
+ List<StreamPushItem> streamPushItems = JSON.parseArray(new String(msg.getBody()), StreamPushItem.class);
+ //鏌ヨ鍏ㄩ儴鐨刟pp+stream 鐢ㄤ簬鍒ゆ柇鏄坊鍔犺繕鏄慨鏀�
+ List<String> allAppAndStream = streamPushService.getAllAppAndStream();
- /**
- * 鐢ㄤ簬瀛樺偍鏇村叿APP+Stream杩囨护鍚庣殑鏁版嵁锛屽彲浠ョ洿鎺ュ瓨鍏tream_push琛ㄤ笌gb_stream琛�
- */
- List<StreamPushItem> streamPushItemForSave = new ArrayList<>();
- List<StreamPushItem> streamPushItemForUpdate = new ArrayList<>();
- for (StreamPushItem streamPushItem : streamPushItems) {
- String app = streamPushItem.getApp();
- String stream = streamPushItem.getStream();
- boolean contains = allAppAndStream.contains(app + stream);
- //涓嶅瓨鍦ㄥ氨娣诲姞
- if (!contains) {
- streamPushItem.setStreamType("push");
- streamPushItem.setCreateTime(DateUtil.getNow());
- streamPushItem.setMediaServerId(mediaServerService.getDefaultMediaServer().getId());
- streamPushItem.setOriginType(2);
- streamPushItem.setOriginTypeStr("rtsp_push");
- streamPushItem.setTotalReaderCount("0");
- streamPushItemForSave.add(streamPushItem);
- } else {
- //瀛樺湪灏卞彧淇敼 name鍜実bId
- streamPushItemForUpdate.add(streamPushItem);
+ /**
+ * 鐢ㄤ簬瀛樺偍鏇村叿APP+Stream杩囨护鍚庣殑鏁版嵁锛屽彲浠ョ洿鎺ュ瓨鍏tream_push琛ㄤ笌gb_stream琛�
+ */
+ List<StreamPushItem> streamPushItemForSave = new ArrayList<>();
+ List<StreamPushItem> streamPushItemForUpdate = new ArrayList<>();
+ for (StreamPushItem streamPushItem : streamPushItems) {
+ String app = streamPushItem.getApp();
+ String stream = streamPushItem.getStream();
+ boolean contains = allAppAndStream.contains(app + stream);
+ //涓嶅瓨鍦ㄥ氨娣诲姞
+ if (!contains) {
+ streamPushItem.setStreamType("push");
+ streamPushItem.setCreateTime(DateUtil.getNow());
+ streamPushItem.setMediaServerId(mediaServerService.getDefaultMediaServer().getId());
+ streamPushItem.setOriginType(2);
+ streamPushItem.setOriginTypeStr("rtsp_push");
+ streamPushItem.setTotalReaderCount("0");
+ streamPushItemForSave.add(streamPushItem);
+ } else {
+ //瀛樺湪灏卞彧淇敼 name鍜実bId
+ streamPushItemForUpdate.add(streamPushItem);
+ }
}
- }
- if (streamPushItemForSave.size() > 0) {
+ if (streamPushItemForSave.size() > 0) {
- logger.info("娣诲姞{}鏉�",streamPushItemForSave.size());
- logger.info(JSONObject.toJSONString(streamPushItemForSave));
- streamPushService.batchAdd(streamPushItemForSave);
+ logger.info("娣诲姞{}鏉�",streamPushItemForSave.size());
+ logger.info(JSONObject.toJSONString(streamPushItemForSave));
+ streamPushService.batchAdd(streamPushItemForSave);
- }
- if(streamPushItemForUpdate.size()>0){
- logger.info("淇敼{}鏉�",streamPushItemForUpdate.size());
- logger.info(JSONObject.toJSONString(streamPushItemForUpdate));
- gbStreamService.updateGbIdOrName(streamPushItemForUpdate);
+ }
+ if(streamPushItemForUpdate.size()>0){
+ logger.info("淇敼{}鏉�",streamPushItemForUpdate.size());
+ logger.info(JSONObject.toJSONString(streamPushItemForUpdate));
+ gbStreamService.updateGbIdOrName(streamPushItemForUpdate);
+ }
+ }catch (Exception e) {
+ logger.warn("[REDIS娑堟伅-鎺ㄦ祦璁惧鍒楄〃鏇存柊] 鍙戠幇鏈鐞嗙殑寮傚父, \r\n{}", JSON.toJSONString(message));
+ logger.error("[REDIS娑堟伅-鎺ㄦ祦璁惧鍒楄〃鏇存柊] 寮傚父鍐呭锛� ", e);
}
}
- taskQueueHandlerRun = false;
});
}
}
--
Gitblit v1.8.0