src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
@@ -92,39 +92,36 @@ @Override public void process(RequestEvent evt) { try { taskQueue.offer(new HandlerCatchData(evt, null, null)); responseAck(evt, Response.OK); if (!taskQueueHandlerRun) { taskQueueHandlerRun = true; taskExecutor.execute(()-> { while (!taskQueue.isEmpty()) { try { HandlerCatchData take = taskQueue.poll(); Element rootElement = getRootElement(take.getEvt()); String cmd = XmlUtil.getText(rootElement, "CmdType"); while (!taskQueue.isEmpty()) { try { HandlerCatchData take = taskQueue.poll(); Element rootElement = getRootElement(take.getEvt()); String cmd = XmlUtil.getText(rootElement, "CmdType"); if (CmdType.CATALOG.equals(cmd)) { logger.info("接收到Catalog通知"); processNotifyCatalogList(take.getEvt()); } else if (CmdType.ALARM.equals(cmd)) { logger.info("接收到Alarm通知"); processNotifyAlarm(take.getEvt()); } else if (CmdType.MOBILE_POSITION.equals(cmd)) { logger.info("接收到MobilePosition通知"); processNotifyMobilePosition(take.getEvt()); } else { logger.info("接收到消息:" + cmd); } } catch (DocumentException e) { throw new RuntimeException(e); } if (CmdType.CATALOG.equals(cmd)) { logger.info("接收到Catalog通知"); processNotifyCatalogList(take.getEvt()); } else if (CmdType.ALARM.equals(cmd)) { logger.info("接收到Alarm通知"); processNotifyAlarm(take.getEvt()); } else if (CmdType.MOBILE_POSITION.equals(cmd)) { logger.info("接收到MobilePosition通知"); processNotifyMobilePosition(take.getEvt()); } else { logger.info("接收到消息:" + cmd); } taskQueueHandlerRun = false; }); } catch (DocumentException e) { throw new RuntimeException(e); } } taskQueueHandlerRun = false; }); } } catch (SipException | InvalidArgumentException | ParseException e) { e.printStackTrace(); } @@ -174,7 +171,7 @@ } else { mobilePosition.setAltitude(0.0); } logger.info("[收到 移动位置订阅]:{}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(), logger.info("[收到移动位置订阅通知]:{}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(), mobilePosition.getLongitude(), mobilePosition.getLatitude()); mobilePosition.setReportSource("Mobile Position"); src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java
@@ -67,9 +67,9 @@ JSONObject mediaJSON = JSON.parseObject(JSON.toJSONString(data.get(0)), JSONObject.class); JSONArray tracks = mediaJSON.getJSONArray("tracks"); if (authority) { streamInfo = getStreamInfoByAppAndStream(mediaInfo, app, stream, tracks, streamAuthorityInfo.getCallId()); streamInfo = getStreamInfoByAppAndStream(mediaInfo, app, stream, tracks, addr,streamAuthorityInfo.getCallId()); }else { streamInfo = getStreamInfoByAppAndStream(mediaInfo, app, stream, tracks, null); streamInfo = getStreamInfoByAppAndStream(mediaInfo, app, stream, tracks, addr,null); } } src/main/java/com/genersoft/iot/vmp/service/impl/RedisGpsMsgListener.java
@@ -1,15 +1,20 @@ package com.genersoft.iot.vmp.service.impl; import com.alibaba.fastjson.JSON; import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import java.util.concurrent.ConcurrentLinkedQueue; /** * 接收来自redis的GPS更新通知 @@ -20,13 +25,31 @@ private final static Logger logger = LoggerFactory.getLogger(RedisGpsMsgListener.class); private boolean taskQueueHandlerRun = false; @Autowired private IRedisCatchStorage redisCatchStorage; private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); @Qualifier("taskExecutor") @Autowired private ThreadPoolTaskExecutor taskExecutor; @Override public void onMessage(@NotNull Message message, byte[] bytes) { // TODO 加消息队列 GPSMsgInfo gpsMsgInfo = JSON.parseObject(message.getBody(), GPSMsgInfo.class); redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo); taskQueue.offer(message); if (!taskQueueHandlerRun) { taskQueueHandlerRun = true; taskExecutor.execute(() -> { while (!taskQueue.isEmpty()) { Message msg = taskQueue.poll(); GPSMsgInfo gpsMsgInfo = JSON.parseObject(msg.getBody(), GPSMsgInfo.class); redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo); } taskQueueHandlerRun = false; }); } } } src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusMsgListener.java
@@ -14,6 +14,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.genersoft.iot.vmp.service.IStreamPushService; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.service.bean.PushStreamStatusChangeFromRedisDto; import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; @@ -21,14 +22,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; /** @@ -40,6 +44,8 @@ private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamStatusMsgListener.class); private boolean taskQueueHandlerRun = false; @Autowired private IRedisCatchStorage redisCatchStorage; @@ -47,39 +53,51 @@ private IStreamPushService streamPushService; @Autowired private EventPublisher eventPublisher; @Autowired private UserSetting userSetting; @Autowired private DynamicTask dynamicTask; private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); @Qualifier("taskExecutor") @Autowired private ThreadPoolTaskExecutor taskExecutor; @Override public void onMessage(Message message, byte[] bytes) { // TODO 增加队列 logger.warn("[REDIS 消息-推流设备状态变化]: {}", new String(message.getBody())); // PushStreamStatusChangeFromRedisDto statusChangeFromPushStream = JSON.parseObject(message.getBody(), PushStreamStatusChangeFromRedisDto.class); if (statusChangeFromPushStream == null) { logger.warn("[REDIS 消息]推流设备状态变化消息解析失败"); return; } // 取消定时任务 dynamicTask.stop(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED); if (statusChangeFromPushStream.isSetAllOffline()) { // 所有设备离线 streamPushService.allStreamOffline(); } if (statusChangeFromPushStream.getOfflineStreams() != null && statusChangeFromPushStream.getOfflineStreams().size() > 0) { // 更新部分设备离线 streamPushService.offline(statusChangeFromPushStream.getOfflineStreams()); } if (statusChangeFromPushStream.getOnlineStreams() != null && statusChangeFromPushStream.getOnlineStreams().size() > 0) { // 更新部分设备上线 streamPushService.online(statusChangeFromPushStream.getOnlineStreams()); taskQueue.offer(message); if (!taskQueueHandlerRun) { taskQueueHandlerRun = true; taskExecutor.execute(() -> { while (!taskQueue.isEmpty()) { Message msg = taskQueue.poll(); PushStreamStatusChangeFromRedisDto statusChangeFromPushStream = JSON.parseObject(msg.getBody(), PushStreamStatusChangeFromRedisDto.class); if (statusChangeFromPushStream == null) { logger.warn("[REDIS 消息]推流设备状态变化消息解析失败"); return; } // 取消定时任务 dynamicTask.stop(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED); if (statusChangeFromPushStream.isSetAllOffline()) { // 所有设备离线 streamPushService.allStreamOffline(); } if (statusChangeFromPushStream.getOfflineStreams() != null && statusChangeFromPushStream.getOfflineStreams().size() > 0) { // 更新部分设备离线 streamPushService.offline(statusChangeFromPushStream.getOfflineStreams()); } if (statusChangeFromPushStream.getOnlineStreams() != null && statusChangeFromPushStream.getOnlineStreams().size() > 0) { // 更新部分设备上线 streamPushService.online(statusChangeFromPushStream.getOnlineStreams()); } } taskQueueHandlerRun = false; }); } } src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -688,21 +688,21 @@ @Override public void sendMobilePositionMsg(JSONObject jsonObject) { String key = VideoManagerConstants.VM_MSG_SUBSCRIBE_MOBILE_POSITION; logger.info("[redis 移动位置订阅通知] {}: {}", key, jsonObject.toString()); logger.info("[redis发送通知]移动位置 {}: {}", key, jsonObject.toString()); redis.convertAndSend(key, jsonObject); } @Override public void sendStreamPushRequestedMsg(MessageForPushChannel msg) { String key = VideoManagerConstants.VM_MSG_STREAM_PUSH_REQUESTED; logger.info("[redis 推流被请求通知] {}: {}/{}", key, msg.getApp(), msg.getStream()); logger.info("[redis发送通知]推流被请求 {}: {}/{}", key, msg.getApp(), msg.getStream()); redis.convertAndSend(key, (JSONObject)JSON.toJSON(msg)); } @Override public void sendAlarmMsg(AlarmChannelMessage msg) { String key = VideoManagerConstants.VM_MSG_SUBSCRIBE_ALARM; logger.info("[redis 报警通知] {}: {}", key, JSON.toJSON(msg)); logger.info("[redis发送通知] 报警{}: {}", key, JSON.toJSON(msg)); redis.convertAndSend(key, (JSONObject)JSON.toJSON(msg)); } @@ -715,7 +715,7 @@ @Override public void sendStreamPushRequestedMsgForStatus() { String key = VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED; logger.info("[redis 通知]获取所有推流设备的状态"); logger.info("[redis通知]获取所有推流设备的状态"); JSONObject jsonObject = new JSONObject(); jsonObject.put(key, key); redis.convertAndSend(key, jsonObject);