From bc38f5ef299f44f65fd34258b84272a027c10cb6 Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期三, 27 七月 2022 14:48:21 +0800
Subject: [PATCH] 修复流地址返回错误

---
 src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java                        |    8 +-
 src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java                              |    4 
 src/main/java/com/genersoft/iot/vmp/service/impl/RedisGpsMsgListener.java                           |   29 ++++++++-
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java |   49 +++++++--------
 src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusMsgListener.java              |   72 +++++++++++++++---------
 5 files changed, 100 insertions(+), 62 deletions(-)

diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
index e191578..25261f3 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
@@ -92,39 +92,36 @@
 	@Override
 	public void process(RequestEvent evt) {
 		try {
-
 			taskQueue.offer(new HandlerCatchData(evt, null, null));
 			responseAck(evt, Response.OK);
 			if (!taskQueueHandlerRun) {
 				taskQueueHandlerRun = true;
 				taskExecutor.execute(()-> {
-							while (!taskQueue.isEmpty()) {
-								try {
-									HandlerCatchData take = taskQueue.poll();
-									Element rootElement = getRootElement(take.getEvt());
-									String cmd = XmlUtil.getText(rootElement, "CmdType");
+					while (!taskQueue.isEmpty()) {
+						try {
+							HandlerCatchData take = taskQueue.poll();
+							Element rootElement = getRootElement(take.getEvt());
+							String cmd = XmlUtil.getText(rootElement, "CmdType");
 
-									if (CmdType.CATALOG.equals(cmd)) {
-										logger.info("鎺ユ敹鍒癈atalog閫氱煡");
-										processNotifyCatalogList(take.getEvt());
-									} else if (CmdType.ALARM.equals(cmd)) {
-										logger.info("鎺ユ敹鍒癆larm閫氱煡");
-										processNotifyAlarm(take.getEvt());
-									} else if (CmdType.MOBILE_POSITION.equals(cmd)) {
-										logger.info("鎺ユ敹鍒癕obilePosition閫氱煡");
-										processNotifyMobilePosition(take.getEvt());
-									} else {
-										logger.info("鎺ユ敹鍒版秷鎭細" + cmd);
-									}
-								} catch (DocumentException e) {
-									throw new RuntimeException(e);
-								}
+							if (CmdType.CATALOG.equals(cmd)) {
+								logger.info("鎺ユ敹鍒癈atalog閫氱煡");
+								processNotifyCatalogList(take.getEvt());
+							} else if (CmdType.ALARM.equals(cmd)) {
+								logger.info("鎺ユ敹鍒癆larm閫氱煡");
+								processNotifyAlarm(take.getEvt());
+							} else if (CmdType.MOBILE_POSITION.equals(cmd)) {
+								logger.info("鎺ユ敹鍒癕obilePosition閫氱煡");
+								processNotifyMobilePosition(take.getEvt());
+							} else {
+								logger.info("鎺ユ敹鍒版秷鎭細" + cmd);
 							}
-						taskQueueHandlerRun = false;
-						});
+						} catch (DocumentException e) {
+							throw new RuntimeException(e);
+						}
+					}
+				taskQueueHandlerRun = false;
+				});
 			}
-
-
 		} catch (SipException | InvalidArgumentException | ParseException e) {
 			e.printStackTrace();
 		}
@@ -174,7 +171,7 @@
 			} else {
 				mobilePosition.setAltitude(0.0);
 			}
-			logger.info("[鏀跺埌 绉诲姩浣嶇疆璁㈤槄]锛歿}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
+			logger.info("[鏀跺埌绉诲姩浣嶇疆璁㈤槄閫氱煡]锛歿}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
 					mobilePosition.getLongitude(), mobilePosition.getLatitude());
 			mobilePosition.setReportSource("Mobile Position");
 
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java
index fb7cea0..c712045 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java
@@ -67,9 +67,9 @@
                 JSONObject mediaJSON = JSON.parseObject(JSON.toJSONString(data.get(0)), JSONObject.class);
                 JSONArray tracks = mediaJSON.getJSONArray("tracks");
                 if (authority) {
-                    streamInfo = getStreamInfoByAppAndStream(mediaInfo, app, stream, tracks, streamAuthorityInfo.getCallId());
+                    streamInfo = getStreamInfoByAppAndStream(mediaInfo, app, stream, tracks, addr,streamAuthorityInfo.getCallId());
                 }else {
-                    streamInfo = getStreamInfoByAppAndStream(mediaInfo, app, stream, tracks, null);
+                    streamInfo = getStreamInfoByAppAndStream(mediaInfo, app, stream, tracks, addr,null);
                 }
 
             }
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGpsMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGpsMsgListener.java
index 7482833..d5a26e7 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGpsMsgListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGpsMsgListener.java
@@ -1,15 +1,20 @@
 package com.genersoft.iot.vmp.service.impl;
 
 import com.alibaba.fastjson.JSON;
