From 5e73874880cdfd5b6b99147a0cdd8a6eabcfbf16 Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期四, 22 九月 2022 11:22:08 +0800
Subject: [PATCH] 添加队列处理redis消息和sip消息,支持使用推流状态作为通道在线状态

---
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java |  152 ++++---
 src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java                                                          |    6 
 src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java                                                              |   32 
 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusMsgListener.java                                         |   18 
 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java                                          |   75 +++
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java          |  219 ++++++----
 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java                                                      |    3 
 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java                                                   |   91 ++++
 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java                                     |  103 +++++
 src/main/java/com/genersoft/iot/vmp/conf/redis/RedisConfig.java                                                                    |    2 
 src/main/resources/all-application.yml                                                                                             |    2 
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java                                   |    5 
 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java                                                    |  100 +++++
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java                                |    4 
 /dev/null                                                                                                                          |   72 ---
 src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java                                                     |    6 
 src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java                                                                          |   10 
 src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java                                                               |   13 
 src/main/java/com/genersoft/iot/vmp/gb28181/bean/SipMsgInfo.java                                                                   |   56 ++
 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java                                                   |  180 +++++---
 20 files changed, 789 insertions(+), 360 deletions(-)

diff --git a/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java b/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java
index 017b39d..cad6e69 100644
--- a/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java
+++ b/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java
@@ -31,6 +31,8 @@
 
     private Boolean logInDatebase = Boolean.TRUE;
 
+    private Boolean usePushingAsStatus = Boolean.TRUE;
+
     private String serverId = "000000";
 
     private String thirdPartyGBIdReg = "[\\s\\S]*";
@@ -136,4 +138,12 @@
     public void setPlatformPlayTimeout(int platformPlayTimeout) {
         this.platformPlayTimeout = platformPlayTimeout;
     }
+
+    public Boolean isUsePushingAsStatus() {
+        return usePushingAsStatus;
+    }
+
+    public void setUsePushingAsStatus(Boolean usePushingAsStatus) {
+        this.usePushingAsStatus = usePushingAsStatus;
+    }
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisConfig.java
index 0b653cf..e412f71 100644
--- a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisConfig.java
+++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisConfig.java
@@ -3,7 +3,7 @@
 
 import com.alibaba.fastjson.parser.ParserConfig;
 import com.genersoft.iot.vmp.common.VideoManagerConstants;
