From bc38f5ef299f44f65fd34258b84272a027c10cb6 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期三, 27 七月 2022 14:48:21 +0800 Subject: [PATCH] 修复流地址返回错误 --- src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java | 8 +- src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java | 4 src/main/java/com/genersoft/iot/vmp/service/impl/RedisGpsMsgListener.java | 29 ++++++++- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java | 49 +++++++-------- src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusMsgListener.java | 72 +++++++++++++++--------- 5 files changed, 100 insertions(+), 62 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java index e191578..25261f3 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -92,39 +92,36 @@ @Override public void process(RequestEvent evt) { try { - taskQueue.offer(new HandlerCatchData(evt, null, null)); responseAck(evt, Response.OK); if (!taskQueueHandlerRun) { taskQueueHandlerRun = true; taskExecutor.execute(()-> { - while (!taskQueue.isEmpty()) { - try { - HandlerCatchData take = taskQueue.poll(); - Element rootElement = getRootElement(take.getEvt()); - String cmd = XmlUtil.getText(rootElement, "CmdType"); + while (!taskQueue.isEmpty()) { + try { + HandlerCatchData take = taskQueue.poll(); + Element rootElement = getRootElement(take.getEvt()); + String cmd = XmlUtil.getText(rootElement, "CmdType"); - if (CmdType.CATALOG.equals(cmd)) { - logger.info("鎺ユ敹鍒癈atalog閫氱煡"); - processNotifyCatalogList(take.getEvt()); - } else if (CmdType.ALARM.equals(cmd)) { - logger.info("鎺ユ敹鍒癆larm閫氱煡"); - processNotifyAlarm(take.getEvt()); - } else if (CmdType.MOBILE_POSITION.equals(cmd)) { - logger.info("鎺ユ敹鍒癕obilePosition閫氱煡"); - processNotifyMobilePosition(take.getEvt()); - } else { - logger.info("鎺ユ敹鍒版秷鎭細" + cmd); - } - } catch (DocumentException e) { - throw new RuntimeException(e); - } + if (CmdType.CATALOG.equals(cmd)) { + logger.info("鎺ユ敹鍒癈atalog閫氱煡"); + processNotifyCatalogList(take.getEvt()); + } else if (CmdType.ALARM.equals(cmd)) { + logger.info("鎺ユ敹鍒癆larm閫氱煡"); + processNotifyAlarm(take.getEvt()); + } else if (CmdType.MOBILE_POSITION.equals(cmd)) { + logger.info("鎺ユ敹鍒癕obilePosition閫氱煡"); + processNotifyMobilePosition(take.getEvt()); + } else { + logger.info("鎺ユ敹鍒版秷鎭細" + cmd); } - taskQueueHandlerRun = false; - }); + } catch (DocumentException e) { + throw new RuntimeException(e); + } + } + taskQueueHandlerRun = false; + }); } - - } catch (SipException | InvalidArgumentException | ParseException e) { e.printStackTrace(); } @@ -174,7 +171,7 @@ } else { mobilePosition.setAltitude(0.0); } - logger.info("[鏀跺埌 绉诲姩浣嶇疆璁㈤槄]锛歿}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(), + logger.info("[鏀跺埌绉诲姩浣嶇疆璁㈤槄閫氱煡]锛歿}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(), mobilePosition.getLongitude(), mobilePosition.getLatitude()); mobilePosition.setReportSource("Mobile Position"); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java index fb7cea0..c712045 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java @@ -67,9 +67,9 @@ JSONObject mediaJSON = JSON.parseObject(JSON.toJSONString(data.get(0)), JSONObject.class); JSONArray tracks = mediaJSON.getJSONArray("tracks"); if (authority) { - streamInfo = getStreamInfoByAppAndStream(mediaInfo, app, stream, tracks, streamAuthorityInfo.getCallId()); + streamInfo = getStreamInfoByAppAndStream(mediaInfo, app, stream, tracks, addr,streamAuthorityInfo.getCallId()); }else { - streamInfo = getStreamInfoByAppAndStream(mediaInfo, app, stream, tracks, null); + streamInfo = getStreamInfoByAppAndStream(mediaInfo, app, stream, tracks, addr,null); } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGpsMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGpsMsgListener.java index 7482833..d5a26e7 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGpsMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGpsMsgListener.java @@ -1,15 +1,20 @@ package com.genersoft.iot.vmp.service.impl; import com.alibaba.fastjson.JSON; +import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; + +import java.util.concurrent.ConcurrentLinkedQueue; /** * 鎺ユ敹鏉ヨ嚜redis鐨凣PS鏇存柊閫氱煡 @@ -20,13 +25,31 @@ private final static Logger logger = LoggerFactory.getLogger(RedisGpsMsgListener.class); + private boolean taskQueueHandlerRun = false; + @Autowired private IRedisCatchStorage redisCatchStorage; + private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); + + @Qualifier("taskExecutor") + @Autowired + private ThreadPoolTaskExecutor taskExecutor; + + @Override public void onMessage(@NotNull Message message, byte[] bytes) { - // TODO 鍔犳秷鎭槦鍒� - GPSMsgInfo gpsMsgInfo = JSON.parseObject(message.getBody(), GPSMsgInfo.class); - redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo); + taskQueue.offer(message); + if (!taskQueueHandlerRun) { + taskQueueHandlerRun = true; + taskExecutor.execute(() -> { + while (!taskQueue.isEmpty()) { + Message msg = taskQueue.poll(); + GPSMsgInfo gpsMsgInfo = JSON.parseObject(msg.getBody(), GPSMsgInfo.class); + redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo); + } + taskQueueHandlerRun = false; + }); + } } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusMsgListener.java index 90ed8d4..8606702 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusMsgListener.java @@ -14,6 +14,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.genersoft.iot.vmp.service.IStreamPushService; +import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.service.bean.PushStreamStatusChangeFromRedisDto; import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; @@ -21,14 +22,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; /** @@ -40,6 +44,8 @@ private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamStatusMsgListener.class); + private boolean taskQueueHandlerRun = false; + @Autowired private IRedisCatchStorage redisCatchStorage; @@ -47,39 +53,51 @@ private IStreamPushService streamPushService; @Autowired - private EventPublisher eventPublisher; - - @Autowired - private UserSetting userSetting; - - @Autowired private DynamicTask dynamicTask; + + + + private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); + + @Qualifier("taskExecutor") + @Autowired + private ThreadPoolTaskExecutor taskExecutor; @Override public void onMessage(Message message, byte[] bytes) { // TODO 澧炲姞闃熷垪 logger.warn("[REDIS 娑堟伅-鎺ㄦ祦璁惧鐘舵�佸彉鍖朷锛� {}", new String(message.getBody())); - // - PushStreamStatusChangeFromRedisDto statusChangeFromPushStream = JSON.parseObject(message.getBody(), PushStreamStatusChangeFromRedisDto.class); - if (statusChangeFromPushStream == null) { - logger.warn("[REDIS 娑堟伅]鎺ㄦ祦璁惧鐘舵�佸彉鍖栨秷鎭В鏋愬け璐�"); - return; - } - // 鍙栨秷瀹氭椂浠诲姟 - dynamicTask.stop(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED); - if (statusChangeFromPushStream.isSetAllOffline()) { - // 鎵�鏈夎澶囩绾� - streamPushService.allStreamOffline(); - } - if (statusChangeFromPushStream.getOfflineStreams() != null - && statusChangeFromPushStream.getOfflineStreams().size() > 0) { - // 鏇存柊閮ㄥ垎璁惧绂荤嚎 - streamPushService.offline(statusChangeFromPushStream.getOfflineStreams()); - } - if (statusChangeFromPushStream.getOnlineStreams() != null && - statusChangeFromPushStream.getOnlineStreams().size() > 0) { - // 鏇存柊閮ㄥ垎璁惧涓婄嚎 - streamPushService.online(statusChangeFromPushStream.getOnlineStreams()); + taskQueue.offer(message); + + if (!taskQueueHandlerRun) { + taskQueueHandlerRun = true; + taskExecutor.execute(() -> { + while (!taskQueue.isEmpty()) { + Message msg = taskQueue.poll(); + PushStreamStatusChangeFromRedisDto statusChangeFromPushStream = JSON.parseObject(msg.getBody(), PushStreamStatusChangeFromRedisDto.class); + if (statusChangeFromPushStream == null) { + logger.warn("[REDIS 娑堟伅]鎺ㄦ祦璁惧鐘舵�佸彉鍖栨秷鎭В鏋愬け璐�"); + return; + } + // 鍙栨秷瀹氭椂浠诲姟 + dynamicTask.stop(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED); + if (statusChangeFromPushStream.isSetAllOffline()) { + // 鎵�鏈夎澶囩绾� + streamPushService.allStreamOffline(); + } + if (statusChangeFromPushStream.getOfflineStreams() != null + && statusChangeFromPushStream.getOfflineStreams().size() > 0) { + // 鏇存柊閮ㄥ垎璁惧绂荤嚎 + streamPushService.offline(statusChangeFromPushStream.getOfflineStreams()); + } + if (statusChangeFromPushStream.getOnlineStreams() != null && + statusChangeFromPushStream.getOnlineStreams().size() > 0) { + // 鏇存柊閮ㄥ垎璁惧涓婄嚎 + streamPushService.online(statusChangeFromPushStream.getOnlineStreams()); + } + } + taskQueueHandlerRun = false; + }); } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 5b2e515..31294a0 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -688,21 +688,21 @@ @Override public void sendMobilePositionMsg(JSONObject jsonObject) { String key = VideoManagerConstants.VM_MSG_SUBSCRIBE_MOBILE_POSITION; - logger.info("[redis 绉诲姩浣嶇疆璁㈤槄閫氱煡] {}: {}", key, jsonObject.toString()); + logger.info("[redis鍙戦�侀�氱煡]绉诲姩浣嶇疆 {}: {}", key, jsonObject.toString()); redis.convertAndSend(key, jsonObject); } @Override public void sendStreamPushRequestedMsg(MessageForPushChannel msg) { String key = VideoManagerConstants.VM_MSG_STREAM_PUSH_REQUESTED; - logger.info("[redis 鎺ㄦ祦琚姹傞�氱煡] {}: {}/{}", key, msg.getApp(), msg.getStream()); + logger.info("[redis鍙戦�侀�氱煡]鎺ㄦ祦琚姹� {}: {}/{}", key, msg.getApp(), msg.getStream()); redis.convertAndSend(key, (JSONObject)JSON.toJSON(msg)); } @Override public void sendAlarmMsg(AlarmChannelMessage msg) { String key = VideoManagerConstants.VM_MSG_SUBSCRIBE_ALARM; - logger.info("[redis 鎶ヨ閫氱煡] {}: {}", key, JSON.toJSON(msg)); + logger.info("[redis鍙戦�侀�氱煡] 鎶ヨ{}: {}", key, JSON.toJSON(msg)); redis.convertAndSend(key, (JSONObject)JSON.toJSON(msg)); } @@ -715,7 +715,7 @@ @Override public void sendStreamPushRequestedMsgForStatus() { String key = VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED; - logger.info("[redis 閫氱煡]鑾峰彇鎵�鏈夋帹娴佽澶囩殑鐘舵��"); + logger.info("[redis閫氱煡]鑾峰彇鎵�鏈夋帹娴佽澶囩殑鐘舵��"); JSONObject jsonObject = new JSONObject(); jsonObject.put(key, key); redis.convertAndSend(key, jsonObject); -- Gitblit v1.8.0