From a574ff094428decbdc35332d184cd0d210716a44 Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期四, 22 九月 2022 16:56:20 +0800
Subject: [PATCH] 修复使用队列导致的问题

---
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java |    4 +-
 src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java                                                              |   33 +++++++++-------
 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusMsgListener.java                                         |    4 +-
 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java                                          |    4 +-
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java          |   13 +++---
 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java                                                      |    2 
 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java                                                   |    6 +-
 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java                                     |    2 
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java                                |    2 
 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java                                                    |    8 ++-
 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java                                                   |    4 +-
 11 files changed, 44 insertions(+), 38 deletions(-)

diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
index 206a159..84bf0b8 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
@@ -79,7 +79,7 @@
 
 	private boolean taskQueueHandlerRun = false;
 
-	private final ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
+	private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
 
 	@Qualifier("taskExecutor")
 	@Autowired
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java
index da2cb9c..ec75015 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java
@@ -72,7 +72,7 @@
 
     private boolean taskQueueHandlerRun = false;
 
-    private final ConcurrentLinkedQueue<SipMsgInfo> taskQueue = new ConcurrentLinkedQueue<>();
+    private ConcurrentLinkedQueue<SipMsgInfo> taskQueue = new ConcurrentLinkedQueue<>();
 
     @Qualifier("taskExecutor")
     @Autowired
@@ -92,13 +92,14 @@
         if (!taskQueueHandlerRun) {
             taskQueueHandlerRun = true;
             taskExecutor.execute(() -> {
+                logger.info("[澶勭悊鎶ヨ閫氱煡]寰呭鐞嗘暟閲忥細{}", taskQueue.size() );
                 while (!taskQueue.isEmpty()) {
                     SipMsgInfo sipMsgInfo = taskQueue.poll();
                     // 鍥炲200 OK
                     try {
                         responseAck(getServerTransaction(sipMsgInfo.getEvt()), Response.OK);
                     } catch (SipException | InvalidArgumentException | ParseException e) {
-                        logger.error("[鏀跺埌鎶ヨ閫氱煡], 鍥炲200OK澶辫触", e);
+                        logger.error("[澶勭悊鎶ヨ閫氱煡], 鍥炲200OK澶辫触", e);
                     }
 
                     Element deviceIdElement = sipMsgInfo.getRootElement().element("DeviceID");
@@ -112,7 +113,7 @@
                     deviceAlarm.setAlarmMethod(getText(sipMsgInfo.getRootElement(), "AlarmMethod"));
                     String alarmTime = XmlUtil.getText(sipMsgInfo.getRootElement(), "AlarmTime");
                     if (alarmTime == null) {
-                        return;
+                        continue;
                     }
                     deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime));
                     String alarmDescription = getText(sipMsgInfo.getRootElement(), "AlarmDescription");
@@ -182,7 +183,7 @@
                             deviceAlarm.setAlarmType(getText(sipMsgInfo.getRootElement().element("Info"), "AlarmType"));
                         }
                     }
-
+                    logger.info("[鏀跺埌鎶ヨ閫氱煡]鍐呭锛歿}", JSONObject.toJSON(deviceAlarm));
                     if ("7".equals(deviceAlarm.getAlarmMethod()) ) {
                         // 鍙戦�佺粰骞冲彴鐨勬姤璀︿俊鎭�� 鍙戦�乺edis閫氱煡
                         AlarmChannelMessage alarmChannelMessage = new AlarmChannelMessage();
@@ -190,7 +191,7 @@
                         alarmChannelMessage.setAlarmDescription(deviceAlarm.getAlarmDescription());
                         alarmChannelMessage.setGbId(channelId);
                         redisCatchStorage.sendAlarmMsg(alarmChannelMessage);
-                        return;
+                        continue;
                     }
 
                     logger.debug("瀛樺偍鎶ヨ淇℃伅銆佹姤璀﹀垎绫�");
@@ -198,7 +199,7 @@
                     if (sipConfig.isAlarm()) {
                         deviceAlarmService.add(deviceAlarm);
                     }
-                    logger.info("[鏀跺埌鎶ヨ閫氱煡]鍐呭锛歿}", JSONObject.toJSON(deviceAlarm));
+
                     if (redisCatchStorage.deviceIsOnline(sipMsgInfo.getDevice().getDeviceId())) {
                         publisher.deviceAlarmEventPublish(deviceAlarm);
                     }
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java
index 67d26d7..652cd83 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java
@@ -58,7 +58,7 @@
 
     private boolean taskQueueHandlerRun = false;
 