-import com.genersoft.iot.vmp.service.impl.*;
+import com.genersoft.iot.vmp.service.redisMsg.*;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.cache.annotation.CachingConfigurerSupport;
 import org.springframework.context.annotation.Bean;
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SipMsgInfo.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SipMsgInfo.java
new file mode 100644
index 0000000..302539b
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SipMsgInfo.java
@@ -0,0 +1,56 @@
+package com.genersoft.iot.vmp.gb28181.bean;
+
+import org.dom4j.Element;
+
+import javax.sip.RequestEvent;
+
+public class SipMsgInfo {
+    private RequestEvent evt;
+    private  Device device;
+    private ParentPlatform platform;
+    private Element rootElement;
+
+    public SipMsgInfo(RequestEvent evt, Device device, Element rootElement) {
+        this.evt = evt;
+        this.device = device;
+        this.rootElement = rootElement;
+    }
+
+    public SipMsgInfo(RequestEvent evt, ParentPlatform platform, Element rootElement) {
+        this.evt = evt;
+        this.platform = platform;
+        this.rootElement = rootElement;
+    }
+
+    public RequestEvent getEvt() {
+        return evt;
+    }
+
+    public void setEvt(RequestEvent evt) {
+        this.evt = evt;
+    }
+
+    public Device getDevice() {
+        return device;
+    }
+
+    public void setDevice(Device device) {
+        this.device = device;
+    }
+
+    public ParentPlatform getPlatform() {
+        return platform;
+    }
+
+    public void setPlatform(ParentPlatform platform) {
+        this.platform = platform;
+    }
+
+    public Element getRootElement() {
+        return rootElement;
+    }
+
+    public void setRootElement(Element rootElement) {
+        this.rootElement = rootElement;
+    }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
index f8c3abf..33cc6e6 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
@@ -14,18 +14,15 @@
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import com.genersoft.iot.vmp.service.IMediaServerService;
 import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
-import com.genersoft.iot.vmp.service.impl.RedisGbPlayMsgListener;
+import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
-import com.genersoft.iot.vmp.utils.SerializeUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.InitializingBean;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-import javax.sip.Dialog;
-import javax.sip.DialogState;
 import javax.sip.RequestEvent;
 import javax.sip.address.SipURI;
 import javax.sip.header.CallIdHeader;
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
index 6ff5c0f..ff3c78f 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -22,8 +22,8 @@
 import com.genersoft.iot.vmp.service.IStreamPushService;
 import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
-import com.genersoft.iot.vmp.service.impl.RedisGbPlayMsgListener;
-import com.genersoft.iot.vmp.service.impl.RedisPushStreamResponseListener;
+import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
+import com.genersoft.iot.vmp.service.redisMsg.RedisPushStreamResponseListener;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
 import com.genersoft.iot.vmp.utils.DateUtil;
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java
index add32d1..da2cb9c 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java
@@ -21,6 +21,8 @@
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.InitializingBean;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Component;
 import org.springframework.util.ObjectUtils;
 import org.springframework.util.StringUtils;
@@ -31,6 +33,7 @@
 import javax.sip.message.Response;
 
 import java.text.ParseException;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.*;
 
@@ -67,6 +70,15 @@
     @Autowired
     private IDeviceChannelService deviceChannelService;
 
+    private boolean taskQueueHandlerRun = false;
+
+    private final ConcurrentLinkedQueue<SipMsgInfo> taskQueue = new ConcurrentLinkedQueue<>();
+
+    @Qualifier("taskExecutor")
+    @Autowired
+    private ThreadPoolTaskExecutor taskExecutor;
+
+
     @Override
     public void afterPropertiesSet() throws Exception {
         notifyMessageHandler.addHandler(cmdType, this);
@@ -75,114 +87,127 @@
     @Override
     public void handForDevice(RequestEvent evt, Device device, Element rootElement) {
         logger.info("[鏀跺埌鎶ヨ閫氱煡]璁惧锛歿}", device.getDeviceId());
-        // 鍥炲200 OK
-        try {
-            responseAck(getServerTransaction(evt), Response.OK);
-        } catch (SipException | InvalidArgumentException | ParseException e) {
-            logger.error("[鏀跺埌鎶ヨ閫氱煡], 鍥炲200OK澶辫触", e);
-        }
 
-        Element deviceIdElement = rootElement.element("DeviceID");
-        String channelId = deviceIdElement.getText().toString();
+        taskQueue.offer(new SipMsgInfo(evt, device, rootElement));
+        if (!taskQueueHandlerRun) {
+            taskQueueHandlerRun = true;
+            taskExecutor.execute(() -> {
+                while (!taskQueue.isEmpty()) {
+                    SipMsgInfo sipMsgInfo = taskQueue.poll();
+                    // 鍥炲200 OK
+                    try {
+                        responseAck(getServerTransaction(sipMsgInfo.getEvt()), Response.OK);
+                    } catch (SipException | InvalidArgumentException | ParseException e) {
+                        logger.error("[鏀跺埌鎶ヨ閫氱煡], 鍥炲200OK澶辫触", e);
+                    }
 
-        DeviceAlarm deviceAlarm = new DeviceAlarm();
-        deviceAlarm.setCreateTime(DateUtil.getNow());
-        deviceAlarm.setDeviceId(device.getDeviceId());
-        deviceAlarm.setChannelId(channelId);
-        deviceAlarm.setAlarmPriority(getText(rootElement, "AlarmPriority"));
-        deviceAlarm.setAlarmMethod(getText(rootElement, "AlarmMethod"));
-        String alarmTime = XmlUtil.getText(rootElement, "AlarmTime");
-        if (alarmTime == null) {
-            return;
-        }
-        deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime));
-        String alarmDescription = getText(rootElement, "AlarmDescription");
-        if (alarmDescription == null) {
-            deviceAlarm.setAlarmDescription("");
-        } else {
-            deviceAlarm.setAlarmDescription(alarmDescription);
-        }
-        String longitude = getText(rootElement, "Longitude");
-        if (longitude != null && NumericUtil.isDouble(longitude)) {
-            deviceAlarm.setLongitude(Double.parseDouble(longitude));
-        } else {
-            deviceAlarm.setLongitude(0.00);
-        }
-        String latitude = getText(rootElement, "Latitude");
-        if (latitude != null && NumericUtil.isDouble(latitude)) {
-            deviceAlarm.setLatitude(Double.parseDouble(latitude));
-        } else {
-            deviceAlarm.setLatitude(0.00);
-        }
+                    Element deviceIdElement = sipMsgInfo.getRootElement().element("DeviceID");
+                    String channelId = deviceIdElement.getText().toString();
 
-        if (!ObjectUtils.isEmpty(deviceAlarm.getAlarmMethod())) {
-            if ( deviceAlarm.getAlarmMethod().contains(DeviceAlarmMethod.GPS.getVal() + "")) {
-                MobilePosition mobilePosition = new MobilePosition();
-                mobilePosition.setCreateTime(DateUtil.getNow());
-                mobilePosition.setDeviceId(deviceAlarm.getDeviceId());
-                mobilePosition.setTime(deviceAlarm.getAlarmTime());
-                mobilePosition.setLongitude(deviceAlarm.getLongitude());
-                mobilePosition.setLatitude(deviceAlarm.getLatitude());
-                mobilePosition.setReportSource("GPS Alarm");
+                    DeviceAlarm deviceAlarm = new DeviceAlarm();
+                    deviceAlarm.setCreateTime(DateUtil.getNow());
+                    deviceAlarm.setDeviceId(sipMsgInfo.getDevice().getDeviceId());
+                    deviceAlarm.setChannelId(channelId);
+                    deviceAlarm.setAlarmPriority(getText(sipMsgInfo.getRootElement(), "AlarmPriority"));
+                    deviceAlarm.setAlarmMethod(getText(sipMsgInfo.getRootElement(), "AlarmMethod"));
+                    String alarmTime = XmlUtil.getText(sipMsgInfo.getRootElement(), "AlarmTime");
+                    if (alarmTime == null) {
+                        return;
+                    }
+                    deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime));
+                    String alarmDescription = getText(sipMsgInfo.getRootElement(), "AlarmDescription");
+                    if (alarmDescription == null) {
+                        deviceAlarm.setAlarmDescription("");
+                    } else {
+                        deviceAlarm.setAlarmDescription(alarmDescription);
+                    }
+                    String longitude = getText(sipMsgInfo.getRootElement(), "Longitude");
+                    if (longitude != null && NumericUtil.isDouble(longitude)) {
+                        deviceAlarm.setLongitude(Double.parseDouble(longitude));
+                    } else {
+                        deviceAlarm.setLongitude(0.00);
+                    }
+                    String latitude = getText(sipMsgInfo.getRootElement(), "Latitude");
+                    if (latitude != null && NumericUtil.isDouble(latitude)) {
+                        deviceAlarm.setLatitude(Double.parseDouble(latitude));
+                    } else {
+                        deviceAlarm.setLatitude(0.00);
+                    }
 
-                // 鏇存柊device channel 鐨勭粡绾害
-                DeviceChannel deviceChannel = new DeviceChannel();
-                deviceChannel.setDeviceId(device.getDeviceId());
-                deviceChannel.setChannelId(channelId);
-                deviceChannel.setLongitude(mobilePosition.getLongitude());
-                deviceChannel.setLatitude(mobilePosition.getLatitude());
-                deviceChannel.setGpsTime(mobilePosition.getTime());
+                    if (!ObjectUtils.isEmpty(deviceAlarm.getAlarmMethod())) {
+                        if ( deviceAlarm.getAlarmMethod().contains(DeviceAlarmMethod.GPS.getVal() + "")) {
+                            MobilePosition mobilePosition = new MobilePosition();
+                            mobilePosition.setCreateTime(DateUtil.getNow());
+                            mobilePosition.setDeviceId(deviceAlarm.getDeviceId());
+                            mobilePosition.setTime(deviceAlarm.getAlarmTime());
+                            mobilePosition.setLongitude(deviceAlarm.getLongitude());
+                            mobilePosition.setLatitude(deviceAlarm.getLatitude());
+                            mobilePosition.setReportSource("GPS Alarm");
 
-                deviceChannel = deviceChannelService.updateGps(deviceChannel, device);
+                            // 鏇存柊device channel 鐨勭粡绾害
+                            DeviceChannel deviceChannel = new DeviceChannel();
+                            deviceChannel.setDeviceId(sipMsgInfo.getDevice().getDeviceId());
+                            deviceChannel.setChannelId(channelId);
+                            deviceChannel.setLongitude(mobilePosition.getLongitude());
+                            deviceChannel.setLatitude(mobilePosition.getLatitude());
+                            deviceChannel.setGpsTime(mobilePosition.getTime());
 
-                mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84());
-                mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84());
-                mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
-                mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
+                            deviceChannel = deviceChannelService.updateGps(deviceChannel, sipMsgInfo.getDevice());
 
-                if (userSetting.getSavePositionHistory()) {
-                    storager.insertMobilePosition(mobilePosition);
+                            mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84());
+                            mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84());
+                            mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
+                            mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
+
+                            if (userSetting.getSavePositionHistory()) {
+                                storager.insertMobilePosition(mobilePosition);
+                            }
+                            storager.updateChannelPosition(deviceChannel);
+
+                            // 鍙戦�乺edis娑堟伅銆� 閫氱煡浣嶇疆淇℃伅鐨勫彉鍖�
+                            JSONObject jsonObject = new JSONObject();
+                            jsonObject.put("time", mobilePosition.getTime());
+                            jsonObject.put("serial", deviceChannel.getDeviceId());
+                            jsonObject.put("code", deviceChannel.getChannelId());
+                            jsonObject.put("longitude", mobilePosition.getLongitude());
+                            jsonObject.put("latitude", mobilePosition.getLatitude());
+                            jsonObject.put("altitude", mobilePosition.getAltitude());
+                            jsonObject.put("direction", mobilePosition.getDirection());
+                            jsonObject.put("speed", mobilePosition.getSpeed());
+                            redisCatchStorage.sendMobilePositionMsg(jsonObject);
+                        }
+                    }
+                    if (!ObjectUtils.isEmpty(deviceAlarm.getDeviceId())) {
+                        if (deviceAlarm.getAlarmMethod().contains(DeviceAlarmMethod.Video.getVal() + "")) {
+                            deviceAlarm.setAlarmType(getText(sipMsgInfo.getRootElement().element("Info"), "AlarmType"));
+                        }
+                    }
+
+                    if ("7".equals(deviceAlarm.getAlarmMethod()) ) {
+                        // 鍙戦�佺粰骞冲彴鐨勬姤璀︿俊鎭�� 鍙戦�乺edis閫氱煡
+                        AlarmChannelMessage alarmChannelMessage = new AlarmChannelMessage();
+                        alarmChannelMessage.setAlarmSn(Integer.parseInt(deviceAlarm.getAlarmMethod()));
+                        alarmChannelMessage.setAlarmDescription(deviceAlarm.getAlarmDescription());
+                        alarmChannelMessage.setGbId(channelId);
+                        redisCatchStorage.sendAlarmMsg(alarmChannelMessage);
+                        return;
+                    }
+
+                    logger.debug("瀛樺偍鎶ヨ淇℃伅銆佹姤璀﹀垎绫�");
+                    // 瀛樺偍鎶ヨ淇℃伅銆佹姤璀﹀垎绫�
+                    if (sipConfig.isAlarm()) {
+                        deviceAlarmService.add(deviceAlarm);
+                    }
+                    logger.info("[鏀跺埌鎶ヨ閫氱煡]鍐呭锛歿}", JSONObject.toJSON(deviceAlarm));
+                    if (redisCatchStorage.deviceIsOnline(sipMsgInfo.getDevice().getDeviceId())) {
+                        publisher.deviceAlarmEventPublish(deviceAlarm);
+                    }
                 }
-                storager.updateChannelPosition(deviceChannel);
-
-                // 鍙戦�乺edis娑堟伅銆� 閫氱煡浣嶇疆淇℃伅鐨勫彉鍖�
-                JSONObject jsonObject = new JSONObject();
-                jsonObject.put("time", mobilePosition.getTime());
-                jsonObject.put("serial", deviceChannel.getDeviceId());
-                jsonObject.put("code", deviceChannel.getChannelId());
-                jsonObject.put("longitude", mobilePosition.getLongitude());
-                jsonObject.put("latitude", mobilePosition.getLatitude());
-                jsonObject.put("altitude", mobilePosition.getAltitude());
-                jsonObject.put("direction", mobilePosition.getDirection());
-                jsonObject.put("speed", mobilePosition.getSpeed());
-                redisCatchStorage.sendMobilePositionMsg(jsonObject);
-            }
-        }
-        if (!ObjectUtils.isEmpty(deviceAlarm.getDeviceId())) {
-            if (deviceAlarm.getAlarmMethod().contains(DeviceAlarmMethod.Video.getVal() + "")) {
-                deviceAlarm.setAlarmType(getText(rootElement.element("Info"), "AlarmType"));
-            }
+                taskQueueHandlerRun = false;
+            });
         }
 
-        if ("7".equals(deviceAlarm.getAlarmMethod()) ) {
-            // 鍙戦�佺粰骞冲彴鐨勬姤璀︿俊鎭�� 鍙戦�乺edis閫氱煡
-            AlarmChannelMessage alarmChannelMessage = new AlarmChannelMessage();
-            alarmChannelMessage.setAlarmSn(Integer.parseInt(deviceAlarm.getAlarmMethod()));
-            alarmChannelMessage.setAlarmDescription(deviceAlarm.getAlarmDescription());
-            alarmChannelMessage.setGbId(channelId);
-            redisCatchStorage.sendAlarmMsg(alarmChannelMessage);
-            return;
-        }
 