+import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData;
 import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.data.redis.connection.Message;
 import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Component;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 /**
  * 鎺ユ敹鏉ヨ嚜redis鐨凣PS鏇存柊閫氱煡
@@ -20,13 +25,31 @@
 
     private final static Logger logger = LoggerFactory.getLogger(RedisGpsMsgListener.class);
 
+    private boolean taskQueueHandlerRun = false;
+
     @Autowired
     private IRedisCatchStorage redisCatchStorage;
 
+    private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
+
+    @Qualifier("taskExecutor")
+    @Autowired
+    private ThreadPoolTaskExecutor taskExecutor;
+
+
     @Override
     public void onMessage(@NotNull Message message, byte[] bytes) {
-        // TODO 鍔犳秷鎭槦鍒�
-        GPSMsgInfo gpsMsgInfo = JSON.parseObject(message.getBody(), GPSMsgInfo.class);
-        redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo);
+        taskQueue.offer(message);
+        if (!taskQueueHandlerRun) {
+            taskQueueHandlerRun = true;
+            taskExecutor.execute(() -> {
+                while (!taskQueue.isEmpty()) {
+                    Message msg = taskQueue.poll();
+                    GPSMsgInfo gpsMsgInfo = JSON.parseObject(msg.getBody(), GPSMsgInfo.class);
+                    redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo);
+                }
+                taskQueueHandlerRun = false;
+            });
+        }
     }
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusMsgListener.java
index 90ed8d4..8606702 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusMsgListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusMsgListener.java
@@ -14,6 +14,7 @@
 import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
 import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
 import com.genersoft.iot.vmp.service.IStreamPushService;
+import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
 import com.genersoft.iot.vmp.service.bean.PushStreamStatusChangeFromRedisDto;
 import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@@ -21,14 +22,17 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.boot.ApplicationArguments;
 import org.springframework.boot.ApplicationRunner;
 import org.springframework.data.redis.connection.Message;
 import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Component;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 
 /**
@@ -40,6 +44,8 @@
 
     private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamStatusMsgListener.class);
 
+    private boolean taskQueueHandlerRun = false;
+
     @Autowired
     private IRedisCatchStorage redisCatchStorage;
 
@@ -47,39 +53,51 @@
     private IStreamPushService streamPushService;
 
     @Autowired
-    private EventPublisher eventPublisher;
-
-    @Autowired
-    private UserSetting userSetting;
-
-    @Autowired
     private DynamicTask dynamicTask;
+
+
+
+    private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
+
+    @Qualifier("taskExecutor")
+    @Autowired
+    private ThreadPoolTaskExecutor taskExecutor;
 
     @Override
     public void onMessage(Message message, byte[] bytes) {
         // TODO 澧炲姞闃熷垪
         logger.warn("[REDIS 娑堟伅-鎺ㄦ祦璁惧鐘舵�佸彉鍖朷锛� {}", new String(message.getBody()));
-        //
-        PushStreamStatusChangeFromRedisDto statusChangeFromPushStream = JSON.parseObject(message.getBody(), PushStreamStatusChangeFromRedisDto.class);
-        if (statusChangeFromPushStream == null) {
-            logger.warn("[REDIS 娑堟伅]鎺ㄦ祦璁惧鐘舵�佸彉鍖栨秷鎭В鏋愬け璐�");
-            return;
-        }
-        // 鍙栨秷瀹氭椂浠诲姟
-        dynamicTask.stop(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED);
-        if (statusChangeFromPushStream.isSetAllOffline()) {
-            // 鎵�鏈夎澶囩绾�
-            streamPushService.allStreamOffline();
-        }
-        if (statusChangeFromPushStream.getOfflineStreams() != null
-                && statusChangeFromPushStream.getOfflineStreams().size() > 0) {
-            // 鏇存柊閮ㄥ垎璁惧绂荤嚎
-            streamPushService.offline(statusChangeFromPushStream.getOfflineStreams());
-        }
-        if (statusChangeFromPushStream.getOnlineStreams() != null &&
-                statusChangeFromPushStream.getOnlineStreams().size() > 0) {
-            // 鏇存柊閮ㄥ垎璁惧涓婄嚎
-            streamPushService.online(statusChangeFromPushStream.getOnlineStreams());
+        taskQueue.offer(message);
+
+        if (!taskQueueHandlerRun) {
+            taskQueueHandlerRun = true;
+            taskExecutor.execute(() -> {
+                while (!taskQueue.isEmpty()) {
+                    Message msg = taskQueue.poll();
+                    PushStreamStatusChangeFromRedisDto statusChangeFromPushStream = JSON.parseObject(msg.getBody(), PushStreamStatusChangeFromRedisDto.class);
+                    if (statusChangeFromPushStream == null) {
+                        logger.warn("[REDIS 娑堟伅]鎺ㄦ祦璁惧鐘舵�佸彉鍖栨秷鎭В鏋愬け璐�");
+                        return;
+                    }
+                    // 鍙栨秷瀹氭椂浠诲姟
+                    dynamicTask.stop(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED);
+                    if (statusChangeFromPushStream.isSetAllOffline()) {
+                        // 鎵�鏈夎澶囩绾�
+                        streamPushService.allStreamOffline();
+                    }
+                    if (statusChangeFromPushStream.getOfflineStreams() != null
+                            && statusChangeFromPushStream.getOfflineStreams().size() > 0) {
+                        // 鏇存柊閮ㄥ垎璁惧绂荤嚎
+                        streamPushService.offline(statusChangeFromPushStream.getOfflineStreams());
+                    }
+                    if (statusChangeFromPushStream.getOnlineStreams() != null &&
+                            statusChangeFromPushStream.getOnlineStreams().size() > 0) {
+                        // 鏇存柊閮ㄥ垎璁惧涓婄嚎
+                        streamPushService.online(statusChangeFromPushStream.getOnlineStreams());
+                    }
+                }
+                taskQueueHandlerRun = false;
+            });
         }
     }
 
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
index 5b2e515..31294a0 100644
--- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -688,21 +688,21 @@
     @Override
     public void sendMobilePositionMsg(JSONObject jsonObject) {
         String key = VideoManagerConstants.VM_MSG_SUBSCRIBE_MOBILE_POSITION;
-        logger.info("[redis 绉诲姩浣嶇疆璁㈤槄閫氱煡] {}: {}", key, jsonObject.toString());
+        logger.info("[redis鍙戦�侀�氱煡]绉诲姩浣嶇疆 {}: {}", key, jsonObject.toString());
         redis.convertAndSend(key, jsonObject);
     }
 
     @Override
     public void sendStreamPushRequestedMsg(MessageForPushChannel msg) {
         String key = VideoManagerConstants.VM_MSG_STREAM_PUSH_REQUESTED;
-        logger.info("[redis 鎺ㄦ祦琚姹傞�氱煡] {}: {}/{}", key, msg.getApp(), msg.getStream());
+        logger.info("[redis鍙戦�侀�氱煡]鎺ㄦ祦琚姹� {}: {}/{}", key, msg.getApp(), msg.getStream());
         redis.convertAndSend(key, (JSONObject)JSON.toJSON(msg));
     }
 
     @Override
     public void sendAlarmMsg(AlarmChannelMessage msg) {
         String key = VideoManagerConstants.VM_MSG_SUBSCRIBE_ALARM;
-        logger.info("[redis 鎶ヨ閫氱煡] {}: {}", key, JSON.toJSON(msg));
+        logger.info("[redis鍙戦�侀�氱煡] 鎶ヨ{}: {}", key, JSON.toJSON(msg));
         redis.convertAndSend(key, (JSONObject)JSON.toJSON(msg));
     }
 
@@ -715,7 +715,7 @@
     @Override
     public void sendStreamPushRequestedMsgForStatus() {
         String key = VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED;
-        logger.info("[redis 閫氱煡]鑾峰彇鎵�鏈夋帹娴佽澶囩殑鐘舵��");
+        logger.info("[redis閫氱煡]鑾峰彇鎵�鏈夋帹娴佽澶囩殑鐘舵��");
         JSONObject jsonObject = new JSONObject();
         jsonObject.put(key, key);
         redis.convertAndSend(key, jsonObject);

--
Gitblit v1.8.0