-    private final ConcurrentLinkedQueue<SipMsgInfo> taskQueue = new ConcurrentLinkedQueue<>();
+    private ConcurrentLinkedQueue<SipMsgInfo> taskQueue = new ConcurrentLinkedQueue<>();
 
     @Qualifier("taskExecutor")
     @Autowired
@@ -83,7 +83,7 @@
                         if (rootElementAfterCharset == null) {
                             logger.warn("[ 绉诲姩璁惧浣嶇疆鏁版嵁閫氱煡 ] content cannot be null, {}", sipMsgInfo.getEvt().getRequest());
                             responseAck(getServerTransaction(sipMsgInfo.getEvt()), Response.BAD_REQUEST);
-                            return;
+                            continue;
                         }
                         MobilePosition mobilePosition = new MobilePosition();
                         mobilePosition.setCreateTime(DateUtil.getNow());
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
index 6662738..cdbbf33 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -131,6 +131,24 @@
         StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
         playResult.setDevice(device);
 
+        result.onCompletion(()->{
+            // 鐐规挱缁撴潫鏃惰皟鐢ㄦ埅鍥炬帴鍙�
+            taskExecutor.execute(()->{
+                // TODO 搴旇鍦ㄤ笂娴佹椂璋冪敤鏇村ソ锛岀粨鏉熶篃鍙兘鏄敊璇粨鏉�
+                String path =  "snap";
+                String fileName =  deviceId + "_" + channelId + ".jpg";
+                WVPResult wvpResult =  (WVPResult)result.getResult();
+                if (Objects.requireNonNull(wvpResult).getCode() == 0) {
+                    StreamInfo streamInfoForSuccess = (StreamInfo)wvpResult.getData();
+                    MediaServerItem mediaInfo = mediaServerService.getOne(streamInfoForSuccess.getMediaServerId());
+                    String streamUrl = streamInfoForSuccess.getFmp4();
+                    // 璇锋眰鎴浘
+                    logger.info("[璇锋眰鎴浘]: " + fileName);
+                    zlmresTfulUtils.getSnap(mediaInfo, streamUrl, 15, 1, path, fileName);
+                }
+            });
+        });
+
         if (streamInfo != null) {
             String streamId = streamInfo.getStream();
             if (streamId == null) {
@@ -192,21 +210,6 @@
             SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck(), false);
             logger.info(JSONObject.toJSONString(ssrcInfo));
             play(mediaServerItem, ssrcInfo, device, channelId, (mediaServerItemInUse, response)->{
-                // 鐐规挱缁撴潫鏃惰皟鐢ㄦ埅鍥炬帴鍙�
-                taskExecutor.execute(()->{
-                    // TODO 搴旇鍦ㄤ笂娴佹椂璋冪敤鏇村ソ锛岀粨鏉熶篃鍙兘鏄敊璇粨鏉�
-                    String path =  "snap";
-                    String fileName =  deviceId + "_" + channelId + ".jpg";
-                    WVPResult wvpResult =  (WVPResult)result.getResult();
-                    if (Objects.requireNonNull(wvpResult).getCode() == 0) {
-                        StreamInfo streamInfoForSuccess = (StreamInfo)wvpResult.getData();
-                        MediaServerItem mediaInfo = mediaServerService.getOne(streamInfoForSuccess.getMediaServerId());
-                        String streamUrl = streamInfoForSuccess.getFmp4();
-                        // 璇锋眰鎴浘
-                        logger.info("[璇锋眰鎴浘]: " + fileName);
-                        zlmresTfulUtils.getSnap(mediaInfo, streamUrl, 15, 1, path, fileName);
-                    }
-                });
                 if (hookEvent != null) {
                     hookEvent.response(mediaServerItem, response);
                 }
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java
index 84f5ef7..8d1b066 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java
@@ -16,6 +16,7 @@
 import org.springframework.stereotype.Component;
 import org.springframework.util.ObjectUtils;
 
+import javax.validation.constraints.NotNull;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
@@ -36,19 +37,20 @@
 
     private boolean taskQueueHandlerRun = false;
 
-    private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
+    private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
 
     @Qualifier("taskExecutor")
     @Autowired
     private ThreadPoolTaskExecutor taskExecutor;
 
     @Override
-    public void onMessage(Message message, byte[] bytes) {
+    public void onMessage(@NotNull Message message, byte[] bytes) {
         logger.info("鏀跺埌鏉ヨ嚜REDIS鐨凙LARM閫氱煡锛� {}", new String(message.getBody()));
 
         taskQueue.offer(message);
         if (!taskQueueHandlerRun) {
             taskQueueHandlerRun = true;
+            logger.info("[绾跨▼姹犱俊鎭痌娲诲姩绾跨▼鏁帮細{}, 鏈�澶х嚎绋嬫暟锛� {}", taskExecutor.getActiveCount(), taskExecutor.getMaxPoolSize());
             taskExecutor.execute(() -> {
                 while (!taskQueue.isEmpty()) {
                     Message msg = taskQueue.poll();
@@ -56,7 +58,7 @@
                     AlarmChannelMessage alarmChannelMessage = JSON.parseObject(msg.getBody(), AlarmChannelMessage.class);
                     if (alarmChannelMessage == null) {
                         logger.warn("[REDIS鐨凙LARM閫氱煡]娑堟伅瑙f瀽澶辫触");
-                        return;
+                        continue;
                     }
                     String gbId = alarmChannelMessage.getGbId();
 
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
index e6ba9b6..2d4a82f 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
@@ -90,7 +90,7 @@
 
     private boolean taskQueueHandlerRun = false;
 
-    private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
+    private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
 
     @Qualifier("taskExecutor")
     @Autowired
@@ -121,7 +121,7 @@
                     JSONObject msgJSON = JSON.parseObject(msg.getBody(), JSONObject.class);
                     WvpRedisMsg wvpRedisMsg = JSON.toJavaObject(msgJSON, WvpRedisMsg.class);
                     if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) {
-                        return;
+                        continue;
                     }
                     if (WvpRedisMsg.isRequest(wvpRedisMsg)) {
                         logger.info("[鏀跺埌REDIS閫氱煡] 璇锋眰锛� {}", new String(msg.getBody()));
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java
index bb2f4ad..b5d02a5 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java
@@ -35,7 +35,7 @@
     @Autowired
     private IVideoManagerStorage storager;
 
-    private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
+    private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
 
     @Qualifier("taskExecutor")
     @Autowired
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java
index 13e6874..4fff32d 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java
@@ -28,7 +28,7 @@
 
     private boolean taskQueueHandlerRun = false;
 
-    private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
+    private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
 
     @Qualifier("taskExecutor")
     @Autowired
@@ -53,7 +53,7 @@
                     MessageForPushChannelResponse response = JSON.parseObject(new String(msg.getBody()), MessageForPushChannelResponse.class);
                     if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){
                         logger.info("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉]锛氬弬鏁颁笉鍏�");
-                        return;
+                        continue;
                     }
                     // 鏌ョ湅姝e湪绛夊緟鐨刬nvite娑堟伅
                     if (responseEvents.get(response.getApp() + response.getStream()) != null) {
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java
index 3925836..b69a587 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java
@@ -40,7 +40,7 @@
 
     private boolean taskQueueHandlerRun = false;
 
-    private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
+    private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
 
     @Qualifier("taskExecutor")
     @Autowired
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusMsgListener.java
index 4b1c2d5..2faf3b8 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusMsgListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusMsgListener.java
@@ -42,7 +42,7 @@
 
 
 
-    private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
+    private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
 
     @Qualifier("taskExecutor")
     @Autowired
@@ -61,7 +61,7 @@
                     PushStreamStatusChangeFromRedisDto statusChangeFromPushStream = JSON.parseObject(msg.getBody(), PushStreamStatusChangeFromRedisDto.class);
                     if (statusChangeFromPushStream == null) {
                         logger.warn("[REDIS娑堟伅]鎺ㄦ祦璁惧鐘舵�佸彉鍖栨秷鎭В鏋愬け璐�");
-                        return;
+                        continue;
                     }
                     // 鍙栨秷瀹氭椂浠诲姟
                     dynamicTask.stop(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED);
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java
index 1897b6f..415787e 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java
@@ -35,7 +35,7 @@
 
     private boolean taskQueueHandlerRun = false;
 
-    private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
+    private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
 
     @Qualifier("taskExecutor")
     @Autowired
@@ -53,13 +53,13 @@
                     JSONObject steamMsgJson = JSON.parseObject(msg.getBody(), JSONObject.class);
                     if (steamMsgJson == null) {
                         logger.warn("[鏀跺埌redis 娴佸彉鍖朷娑堟伅瑙f瀽澶辫触");
-                        return;
+                        continue;
                     }
                     String serverId = steamMsgJson.getString("serverId");
 
                     if (userSetting.getServerId().equals(serverId)) {
                         // 鑷繁鍙戦�佺殑娑堟伅蹇界暐鍗冲彲
-                        return;
+                        continue;
                     }
                     logger.info("[鏀跺埌redis 娴佸彉鍖朷锛� {}", new String(message.getBody()));
                     String app = steamMsgJson.getString("app");

--
Gitblit v1.8.0