-        logger.debug("瀛樺偍鎶ヨ淇℃伅銆佹姤璀﹀垎绫�");
-        // 瀛樺偍鎶ヨ淇℃伅銆佹姤璀﹀垎绫�
-        if (sipConfig.isAlarm()) {
-            deviceAlarmService.add(deviceAlarm);
-        }
-        logger.info("[鏀跺埌鎶ヨ閫氱煡]鍐呭锛歿}", JSONObject.toJSON(deviceAlarm));
-        if (redisCatchStorage.deviceIsOnline(device.getDeviceId())) {
-            publisher.deviceAlarmEventPublish(deviceAlarm);
-        }
     }
 
     @Override
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java
index b5051c0..67d26d7 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java
@@ -17,6 +17,8 @@
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.InitializingBean;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Component;
 import org.springframework.util.ObjectUtils;
 import org.springframework.util.StringUtils;
@@ -26,6 +28,7 @@
 import javax.sip.SipException;
 import javax.sip.message.Response;
 import java.text.ParseException;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText;
 
@@ -53,6 +56,14 @@
     @Autowired
     private IDeviceChannelService deviceChannelService;
 
+    private boolean taskQueueHandlerRun = false;
+
+    private final ConcurrentLinkedQueue<SipMsgInfo> taskQueue = new ConcurrentLinkedQueue<>();
+
+    @Qualifier("taskExecutor")
+    @Autowired
+    private ThreadPoolTaskExecutor taskExecutor;
+
     @Override
     public void afterPropertiesSet() throws Exception {
         notifyMessageHandler.addHandler(cmdType, this);
@@ -61,78 +72,91 @@
     @Override
     public void handForDevice(RequestEvent evt, Device device, Element rootElement) {
 
-        try {
-            rootElement = getRootElement(evt, device.getCharset());
-            if (rootElement == null) {
-                logger.warn("[ 绉诲姩璁惧浣嶇疆鏁版嵁閫氱煡 ] content cannot be null, {}", evt.getRequest());
-                responseAck(getServerTransaction(evt), Response.BAD_REQUEST);
-                return;
-            }
-            MobilePosition mobilePosition = new MobilePosition();
-            mobilePosition.setCreateTime(DateUtil.getNow());
-            if (!ObjectUtils.isEmpty(device.getName())) {
-                mobilePosition.setDeviceName(device.getName());
-            }
-            mobilePosition.setDeviceId(device.getDeviceId());
-            mobilePosition.setChannelId(getText(rootElement, "DeviceID"));
-            mobilePosition.setTime(getText(rootElement, "Time"));
-            mobilePosition.setLongitude(Double.parseDouble(getText(rootElement, "Longitude")));
-            mobilePosition.setLatitude(Double.parseDouble(getText(rootElement, "Latitude")));
-            if (NumericUtil.isDouble(getText(rootElement, "Speed"))) {
-                mobilePosition.setSpeed(Double.parseDouble(getText(rootElement, "Speed")));
-            } else {
-                mobilePosition.setSpeed(0.0);
-            }
-            if (NumericUtil.isDouble(getText(rootElement, "Direction"))) {
-                mobilePosition.setDirection(Double.parseDouble(getText(rootElement, "Direction")));
-            } else {
-                mobilePosition.setDirection(0.0);
-            }
-            if (NumericUtil.isDouble(getText(rootElement, "Altitude"))) {
-                mobilePosition.setAltitude(Double.parseDouble(getText(rootElement, "Altitude")));
-            } else {
-                mobilePosition.setAltitude(0.0);
-            }
-            mobilePosition.setReportSource("Mobile Position");
+        taskQueue.offer(new SipMsgInfo(evt, device, rootElement));
+        if (!taskQueueHandlerRun) {
+            taskQueueHandlerRun = true;
+            taskExecutor.execute(() -> {
+                while (!taskQueue.isEmpty()) {
+                    SipMsgInfo sipMsgInfo = taskQueue.poll();
+                    try {
+                        Element rootElementAfterCharset = getRootElement(sipMsgInfo.getEvt(), sipMsgInfo.getDevice().getCharset());
+                        if (rootElementAfterCharset == null) {
+                            logger.warn("[ 绉诲姩璁惧浣嶇疆鏁版嵁閫氱煡 ] content cannot be null, {}", sipMsgInfo.getEvt().getRequest());
+                            responseAck(getServerTransaction(sipMsgInfo.getEvt()), Response.BAD_REQUEST);
+                            return;
+                        }
+                        MobilePosition mobilePosition = new MobilePosition();
+                        mobilePosition.setCreateTime(DateUtil.getNow());
+                        if (!ObjectUtils.isEmpty(sipMsgInfo.getDevice().getName())) {
+                            mobilePosition.setDeviceName(sipMsgInfo.getDevice().getName());
+                        }
+                        mobilePosition.setDeviceId(sipMsgInfo.getDevice().getDeviceId());
+                        mobilePosition.setChannelId(getText(rootElementAfterCharset, "DeviceID"));
+                        mobilePosition.setTime(getText(rootElementAfterCharset, "Time"));
+                        mobilePosition.setLongitude(Double.parseDouble(getText(rootElementAfterCharset, "Longitude")));
+                        mobilePosition.setLatitude(Double.parseDouble(getText(rootElementAfterCharset, "Latitude")));
+                        if (NumericUtil.isDouble(getText(rootElementAfterCharset, "Speed"))) {
+                            mobilePosition.setSpeed(Double.parseDouble(getText(rootElementAfterCharset, "Speed")));
+                        } else {
+                            mobilePosition.setSpeed(0.0);
+                        }
+                        if (NumericUtil.isDouble(getText(rootElementAfterCharset, "Direction"))) {
+                            mobilePosition.setDirection(Double.parseDouble(getText(rootElementAfterCharset, "Direction")));
+                        } else {
+                            mobilePosition.setDirection(0.0);
+                        }
+                        if (NumericUtil.isDouble(getText(rootElementAfterCharset, "Altitude"))) {
+                            mobilePosition.setAltitude(Double.parseDouble(getText(rootElementAfterCharset, "Altitude")));
+                        } else {
+                            mobilePosition.setAltitude(0.0);
+                        }
+                        mobilePosition.setReportSource("Mobile Position");
 
 
-            // 鏇存柊device channel 鐨勭粡绾害
-            DeviceChannel deviceChannel = new DeviceChannel();
-            deviceChannel.setDeviceId(device.getDeviceId());
-            deviceChannel.setChannelId(mobilePosition.getChannelId());
-            deviceChannel.setLongitude(mobilePosition.getLongitude());
-            deviceChannel.setLatitude(mobilePosition.getLatitude());
-            deviceChannel.setGpsTime(mobilePosition.getTime());
+                        // 鏇存柊device channel 鐨勭粡绾害
+                        DeviceChannel deviceChannel = new DeviceChannel();
+                        deviceChannel.setDeviceId(sipMsgInfo.getDevice().getDeviceId());
+                        deviceChannel.setChannelId(mobilePosition.getChannelId());
+                        deviceChannel.setLongitude(mobilePosition.getLongitude());
+                        deviceChannel.setLatitude(mobilePosition.getLatitude());
+                        deviceChannel.setGpsTime(mobilePosition.getTime());
 
-            deviceChannel = deviceChannelService.updateGps(deviceChannel, device);
+                        deviceChannel = deviceChannelService.updateGps(deviceChannel, sipMsgInfo.getDevice());
 
-            mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84());
-            mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84());
-            mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
-            mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
+                        mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84());
+                        mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84());
+                        mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
+                        mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
 
-            if (userSetting.getSavePositionHistory()) {
-                storager.insertMobilePosition(mobilePosition);
-            }
-            storager.updateChannelPosition(deviceChannel);
-            //鍥炲 200 OK
-            responseAck(getServerTransaction(evt), Response.OK);
+                        if (userSetting.getSavePositionHistory()) {
+                            storager.insertMobilePosition(mobilePosition);
+                        }
+                        storager.updateChannelPosition(deviceChannel);
+                        //鍥炲 200 OK
+                        responseAck(getServerTransaction(sipMsgInfo.getEvt()), Response.OK);
 
-            // 鍙戦�乺edis娑堟伅銆� 閫氱煡浣嶇疆淇℃伅鐨勫彉鍖�
-            JSONObject jsonObject = new JSONObject();
-            jsonObject.put("time", mobilePosition.getTime());
-            jsonObject.put("serial", deviceChannel.getDeviceId());
-            jsonObject.put("code", deviceChannel.getChannelId());
-            jsonObject.put("longitude", mobilePosition.getLongitude());
-            jsonObject.put("latitude", mobilePosition.getLatitude());
-            jsonObject.put("altitude", mobilePosition.getAltitude());
-            jsonObject.put("direction", mobilePosition.getDirection());
-            jsonObject.put("speed", mobilePosition.getSpeed());
-            redisCatchStorage.sendMobilePositionMsg(jsonObject);
+                        // 鍙戦�乺edis娑堟伅銆� 閫氱煡浣嶇疆淇℃伅鐨勫彉鍖�
+                        JSONObject jsonObject = new JSONObject();
+                        jsonObject.put("time", mobilePosition.getTime());
+                        jsonObject.put("serial", deviceChannel.getDeviceId());
+                        jsonObject.put("code", deviceChannel.getChannelId());
+                        jsonObject.put("longitude", mobilePosition.getLongitude());
+                        jsonObject.put("latitude", mobilePosition.getLatitude());
+                        jsonObject.put("altitude", mobilePosition.getAltitude());
+                        jsonObject.put("direction", mobilePosition.getDirection());
+                        jsonObject.put("speed", mobilePosition.getSpeed());
+                        redisCatchStorage.sendMobilePositionMsg(jsonObject);
 
-        } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) {
-            e.printStackTrace();
+                    } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) {
+                        e.printStackTrace();
+                    }
+
+                }
+                taskQueueHandlerRun = false;
+            });
         }
