From 5e73874880cdfd5b6b99147a0cdd8a6eabcfbf16 Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期四, 22 九月 2022 11:22:08 +0800
Subject: [PATCH] 添加队列处理redis消息和sip消息,支持使用推流状态作为通道在线状态
---
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java | 152 ++++---
src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java | 6
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java | 32
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusMsgListener.java | 18
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java | 75 +++
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java | 219 ++++++----
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java | 3
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java | 91 ++++
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java | 103 +++++
src/main/java/com/genersoft/iot/vmp/conf/redis/RedisConfig.java | 2
src/main/resources/all-application.yml | 2
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java | 5
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java | 100 +++++
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java | 4
/dev/null | 72 ---
src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java | 6
src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java | 10
src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java | 13
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SipMsgInfo.java | 56 ++
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java | 180 +++++---
20 files changed, 789 insertions(+), 360 deletions(-)
diff --git a/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java b/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java
index 017b39d..cad6e69 100644
--- a/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java
+++ b/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java
@@ -31,6 +31,8 @@
private Boolean logInDatebase = Boolean.TRUE;
+ private Boolean usePushingAsStatus = Boolean.TRUE;
+
private String serverId = "000000";
private String thirdPartyGBIdReg = "[\\s\\S]*";
@@ -136,4 +138,12 @@
public void setPlatformPlayTimeout(int platformPlayTimeout) {
this.platformPlayTimeout = platformPlayTimeout;
}
+
+ public Boolean isUsePushingAsStatus() {
+ return usePushingAsStatus;
+ }
+
+ public void setUsePushingAsStatus(Boolean usePushingAsStatus) {
+ this.usePushingAsStatus = usePushingAsStatus;
+ }
}
diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisConfig.java
index 0b653cf..e412f71 100644
--- a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisConfig.java
+++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisConfig.java
@@ -3,7 +3,7 @@
import com.alibaba.fastjson.parser.ParserConfig;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
-import com.genersoft.iot.vmp.service.impl.*;
+import com.genersoft.iot.vmp.service.redisMsg.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.context.annotation.Bean;
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SipMsgInfo.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SipMsgInfo.java
new file mode 100644
index 0000000..302539b
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SipMsgInfo.java
@@ -0,0 +1,56 @@
+package com.genersoft.iot.vmp.gb28181.bean;
+
+import org.dom4j.Element;
+
+import javax.sip.RequestEvent;
+
+public class SipMsgInfo {
+ private RequestEvent evt;
+ private Device device;
+ private ParentPlatform platform;
+ private Element rootElement;
+
+ public SipMsgInfo(RequestEvent evt, Device device, Element rootElement) {
+ this.evt = evt;
+ this.device = device;
+ this.rootElement = rootElement;
+ }
+
+ public SipMsgInfo(RequestEvent evt, ParentPlatform platform, Element rootElement) {
+ this.evt = evt;
+ this.platform = platform;
+ this.rootElement = rootElement;
+ }
+
+ public RequestEvent getEvt() {
+ return evt;
+ }
+
+ public void setEvt(RequestEvent evt) {
+ this.evt = evt;
+ }
+
+ public Device getDevice() {
+ return device;
+ }
+
+ public void setDevice(Device device) {
+ this.device = device;
+ }
+
+ public ParentPlatform getPlatform() {
+ return platform;
+ }
+
+ public void setPlatform(ParentPlatform platform) {
+ this.platform = platform;
+ }
+
+ public Element getRootElement() {
+ return rootElement;
+ }
+
+ public void setRootElement(Element rootElement) {
+ this.rootElement = rootElement;
+ }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
index f8c3abf..33cc6e6 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
@@ -14,18 +14,15 @@
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
-import com.genersoft.iot.vmp.service.impl.RedisGbPlayMsgListener;
+import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
-import com.genersoft.iot.vmp.utils.SerializeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import javax.sip.Dialog;
-import javax.sip.DialogState;
import javax.sip.RequestEvent;
import javax.sip.address.SipURI;
import javax.sip.header.CallIdHeader;
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
index 6ff5c0f..ff3c78f 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -22,8 +22,8 @@
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
-import com.genersoft.iot.vmp.service.impl.RedisGbPlayMsgListener;
-import com.genersoft.iot.vmp.service.impl.RedisPushStreamResponseListener;
+import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
+import com.genersoft.iot.vmp.service.redisMsg.RedisPushStreamResponseListener;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java
index add32d1..da2cb9c 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java
@@ -21,6 +21,8 @@
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
@@ -31,6 +33,7 @@
import javax.sip.message.Response;
import java.text.ParseException;
+import java.util.concurrent.ConcurrentLinkedQueue;
import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.*;
@@ -67,6 +70,15 @@
@Autowired
private IDeviceChannelService deviceChannelService;
+ private boolean taskQueueHandlerRun = false;
+
+ private final ConcurrentLinkedQueue<SipMsgInfo> taskQueue = new ConcurrentLinkedQueue<>();
+
+ @Qualifier("taskExecutor")
+ @Autowired
+ private ThreadPoolTaskExecutor taskExecutor;
+
+
@Override
public void afterPropertiesSet() throws Exception {
notifyMessageHandler.addHandler(cmdType, this);
@@ -75,114 +87,127 @@
@Override
public void handForDevice(RequestEvent evt, Device device, Element rootElement) {
logger.info("[鏀跺埌鎶ヨ閫氱煡]璁惧锛歿}", device.getDeviceId());
- // 鍥炲200 OK
- try {
- responseAck(getServerTransaction(evt), Response.OK);
- } catch (SipException | InvalidArgumentException | ParseException e) {
- logger.error("[鏀跺埌鎶ヨ閫氱煡], 鍥炲200OK澶辫触", e);
- }
- Element deviceIdElement = rootElement.element("DeviceID");
- String channelId = deviceIdElement.getText().toString();
+ taskQueue.offer(new SipMsgInfo(evt, device, rootElement));
+ if (!taskQueueHandlerRun) {
+ taskQueueHandlerRun = true;
+ taskExecutor.execute(() -> {
+ while (!taskQueue.isEmpty()) {
+ SipMsgInfo sipMsgInfo = taskQueue.poll();
+ // 鍥炲200 OK
+ try {
+ responseAck(getServerTransaction(sipMsgInfo.getEvt()), Response.OK);
+ } catch (SipException | InvalidArgumentException | ParseException e) {
+ logger.error("[鏀跺埌鎶ヨ閫氱煡], 鍥炲200OK澶辫触", e);
+ }
- DeviceAlarm deviceAlarm = new DeviceAlarm();
- deviceAlarm.setCreateTime(DateUtil.getNow());
- deviceAlarm.setDeviceId(device.getDeviceId());
- deviceAlarm.setChannelId(channelId);
- deviceAlarm.setAlarmPriority(getText(rootElement, "AlarmPriority"));
- deviceAlarm.setAlarmMethod(getText(rootElement, "AlarmMethod"));
- String alarmTime = XmlUtil.getText(rootElement, "AlarmTime");
- if (alarmTime == null) {
- return;
- }
- deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime));
- String alarmDescription = getText(rootElement, "AlarmDescription");
- if (alarmDescription == null) {
- deviceAlarm.setAlarmDescription("");
- } else {
- deviceAlarm.setAlarmDescription(alarmDescription);
- }
- String longitude = getText(rootElement, "Longitude");
- if (longitude != null && NumericUtil.isDouble(longitude)) {
- deviceAlarm.setLongitude(Double.parseDouble(longitude));
- } else {
- deviceAlarm.setLongitude(0.00);
- }
- String latitude = getText(rootElement, "Latitude");
- if (latitude != null && NumericUtil.isDouble(latitude)) {
- deviceAlarm.setLatitude(Double.parseDouble(latitude));
- } else {
- deviceAlarm.setLatitude(0.00);
- }
+ Element deviceIdElement = sipMsgInfo.getRootElement().element("DeviceID");
+ String channelId = deviceIdElement.getText().toString();
- if (!ObjectUtils.isEmpty(deviceAlarm.getAlarmMethod())) {
- if ( deviceAlarm.getAlarmMethod().contains(DeviceAlarmMethod.GPS.getVal() + "")) {
- MobilePosition mobilePosition = new MobilePosition();
- mobilePosition.setCreateTime(DateUtil.getNow());
- mobilePosition.setDeviceId(deviceAlarm.getDeviceId());
- mobilePosition.setTime(deviceAlarm.getAlarmTime());
- mobilePosition.setLongitude(deviceAlarm.getLongitude());
- mobilePosition.setLatitude(deviceAlarm.getLatitude());
- mobilePosition.setReportSource("GPS Alarm");
+ DeviceAlarm deviceAlarm = new DeviceAlarm();
+ deviceAlarm.setCreateTime(DateUtil.getNow());
+ deviceAlarm.setDeviceId(sipMsgInfo.getDevice().getDeviceId());
+ deviceAlarm.setChannelId(channelId);
+ deviceAlarm.setAlarmPriority(getText(sipMsgInfo.getRootElement(), "AlarmPriority"));
+ deviceAlarm.setAlarmMethod(getText(sipMsgInfo.getRootElement(), "AlarmMethod"));
+ String alarmTime = XmlUtil.getText(sipMsgInfo.getRootElement(), "AlarmTime");
+ if (alarmTime == null) {
+ return;
+ }
+ deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime));
+ String alarmDescription = getText(sipMsgInfo.getRootElement(), "AlarmDescription");
+ if (alarmDescription == null) {
+ deviceAlarm.setAlarmDescription("");
+ } else {
+ deviceAlarm.setAlarmDescription(alarmDescription);
+ }
+ String longitude = getText(sipMsgInfo.getRootElement(), "Longitude");
+ if (longitude != null && NumericUtil.isDouble(longitude)) {
+ deviceAlarm.setLongitude(Double.parseDouble(longitude));
+ } else {
+ deviceAlarm.setLongitude(0.00);
+ }
+ String latitude = getText(sipMsgInfo.getRootElement(), "Latitude");
+ if (latitude != null && NumericUtil.isDouble(latitude)) {
+ deviceAlarm.setLatitude(Double.parseDouble(latitude));
+ } else {
+ deviceAlarm.setLatitude(0.00);
+ }
- // 鏇存柊device channel 鐨勭粡绾害
- DeviceChannel deviceChannel = new DeviceChannel();
- deviceChannel.setDeviceId(device.getDeviceId());
- deviceChannel.setChannelId(channelId);
- deviceChannel.setLongitude(mobilePosition.getLongitude());
- deviceChannel.setLatitude(mobilePosition.getLatitude());
- deviceChannel.setGpsTime(mobilePosition.getTime());
+ if (!ObjectUtils.isEmpty(deviceAlarm.getAlarmMethod())) {
+ if ( deviceAlarm.getAlarmMethod().contains(DeviceAlarmMethod.GPS.getVal() + "")) {
+ MobilePosition mobilePosition = new MobilePosition();
+ mobilePosition.setCreateTime(DateUtil.getNow());
+ mobilePosition.setDeviceId(deviceAlarm.getDeviceId());
+ mobilePosition.setTime(deviceAlarm.getAlarmTime());
+ mobilePosition.setLongitude(deviceAlarm.getLongitude());
+ mobilePosition.setLatitude(deviceAlarm.getLatitude());
+ mobilePosition.setReportSource("GPS Alarm");
- deviceChannel = deviceChannelService.updateGps(deviceChannel, device);
+ // 鏇存柊device channel 鐨勭粡绾害
+ DeviceChannel deviceChannel = new DeviceChannel();
+ deviceChannel.setDeviceId(sipMsgInfo.getDevice().getDeviceId());
+ deviceChannel.setChannelId(channelId);
+ deviceChannel.setLongitude(mobilePosition.getLongitude());
+ deviceChannel.setLatitude(mobilePosition.getLatitude());
+ deviceChannel.setGpsTime(mobilePosition.getTime());
- mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84());
- mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84());
- mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
- mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
+ deviceChannel = deviceChannelService.updateGps(deviceChannel, sipMsgInfo.getDevice());
- if (userSetting.getSavePositionHistory()) {
- storager.insertMobilePosition(mobilePosition);
+ mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84());
+ mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84());
+ mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
+ mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
+
+ if (userSetting.getSavePositionHistory()) {
+ storager.insertMobilePosition(mobilePosition);
+ }
+ storager.updateChannelPosition(deviceChannel);
+
+ // 鍙戦�乺edis娑堟伅銆� 閫氱煡浣嶇疆淇℃伅鐨勫彉鍖�
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put("time", mobilePosition.getTime());
+ jsonObject.put("serial", deviceChannel.getDeviceId());
+ jsonObject.put("code", deviceChannel.getChannelId());
+ jsonObject.put("longitude", mobilePosition.getLongitude());
+ jsonObject.put("latitude", mobilePosition.getLatitude());
+ jsonObject.put("altitude", mobilePosition.getAltitude());
+ jsonObject.put("direction", mobilePosition.getDirection());
+ jsonObject.put("speed", mobilePosition.getSpeed());
+ redisCatchStorage.sendMobilePositionMsg(jsonObject);
+ }
+ }
+ if (!ObjectUtils.isEmpty(deviceAlarm.getDeviceId())) {
+ if (deviceAlarm.getAlarmMethod().contains(DeviceAlarmMethod.Video.getVal() + "")) {
+ deviceAlarm.setAlarmType(getText(sipMsgInfo.getRootElement().element("Info"), "AlarmType"));
+ }
+ }
+
+ if ("7".equals(deviceAlarm.getAlarmMethod()) ) {
+ // 鍙戦�佺粰骞冲彴鐨勬姤璀︿俊鎭�� 鍙戦�乺edis閫氱煡
+ AlarmChannelMessage alarmChannelMessage = new AlarmChannelMessage();
+ alarmChannelMessage.setAlarmSn(Integer.parseInt(deviceAlarm.getAlarmMethod()));
+ alarmChannelMessage.setAlarmDescription(deviceAlarm.getAlarmDescription());
+ alarmChannelMessage.setGbId(channelId);
+ redisCatchStorage.sendAlarmMsg(alarmChannelMessage);
+ return;
+ }
+
+ logger.debug("瀛樺偍鎶ヨ淇℃伅銆佹姤璀﹀垎绫�");
+ // 瀛樺偍鎶ヨ淇℃伅銆佹姤璀﹀垎绫�
+ if (sipConfig.isAlarm()) {
+ deviceAlarmService.add(deviceAlarm);
+ }
+ logger.info("[鏀跺埌鎶ヨ閫氱煡]鍐呭锛歿}", JSONObject.toJSON(deviceAlarm));
+ if (redisCatchStorage.deviceIsOnline(sipMsgInfo.getDevice().getDeviceId())) {
+ publisher.deviceAlarmEventPublish(deviceAlarm);
+ }
}
- storager.updateChannelPosition(deviceChannel);
-
- // 鍙戦�乺edis娑堟伅銆� 閫氱煡浣嶇疆淇℃伅鐨勫彉鍖�
- JSONObject jsonObject = new JSONObject();
- jsonObject.put("time", mobilePosition.getTime());
- jsonObject.put("serial", deviceChannel.getDeviceId());
- jsonObject.put("code", deviceChannel.getChannelId());
- jsonObject.put("longitude", mobilePosition.getLongitude());
- jsonObject.put("latitude", mobilePosition.getLatitude());
- jsonObject.put("altitude", mobilePosition.getAltitude());
- jsonObject.put("direction", mobilePosition.getDirection());
- jsonObject.put("speed", mobilePosition.getSpeed());
- redisCatchStorage.sendMobilePositionMsg(jsonObject);
- }
- }
- if (!ObjectUtils.isEmpty(deviceAlarm.getDeviceId())) {
- if (deviceAlarm.getAlarmMethod().contains(DeviceAlarmMethod.Video.getVal() + "")) {
- deviceAlarm.setAlarmType(getText(rootElement.element("Info"), "AlarmType"));
- }
+ taskQueueHandlerRun = false;
+ });
}
- if ("7".equals(deviceAlarm.getAlarmMethod()) ) {
- // 鍙戦�佺粰骞冲彴鐨勬姤璀︿俊鎭�� 鍙戦�乺edis閫氱煡
- AlarmChannelMessage alarmChannelMessage = new AlarmChannelMessage();
- alarmChannelMessage.setAlarmSn(Integer.parseInt(deviceAlarm.getAlarmMethod()));
- alarmChannelMessage.setAlarmDescription(deviceAlarm.getAlarmDescription());
- alarmChannelMessage.setGbId(channelId);
- redisCatchStorage.sendAlarmMsg(alarmChannelMessage);
- return;
- }
- logger.debug("瀛樺偍鎶ヨ淇℃伅銆佹姤璀﹀垎绫�");
- // 瀛樺偍鎶ヨ淇℃伅銆佹姤璀﹀垎绫�
- if (sipConfig.isAlarm()) {
- deviceAlarmService.add(deviceAlarm);
- }
- logger.info("[鏀跺埌鎶ヨ閫氱煡]鍐呭锛歿}", JSONObject.toJSON(deviceAlarm));
- if (redisCatchStorage.deviceIsOnline(device.getDeviceId())) {
- publisher.deviceAlarmEventPublish(deviceAlarm);
- }
}
@Override
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java
index b5051c0..67d26d7 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java
@@ -17,6 +17,8 @@
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
@@ -26,6 +28,7 @@
import javax.sip.SipException;
import javax.sip.message.Response;
import java.text.ParseException;
+import java.util.concurrent.ConcurrentLinkedQueue;
import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText;
@@ -53,6 +56,14 @@
@Autowired
private IDeviceChannelService deviceChannelService;
+ private boolean taskQueueHandlerRun = false;
+
+ private final ConcurrentLinkedQueue<SipMsgInfo> taskQueue = new ConcurrentLinkedQueue<>();
+
+ @Qualifier("taskExecutor")
+ @Autowired
+ private ThreadPoolTaskExecutor taskExecutor;
+
@Override
public void afterPropertiesSet() throws Exception {
notifyMessageHandler.addHandler(cmdType, this);
@@ -61,78 +72,91 @@
@Override
public void handForDevice(RequestEvent evt, Device device, Element rootElement) {
- try {
- rootElement = getRootElement(evt, device.getCharset());
- if (rootElement == null) {
- logger.warn("[ 绉诲姩璁惧浣嶇疆鏁版嵁閫氱煡 ] content cannot be null, {}", evt.getRequest());
- responseAck(getServerTransaction(evt), Response.BAD_REQUEST);
- return;
- }
- MobilePosition mobilePosition = new MobilePosition();
- mobilePosition.setCreateTime(DateUtil.getNow());
- if (!ObjectUtils.isEmpty(device.getName())) {
- mobilePosition.setDeviceName(device.getName());
- }
- mobilePosition.setDeviceId(device.getDeviceId());
- mobilePosition.setChannelId(getText(rootElement, "DeviceID"));
- mobilePosition.setTime(getText(rootElement, "Time"));
- mobilePosition.setLongitude(Double.parseDouble(getText(rootElement, "Longitude")));
- mobilePosition.setLatitude(Double.parseDouble(getText(rootElement, "Latitude")));
- if (NumericUtil.isDouble(getText(rootElement, "Speed"))) {
- mobilePosition.setSpeed(Double.parseDouble(getText(rootElement, "Speed")));
- } else {
- mobilePosition.setSpeed(0.0);
- }
- if (NumericUtil.isDouble(getText(rootElement, "Direction"))) {
- mobilePosition.setDirection(Double.parseDouble(getText(rootElement, "Direction")));
- } else {
- mobilePosition.setDirection(0.0);
- }
- if (NumericUtil.isDouble(getText(rootElement, "Altitude"))) {
- mobilePosition.setAltitude(Double.parseDouble(getText(rootElement, "Altitude")));
- } else {
- mobilePosition.setAltitude(0.0);
- }
- mobilePosition.setReportSource("Mobile Position");
+ taskQueue.offer(new SipMsgInfo(evt, device, rootElement));
+ if (!taskQueueHandlerRun) {
+ taskQueueHandlerRun = true;
+ taskExecutor.execute(() -> {
+ while (!taskQueue.isEmpty()) {
+ SipMsgInfo sipMsgInfo = taskQueue.poll();
+ try {
+ Element rootElementAfterCharset = getRootElement(sipMsgInfo.getEvt(), sipMsgInfo.getDevice().getCharset());
+ if (rootElementAfterCharset == null) {
+ logger.warn("[ 绉诲姩璁惧浣嶇疆鏁版嵁閫氱煡 ] content cannot be null, {}", sipMsgInfo.getEvt().getRequest());
+ responseAck(getServerTransaction(sipMsgInfo.getEvt()), Response.BAD_REQUEST);
+ return;
+ }
+ MobilePosition mobilePosition = new MobilePosition();
+ mobilePosition.setCreateTime(DateUtil.getNow());
+ if (!ObjectUtils.isEmpty(sipMsgInfo.getDevice().getName())) {
+ mobilePosition.setDeviceName(sipMsgInfo.getDevice().getName());
+ }
+ mobilePosition.setDeviceId(sipMsgInfo.getDevice().getDeviceId());
+ mobilePosition.setChannelId(getText(rootElementAfterCharset, "DeviceID"));
+ mobilePosition.setTime(getText(rootElementAfterCharset, "Time"));
+ mobilePosition.setLongitude(Double.parseDouble(getText(rootElementAfterCharset, "Longitude")));
+ mobilePosition.setLatitude(Double.parseDouble(getText(rootElementAfterCharset, "Latitude")));
+ if (NumericUtil.isDouble(getText(rootElementAfterCharset, "Speed"))) {
+ mobilePosition.setSpeed(Double.parseDouble(getText(rootElementAfterCharset, "Speed")));
+ } else {
+ mobilePosition.setSpeed(0.0);
+ }
+ if (NumericUtil.isDouble(getText(rootElementAfterCharset, "Direction"))) {
+ mobilePosition.setDirection(Double.parseDouble(getText(rootElementAfterCharset, "Direction")));
+ } else {
+ mobilePosition.setDirection(0.0);
+ }
+ if (NumericUtil.isDouble(getText(rootElementAfterCharset, "Altitude"))) {
+ mobilePosition.setAltitude(Double.parseDouble(getText(rootElementAfterCharset, "Altitude")));
+ } else {
+ mobilePosition.setAltitude(0.0);
+ }
+ mobilePosition.setReportSource("Mobile Position");
- // 鏇存柊device channel 鐨勭粡绾害
- DeviceChannel deviceChannel = new DeviceChannel();
- deviceChannel.setDeviceId(device.getDeviceId());
- deviceChannel.setChannelId(mobilePosition.getChannelId());
- deviceChannel.setLongitude(mobilePosition.getLongitude());
- deviceChannel.setLatitude(mobilePosition.getLatitude());
- deviceChannel.setGpsTime(mobilePosition.getTime());
+ // 鏇存柊device channel 鐨勭粡绾害
+ DeviceChannel deviceChannel = new DeviceChannel();
+ deviceChannel.setDeviceId(sipMsgInfo.getDevice().getDeviceId());
+ deviceChannel.setChannelId(mobilePosition.getChannelId());
+ deviceChannel.setLongitude(mobilePosition.getLongitude());
+ deviceChannel.setLatitude(mobilePosition.getLatitude());
+ deviceChannel.setGpsTime(mobilePosition.getTime());
- deviceChannel = deviceChannelService.updateGps(deviceChannel, device);
+ deviceChannel = deviceChannelService.updateGps(deviceChannel, sipMsgInfo.getDevice());
- mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84());
- mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84());
- mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
- mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
+ mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84());
+ mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84());
+ mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
+ mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
- if (userSetting.getSavePositionHistory()) {
- storager.insertMobilePosition(mobilePosition);
- }
- storager.updateChannelPosition(deviceChannel);
- //鍥炲 200 OK
- responseAck(getServerTransaction(evt), Response.OK);
+ if (userSetting.getSavePositionHistory()) {
+ storager.insertMobilePosition(mobilePosition);
+ }
+ storager.updateChannelPosition(deviceChannel);
+ //鍥炲 200 OK
+ responseAck(getServerTransaction(sipMsgInfo.getEvt()), Response.OK);
- // 鍙戦�乺edis娑堟伅銆� 閫氱煡浣嶇疆淇℃伅鐨勫彉鍖�
- JSONObject jsonObject = new JSONObject();
- jsonObject.put("time", mobilePosition.getTime());
- jsonObject.put("serial", deviceChannel.getDeviceId());
- jsonObject.put("code", deviceChannel.getChannelId());
- jsonObject.put("longitude", mobilePosition.getLongitude());
- jsonObject.put("latitude", mobilePosition.getLatitude());
- jsonObject.put("altitude", mobilePosition.getAltitude());
- jsonObject.put("direction", mobilePosition.getDirection());
- jsonObject.put("speed", mobilePosition.getSpeed());
- redisCatchStorage.sendMobilePositionMsg(jsonObject);
+ // 鍙戦�乺edis娑堟伅銆� 閫氱煡浣嶇疆淇℃伅鐨勫彉鍖�
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put("time", mobilePosition.getTime());
+ jsonObject.put("serial", deviceChannel.getDeviceId());
+ jsonObject.put("code", deviceChannel.getChannelId());
+ jsonObject.put("longitude", mobilePosition.getLongitude());
+ jsonObject.put("latitude", mobilePosition.getLatitude());
+ jsonObject.put("altitude", mobilePosition.getAltitude());
+ jsonObject.put("direction", mobilePosition.getDirection());
+ jsonObject.put("speed", mobilePosition.getSpeed());
+ redisCatchStorage.sendMobilePositionMsg(jsonObject);
- } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) {
- e.printStackTrace();
+ } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) {
+ e.printStackTrace();
+ }
+
+ }
+ taskQueueHandlerRun = false;
+ });
}
+
+
}
@Override
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
index dedc7c1..3c5644b 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
@@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.service.impl;
import com.genersoft.iot.vmp.conf.DynamicTask;
+import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
@@ -58,6 +59,9 @@
@Autowired
private GbStreamMapper gbStreamMapper;
+
+ @Autowired
+ private UserSetting userSetting;
@@ -241,7 +245,7 @@
if (subscribe != null) {
// TODO 鏆傛椂鍙鐞嗚棰戞祦鐨勫洖澶�,鍚庣画澧炲姞瀵瑰浗鏍囪澶囩殑鏀寔
- List<DeviceChannel> gbStreams = gbStreamMapper.queryGbStreamListInPlatform(platform.getServerGBId());
+ List<DeviceChannel> gbStreams = gbStreamMapper.queryGbStreamListInPlatform(platform.getServerGBId(), userSetting.isUsePushingAsStatus());
if (gbStreams.size() == 0) {
return;
}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
index c0b9e95..6662738 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -131,23 +131,6 @@
StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
playResult.setDevice(device);
- result.onCompletion(()->{
- // 鐐规挱缁撴潫鏃惰皟鐢ㄦ埅鍥炬帴鍙�
- taskExecutor.execute(()->{
- // TODO 搴旇鍦ㄤ笂娴佹椂璋冪敤鏇村ソ锛岀粨鏉熶篃鍙兘鏄敊璇粨鏉�
- String path = "snap";
- String fileName = deviceId + "_" + channelId + ".jpg";
- WVPResult wvpResult = (WVPResult)result.getResult();
- if (Objects.requireNonNull(wvpResult).getCode() == 0) {
- StreamInfo streamInfoForSuccess = (StreamInfo)wvpResult.getData();
- MediaServerItem mediaInfo = mediaServerService.getOne(streamInfoForSuccess.getMediaServerId());
- String streamUrl = streamInfoForSuccess.getFmp4();
- // 璇锋眰鎴浘
- logger.info("[璇锋眰鎴浘]: " + fileName);
- zlmresTfulUtils.getSnap(mediaInfo, streamUrl, 15, 1, path, fileName);
- }
- });
- });
if (streamInfo != null) {
String streamId = streamInfo.getStream();
if (streamId == null) {
@@ -209,6 +192,21 @@
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck(), false);
logger.info(JSONObject.toJSONString(ssrcInfo));
play(mediaServerItem, ssrcInfo, device, channelId, (mediaServerItemInUse, response)->{
+ // 鐐规挱缁撴潫鏃惰皟鐢ㄦ埅鍥炬帴鍙�
+ taskExecutor.execute(()->{
+ // TODO 搴旇鍦ㄤ笂娴佹椂璋冪敤鏇村ソ锛岀粨鏉熶篃鍙兘鏄敊璇粨鏉�
+ String path = "snap";
+ String fileName = deviceId + "_" + channelId + ".jpg";
+ WVPResult wvpResult = (WVPResult)result.getResult();
+ if (Objects.requireNonNull(wvpResult).getCode() == 0) {
+ StreamInfo streamInfoForSuccess = (StreamInfo)wvpResult.getData();
+ MediaServerItem mediaInfo = mediaServerService.getOne(streamInfoForSuccess.getMediaServerId());
+ String streamUrl = streamInfoForSuccess.getFmp4();
+ // 璇锋眰鎴浘
+ logger.info("[璇锋眰鎴浘]: " + fileName);
+ zlmresTfulUtils.getSnap(mediaInfo, streamUrl, 15, 1, path, fileName);
+ }
+ });
if (hookEvent != null) {
hookEvent.response(mediaServerItem, response);
}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisAlarmMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisAlarmMsgListener.java
deleted file mode 100644
index 1634234..0000000
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisAlarmMsgListener.java
+++ /dev/null
@@ -1,77 +0,0 @@
-package com.genersoft.iot.vmp.service.impl;
-
-import com.alibaba.fastjson.JSON;
-import com.genersoft.iot.vmp.gb28181.bean.*;
-import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
-import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
-import com.genersoft.iot.vmp.service.IPlatformChannelService;
-import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
-import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
-import com.genersoft.iot.vmp.utils.DateUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.data.redis.connection.Message;
-import org.springframework.data.redis.connection.MessageListener;
-import org.springframework.stereotype.Component;
-import org.springframework.util.ObjectUtils;
-
-import java.util.List;
-
-
-@Component
-public class RedisAlarmMsgListener implements MessageListener {
-
- private final static Logger logger = LoggerFactory.getLogger(RedisAlarmMsgListener.class);
-
- @Autowired
- private ISIPCommander commander;
-
- @Autowired
- private ISIPCommanderForPlatform commanderForPlatform;
-
- @Autowired
- private IVideoManagerStorage storage;
-
- @Override
- public void onMessage(Message message, byte[] bytes) {
- logger.info("鏀跺埌鏉ヨ嚜REDIS鐨凙LARM閫氱煡锛� {}", new String(message.getBody()));
- AlarmChannelMessage alarmChannelMessage = JSON.parseObject(message.getBody(), AlarmChannelMessage.class);
- if (alarmChannelMessage == null) {
- logger.warn("[REDIS鐨凙LARM閫氱煡]娑堟伅瑙f瀽澶辫触");
- return;
- }
- String gbId = alarmChannelMessage.getGbId();
-
- DeviceAlarm deviceAlarm = new DeviceAlarm();
- deviceAlarm.setCreateTime(DateUtil.getNow());
- deviceAlarm.setChannelId(gbId);
- deviceAlarm.setAlarmDescription(alarmChannelMessage.getAlarmDescription());
- deviceAlarm.setAlarmMethod("" + alarmChannelMessage.getAlarmSn());
- deviceAlarm.setAlarmPriority("1");
- deviceAlarm.setAlarmTime(DateUtil.getNowForISO8601());
- deviceAlarm.setAlarmType("1");
- deviceAlarm.setLongitude(0);
- deviceAlarm.setLatitude(0);
-
- if (ObjectUtils.isEmpty(gbId)) {
- // 鍙戦�佺粰鎵�鏈夌殑涓婄骇
- List<ParentPlatform> parentPlatforms = storage.queryEnableParentPlatformList(true);
- if (parentPlatforms.size() > 0) {
- for (ParentPlatform parentPlatform : parentPlatforms) {
- commanderForPlatform.sendAlarmMessage(parentPlatform, deviceAlarm);
- }
- }
- }else {
- Device device = storage.queryVideoDevice(gbId);
- ParentPlatform platform = storage.queryParentPlatByServerGBId(gbId);
- if (device != null && platform == null) {
- commander.sendAlarmMessage(device, deviceAlarm);
- }else if (device == null && platform != null){
- commanderForPlatform.sendAlarmMessage(platform, deviceAlarm);
- }else {
- logger.warn("鏃犳硶纭畾" + gbId + "鏄钩鍙拌繕鏄澶�");
- }
- }
- }
-}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamResponseListener.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamResponseListener.java
deleted file mode 100644
index 56c9ff3..0000000
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamResponseListener.java
+++ /dev/null
@@ -1,62 +0,0 @@
-package com.genersoft.iot.vmp.service.impl;
-
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONObject;
-import com.genersoft.iot.vmp.media.zlm.dto.ChannelOnlineEvent;
-import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
-import com.genersoft.iot.vmp.service.IGbStreamService;
-import com.genersoft.iot.vmp.service.IMediaServerService;
-import com.genersoft.iot.vmp.service.IStreamPushService;
-import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse;
-import com.genersoft.iot.vmp.utils.DateUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.data.redis.connection.Message;
-import org.springframework.data.redis.connection.MessageListener;
-import org.springframework.stereotype.Component;
-import org.springframework.util.ObjectUtils;
-
-import javax.annotation.Resource;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * 鎺ユ敹redis杩斿洖鐨勬帹娴佺粨鏋�
- * @author lin
- */
-@Component
-public class RedisPushStreamResponseListener implements MessageListener {
-
- private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamResponseListener.class);
-
- private Map<String, PushStreamResponseEvent> responseEvents = new ConcurrentHashMap<>();
-
- public interface PushStreamResponseEvent{
- void run(MessageForPushChannelResponse response);
- }
-
- @Override
- public void onMessage(Message message, byte[] bytes) {
- //
- logger.warn("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉]锛� {}", new String(message.getBody()));
- MessageForPushChannelResponse response = JSON.parseObject(new String(message.getBody()), MessageForPushChannelResponse.class);
- if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){
- logger.info("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉]锛氬弬鏁颁笉鍏�");
- return;
- }
- // 鏌ョ湅姝e湪绛夊緟鐨刬nvite娑堟伅
- if (responseEvents.get(response.getApp() + response.getStream()) != null) {
- responseEvents.get(response.getApp() + response.getStream()).run(response);
- }
- }
-
- public void addEvent(String app, String stream, PushStreamResponseEvent callback) {
- responseEvents.put(app + stream, callback);
- }
-
- public void removeEvent(String app, String stream) {
- responseEvents.remove(app + stream);
- }
-}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusListMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusListMsgListener.java
deleted file mode 100644
index bedbf44..0000000
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusListMsgListener.java
+++ /dev/null
@@ -1,81 +0,0 @@
-package com.genersoft.iot.vmp.service.impl;
-
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONObject;
-import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
-import com.genersoft.iot.vmp.service.IGbStreamService;
-import com.genersoft.iot.vmp.service.IMediaServerService;
-import com.genersoft.iot.vmp.service.IStreamPushService;
-import com.genersoft.iot.vmp.utils.DateUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.data.redis.connection.Message;
-import org.springframework.data.redis.connection.MessageListener;
-import org.springframework.stereotype.Component;
-
-import javax.annotation.Resource;
-import java.util.*;
-
-/**
- * @Auther: JiangFeng
- * @Date: 2022/8/16 11:32
- * @Description: 鎺ユ敹redis鍙戦�佺殑鎺ㄦ祦璁惧鍒楄〃鏇存柊閫氱煡
- */
-@Component
-public class RedisPushStreamStatusListMsgListener implements MessageListener {
-
- private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamStatusListMsgListener.class);
- @Resource
- private IMediaServerService mediaServerService;
-
- @Resource
- private IStreamPushService streamPushService;
- @Resource
- private IGbStreamService gbStreamService;
-
- @Override
- public void onMessage(Message message, byte[] bytes) {
- //
- logger.warn("[REDIS娑堟伅-鎺ㄦ祦璁惧鍒楄〃鏇存柊]锛� {}", new String(message.getBody()));
- List<StreamPushItem> streamPushItems = JSON.parseArray(new String(message.getBody()), StreamPushItem.class);
- //鏌ヨ鍏ㄩ儴鐨刟pp+stream 鐢ㄤ簬鍒ゆ柇鏄坊鍔犺繕鏄慨鏀�
- List<String> allAppAndStream = streamPushService.getAllAppAndStream();
-
- /**
- * 鐢ㄤ簬瀛樺偍鏇村叿APP+Stream杩囨护鍚庣殑鏁版嵁锛屽彲浠ョ洿鎺ュ瓨鍏tream_push琛ㄤ笌gb_stream琛�
- */
- List<StreamPushItem> streamPushItemForSave = new ArrayList<>();
- List<StreamPushItem> streamPushItemForUpdate = new ArrayList<>();
- for (StreamPushItem streamPushItem : streamPushItems) {
- String app = streamPushItem.getApp();
- String stream = streamPushItem.getStream();
- boolean contains = allAppAndStream.contains(app + stream);
- //涓嶅瓨鍦ㄥ氨娣诲姞
- if (!contains) {
- streamPushItem.setStreamType("push");
- streamPushItem.setCreateTime(DateUtil.getNow());
- streamPushItem.setMediaServerId(mediaServerService.getDefaultMediaServer().getId());
- streamPushItem.setOriginType(2);
- streamPushItem.setOriginTypeStr("rtsp_push");
- streamPushItem.setTotalReaderCount("0");
- streamPushItemForSave.add(streamPushItem);
- } else {
- //瀛樺湪灏卞彧淇敼 name鍜実bId
- streamPushItemForUpdate.add(streamPushItem);
- }
- }
- if (streamPushItemForSave.size() > 0) {
-
- logger.info("娣诲姞{}鏉�",streamPushItemForSave.size());
- logger.info(JSONObject.toJSONString(streamPushItemForSave));
- streamPushService.batchAdd(streamPushItemForSave);
-
- }
- if(streamPushItemForUpdate.size()>0){
- logger.info("淇敼{}鏉�",streamPushItemForUpdate.size());
- logger.info(JSONObject.toJSONString(streamPushItemForUpdate));
- gbStreamService.updateGbIdOrName(streamPushItemForUpdate);
- }
-
- }
-}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisStreamMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisStreamMsgListener.java
deleted file mode 100644
index 118a227..0000000
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisStreamMsgListener.java
+++ /dev/null
@@ -1,72 +0,0 @@
-package com.genersoft.iot.vmp.service.impl;
-
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONObject;
-import com.genersoft.iot.vmp.conf.UserSetting;
-
-import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
-import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
-import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
-import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
-import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.data.redis.connection.Message;
-import org.springframework.data.redis.connection.MessageListener;
-import org.springframework.stereotype.Component;
-
-
-/**
- * 鎺ユ敹鍏朵粬wvp鍙戦�佹祦鍙樺寲閫氱煡
- * @author lin
- */
-@Component
-public class RedisStreamMsgListener implements MessageListener {
-
- private final static Logger logger = LoggerFactory.getLogger(RedisStreamMsgListener.class);
-
- @Autowired
- private UserSetting userSetting;
-
- @Autowired
- private ZLMMediaListManager zlmMediaListManager;
-
- @Override
- public void onMessage(Message message, byte[] bytes) {
-
- JSONObject steamMsgJson = JSON.parseObject(message.getBody(), JSONObject.class);
- if (steamMsgJson == null) {
- logger.warn("[鏀跺埌redis 娴佸彉鍖朷娑堟伅瑙f瀽澶辫触");
- return;
- }
- String serverId = steamMsgJson.getString("serverId");
-
- if (userSetting.getServerId().equals(serverId)) {
- // 鑷繁鍙戦�佺殑娑堟伅蹇界暐鍗冲彲
- return;
- }
- logger.info("[鏀跺埌redis 娴佸彉鍖朷锛� {}", new String(message.getBody()));
- String app = steamMsgJson.getString("app");
- String stream = steamMsgJson.getString("stream");
- boolean register = steamMsgJson.getBoolean("register");
- String mediaServerId = steamMsgJson.getString("mediaServerId");
- MediaItem mediaItem = new MediaItem();
- mediaItem.setSeverId(serverId);
- mediaItem.setApp(app);
- mediaItem.setStream(stream);
- mediaItem.setRegist(register);
- mediaItem.setMediaServerId(mediaServerId);
- mediaItem.setCreateStamp(System.currentTimeMillis()/1000);
- mediaItem.setAliveSecond(0L);
- mediaItem.setTotalReaderCount("0");
- mediaItem.setOriginType(0);
- mediaItem.setOriginTypeStr("0");
- mediaItem.setOriginTypeStr("unknown");
- if (register) {
- zlmMediaListManager.addPush(mediaItem);
- }else {
- zlmMediaListManager.removeMedia(app, stream);
- }
- }
-}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java
new file mode 100644
index 0000000..84f5ef7
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java
@@ -0,0 +1,100 @@
+package com.genersoft.iot.vmp.service.redisMsg;
+
+import com.alibaba.fastjson.JSON;
+import com.genersoft.iot.vmp.gb28181.bean.*;
+import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
+import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
+import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
+import com.genersoft.iot.vmp.utils.DateUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.stereotype.Component;
+import org.springframework.util.ObjectUtils;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+
+@Component
+public class RedisAlarmMsgListener implements MessageListener {
+
+ private final static Logger logger = LoggerFactory.getLogger(RedisAlarmMsgListener.class);
+
+ @Autowired
+ private ISIPCommander commander;
+
+ @Autowired
+ private ISIPCommanderForPlatform commanderForPlatform;
+
+ @Autowired
+ private IVideoManagerStorage storage;
+
+ private boolean taskQueueHandlerRun = false;
+
+ private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
+
+ @Qualifier("taskExecutor")
+ @Autowired
+ private ThreadPoolTaskExecutor taskExecutor;
+
+ @Override
+ public void onMessage(Message message, byte[] bytes) {
+ logger.info("鏀跺埌鏉ヨ嚜REDIS鐨凙LARM閫氱煡锛� {}", new String(message.getBody()));
+
+ taskQueue.offer(message);
+ if (!taskQueueHandlerRun) {
+ taskQueueHandlerRun = true;
+ taskExecutor.execute(() -> {
+ while (!taskQueue.isEmpty()) {
+ Message msg = taskQueue.poll();
+
+ AlarmChannelMessage alarmChannelMessage = JSON.parseObject(msg.getBody(), AlarmChannelMessage.class);
+ if (alarmChannelMessage == null) {
+ logger.warn("[REDIS鐨凙LARM閫氱煡]娑堟伅瑙f瀽澶辫触");
+ return;
+ }
+ String gbId = alarmChannelMessage.getGbId();
+
+ DeviceAlarm deviceAlarm = new DeviceAlarm();
+ deviceAlarm.setCreateTime(DateUtil.getNow());
+ deviceAlarm.setChannelId(gbId);
+ deviceAlarm.setAlarmDescription(alarmChannelMessage.getAlarmDescription());
+ deviceAlarm.setAlarmMethod("" + alarmChannelMessage.getAlarmSn());
+ deviceAlarm.setAlarmPriority("1");
+ deviceAlarm.setAlarmTime(DateUtil.getNowForISO8601());
+ deviceAlarm.setAlarmType("1");
+ deviceAlarm.setLongitude(0);
+ deviceAlarm.setLatitude(0);
+
+ if (ObjectUtils.isEmpty(gbId)) {
+ // 鍙戦�佺粰鎵�鏈夌殑涓婄骇
+ List<ParentPlatform> parentPlatforms = storage.queryEnableParentPlatformList(true);
+ if (parentPlatforms.size() > 0) {
+ for (ParentPlatform parentPlatform : parentPlatforms) {
+ commanderForPlatform.sendAlarmMessage(parentPlatform, deviceAlarm);
+ }
+ }
+ }else {
+ Device device = storage.queryVideoDevice(gbId);
+ ParentPlatform platform = storage.queryParentPlatByServerGBId(gbId);
+ if (device != null && platform == null) {
+ commander.sendAlarmMessage(device, deviceAlarm);
+ }else if (device == null && platform != null){
+ commanderForPlatform.sendAlarmMessage(platform, deviceAlarm);
+ }else {
+ logger.warn("鏃犳硶纭畾" + gbId + "鏄钩鍙拌繕鏄澶�");
+ }
+ }
+ }
+ taskQueueHandlerRun = false;
+ });
+ }
+
+
+ }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGbPlayMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
similarity index 67%
rename from src/main/java/com/genersoft/iot/vmp/service/impl/RedisGbPlayMsgListener.java
rename to src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
index e327d0f..e6ba9b6 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGbPlayMsgListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
@@ -1,4 +1,4 @@
-package com.genersoft.iot.vmp.service.impl;
+package com.genersoft.iot.vmp.service.redisMsg;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
@@ -19,8 +19,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import java.text.ParseException;
@@ -28,6 +30,7 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
/**
@@ -85,6 +88,14 @@
@Autowired
private ZlmHttpHookSubscribe subscribe;
+ private boolean taskQueueHandlerRun = false;
+
+ private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
+
+ @Qualifier("taskExecutor")
+ @Autowired
+ private ThreadPoolTaskExecutor taskExecutor;
+
public interface PlayMsgCallback{
void handler(ResponseSendItemMsg responseSendItemMsg) throws ParseException;
@@ -100,94 +111,107 @@
@Override
public void onMessage(Message message, byte[] bytes) {
- JSONObject msgJSON = JSON.parseObject(message.getBody(), JSONObject.class);
- WvpRedisMsg wvpRedisMsg = JSON.toJavaObject(msgJSON, WvpRedisMsg.class);
- if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) {
- return;
- }
- if (WvpRedisMsg.isRequest(wvpRedisMsg)) {
- logger.info("[鏀跺埌REDIS閫氱煡] 璇锋眰锛� {}", new String(message.getBody()));
- switch (wvpRedisMsg.getCmd()){
- case WvpRedisMsgCmd.GET_SEND_ITEM:
- RequestSendItemMsg content = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), RequestSendItemMsg.class);
- requestSendItemMsgHand(content, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
- break;
- case WvpRedisMsgCmd.REQUEST_PUSH_STREAM:
- RequestPushStreamMsg param = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), RequestPushStreamMsg.class);;
- requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
- break;
- default:
- break;
- }
+ taskQueue.offer(message);
+ if (!taskQueueHandlerRun) {
+ taskQueueHandlerRun = true;
+ taskExecutor.execute(() -> {
+ while (!taskQueue.isEmpty()) {
+ Message msg = taskQueue.poll();
+ JSONObject msgJSON = JSON.parseObject(msg.getBody(), JSONObject.class);
+ WvpRedisMsg wvpRedisMsg = JSON.toJavaObject(msgJSON, WvpRedisMsg.class);
+ if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) {
+ return;
+ }
+ if (WvpRedisMsg.isRequest(wvpRedisMsg)) {
+ logger.info("[鏀跺埌REDIS閫氱煡] 璇锋眰锛� {}", new String(msg.getBody()));
- }else {
- logger.info("[鏀跺埌REDIS閫氱煡] 鍥炲锛� {}", new String(message.getBody()));
- switch (wvpRedisMsg.getCmd()){
- case WvpRedisMsgCmd.GET_SEND_ITEM:
+ switch (wvpRedisMsg.getCmd()){
+ case WvpRedisMsgCmd.GET_SEND_ITEM:
+ RequestSendItemMsg content = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), RequestSendItemMsg.class);
+ requestSendItemMsgHand(content, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
+ break;
+ case WvpRedisMsgCmd.REQUEST_PUSH_STREAM:
+ RequestPushStreamMsg param = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), RequestPushStreamMsg.class);;
+ requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
+ break;
+ default:
+ break;
+ }
- WVPResult content = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), WVPResult.class);
+ }else {
+ logger.info("[鏀跺埌REDIS閫氱煡] 鍥炲锛� {}", new String(msg.getBody()));
+ switch (wvpRedisMsg.getCmd()){
+ case WvpRedisMsgCmd.GET_SEND_ITEM:
- String key = wvpRedisMsg.getSerial();
- switch (content.getCode()) {
- case 0:
- ResponseSendItemMsg responseSendItemMsg =JSON.toJavaObject((JSONObject)content.getData(), ResponseSendItemMsg.class);
- PlayMsgCallback playMsgCallback = callbacks.get(key);
- if (playMsgCallback != null) {
- callbacksForError.remove(key);
- try {
- playMsgCallback.handler(responseSendItemMsg);
- } catch (ParseException e) {
- throw new RuntimeException(e);
+ WVPResult content = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), WVPResult.class);
+
+ String key = wvpRedisMsg.getSerial();
+ switch (content.getCode()) {
+ case 0:
+ ResponseSendItemMsg responseSendItemMsg =JSON.toJavaObject((JSONObject)content.getData(), ResponseSendItemMsg.class);
+ PlayMsgCallback playMsgCallback = callbacks.get(key);
+ if (playMsgCallback != null) {
+ callbacksForError.remove(key);
+ try {
+ playMsgCallback.handler(responseSendItemMsg);
+ } catch (ParseException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ break;
+ case ERROR_CODE_MEDIA_SERVER_NOT_FOUND:
+ case ERROR_CODE_OFFLINE:
+ case ERROR_CODE_TIMEOUT:
+ PlayMsgErrorCallback errorCallback = callbacksForError.get(key);
+ if (errorCallback != null) {
+ callbacks.remove(key);
+ errorCallback.handler(content);
+ }
+ break;
+ default:
+ break;
}
- }
- break;
- case ERROR_CODE_MEDIA_SERVER_NOT_FOUND:
- case ERROR_CODE_OFFLINE:
- case ERROR_CODE_TIMEOUT:
- PlayMsgErrorCallback errorCallback = callbacksForError.get(key);
- if (errorCallback != null) {
- callbacks.remove(key);
- errorCallback.handler(content);
- }
- break;
- default:
- break;
+ break;
+ case WvpRedisMsgCmd.REQUEST_PUSH_STREAM:
+ WVPResult wvpResult = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), WVPResult.class);
+ String serial = wvpRedisMsg.getSerial();
+ switch (wvpResult.getCode()) {
+ case 0:
+ JSONObject jsonObject = (JSONObject)wvpResult.getData();
+ PlayMsgCallbackForStartSendRtpStream playMsgCallback = callbacksForStartSendRtpStream.get(serial);
+ if (playMsgCallback != null) {
+ callbacksForError.remove(serial);
+ playMsgCallback.handler(jsonObject);
+ }
+ break;
+ case ERROR_CODE_MEDIA_SERVER_NOT_FOUND:
+ case ERROR_CODE_OFFLINE:
+ case ERROR_CODE_TIMEOUT:
+ PlayMsgErrorCallback errorCallback = callbacksForError.get(serial);
+ if (errorCallback != null) {
+ callbacks.remove(serial);
+ errorCallback.handler(wvpResult);
+ }
+ break;
+ default:
+ break;
+ }
+ break;
+ default:
+ break;
+ }
}
- break;
- case WvpRedisMsgCmd.REQUEST_PUSH_STREAM:
- WVPResult wvpResult = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), WVPResult.class);
- String serial = wvpRedisMsg.getSerial();
- switch (wvpResult.getCode()) {
- case 0:
- JSONObject jsonObject = (JSONObject)wvpResult.getData();
- PlayMsgCallbackForStartSendRtpStream playMsgCallback = callbacksForStartSendRtpStream.get(serial);
- if (playMsgCallback != null) {
- callbacksForError.remove(serial);
- playMsgCallback.handler(jsonObject);
- }
- break;
- case ERROR_CODE_MEDIA_SERVER_NOT_FOUND:
- case ERROR_CODE_OFFLINE:
- case ERROR_CODE_TIMEOUT:
- PlayMsgErrorCallback errorCallback = callbacksForError.get(serial);
- if (errorCallback != null) {
- callbacks.remove(serial);
- errorCallback.handler(wvpResult);
- }
- break;
- default:
- break;
- }
- break;
- default:
- break;
- }
+ }
+ taskQueueHandlerRun = false;
+ });
}
+
+
}
/**
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGpsMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java
similarity index 95%
rename from src/main/java/com/genersoft/iot/vmp/service/impl/RedisGpsMsgListener.java
rename to src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java
index 4e94d68..bb2f4ad 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGpsMsgListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java
@@ -1,7 +1,6 @@
-package com.genersoft.iot.vmp.service.impl;
+package com.genersoft.iot.vmp.service.redisMsg;
import com.alibaba.fastjson.JSON;
-import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java
new file mode 100644
index 0000000..13e6874
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java
@@ -0,0 +1,75 @@
+package com.genersoft.iot.vmp.service.redisMsg;
+
+import com.alibaba.fastjson.JSON;
+import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
+import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.stereotype.Component;
+import org.springframework.util.ObjectUtils;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * 鎺ユ敹redis杩斿洖鐨勬帹娴佺粨鏋�
+ * @author lin
+ */
+@Component
+public class RedisPushStreamResponseListener implements MessageListener {
+
+ private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamResponseListener.class);
+
+ private boolean taskQueueHandlerRun = false;
+
+ private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
+
+ @Qualifier("taskExecutor")
+ @Autowired
+ private ThreadPoolTaskExecutor taskExecutor;
+
+
+ private Map<String, PushStreamResponseEvent> responseEvents = new ConcurrentHashMap<>();
+
+ public interface PushStreamResponseEvent{
+ void run(MessageForPushChannelResponse response);
+ }
+
+ @Override
+ public void onMessage(Message message, byte[] bytes) {
+ logger.warn("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉]锛� {}", new String(message.getBody()));
+ taskQueue.offer(message);
+ if (!taskQueueHandlerRun) {
+ taskQueueHandlerRun = true;
+ taskExecutor.execute(() -> {
+ while (!taskQueue.isEmpty()) {
+ Message msg = taskQueue.poll();
+ MessageForPushChannelResponse response = JSON.parseObject(new String(msg.getBody()), MessageForPushChannelResponse.class);
+ if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){
+ logger.info("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉]锛氬弬鏁颁笉鍏�");
+ return;
+ }
+ // 鏌ョ湅姝e湪绛夊緟鐨刬nvite娑堟伅
+ if (responseEvents.get(response.getApp() + response.getStream()) != null) {
+ responseEvents.get(response.getApp() + response.getStream()).run(response);
+ }
+ }
+ taskQueueHandlerRun = false;
+ });
+ }
+ }
+
+ public void addEvent(String app, String stream, PushStreamResponseEvent callback) {
+ responseEvents.put(app + stream, callback);
+ }
+
+ public void removeEvent(String app, String stream) {
+ responseEvents.remove(app + stream);
+ }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java
new file mode 100644
index 0000000..3925836
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java
@@ -0,0 +1,103 @@
+package com.genersoft.iot.vmp.service.redisMsg;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
+import com.genersoft.iot.vmp.service.IGbStreamService;
+import com.genersoft.iot.vmp.service.IMediaServerService;
+import com.genersoft.iot.vmp.service.IStreamPushService;
+import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
+import com.genersoft.iot.vmp.utils.DateUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+import java.util.*;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * @Auther: JiangFeng
+ * @Date: 2022/8/16 11:32
+ * @Description: 鎺ユ敹redis鍙戦�佺殑鎺ㄦ祦璁惧鍒楄〃鏇存柊閫氱煡
+ */
+@Component
+public class RedisPushStreamStatusListMsgListener implements MessageListener {
+
+ private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamStatusListMsgListener.class);
+ @Resource
+ private IMediaServerService mediaServerService;
+
+ @Resource
+ private IStreamPushService streamPushService;
+ @Resource
+ private IGbStreamService gbStreamService;
+
+ private boolean taskQueueHandlerRun = false;
+
+ private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
+
+ @Qualifier("taskExecutor")
+ @Autowired
+ private ThreadPoolTaskExecutor taskExecutor;
+
+ @Override
+ public void onMessage(Message message, byte[] bytes) {
+ logger.info("[REDIS娑堟伅-鎺ㄦ祦璁惧鍒楄〃鏇存柊]锛� {}", new String(message.getBody()));
+
+ taskQueue.offer(message);
+ if (!taskQueueHandlerRun) {
+ taskQueueHandlerRun = true;
+ taskExecutor.execute(() -> {
+ while (!taskQueue.isEmpty()) {
+ Message msg = taskQueue.poll();
+ List<StreamPushItem> streamPushItems = JSON.parseArray(new String(msg.getBody()), StreamPushItem.class);
+ //鏌ヨ鍏ㄩ儴鐨刟pp+stream 鐢ㄤ簬鍒ゆ柇鏄坊鍔犺繕鏄慨鏀�
+ List<String> allAppAndStream = streamPushService.getAllAppAndStream();
+
+ /**
+ * 鐢ㄤ簬瀛樺偍鏇村叿APP+Stream杩囨护鍚庣殑鏁版嵁锛屽彲浠ョ洿鎺ュ瓨鍏tream_push琛ㄤ笌gb_stream琛�
+ */
+ List<StreamPushItem> streamPushItemForSave = new ArrayList<>();
+ List<StreamPushItem> streamPushItemForUpdate = new ArrayList<>();
+ for (StreamPushItem streamPushItem : streamPushItems) {
+ String app = streamPushItem.getApp();
+ String stream = streamPushItem.getStream();
+ boolean contains = allAppAndStream.contains(app + stream);
+ //涓嶅瓨鍦ㄥ氨娣诲姞
+ if (!contains) {
+ streamPushItem.setStreamType("push");
+ streamPushItem.setCreateTime(DateUtil.getNow());
+ streamPushItem.setMediaServerId(mediaServerService.getDefaultMediaServer().getId());
+ streamPushItem.setOriginType(2);
+ streamPushItem.setOriginTypeStr("rtsp_push");
+ streamPushItem.setTotalReaderCount("0");
+ streamPushItemForSave.add(streamPushItem);
+ } else {
+ //瀛樺湪灏卞彧淇敼 name鍜実bId
+ streamPushItemForUpdate.add(streamPushItem);
+ }
+ }
+ if (streamPushItemForSave.size() > 0) {
+
+ logger.info("娣诲姞{}鏉�",streamPushItemForSave.size());
+ logger.info(JSONObject.toJSONString(streamPushItemForSave));
+ streamPushService.batchAdd(streamPushItemForSave);
+
+ }
+ if(streamPushItemForUpdate.size()>0){
+ logger.info("淇敼{}鏉�",streamPushItemForUpdate.size());
+ logger.info(JSONObject.toJSONString(streamPushItemForUpdate));
+ gbStreamService.updateGbIdOrName(streamPushItemForUpdate);
+ }
+ }
+ taskQueueHandlerRun = false;
+ });
+ }
+ }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusMsgListener.java
similarity index 82%
rename from src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusMsgListener.java
rename to src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusMsgListener.java
index 50e894a..4b1c2d5 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusMsgListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusMsgListener.java
@@ -1,24 +1,11 @@
-package com.genersoft.iot.vmp.service.impl;
+package com.genersoft.iot.vmp.service.redisMsg;
import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.DynamicTask;
-import com.genersoft.iot.vmp.conf.UserSetting;
-import com.genersoft.iot.vmp.gb28181.bean.GbStream;
-import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
-import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
-import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
-import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
-import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
-import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
-import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.IStreamPushService;
-import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.service.bean.PushStreamStatusChangeFromRedisDto;
-import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
-import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -30,8 +17,6 @@
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
-import java.util.ArrayList;
-import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -65,7 +50,6 @@
@Override
public void onMessage(Message message, byte[] bytes) {
- // TODO 澧炲姞闃熷垪
logger.warn("[REDIS娑堟伅-鎺ㄦ祦璁惧鐘舵�佸彉鍖朷锛� {}", new String(message.getBody()));
taskQueue.offer(message);
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java
new file mode 100644
index 0000000..1897b6f
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java
@@ -0,0 +1,91 @@
+package com.genersoft.iot.vmp.service.redisMsg;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.genersoft.iot.vmp.conf.UserSetting;
+
+import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
+import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.stereotype.Component;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+
+/**
+ * 鎺ユ敹鍏朵粬wvp鍙戦�佹祦鍙樺寲閫氱煡
+ * @author lin
+ */
+@Component
+public class RedisStreamMsgListener implements MessageListener {
+
+ private final static Logger logger = LoggerFactory.getLogger(RedisStreamMsgListener.class);
+
+ @Autowired
+ private UserSetting userSetting;
+
+ @Autowired
+ private ZLMMediaListManager zlmMediaListManager;
+
+ private boolean taskQueueHandlerRun = false;
+
+ private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
+
+ @Qualifier("taskExecutor")
+ @Autowired
+ private ThreadPoolTaskExecutor taskExecutor;
+
+ @Override
+ public void onMessage(Message message, byte[] bytes) {
+
+ taskQueue.offer(message);
+ if (!taskQueueHandlerRun) {
+ taskQueueHandlerRun = true;
+ taskExecutor.execute(() -> {
+ while (!taskQueue.isEmpty()) {
+ Message msg = taskQueue.poll();
+ JSONObject steamMsgJson = JSON.parseObject(msg.getBody(), JSONObject.class);
+ if (steamMsgJson == null) {
+ logger.warn("[鏀跺埌redis 娴佸彉鍖朷娑堟伅瑙f瀽澶辫触");
+ return;
+ }
+ String serverId = steamMsgJson.getString("serverId");
+
+ if (userSetting.getServerId().equals(serverId)) {
+ // 鑷繁鍙戦�佺殑娑堟伅蹇界暐鍗冲彲
+ return;
+ }
+ logger.info("[鏀跺埌redis 娴佸彉鍖朷锛� {}", new String(message.getBody()));
+ String app = steamMsgJson.getString("app");
+ String stream = steamMsgJson.getString("stream");
+ boolean register = steamMsgJson.getBoolean("register");
+ String mediaServerId = steamMsgJson.getString("mediaServerId");
+ MediaItem mediaItem = new MediaItem();
+ mediaItem.setSeverId(serverId);
+ mediaItem.setApp(app);
+ mediaItem.setStream(stream);
+ mediaItem.setRegist(register);
+ mediaItem.setMediaServerId(mediaServerId);
+ mediaItem.setCreateStamp(System.currentTimeMillis()/1000);
+ mediaItem.setAliveSecond(0L);
+ mediaItem.setTotalReaderCount("0");
+ mediaItem.setOriginType(0);
+ mediaItem.setOriginTypeStr("0");
+ mediaItem.setOriginTypeStr("unknown");
+ if (register) {
+ zlmMediaListManager.addPush(mediaItem);
+ }else {
+ zlmMediaListManager.removeMedia(app, stream);
+ }
+ }
+ taskQueueHandlerRun = false;
+ });
+ }
+ }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java
index 5e74d99..8a5d455 100644
--- a/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java
@@ -75,18 +75,23 @@
"WHERE gs.gbId = '${gbId}' AND pgs.platformId = '${platformId}'")
GbStream queryStreamInPlatform(String platformId, String gbId);
- @Select("select gt.gbId as channelId, gt.name, 'wvp-pro' as manufacture, st.status, gt.longitude, gt.latitude, pc.id as parentId," +
+ @Select("<script> "+
+ "select gt.gbId as channelId, gt.name, 'wvp-pro' as manufacture, st.status, gt.longitude, gt.latitude, pc.id as parentId," +
" '1' as registerWay, pc.civilCode, 'live' as model, 'wvp-pro' as owner, '0' as parental,'0' as secrecy" +
" from gb_stream gt " +
" left join (" +
- " select sp.status, sp.app, sp.stream from stream_push sp" +
+ " select " +
+ " <if test='usPushingAsStatus != true'> sp.status as status, </if>" +
+ " <if test='usPushingAsStatus == true'> sp.pushIng as status, </if>" +
+ "sp.app, sp.stream from stream_push sp" +
" union all" +
" select spxy.status, spxy.app, spxy.stream from stream_proxy spxy" +
" ) st on st.app = gt.app and st.stream = gt.stream" +
" left join platform_gb_stream pgs on gt.gbStreamId = pgs.gbStreamId" +
" left join platform_catalog pc on pgs.catalogId = pc.id and pgs.platformId = pc.platformId" +
- " where pgs.platformId=#{platformId}")
- List<DeviceChannel> queryGbStreamListInPlatform(String platformId);
+ " where pgs.platformId=#{platformId}" +
+ "</script>")
+ List<DeviceChannel> queryGbStreamListInPlatform(String platformId, boolean usPushingAsStatus);
@Select("SELECT gs.* FROM gb_stream gs LEFT JOIN platform_gb_stream pgs " +
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java
index 702b5be..f8a74fe 100644
--- a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java
@@ -2,6 +2,7 @@
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.SipConfig;
+import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
@@ -82,6 +83,9 @@
@Autowired
private GbStreamMapper gbStreamMapper;
+
+ @Autowired
+ private UserSetting userSetting;
@Autowired
private PlatformCatalogMapper catalogMapper;
@@ -614,7 +618,7 @@
*/
@Override
public List<DeviceChannel> queryGbStreamListInPlatform(String platformId) {
- return gbStreamMapper.queryGbStreamListInPlatform(platformId);
+ return gbStreamMapper.queryGbStreamListInPlatform(platformId, userSetting.isUsePushingAsStatus());
}
/**
diff --git a/src/main/resources/all-application.yml b/src/main/resources/all-application.yml
index 3569de2..ed025b9 100644
--- a/src/main/resources/all-application.yml
+++ b/src/main/resources/all-application.yml
@@ -188,6 +188,8 @@
record-sip: true
# 鏄惁灏嗘棩蹇楀瓨鍌ㄨ繘鏁版嵁搴�
logInDatebase: true
+ # 浣跨敤鎺ㄦ祦鐘舵�佷綔涓烘帹娴侀�氶亾鐘舵��
+ use-pushing-as-status: true
# 鍏抽棴鍦ㄧ嚎鏂囨。锛堢敓浜х幆澧冨缓璁叧闂級
springdoc:
--
Gitblit v1.8.0