From a574ff094428decbdc35332d184cd0d210716a44 Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期四, 22 九月 2022 16:56:20 +0800
Subject: [PATCH] 修复使用队列导致的问题
---
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java | 4 +-
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java | 33 +++++++++-------
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusMsgListener.java | 4 +-
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java | 4 +-
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java | 13 +++---
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java | 2
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java | 6 +-
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java | 2
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java | 2
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java | 8 ++-
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java | 4 +-
11 files changed, 44 insertions(+), 38 deletions(-)
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
index 206a159..84bf0b8 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
@@ -79,7 +79,7 @@
private boolean taskQueueHandlerRun = false;
- private final ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
+ private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
@Qualifier("taskExecutor")
@Autowired
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java
index da2cb9c..ec75015 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java
@@ -72,7 +72,7 @@
private boolean taskQueueHandlerRun = false;
- private final ConcurrentLinkedQueue<SipMsgInfo> taskQueue = new ConcurrentLinkedQueue<>();
+ private ConcurrentLinkedQueue<SipMsgInfo> taskQueue = new ConcurrentLinkedQueue<>();
@Qualifier("taskExecutor")
@Autowired
@@ -92,13 +92,14 @@
if (!taskQueueHandlerRun) {
taskQueueHandlerRun = true;
taskExecutor.execute(() -> {
+ logger.info("[澶勭悊鎶ヨ閫氱煡]寰呭鐞嗘暟閲忥細{}", taskQueue.size() );
while (!taskQueue.isEmpty()) {
SipMsgInfo sipMsgInfo = taskQueue.poll();
// 鍥炲200 OK
try {
responseAck(getServerTransaction(sipMsgInfo.getEvt()), Response.OK);
} catch (SipException | InvalidArgumentException | ParseException e) {
- logger.error("[鏀跺埌鎶ヨ閫氱煡], 鍥炲200OK澶辫触", e);
+ logger.error("[澶勭悊鎶ヨ閫氱煡], 鍥炲200OK澶辫触", e);
}
Element deviceIdElement = sipMsgInfo.getRootElement().element("DeviceID");
@@ -112,7 +113,7 @@
deviceAlarm.setAlarmMethod(getText(sipMsgInfo.getRootElement(), "AlarmMethod"));
String alarmTime = XmlUtil.getText(sipMsgInfo.getRootElement(), "AlarmTime");
if (alarmTime == null) {
- return;
+ continue;
}
deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime));
String alarmDescription = getText(sipMsgInfo.getRootElement(), "AlarmDescription");
@@ -182,7 +183,7 @@
deviceAlarm.setAlarmType(getText(sipMsgInfo.getRootElement().element("Info"), "AlarmType"));
}
}
-
+ logger.info("[鏀跺埌鎶ヨ閫氱煡]鍐呭锛歿}", JSONObject.toJSON(deviceAlarm));
if ("7".equals(deviceAlarm.getAlarmMethod()) ) {
// 鍙戦�佺粰骞冲彴鐨勬姤璀︿俊鎭�� 鍙戦�乺edis閫氱煡
AlarmChannelMessage alarmChannelMessage = new AlarmChannelMessage();
@@ -190,7 +191,7 @@
alarmChannelMessage.setAlarmDescription(deviceAlarm.getAlarmDescription());
alarmChannelMessage.setGbId(channelId);
redisCatchStorage.sendAlarmMsg(alarmChannelMessage);
- return;
+ continue;
}
logger.debug("瀛樺偍鎶ヨ淇℃伅銆佹姤璀﹀垎绫�");
@@ -198,7 +199,7 @@
if (sipConfig.isAlarm()) {
deviceAlarmService.add(deviceAlarm);
}
- logger.info("[鏀跺埌鎶ヨ閫氱煡]鍐呭锛歿}", JSONObject.toJSON(deviceAlarm));
+
if (redisCatchStorage.deviceIsOnline(sipMsgInfo.getDevice().getDeviceId())) {
publisher.deviceAlarmEventPublish(deviceAlarm);
}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java
index 67d26d7..652cd83 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java
@@ -58,7 +58,7 @@
private boolean taskQueueHandlerRun = false;
- private final ConcurrentLinkedQueue<SipMsgInfo> taskQueue = new ConcurrentLinkedQueue<>();
+ private ConcurrentLinkedQueue<SipMsgInfo> taskQueue = new ConcurrentLinkedQueue<>();
@Qualifier("taskExecutor")
@Autowired
@@ -83,7 +83,7 @@
if (rootElementAfterCharset == null) {
logger.warn("[ 绉诲姩璁惧浣嶇疆鏁版嵁閫氱煡 ] content cannot be null, {}", sipMsgInfo.getEvt().getRequest());
responseAck(getServerTransaction(sipMsgInfo.getEvt()), Response.BAD_REQUEST);
- return;
+ continue;
}
MobilePosition mobilePosition = new MobilePosition();
mobilePosition.setCreateTime(DateUtil.getNow());
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
index 6662738..cdbbf33 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -131,6 +131,24 @@
StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
playResult.setDevice(device);
+ result.onCompletion(()->{
+ // 鐐规挱缁撴潫鏃惰皟鐢ㄦ埅鍥炬帴鍙�
+ taskExecutor.execute(()->{
+ // TODO 搴旇鍦ㄤ笂娴佹椂璋冪敤鏇村ソ锛岀粨鏉熶篃鍙兘鏄敊璇粨鏉�
+ String path = "snap";
+ String fileName = deviceId + "_" + channelId + ".jpg";
+ WVPResult wvpResult = (WVPResult)result.getResult();
+ if (Objects.requireNonNull(wvpResult).getCode() == 0) {
+ StreamInfo streamInfoForSuccess = (StreamInfo)wvpResult.getData();
+ MediaServerItem mediaInfo = mediaServerService.getOne(streamInfoForSuccess.getMediaServerId());
+ String streamUrl = streamInfoForSuccess.getFmp4();
+ // 璇锋眰鎴浘
+ logger.info("[璇锋眰鎴浘]: " + fileName);
+ zlmresTfulUtils.getSnap(mediaInfo, streamUrl, 15, 1, path, fileName);
+ }
+ });
+ });
+
if (streamInfo != null) {
String streamId = streamInfo.getStream();
if (streamId == null) {
@@ -192,21 +210,6 @@
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck(), false);
logger.info(JSONObject.toJSONString(ssrcInfo));
play(mediaServerItem, ssrcInfo, device, channelId, (mediaServerItemInUse, response)->{
- // 鐐规挱缁撴潫鏃惰皟鐢ㄦ埅鍥炬帴鍙�
- taskExecutor.execute(()->{
- // TODO 搴旇鍦ㄤ笂娴佹椂璋冪敤鏇村ソ锛岀粨鏉熶篃鍙兘鏄敊璇粨鏉�
- String path = "snap";
- String fileName = deviceId + "_" + channelId + ".jpg";
- WVPResult wvpResult = (WVPResult)result.getResult();
- if (Objects.requireNonNull(wvpResult).getCode() == 0) {
- StreamInfo streamInfoForSuccess = (StreamInfo)wvpResult.getData();
- MediaServerItem mediaInfo = mediaServerService.getOne(streamInfoForSuccess.getMediaServerId());
- String streamUrl = streamInfoForSuccess.getFmp4();
- // 璇锋眰鎴浘
- logger.info("[璇锋眰鎴浘]: " + fileName);
- zlmresTfulUtils.getSnap(mediaInfo, streamUrl, 15, 1, path, fileName);
- }
- });
if (hookEvent != null) {
hookEvent.response(mediaServerItem, response);
}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java
index 84f5ef7..8d1b066 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java
@@ -16,6 +16,7 @@
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
+import javax.validation.constraints.NotNull;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -36,19 +37,20 @@
private boolean taskQueueHandlerRun = false;
- private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
+ private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
@Qualifier("taskExecutor")
@Autowired
private ThreadPoolTaskExecutor taskExecutor;
@Override
- public void onMessage(Message message, byte[] bytes) {
+ public void onMessage(@NotNull Message message, byte[] bytes) {
logger.info("鏀跺埌鏉ヨ嚜REDIS鐨凙LARM閫氱煡锛� {}", new String(message.getBody()));
taskQueue.offer(message);
if (!taskQueueHandlerRun) {
taskQueueHandlerRun = true;
+ logger.info("[绾跨▼姹犱俊鎭痌娲诲姩绾跨▼鏁帮細{}, 鏈�澶х嚎绋嬫暟锛� {}", taskExecutor.getActiveCount(), taskExecutor.getMaxPoolSize());
taskExecutor.execute(() -> {
while (!taskQueue.isEmpty()) {
Message msg = taskQueue.poll();
@@ -56,7 +58,7 @@
AlarmChannelMessage alarmChannelMessage = JSON.parseObject(msg.getBody(), AlarmChannelMessage.class);
if (alarmChannelMessage == null) {
logger.warn("[REDIS鐨凙LARM閫氱煡]娑堟伅瑙f瀽澶辫触");
- return;
+ continue;
}
String gbId = alarmChannelMessage.getGbId();
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
index e6ba9b6..2d4a82f 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
@@ -90,7 +90,7 @@
private boolean taskQueueHandlerRun = false;
- private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
+ private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
@Qualifier("taskExecutor")
@Autowired
@@ -121,7 +121,7 @@
JSONObject msgJSON = JSON.parseObject(msg.getBody(), JSONObject.class);
WvpRedisMsg wvpRedisMsg = JSON.toJavaObject(msgJSON, WvpRedisMsg.class);
if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) {
- return;
+ continue;
}
if (WvpRedisMsg.isRequest(wvpRedisMsg)) {
logger.info("[鏀跺埌REDIS閫氱煡] 璇锋眰锛� {}", new String(msg.getBody()));
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java
index bb2f4ad..b5d02a5 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java
@@ -35,7 +35,7 @@
@Autowired
private IVideoManagerStorage storager;
- private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
+ private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
@Qualifier("taskExecutor")
@Autowired
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java
index 13e6874..4fff32d 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java
@@ -28,7 +28,7 @@
private boolean taskQueueHandlerRun = false;
- private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
+ private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
@Qualifier("taskExecutor")
@Autowired
@@ -53,7 +53,7 @@
MessageForPushChannelResponse response = JSON.parseObject(new String(msg.getBody()), MessageForPushChannelResponse.class);
if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){
logger.info("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉]锛氬弬鏁颁笉鍏�");
- return;
+ continue;
}
// 鏌ョ湅姝e湪绛夊緟鐨刬nvite娑堟伅
if (responseEvents.get(response.getApp() + response.getStream()) != null) {
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java
index 3925836..b69a587 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java
@@ -40,7 +40,7 @@
private boolean taskQueueHandlerRun = false;
- private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
+ private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
@Qualifier("taskExecutor")
@Autowired
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusMsgListener.java
index 4b1c2d5..2faf3b8 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusMsgListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusMsgListener.java
@@ -42,7 +42,7 @@
- private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
+ private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
@Qualifier("taskExecutor")
@Autowired
@@ -61,7 +61,7 @@
PushStreamStatusChangeFromRedisDto statusChangeFromPushStream = JSON.parseObject(msg.getBody(), PushStreamStatusChangeFromRedisDto.class);
if (statusChangeFromPushStream == null) {
logger.warn("[REDIS娑堟伅]鎺ㄦ祦璁惧鐘舵�佸彉鍖栨秷鎭В鏋愬け璐�");
- return;
+ continue;
}
// 鍙栨秷瀹氭椂浠诲姟
dynamicTask.stop(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED);
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java
index 1897b6f..415787e 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java
@@ -35,7 +35,7 @@
private boolean taskQueueHandlerRun = false;
- private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
+ private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
@Qualifier("taskExecutor")
@Autowired
@@ -53,13 +53,13 @@
JSONObject steamMsgJson = JSON.parseObject(msg.getBody(), JSONObject.class);
if (steamMsgJson == null) {
logger.warn("[鏀跺埌redis 娴佸彉鍖朷娑堟伅瑙f瀽澶辫触");
- return;
+ continue;
}
String serverId = steamMsgJson.getString("serverId");
if (userSetting.getServerId().equals(serverId)) {
// 鑷繁鍙戦�佺殑娑堟伅蹇界暐鍗冲彲
- return;
+ continue;
}
logger.info("[鏀跺埌redis 娴佸彉鍖朷锛� {}", new String(message.getBody()));
String app = steamMsgJson.getString("app");
--
Gitblit v1.8.0