+
+
     }
 
     @Override
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
index dedc7c1..3c5644b 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
@@ -1,6 +1,7 @@
 package com.genersoft.iot.vmp.service.impl;
 
 import com.genersoft.iot.vmp.conf.DynamicTask;
+import com.genersoft.iot.vmp.conf.UserSetting;
 import com.genersoft.iot.vmp.gb28181.bean.*;
 import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
@@ -58,6 +59,9 @@
 
     @Autowired
     private GbStreamMapper gbStreamMapper;
+
+    @Autowired
+    private UserSetting userSetting;
 
 
 
@@ -241,7 +245,7 @@
         if (subscribe != null) {
 
             // TODO 鏆傛椂鍙鐞嗚棰戞祦鐨勫洖澶�,鍚庣画澧炲姞瀵瑰浗鏍囪澶囩殑鏀寔
-            List<DeviceChannel> gbStreams = gbStreamMapper.queryGbStreamListInPlatform(platform.getServerGBId());
+            List<DeviceChannel> gbStreams = gbStreamMapper.queryGbStreamListInPlatform(platform.getServerGBId(), userSetting.isUsePushingAsStatus());
             if (gbStreams.size() == 0) {
                 return;
             }
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
index c0b9e95..6662738 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -131,23 +131,6 @@
         StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
         playResult.setDevice(device);
 
-        result.onCompletion(()->{
-            // 鐐规挱缁撴潫鏃惰皟鐢ㄦ埅鍥炬帴鍙�
-            taskExecutor.execute(()->{
-                // TODO 搴旇鍦ㄤ笂娴佹椂璋冪敤鏇村ソ锛岀粨鏉熶篃鍙兘鏄敊璇粨鏉�
-                String path =  "snap";
-                String fileName =  deviceId + "_" + channelId + ".jpg";
-                WVPResult wvpResult =  (WVPResult)result.getResult();
-                if (Objects.requireNonNull(wvpResult).getCode() == 0) {
-                    StreamInfo streamInfoForSuccess = (StreamInfo)wvpResult.getData();
-                    MediaServerItem mediaInfo = mediaServerService.getOne(streamInfoForSuccess.getMediaServerId());
-                    String streamUrl = streamInfoForSuccess.getFmp4();
-                    // 璇锋眰鎴浘
-                    logger.info("[璇锋眰鎴浘]: " + fileName);
-                    zlmresTfulUtils.getSnap(mediaInfo, streamUrl, 15, 1, path, fileName);
-                }
-            });
-        });
         if (streamInfo != null) {
             String streamId = streamInfo.getStream();
             if (streamId == null) {
@@ -209,6 +192,21 @@
             SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck(), false);
             logger.info(JSONObject.toJSONString(ssrcInfo));
             play(mediaServerItem, ssrcInfo, device, channelId, (mediaServerItemInUse, response)->{
+                // 鐐规挱缁撴潫鏃惰皟鐢ㄦ埅鍥炬帴鍙�
+                taskExecutor.execute(()->{
+                    // TODO 搴旇鍦ㄤ笂娴佹椂璋冪敤鏇村ソ锛岀粨鏉熶篃鍙兘鏄敊璇粨鏉�
+                    String path =  "snap";
+                    String fileName =  deviceId + "_" + channelId + ".jpg";
+                    WVPResult wvpResult =  (WVPResult)result.getResult();
+                    if (Objects.requireNonNull(wvpResult).getCode() == 0) {
+                        StreamInfo streamInfoForSuccess = (StreamInfo)wvpResult.getData();
+                        MediaServerItem mediaInfo = mediaServerService.getOne(streamInfoForSuccess.getMediaServerId());
+                        String streamUrl = streamInfoForSuccess.getFmp4();
+                        // 璇锋眰鎴浘
+                        logger.info("[璇锋眰鎴浘]: " + fileName);
+                        zlmresTfulUtils.getSnap(mediaInfo, streamUrl, 15, 1, path, fileName);
+                    }
+                });
                 if (hookEvent != null) {
                     hookEvent.response(mediaServerItem, response);
                 }
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisAlarmMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisAlarmMsgListener.java
deleted file mode 100644
index 1634234..0000000
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisAlarmMsgListener.java
+++ /dev/null
@@ -1,77 +0,0 @@
-package com.genersoft.iot.vmp.service.impl;
-
-import com.alibaba.fastjson.JSON;
-import com.genersoft.iot.vmp.gb28181.bean.*;
-import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
-import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
-import com.genersoft.iot.vmp.service.IPlatformChannelService;
-import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
-import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
-import com.genersoft.iot.vmp.utils.DateUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.data.redis.connection.Message;
-import org.springframework.data.redis.connection.MessageListener;
-import org.springframework.stereotype.Component;
-import org.springframework.util.ObjectUtils;
-
-import java.util.List;
-
-
-@Component
-public class RedisAlarmMsgListener implements MessageListener {
-
-    private final static Logger logger = LoggerFactory.getLogger(RedisAlarmMsgListener.class);
-
-    @Autowired
-    private ISIPCommander commander;
-
-    @Autowired
-    private ISIPCommanderForPlatform commanderForPlatform;
-
-    @Autowired
-    private IVideoManagerStorage storage;
-
-    @Override
-    public void onMessage(Message message, byte[] bytes) {
-        logger.info("鏀跺埌鏉ヨ嚜REDIS鐨凙LARM閫氱煡锛� {}", new String(message.getBody()));
-        AlarmChannelMessage alarmChannelMessage = JSON.parseObject(message.getBody(), AlarmChannelMessage.class);
-        if (alarmChannelMessage == null) {
-            logger.warn("[REDIS鐨凙LARM閫氱煡]娑堟伅瑙f瀽澶辫触");
-            return;
-        }
-        String gbId = alarmChannelMessage.getGbId();
-
-        DeviceAlarm deviceAlarm = new DeviceAlarm();
-        deviceAlarm.setCreateTime(DateUtil.getNow());
-        deviceAlarm.setChannelId(gbId);
-        deviceAlarm.setAlarmDescription(alarmChannelMessage.getAlarmDescription());
-        deviceAlarm.setAlarmMethod("" + alarmChannelMessage.getAlarmSn());
-        deviceAlarm.setAlarmPriority("1");
-        deviceAlarm.setAlarmTime(DateUtil.getNowForISO8601());
-        deviceAlarm.setAlarmType("1");
-        deviceAlarm.setLongitude(0);
-        deviceAlarm.setLatitude(0);
-
-        if (ObjectUtils.isEmpty(gbId)) {
-            // 鍙戦�佺粰鎵�鏈夌殑涓婄骇
-            List<ParentPlatform> parentPlatforms = storage.queryEnableParentPlatformList(true);
-            if (parentPlatforms.size() > 0) {
-                for (ParentPlatform parentPlatform : parentPlatforms) {
-                    commanderForPlatform.sendAlarmMessage(parentPlatform, deviceAlarm);
-                }
-            }
-        }else {
-            Device device = storage.queryVideoDevice(gbId);
-            ParentPlatform platform = storage.queryParentPlatByServerGBId(gbId);
-            if (device != null && platform == null) {
-                commander.sendAlarmMessage(device, deviceAlarm);
-            }else if (device == null && platform != null){
-                commanderForPlatform.sendAlarmMessage(platform, deviceAlarm);
-            }else {
-                logger.warn("鏃犳硶纭畾" + gbId + "鏄钩鍙拌繕鏄澶�");
-            }
-        }
-    }
-}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamResponseListener.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamResponseListener.java
deleted file mode 100644
index 56c9ff3..0000000
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamResponseListener.java
+++ /dev/null
@@ -1,62 +0,0 @@
-package com.genersoft.iot.vmp.service.impl;
-
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONObject;
-import com.genersoft.iot.vmp.media.zlm.dto.ChannelOnlineEvent;
-import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
-import com.genersoft.iot.vmp.service.IGbStreamService;
-import com.genersoft.iot.vmp.service.IMediaServerService;
-import com.genersoft.iot.vmp.service.IStreamPushService;
-import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse;
-import com.genersoft.iot.vmp.utils.DateUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.data.redis.connection.Message;
-import org.springframework.data.redis.connection.MessageListener;
-import org.springframework.stereotype.Component;
-import org.springframework.util.ObjectUtils;
-
-import javax.annotation.Resource;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * 鎺ユ敹redis杩斿洖鐨勬帹娴佺粨鏋�
- * @author lin
- */
-@Component
-public class RedisPushStreamResponseListener implements MessageListener {
-
-    private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamResponseListener.class);
-
-    private Map<String, PushStreamResponseEvent> responseEvents = new ConcurrentHashMap<>();
-
-    public interface PushStreamResponseEvent{
-        void run(MessageForPushChannelResponse response);
-    }
-
-    @Override
-    public void onMessage(Message message, byte[] bytes) {
-        //
-        logger.warn("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉]锛� {}", new String(message.getBody()));
-        MessageForPushChannelResponse response = JSON.parseObject(new String(message.getBody()), MessageForPushChannelResponse.class);
-        if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){
-            logger.info("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉]锛氬弬鏁颁笉鍏�");
-            return;
-        }
-        // 鏌ョ湅姝e湪绛夊緟鐨刬nvite娑堟伅
-        if (responseEvents.get(response.getApp() + response.getStream()) != null) {
-            responseEvents.get(response.getApp() + response.getStream()).run(response);
-        }
-    }
-
-    public void addEvent(String app, String stream, PushStreamResponseEvent callback) {
-        responseEvents.put(app + stream, callback);
-    }
-
-    public void removeEvent(String app, String stream) {
-        responseEvents.remove(app + stream);
-    }
-}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusListMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusListMsgListener.java
deleted file mode 100644
index bedbf44..0000000
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusListMsgListener.java
+++ /dev/null
@@ -1,81 +0,0 @@
-package com.genersoft.iot.vmp.service.impl;
-
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONObject;
-import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
-import com.genersoft.iot.vmp.service.IGbStreamService;
-import com.genersoft.iot.vmp.service.IMediaServerService;
-import com.genersoft.iot.vmp.service.IStreamPushService;
-import com.genersoft.iot.vmp.utils.DateUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.data.redis.connection.Message;
-import org.springframework.data.redis.connection.MessageListener;
-import org.springframework.stereotype.Component;
-
-import javax.annotation.Resource;
-import java.util.*;
-
-/**
- * @Auther: JiangFeng
- * @Date: 2022/8/16 11:32
- * @Description: 鎺ユ敹redis鍙戦�佺殑鎺ㄦ祦璁惧鍒楄〃鏇存柊閫氱煡
- */
-@Component
-public class RedisPushStreamStatusListMsgListener implements MessageListener {
-
-    private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamStatusListMsgListener.class);
-    @Resource
-    private IMediaServerService mediaServerService;
-
-    @Resource
-    private IStreamPushService streamPushService;
-    @Resource
-    private IGbStreamService gbStreamService;
-
-    @Override
-    public void onMessage(Message message, byte[] bytes) {
-        //
-        logger.warn("[REDIS娑堟伅-鎺ㄦ祦璁惧鍒楄〃鏇存柊]锛� {}", new String(message.getBody()));
-        List<StreamPushItem> streamPushItems = JSON.parseArray(new String(message.getBody()), StreamPushItem.class);
-        //鏌ヨ鍏ㄩ儴鐨刟pp+stream 鐢ㄤ簬鍒ゆ柇鏄坊鍔犺繕鏄慨鏀�
-        List<String> allAppAndStream = streamPushService.getAllAppAndStream();
-
-        /**
-         * 鐢ㄤ簬瀛樺偍鏇村叿APP+Stream杩囨护鍚庣殑鏁版嵁锛屽彲浠ョ洿鎺ュ瓨鍏tream_push琛ㄤ笌gb_stream琛�
-         */
-        List<StreamPushItem> streamPushItemForSave = new ArrayList<>();
-        List<StreamPushItem> streamPushItemForUpdate = new ArrayList<>();
-        for (StreamPushItem streamPushItem : streamPushItems) {
-            String app = streamPushItem.getApp();
-            String stream = streamPushItem.getStream();
-            boolean contains = allAppAndStream.contains(app + stream);
-            //涓嶅瓨鍦ㄥ氨娣诲姞
-            if (!contains) {
-                streamPushItem.setStreamType("push");
-                streamPushItem.setCreateTime(DateUtil.getNow());
-                streamPushItem.setMediaServerId(mediaServerService.getDefaultMediaServer().getId());
-                streamPushItem.setOriginType(2);
-                streamPushItem.setOriginTypeStr("rtsp_push");
-                streamPushItem.setTotalReaderCount("0");
-                streamPushItemForSave.add(streamPushItem);
-            } else {
-                //瀛樺湪灏卞彧淇敼 name鍜実bId
-                streamPushItemForUpdate.add(streamPushItem);
-            }
-        }
-        if (streamPushItemForSave.size() > 0) {
-
-            logger.info("娣诲姞{}鏉�",streamPushItemForSave.size());
-            logger.info(JSONObject.toJSONString(streamPushItemForSave));
-            streamPushService.batchAdd(streamPushItemForSave);
-
-        }
-        if(streamPushItemForUpdate.size()>0){
-            logger.info("淇敼{}鏉�",streamPushItemForUpdate.size());
-            logger.info(JSONObject.toJSONString(streamPushItemForUpdate));
-            gbStreamService.updateGbIdOrName(streamPushItemForUpdate);
-        }
-
-    }
-}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisStreamMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisStreamMsgListener.java
deleted file mode 100644
index 118a227..0000000
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisStreamMsgListener.java
+++ /dev/null
@@ -1,72 +0,0 @@
-package com.genersoft.iot.vmp.service.impl;
-
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONObject;
-import com.genersoft.iot.vmp.conf.UserSetting;
-
-import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
-import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
-import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
-import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
-import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.data.redis.connection.Message;
-import org.springframework.data.redis.connection.MessageListener;
-import org.springframework.stereotype.Component;
-
-
-/**
- * 鎺ユ敹鍏朵粬wvp鍙戦�佹祦鍙樺寲閫氱煡
- * @author lin
- */
-@Component
-public class RedisStreamMsgListener implements MessageListener {
-
-    private final static Logger logger = LoggerFactory.getLogger(RedisStreamMsgListener.class);
-
-    @Autowired
-    private UserSetting userSetting;
-
-    @Autowired
-    private ZLMMediaListManager zlmMediaListManager;
-
-    @Override
-    public void onMessage(Message message, byte[] bytes) {
-
-        JSONObject steamMsgJson = JSON.parseObject(message.getBody(), JSONObject.class);
-        if (steamMsgJson == null) {
-            logger.warn("[鏀跺埌redis 娴佸彉鍖朷娑堟伅瑙f瀽澶辫触");
-            return;
-        }
-        String serverId = steamMsgJson.getString("serverId");
-
-        if (userSetting.getServerId().equals(serverId)) {
-            // 鑷繁鍙戦�佺殑娑堟伅蹇界暐鍗冲彲
-            return;
-        }
-        logger.info("[鏀跺埌redis 娴佸彉鍖朷锛� {}", new String(message.getBody()));
-        String app = steamMsgJson.getString("app");
-        String stream = steamMsgJson.getString("stream");
-        boolean register = steamMsgJson.getBoolean("register");
-        String mediaServerId = steamMsgJson.getString("mediaServerId");
-        MediaItem mediaItem = new MediaItem();
-        mediaItem.setSeverId(serverId);
-        mediaItem.setApp(app);
-        mediaItem.setStream(stream);
-        mediaItem.setRegist(register);
-        mediaItem.setMediaServerId(mediaServerId);
-        mediaItem.setCreateStamp(System.currentTimeMillis()/1000);
-        mediaItem.setAliveSecond(0L);
-        mediaItem.setTotalReaderCount("0");
-        mediaItem.setOriginType(0);
-        mediaItem.setOriginTypeStr("0");
-        mediaItem.setOriginTypeStr("unknown");
-        if (register) {
-            zlmMediaListManager.addPush(mediaItem);
-        }else {
-            zlmMediaListManager.removeMedia(app, stream);
-        }
-    }
-}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java
new file mode 100644
index 0000000..84f5ef7
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java
@@ -0,0 +1,100 @@
+package com.genersoft.iot.vmp.service.redisMsg;
+
+import com.alibaba.fastjson.JSON;
+import com.genersoft.iot.vmp.gb28181.bean.*;
+import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
+import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
+import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
+import com.genersoft.iot.vmp.utils.DateUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.stereotype.Component;
+import org.springframework.util.ObjectUtils;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+
+@Component
+public class RedisAlarmMsgListener implements MessageListener {
+
+    private final static Logger logger = LoggerFactory.getLogger(RedisAlarmMsgListener.class);
+
+    @Autowired
+    private ISIPCommander commander;
+
+    @Autowired
+    private ISIPCommanderForPlatform commanderForPlatform;
+
+    @Autowired
+    private IVideoManagerStorage storage;
+
+    private boolean taskQueueHandlerRun = false;
+
+    private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
+
+    @Qualifier("taskExecutor")
+    @Autowired
+    private ThreadPoolTaskExecutor taskExecutor;
+
+    @Override
+    public void onMessage(Message message, byte[] bytes) {
+        logger.info("鏀跺埌鏉ヨ嚜REDIS鐨凙LARM閫氱煡锛� {}", new String(message.getBody()));
+
+        taskQueue.offer(message);
+        if (!taskQueueHandlerRun) {
+            taskQueueHandlerRun = true;
+            taskExecutor.execute(() -> {
+                while (!taskQueue.isEmpty()) {
+                    Message msg = taskQueue.poll();
+
+                    AlarmChannelMessage alarmChannelMessage = JSON.parseObject(msg.getBody(), AlarmChannelMessage.class);
+                    if (alarmChannelMessage == null) {
+                        logger.warn("[REDIS鐨凙LARM閫氱煡]娑堟伅瑙f瀽澶辫触");
+                        return;
+                    }
+                    String gbId = alarmChannelMessage.getGbId();
+
+                    DeviceAlarm deviceAlarm = new DeviceAlarm();
+                    deviceAlarm.setCreateTime(DateUtil.getNow());
+                    deviceAlarm.setChannelId(gbId);
+                    deviceAlarm.setAlarmDescription(alarmChannelMessage.getAlarmDescription());
+                    deviceAlarm.setAlarmMethod("" + alarmChannelMessage.getAlarmSn());
+                    deviceAlarm.setAlarmPriority("1");
+                    deviceAlarm.setAlarmTime(DateUtil.getNowForISO8601());
+                    deviceAlarm.setAlarmType("1");
+                    deviceAlarm.setLongitude(0);
+                    deviceAlarm.setLatitude(0);
+
+                    if (ObjectUtils.isEmpty(gbId)) {
+                        // 鍙戦�佺粰鎵�鏈夌殑涓婄骇
+                        List<ParentPlatform> parentPlatforms = storage.queryEnableParentPlatformList(true);
+                        if (parentPlatforms.size() > 0) {
+                            for (ParentPlatform parentPlatform : parentPlatforms) {
+                                commanderForPlatform.sendAlarmMessage(parentPlatform, deviceAlarm);
+                            }
+                        }
+                    }else {
+                        Device device = storage.queryVideoDevice(gbId);
+                        ParentPlatform platform = storage.queryParentPlatByServerGBId(gbId);
+                        if (device != null && platform == null) {
+                            commander.sendAlarmMessage(device, deviceAlarm);
+                        }else if (device == null && platform != null){
+                            commanderForPlatform.sendAlarmMessage(platform, deviceAlarm);
+                        }else {
+                            logger.warn("鏃犳硶纭畾" + gbId + "鏄钩鍙拌繕鏄澶�");
+                        }
+                    }
+                }
+                taskQueueHandlerRun = false;
+            });
+        }
+
+
+    }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGbPlayMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
similarity index 67%
rename from src/main/java/com/genersoft/iot/vmp/service/impl/RedisGbPlayMsgListener.java
rename to src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
index e327d0f..e6ba9b6 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGbPlayMsgListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
@@ -1,4 +1,4 @@
-package com.genersoft.iot.vmp.service.impl;
+package com.genersoft.iot.vmp.service.redisMsg;
 
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
@@ -19,8 +19,10 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.data.redis.connection.Message;
 import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Component;
 
 import java.text.ParseException;
@@ -28,6 +30,7 @@
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 
 /**
@@ -85,6 +88,14 @@
     @Autowired
     private ZlmHttpHookSubscribe subscribe;
 
+    private boolean taskQueueHandlerRun = false;
+
+    private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
+
+    @Qualifier("taskExecutor")
+    @Autowired
+    private ThreadPoolTaskExecutor taskExecutor;
+
 
     public interface PlayMsgCallback{
         void handler(ResponseSendItemMsg responseSendItemMsg) throws ParseException;
@@ -100,94 +111,107 @@
 
     @Override
     public void onMessage(Message message, byte[] bytes) {
-        JSONObject msgJSON = JSON.parseObject(message.getBody(), JSONObject.class);
-        WvpRedisMsg wvpRedisMsg = JSON.toJavaObject(msgJSON, WvpRedisMsg.class);
-        if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) {
-            return;
-        }
-        if (WvpRedisMsg.isRequest(wvpRedisMsg)) {
-            logger.info("[鏀跺埌REDIS閫氱煡] 璇锋眰锛� {}", new String(message.getBody()));
 
-            switch (wvpRedisMsg.getCmd()){
-                case WvpRedisMsgCmd.GET_SEND_ITEM:
-                    RequestSendItemMsg content = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), RequestSendItemMsg.class);
-                    requestSendItemMsgHand(content, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
-                    break;
-                case WvpRedisMsgCmd.REQUEST_PUSH_STREAM:
-                    RequestPushStreamMsg param = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), RequestPushStreamMsg.class);;
-                    requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
-                    break;
-                default:
-                    break;
-            }
+        taskQueue.offer(message);
+        if (!taskQueueHandlerRun) {
+            taskQueueHandlerRun = true;
+            taskExecutor.execute(() -> {
+                while (!taskQueue.isEmpty()) {
+                    Message msg = taskQueue.poll();
+                    JSONObject msgJSON = JSON.parseObject(msg.getBody(), JSONObject.class);
+                    WvpRedisMsg wvpRedisMsg = JSON.toJavaObject(msgJSON, WvpRedisMsg.class);
+                    if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) {
+                        return;
+                    }
+                    if (WvpRedisMsg.isRequest(wvpRedisMsg)) {
+                        logger.info("[鏀跺埌REDIS閫氱煡] 璇锋眰锛� {}", new String(msg.getBody()));
 
-        }else {
-            logger.info("[鏀跺埌REDIS閫氱煡] 鍥炲锛� {}", new String(message.getBody()));
-            switch (wvpRedisMsg.getCmd()){
-                case WvpRedisMsgCmd.GET_SEND_ITEM:
+                        switch (wvpRedisMsg.getCmd()){
+                            case WvpRedisMsgCmd.GET_SEND_ITEM:
+                                RequestSendItemMsg content = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), RequestSendItemMsg.class);
+                                requestSendItemMsgHand(content, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
+                                break;
+                            case WvpRedisMsgCmd.REQUEST_PUSH_STREAM:
+                                RequestPushStreamMsg param = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), RequestPushStreamMsg.class);;
+                                requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
+                                break;
+                            default:
+                                break;
+                        }
 
-                    WVPResult content  = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), WVPResult.class);
+                    }else {
+                        logger.info("[鏀跺埌REDIS閫氱煡] 鍥炲锛� {}", new String(msg.getBody()));
+                        switch (wvpRedisMsg.getCmd()){
+                            case WvpRedisMsgCmd.GET_SEND_ITEM:
 
-                    String key = wvpRedisMsg.getSerial();
-                    switch (content.getCode()) {
-                        case 0:
-                            ResponseSendItemMsg responseSendItemMsg =JSON.toJavaObject((JSONObject)content.getData(), ResponseSendItemMsg.class);
-                            PlayMsgCallback playMsgCallback = callbacks.get(key);
-                            if (playMsgCallback != null) {
-                                callbacksForError.remove(key);
-                                try {
-                                    playMsgCallback.handler(responseSendItemMsg);
-                                } catch (ParseException e) {
-                                    throw new RuntimeException(e);
+                                WVPResult content  = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), WVPResult.class);
+
+                                String key = wvpRedisMsg.getSerial();
+                                switch (content.getCode()) {
+                                    case 0:
+                                        ResponseSendItemMsg responseSendItemMsg =JSON.toJavaObject((JSONObject)content.getData(), ResponseSendItemMsg.class);
+                                        PlayMsgCallback playMsgCallback = callbacks.get(key);
+                                        if (playMsgCallback != null) {
+                                            callbacksForError.remove(key);
+                                            try {
+                                                playMsgCallback.handler(responseSendItemMsg);
+                                            } catch (ParseException e) {
+                                                throw new RuntimeException(e);
+                                            }
+                                        }
+                                        break;
+                                    case ERROR_CODE_MEDIA_SERVER_NOT_FOUND:
+                                    case ERROR_CODE_OFFLINE:
+                                    case ERROR_CODE_TIMEOUT:
+                                        PlayMsgErrorCallback errorCallback = callbacksForError.get(key);
+                                        if (errorCallback != null) {
+                                            callbacks.remove(key);
+                                            errorCallback.handler(content);
+                                        }
+                                        break;
+                                    default:
+                                        break;
                                 }
-                            }
-                            break;
-                        case ERROR_CODE_MEDIA_SERVER_NOT_FOUND:
-                        case ERROR_CODE_OFFLINE:
-                        case ERROR_CODE_TIMEOUT:
-                            PlayMsgErrorCallback errorCallback = callbacksForError.get(key);
-                            if (errorCallback != null) {
-                                callbacks.remove(key);
-                                errorCallback.handler(content);
-                            }
-                            break;
-                        default:
-                            break;
+                                break;
+                            case WvpRedisMsgCmd.REQUEST_PUSH_STREAM:
+                                WVPResult wvpResult  = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), WVPResult.class);
+                                String serial = wvpRedisMsg.getSerial();
+                                switch (wvpResult.getCode()) {
+                                    case 0:
+                                        JSONObject jsonObject = (JSONObject)wvpResult.getData();
+                                        PlayMsgCallbackForStartSendRtpStream playMsgCallback = callbacksForStartSendRtpStream.get(serial);
+                                        if (playMsgCallback != null) {
+                                            callbacksForError.remove(serial);
+                                            playMsgCallback.handler(jsonObject);
+                                        }
+                                        break;
+                                    case ERROR_CODE_MEDIA_SERVER_NOT_FOUND:
+                                    case ERROR_CODE_OFFLINE:
+                                    case ERROR_CODE_TIMEOUT:
+                                        PlayMsgErrorCallback errorCallback = callbacksForError.get(serial);
+                                        if (errorCallback != null) {
+                                            callbacks.remove(serial);
+                                            errorCallback.handler(wvpResult);
+                                        }
+                                        break;
+                                    default:
+                                        break;
+                                }
+                                break;
+                            default:
+                                break;
+                        }
                     }
-                    break;
-                case WvpRedisMsgCmd.REQUEST_PUSH_STREAM:
-                    WVPResult wvpResult  = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), WVPResult.class);
-                    String serial = wvpRedisMsg.getSerial();
-                    switch (wvpResult.getCode()) {
-                        case 0:
-                            JSONObject jsonObject = (JSONObject)wvpResult.getData();
-                            PlayMsgCallbackForStartSendRtpStream playMsgCallback = callbacksForStartSendRtpStream.get(serial);
-                            if (playMsgCallback != null) {
-                                callbacksForError.remove(serial);
-                                playMsgCallback.handler(jsonObject);
-                            }
-                            break;
-                        case ERROR_CODE_MEDIA_SERVER_NOT_FOUND:
-                        case ERROR_CODE_OFFLINE:
-                        case ERROR_CODE_TIMEOUT:
-                            PlayMsgErrorCallback errorCallback = callbacksForError.get(serial);
-                            if (errorCallback != null) {
-                                callbacks.remove(serial);
-                                errorCallback.handler(wvpResult);
-                            }
-                            break;
-                        default:
-                            break;
-                    }
-                    break;
-                default:
-                    break;
-            }
+                }
+                taskQueueHandlerRun = false;
+            });
         }
 
 
 
 
+
+
     }
 
     /**
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGpsMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java
similarity index 95%
rename from src/main/java/com/genersoft/iot/vmp/service/impl/RedisGpsMsgListener.java
rename to src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java
index 4e94d68..bb2f4ad 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGpsMsgListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java
@@ -1,7 +1,6 @@
-package com.genersoft.iot.vmp.service.impl;
+package com.genersoft.iot.vmp.service.redisMsg;
 
 import com.alibaba.fastjson.JSON;
-import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData;
 import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java
new file mode 100644
index 0000000..13e6874
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java
@@ -0,0 +1,75 @@
+package com.genersoft.iot.vmp.service.redisMsg;
+
+import com.alibaba.fastjson.JSON;
+import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
+import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.stereotype.Component;
+import org.springframework.util.ObjectUtils;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * 鎺ユ敹redis杩斿洖鐨勬帹娴佺粨鏋�
+ * @author lin
+ */
+@Component
+public class RedisPushStreamResponseListener implements MessageListener {
+
+    private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamResponseListener.class);
+
+    private boolean taskQueueHandlerRun = false;
+
+    private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
+
+    @Qualifier("taskExecutor")
+    @Autowired
+    private ThreadPoolTaskExecutor taskExecutor;
+
+
+    private Map<String, PushStreamResponseEvent> responseEvents = new ConcurrentHashMap<>();
+
+    public interface PushStreamResponseEvent{
+        void run(MessageForPushChannelResponse response);
+    }
+
+    @Override
+    public void onMessage(Message message, byte[] bytes) {
+        logger.warn("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉]锛� {}", new String(message.getBody()));
+        taskQueue.offer(message);
+        if (!taskQueueHandlerRun) {
+            taskQueueHandlerRun = true;
+            taskExecutor.execute(() -> {
+                while (!taskQueue.isEmpty()) {
+                    Message msg = taskQueue.poll();
+                    MessageForPushChannelResponse response = JSON.parseObject(new String(msg.getBody()), MessageForPushChannelResponse.class);
+                    if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){
+                        logger.info("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉]锛氬弬鏁颁笉鍏�");
+                        return;
+                    }
+                    // 鏌ョ湅姝e湪绛夊緟鐨刬nvite娑堟伅
+                    if (responseEvents.get(response.getApp() + response.getStream()) != null) {
+                        responseEvents.get(response.getApp() + response.getStream()).run(response);
+                    }
+                }
+                taskQueueHandlerRun = false;
+            });
+        }
+    }
+
+    public void addEvent(String app, String stream, PushStreamResponseEvent callback) {
+        responseEvents.put(app + stream, callback);
+    }
+
+    public void removeEvent(String app, String stream) {
+        responseEvents.remove(app + stream);
+    }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java
new file mode 100644
index 0000000..3925836
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java
@@ -0,0 +1,103 @@
+package com.genersoft.iot.vmp.service.redisMsg;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
+import com.genersoft.iot.vmp.service.IGbStreamService;
+import com.genersoft.iot.vmp.service.IMediaServerService;
+import com.genersoft.iot.vmp.service.IStreamPushService;
+import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
+import com.genersoft.iot.vmp.utils.DateUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+import java.util.*;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * @Auther: JiangFeng
+ * @Date: 2022/8/16 11:32
+ * @Description: 鎺ユ敹redis鍙戦�佺殑鎺ㄦ祦璁惧鍒楄〃鏇存柊閫氱煡
+ */
+@Component
+public class RedisPushStreamStatusListMsgListener implements MessageListener {
+
+    private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamStatusListMsgListener.class);
+    @Resource
+    private IMediaServerService mediaServerService;
+
+    @Resource
+    private IStreamPushService streamPushService;
+    @Resource
+    private IGbStreamService gbStreamService;
+
+    private boolean taskQueueHandlerRun = false;
+
+    private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
+
+    @Qualifier("taskExecutor")
+    @Autowired
+    private ThreadPoolTaskExecutor taskExecutor;
+
+    @Override
+    public void onMessage(Message message, byte[] bytes) {
+        logger.info("[REDIS娑堟伅-鎺ㄦ祦璁惧鍒楄〃鏇存柊]锛� {}", new String(message.getBody()));
+
+        taskQueue.offer(message);
+        if (!taskQueueHandlerRun) {
+            taskQueueHandlerRun = true;
+            taskExecutor.execute(() -> {
+                while (!taskQueue.isEmpty()) {
+                    Message msg = taskQueue.poll();
+                    List<StreamPushItem> streamPushItems = JSON.parseArray(new String(msg.getBody()), StreamPushItem.class);
+                    //鏌ヨ鍏ㄩ儴鐨刟pp+stream 鐢ㄤ簬鍒ゆ柇鏄坊鍔犺繕鏄慨鏀�
+                    List<String> allAppAndStream = streamPushService.getAllAppAndStream();
+
+                    /**
+                     * 鐢ㄤ簬瀛樺偍鏇村叿APP+Stream杩囨护鍚庣殑鏁版嵁锛屽彲浠ョ洿鎺ュ瓨鍏tream_push琛ㄤ笌gb_stream琛�
+                     */
+                    List<StreamPushItem> streamPushItemForSave = new ArrayList<>();
+                    List<StreamPushItem> streamPushItemForUpdate = new ArrayList<>();
+                    for (StreamPushItem streamPushItem : streamPushItems) {
+                        String app = streamPushItem.getApp();
+                        String stream = streamPushItem.getStream();
+                        boolean contains = allAppAndStream.contains(app + stream);
+                        //涓嶅瓨鍦ㄥ氨娣诲姞
+                        if (!contains) {
+                            streamPushItem.setStreamType("push");
+                            streamPushItem.setCreateTime(DateUtil.getNow());
+                            streamPushItem.setMediaServerId(mediaServerService.getDefaultMediaServer().getId());
+                            streamPushItem.setOriginType(2);
+                            streamPushItem.setOriginTypeStr("rtsp_push");
+                            streamPushItem.setTotalReaderCount("0");
+                            streamPushItemForSave.add(streamPushItem);
+                        } else {
+                            //瀛樺湪灏卞彧淇敼 name鍜実bId
+                            streamPushItemForUpdate.add(streamPushItem);
+                        }
+                    }
+                    if (streamPushItemForSave.size() > 0) {
+
+                        logger.info("娣诲姞{}鏉�",streamPushItemForSave.size());
+                        logger.info(JSONObject.toJSONString(streamPushItemForSave));
+                        streamPushService.batchAdd(streamPushItemForSave);
+
+                    }
+                    if(streamPushItemForUpdate.size()>0){
+                        logger.info("淇敼{}鏉�",streamPushItemForUpdate.size());
+                        logger.info(JSONObject.toJSONString(streamPushItemForUpdate));
+                        gbStreamService.updateGbIdOrName(streamPushItemForUpdate);
+                    }
+                }
+                taskQueueHandlerRun = false;
+            });
+        }
+    }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusMsgListener.java
similarity index 82%
rename from src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusMsgListener.java
rename to src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusMsgListener.java
index 50e894a..4b1c2d5 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusMsgListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusMsgListener.java
@@ -1,24 +1,11 @@
-package com.genersoft.iot.vmp.service.impl;
+package com.genersoft.iot.vmp.service.redisMsg;
 
 import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONObject;
 import com.genersoft.iot.vmp.common.VideoManagerConstants;
 import com.genersoft.iot.vmp.conf.DynamicTask;
-import com.genersoft.iot.vmp.conf.UserSetting;
-import com.genersoft.iot.vmp.gb28181.bean.GbStream;
-import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
-import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
-import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
-import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
-import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
-import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
-import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
 import com.genersoft.iot.vmp.service.IStreamPushService;
-import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
 import com.genersoft.iot.vmp.service.bean.PushStreamStatusChangeFromRedisDto;
-import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
-import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -30,8 +17,6 @@
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Component;
 
-import java.util.ArrayList;
-import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 
@@ -65,7 +50,6 @@
 
     @Override
     public void onMessage(Message message, byte[] bytes) {
-        // TODO 澧炲姞闃熷垪
         logger.warn("[REDIS娑堟伅-鎺ㄦ祦璁惧鐘舵�佸彉鍖朷锛� {}", new String(message.getBody()));
         taskQueue.offer(message);
 
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java
new file mode 100644
index 0000000..1897b6f
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java
@@ -0,0 +1,91 @@
+package com.genersoft.iot.vmp.service.redisMsg;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.genersoft.iot.vmp.conf.UserSetting;
+
+import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
+import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.stereotype.Component;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+
+/**
+ * 鎺ユ敹鍏朵粬wvp鍙戦�佹祦鍙樺寲閫氱煡
+ * @author lin
+ */
+@Component
+public class RedisStreamMsgListener implements MessageListener {
+
+    private final static Logger logger = LoggerFactory.getLogger(RedisStreamMsgListener.class);
+
+    @Autowired
+    private UserSetting userSetting;
+
+    @Autowired
+    private ZLMMediaListManager zlmMediaListManager;
+
+    private boolean taskQueueHandlerRun = false;
+
+    private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
+
+    @Qualifier("taskExecutor")
+    @Autowired
+    private ThreadPoolTaskExecutor taskExecutor;
+
+    @Override
+    public void onMessage(Message message, byte[] bytes) {
+
+        taskQueue.offer(message);
+        if (!taskQueueHandlerRun) {
+            taskQueueHandlerRun = true;
+            taskExecutor.execute(() -> {
+                while (!taskQueue.isEmpty()) {
+                    Message msg = taskQueue.poll();
+                    JSONObject steamMsgJson = JSON.parseObject(msg.getBody(), JSONObject.class);
+                    if (steamMsgJson == null) {
+                        logger.warn("[鏀跺埌redis 娴佸彉鍖朷娑堟伅瑙f瀽澶辫触");
+                        return;
+                    }
+                    String serverId = steamMsgJson.getString("serverId");
+
+                    if (userSetting.getServerId().equals(serverId)) {
+                        // 鑷繁鍙戦�佺殑娑堟伅蹇界暐鍗冲彲
+                        return;
+                    }
+                    logger.info("[鏀跺埌redis 娴佸彉鍖朷锛� {}", new String(message.getBody()));
+                    String app = steamMsgJson.getString("app");
+                    String stream = steamMsgJson.getString("stream");
+                    boolean register = steamMsgJson.getBoolean("register");
+                    String mediaServerId = steamMsgJson.getString("mediaServerId");
+                    MediaItem mediaItem = new MediaItem();
+                    mediaItem.setSeverId(serverId);
+                    mediaItem.setApp(app);
+                    mediaItem.setStream(stream);
+                    mediaItem.setRegist(register);
+                    mediaItem.setMediaServerId(mediaServerId);
+                    mediaItem.setCreateStamp(System.currentTimeMillis()/1000);
+                    mediaItem.setAliveSecond(0L);
+                    mediaItem.setTotalReaderCount("0");
+                    mediaItem.setOriginType(0);
+                    mediaItem.setOriginTypeStr("0");
+                    mediaItem.setOriginTypeStr("unknown");
+                    if (register) {
+                        zlmMediaListManager.addPush(mediaItem);
+                    }else {
+                        zlmMediaListManager.removeMedia(app, stream);
+                    }
+                }
+                taskQueueHandlerRun = false;
+            });
+        }
+    }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java
index 5e74d99..8a5d455 100644
--- a/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java
@@ -75,18 +75,23 @@
             "WHERE gs.gbId = '${gbId}' AND pgs.platformId = '${platformId}'")
     GbStream queryStreamInPlatform(String platformId, String gbId);
 
-    @Select("select gt.gbId as channelId, gt.name, 'wvp-pro' as manufacture,  st.status, gt.longitude, gt.latitude, pc.id as parentId," +
+    @Select("<script> "+
+            "select gt.gbId as channelId, gt.name, 'wvp-pro' as manufacture,  st.status, gt.longitude, gt.latitude, pc.id as parentId," +
             "       '1' as registerWay, pc.civilCode, 'live' as model, 'wvp-pro' as owner, '0' as parental,'0' as secrecy" +
             " from gb_stream gt " +
             " left join (" +
-            "    select sp.status, sp.app, sp.stream from stream_push sp" +
+            "    select " +
+            " <if test='usPushingAsStatus != true'> sp.status as status, </if>" +
+            " <if test='usPushingAsStatus == true'> sp.pushIng as status, </if>" +
+            "sp.app, sp.stream from stream_push sp" +
             "    union all" +
             "    select spxy.status, spxy.app, spxy.stream from stream_proxy spxy" +
             " ) st on st.app = gt.app and st.stream = gt.stream" +
             " left join platform_gb_stream pgs on  gt.gbStreamId = pgs.gbStreamId" +
             " left join platform_catalog pc on pgs.catalogId = pc.id and pgs.platformId = pc.platformId" +
-            " where pgs.platformId=#{platformId}")
-    List<DeviceChannel> queryGbStreamListInPlatform(String platformId);
+            " where pgs.platformId=#{platformId}" +
+            "</script>")
+    List<DeviceChannel> queryGbStreamListInPlatform(String platformId, boolean usPushingAsStatus);
 
 
     @Select("SELECT gs.* FROM gb_stream gs LEFT JOIN platform_gb_stream pgs " +
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java
index 702b5be..f8a74fe 100644
--- a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java
@@ -2,6 +2,7 @@
 
 import com.genersoft.iot.vmp.common.StreamInfo;
 import com.genersoft.iot.vmp.conf.SipConfig;
+import com.genersoft.iot.vmp.conf.UserSetting;
 import com.genersoft.iot.vmp.gb28181.bean.*;
 import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
 import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
@@ -82,6 +83,9 @@
 
 	@Autowired
     private GbStreamMapper gbStreamMapper;
+
+	@Autowired
+    private UserSetting userSetting;
 
 	@Autowired
     private PlatformCatalogMapper catalogMapper;
@@ -614,7 +618,7 @@
 	 */
 	@Override
 	public List<DeviceChannel> queryGbStreamListInPlatform(String platformId) {
-		return gbStreamMapper.queryGbStreamListInPlatform(platformId);
+		return gbStreamMapper.queryGbStreamListInPlatform(platformId, userSetting.isUsePushingAsStatus());
 	}
 
 	/**
diff --git a/src/main/resources/all-application.yml b/src/main/resources/all-application.yml
index 3569de2..ed025b9 100644
--- a/src/main/resources/all-application.yml
+++ b/src/main/resources/all-application.yml
@@ -188,6 +188,8 @@
     record-sip: true
     # 鏄惁灏嗘棩蹇楀瓨鍌ㄨ繘鏁版嵁搴�
     logInDatebase: true
+    # 浣跨敤鎺ㄦ祦鐘舵�佷綔涓烘帹娴侀�氶亾鐘舵��
+    use-pushing-as-status: true
 
 # 鍏抽棴鍦ㄧ嚎鏂囨。锛堢敓浜х幆澧冨缓璁叧闂級
 springdoc:

--
Gitblit v1.8.0