From d7a1b94f905c5f28c9c8f2d48c3f9e28ebcf9cc4 Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期六, 24 九月 2022 21:04:58 +0800
Subject: [PATCH] Merge branch 'wvp-28181-2.0'

---
 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java |  103 +++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 103 insertions(+), 0 deletions(-)

diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java
new file mode 100644
index 0000000..b69a587
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java
@@ -0,0 +1,103 @@
+package com.genersoft.iot.vmp.service.redisMsg;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
+import com.genersoft.iot.vmp.service.IGbStreamService;
+import com.genersoft.iot.vmp.service.IMediaServerService;
+import com.genersoft.iot.vmp.service.IStreamPushService;
+import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
+import com.genersoft.iot.vmp.utils.DateUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+import java.util.*;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * @Auther: JiangFeng
+ * @Date: 2022/8/16 11:32
+ * @Description: 鎺ユ敹redis鍙戦�佺殑鎺ㄦ祦璁惧鍒楄〃鏇存柊閫氱煡
+ */
+@Component
+public class RedisPushStreamStatusListMsgListener implements MessageListener {
+
+    private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamStatusListMsgListener.class);
+    @Resource
+    private IMediaServerService mediaServerService;
+
+    @Resource
+    private IStreamPushService streamPushService;
+    @Resource
+    private IGbStreamService gbStreamService;
+
+    private boolean taskQueueHandlerRun = false;
+
+    private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
+
+    @Qualifier("taskExecutor")
+    @Autowired
+    private ThreadPoolTaskExecutor taskExecutor;
+
+    @Override
+    public void onMessage(Message message, byte[] bytes) {
+        logger.info("[REDIS娑堟伅-鎺ㄦ祦璁惧鍒楄〃鏇存柊]锛� {}", new String(message.getBody()));
+
+        taskQueue.offer(message);
+        if (!taskQueueHandlerRun) {
+            taskQueueHandlerRun = true;
+            taskExecutor.execute(() -> {
+                while (!taskQueue.isEmpty()) {
+                    Message msg = taskQueue.poll();
+                    List<StreamPushItem> streamPushItems = JSON.parseArray(new String(msg.getBody()), StreamPushItem.class);
+                    //鏌ヨ鍏ㄩ儴鐨刟pp+stream 鐢ㄤ簬鍒ゆ柇鏄坊鍔犺繕鏄慨鏀�
+                    List<String> allAppAndStream = streamPushService.getAllAppAndStream();
+
+                    /**
+                     * 鐢ㄤ簬瀛樺偍鏇村叿APP+Stream杩囨护鍚庣殑鏁版嵁锛屽彲浠ョ洿鎺ュ瓨鍏tream_push琛ㄤ笌gb_stream琛�
+                     */
+                    List<StreamPushItem> streamPushItemForSave = new ArrayList<>();
+                    List<StreamPushItem> streamPushItemForUpdate = new ArrayList<>();
+                    for (StreamPushItem streamPushItem : streamPushItems) {
+                        String app = streamPushItem.getApp();
+                        String stream = streamPushItem.getStream();
+                        boolean contains = allAppAndStream.contains(app + stream);
+                        //涓嶅瓨鍦ㄥ氨娣诲姞
+                        if (!contains) {
+                            streamPushItem.setStreamType("push");
+                            streamPushItem.setCreateTime(DateUtil.getNow());
+                            streamPushItem.setMediaServerId(mediaServerService.getDefaultMediaServer().getId());
+                            streamPushItem.setOriginType(2);
+                            streamPushItem.setOriginTypeStr("rtsp_push");
+                            streamPushItem.setTotalReaderCount("0");
+                            streamPushItemForSave.add(streamPushItem);
+                        } else {
+                            //瀛樺湪灏卞彧淇敼 name鍜実bId
+                            streamPushItemForUpdate.add(streamPushItem);
+                        }
+                    }
+                    if (streamPushItemForSave.size() > 0) {
+
+                        logger.info("娣诲姞{}鏉�",streamPushItemForSave.size());
+                        logger.info(JSONObject.toJSONString(streamPushItemForSave));
+                        streamPushService.batchAdd(streamPushItemForSave);
+
+                    }
+                    if(streamPushItemForUpdate.size()>0){
+                        logger.info("淇敼{}鏉�",streamPushItemForUpdate.size());
+                        logger.info(JSONObject.toJSONString(streamPushItemForUpdate));
+                        gbStreamService.updateGbIdOrName(streamPushItemForUpdate);
+                    }
+                }
+                taskQueueHandlerRun = false;
+            });
+        }
+    }
+}

--
Gitblit v1.8.0