648540858
2022-11-29 ef6693aabbbf12e83d09ad8749f6e60faacc012d
Merge branch 'wvp-28181-2.0' into Zafu-Dev-1127
17个文件已修改
331 ■■■■■ 已修改文件
sql/mysql.sql 33 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/auth/DigestServerAuthenticationHelper.java 72 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java 11 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java 9 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java 31 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java 32 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java 17 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java 21 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java 11 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java 14 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java 15 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusMsgListener.java 11 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java 12 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/resources/all-application.yml 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
sql/mysql.sql
@@ -1,4 +1,4 @@
-- MySQL dump 10.13  Distrib 8.0.30, for Linux (x86_64)
-- MySQL dump 10.13  Distrib 8.0.31, for Linux (x86_64)
--
-- Host: 127.0.0.1    Database: wvp
-- ------------------------------------------------------
@@ -34,13 +34,13 @@
                          `online` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
                          `registerTime` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
                          `keepaliveTime` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
                          `ip` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
                          `ip` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
                          `createTime` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
                          `updateTime` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
                          `port` int DEFAULT NULL,
                          `expires` int DEFAULT NULL,
                          `subscribeCycleForCatalog` int DEFAULT NULL,
                          `hostAddress` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
                          `hostAddress` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
                          `charset` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
                          `subscribeCycleForMobilePosition` int DEFAULT NULL,
                          `mobilePositionSubmissionInterval` int DEFAULT '5',
@@ -48,12 +48,13 @@
                          `ssrcCheck` int DEFAULT '0',
                          `geoCoordSys` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
                          `treeType` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
                          `mediaServerId` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT 'auto',
                          `custom_name` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL,
                          `password` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL,
                          `custom_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
                          `password` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
                          `sdpIp` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
                          `localIp` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
                          PRIMARY KEY (`id`),
                          UNIQUE KEY `device_deviceId_uindex` (`deviceId`)
) ENGINE=InnoDB AUTO_INCREMENT=47 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
) ENGINE=InnoDB AUTO_INCREMENT=57 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
/*!40101 SET character_set_client = @saved_cs_client */;
--
@@ -145,7 +146,7 @@
                                  PRIMARY KEY (`id`),
                                  UNIQUE KEY `device_channel_id_uindex` (`id`),
                                  UNIQUE KEY `device_channel_pk` (`channelId`,`deviceId`)
) ENGINE=InnoDB AUTO_INCREMENT=60301 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
) ENGINE=InnoDB AUTO_INCREMENT=74416 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
/*!40101 SET character_set_client = @saved_cs_client */;
--
@@ -215,7 +216,7 @@
                             PRIMARY KEY (`gbStreamId`) USING BTREE,
                             UNIQUE KEY `app` (`app`,`stream`) USING BTREE,
                             UNIQUE KEY `gbId` (`gbId`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=301059 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC;
) ENGINE=InnoDB AUTO_INCREMENT=331060 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC;
/*!40101 SET character_set_client = @saved_cs_client */;
--
@@ -245,7 +246,7 @@
                       `username` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
                       `createTime` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
                       PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=733627 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC;
) ENGINE=InnoDB AUTO_INCREMENT=760908 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC;
/*!40101 SET character_set_client = @saved_cs_client */;
--
@@ -337,7 +338,7 @@
                                   PRIMARY KEY (`id`),
                                   UNIQUE KEY `parent_platform_id_uindex` (`id`),
                                   UNIQUE KEY `parent_platform_pk` (`serverGBId`)
) ENGINE=InnoDB AUTO_INCREMENT=44 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC;
) ENGINE=InnoDB AUTO_INCREMENT=47 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC;
/*!40101 SET character_set_client = @saved_cs_client */;
--
@@ -389,7 +390,7 @@
                                       `catalogId` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
                                       `deviceChannelId` int NOT NULL,
                                       PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=102 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
) ENGINE=InnoDB AUTO_INCREMENT=3146 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
/*!40101 SET character_set_client = @saved_cs_client */;
--
@@ -415,7 +416,7 @@
                                      `id` int NOT NULL AUTO_INCREMENT,
                                      PRIMARY KEY (`id`),
                                      UNIQUE KEY `platform_gb_stream_pk` (`platformId`,`catalogId`,`gbStreamId`)
) ENGINE=InnoDB AUTO_INCREMENT=301766 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC;
) ENGINE=InnoDB AUTO_INCREMENT=391772 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC;
/*!40101 SET character_set_client = @saved_cs_client */;
--
@@ -457,7 +458,7 @@
                                `enable_disable_none_reader` bit(1) DEFAULT NULL,
                                PRIMARY KEY (`id`),
                                UNIQUE KEY `stream_proxy_pk` (`app`,`stream`)
) ENGINE=InnoDB AUTO_INCREMENT=548 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
) ENGINE=InnoDB AUTO_INCREMENT=568 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
/*!40101 SET character_set_client = @saved_cs_client */;
--
@@ -494,7 +495,7 @@
                               `self` int DEFAULT NULL,
                               PRIMARY KEY (`id`),
                               UNIQUE KEY `stream_push_pk` (`app`,`stream`)
) ENGINE=InnoDB AUTO_INCREMENT=310558 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
) ENGINE=InnoDB AUTO_INCREMENT=361492 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
/*!40101 SET character_set_client = @saved_cs_client */;
--
@@ -572,4 +573,4 @@
/*!40101 SET COLLATION_CONNECTION=@OLD_COLLATION_CONNECTION */;
/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;
-- Dump completed on 2022-10-18 17:00:02
-- Dump completed on 2022-11-29 11:47:46
src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java
@@ -29,8 +29,6 @@
    @Autowired
    private ISIPProcessorObserver sipProcessorObserver;
    private final Map<String, SipProviderImpl> tcpSipProviderMap = new ConcurrentHashMap<>();
    private final Map<String, SipProviderImpl> udpSipProviderMap = new ConcurrentHashMap<>();
src/main/java/com/genersoft/iot/vmp/gb28181/auth/DigestServerAuthenticationHelper.java
@@ -25,10 +25,9 @@
 */
package com.genersoft.iot.vmp.gb28181.auth;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.time.Instant;
import java.util.Random;
import gov.nist.core.InternalErrorHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.sip.address.URI;
import javax.sip.header.AuthorizationHeader;
@@ -36,10 +35,10 @@
import javax.sip.header.WWWAuthenticateHeader;
import javax.sip.message.Request;
import javax.sip.message.Response;
import gov.nist.core.InternalErrorHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.time.Instant;
import java.util.Random;
/**
 * Implements the HTTP digest authentication method server side functionality.
@@ -201,12 +200,13 @@
        // String ncStr = new DecimalFormat("00000000").format(Integer.parseInt(nc + "", 16));
        String A1 = username + ":" + realm + ":" + pass;
        String A2 = request.getMethod().toUpperCase() + ":" + uri.toString();
        byte mdbytes[] = messageDigest.digest(A1.getBytes());
        String HA1 = toHexString(mdbytes);
        logger.debug("A1: " + A1);
        logger.debug("A2: " + A2);
        mdbytes = messageDigest.digest(A2.getBytes());
        String HA2 = toHexString(mdbytes);
        logger.debug("HA1: " + HA1);
@@ -238,58 +238,4 @@
    }
//     public static void main(String[] args) throws NoSuchAlgorithmException {
//         String realm = "3402000000";
//         String username = "44010000001180008012";
//         String nonce = "07cab60999fbf643264ace27d3b7de8b";
//         String uri = "sip:34020000002000000001@3402000000";
//         // qop 保护质量 包含auth(默认的)和auth-int(增加了报文完整性检测)两种策略
//         String qop = "auth";
//         // 客户端随机数,这是一个不透明的字符串值,由客户端提供,并且客户端和服务器都会使用,以避免用明文文本。
//         // 这使得双方都可以查验对方的身份,并对消息的完整性提供一些保护
//         //String cNonce = authHeader.getCNonce();
//         // nonce计数器,是一个16进制的数值,表示同一nonce下客户端发送出请求的数量
//         int nc = 1;
//         String ncStr = new DecimalFormat("00000000").format(nc);
// //        String ncStr = new DecimalFormat("00000000").format(Integer.parseInt(nc + "", 16));
//         MessageDigest messageDigest = MessageDigest.getInstance(DEFAULT_ALGORITHM);
//         String A1 = username + ":" + realm + ":" + "12345678";
//         String A2 = "REGISTER" + ":" + uri;
//         byte mdbytes[] = messageDigest.digest(A1.getBytes());
//         String HA1 = toHexString(mdbytes);
//         System.out.println("A1: " + A1);
//         System.out.println("A2: " + A2);
//         mdbytes = messageDigest.digest(A2.getBytes());
//         String HA2 = toHexString(mdbytes);
//         System.out.println("HA1: " + HA1);
//         System.out.println("HA2: " + HA2);
//         String cnonce = "0a4f113b";
//         System.out.println("nonce: " + nonce);
//         System.out.println("nc: " + ncStr);
//         System.out.println("cnonce: " + cnonce);
//         System.out.println("qop: " + qop);
//         String KD = HA1 + ":" + nonce;
//         if (qop != null && qop.equals("auth") ) {
//             if (nc != -1) {
//                 KD += ":" + ncStr;
//             }
//             if (cnonce != null) {
//                 KD += ":" + cnonce;
//             }
//             KD += ":" + qop;
//         }
//         KD += ":" + HA2;
//         System.out.println("KD: " + KD);
//         mdbytes = messageDigest.digest(KD.getBytes());
//         String mdString = toHexString(mdbytes);
//         System.out.println("mdString: " + mdString);
//         String response = "4f0507d4b87cdecff04bdaf4c96348f0";
//         System.out.println("response: " + response);
//     }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
@@ -1,7 +1,6 @@
package com.genersoft.iot.vmp.gb28181.transmit.cmd.impl;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.gb28181.SipLayer;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
@@ -9,13 +8,13 @@
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderPlarformProvider;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.storager.dao.dto.PlatformRegisterInfo;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.dao.dto.PlatformRegisterInfo;
import com.genersoft.iot.vmp.utils.DateUtil;
import gov.nist.javax.sip.message.MessageFactoryImpl;
import gov.nist.javax.sip.message.SIPRequest;
import org.slf4j.Logger;
@@ -26,8 +25,10 @@
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import javax.sip.*;
import javax.sip.header.*;
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import javax.sip.header.CallIdHeader;
import javax.sip.header.WWWAuthenticateHeader;
import javax.sip.message.Request;
import java.text.ParseException;
import java.util.ArrayList;
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
@@ -11,7 +11,6 @@
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.Coordtransform;
import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
@@ -31,7 +30,6 @@
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
@@ -77,8 +75,6 @@
    @Autowired
    private IDeviceChannelService deviceChannelService;
    private boolean taskQueueHandlerRun = false;
    private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
    @Qualifier("taskExecutor")
@@ -98,9 +94,9 @@
        }catch (SipException | InvalidArgumentException | ParseException e) {
            e.printStackTrace();
        }
        boolean runed = !taskQueue.isEmpty();
        taskQueue.offer(new HandlerCatchData(evt, null, null));
        if (!taskQueueHandlerRun) {
            taskQueueHandlerRun = true;
        if (!runed) {
            taskExecutor.execute(()-> {
                while (!taskQueue.isEmpty()) {
                    try {
@@ -128,7 +124,6 @@
                        logger.error("处理NOTIFY消息时错误", e);
                    }
                }
                taskQueueHandlerRun = false;
            });
        }
    }
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java
@@ -9,7 +9,6 @@
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.NotifyMessageHandler;
import com.genersoft.iot.vmp.gb28181.utils.Coordtransform;
import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
import com.genersoft.iot.vmp.service.IDeviceAlarmService;
@@ -27,17 +26,15 @@
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.message.Response;
import java.text.ParseException;
import java.util.concurrent.ConcurrentLinkedQueue;
import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.*;
import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText;
/**
 * 报警事件的处理,参考:9.4
@@ -72,8 +69,6 @@
    @Autowired
    private IDeviceChannelService deviceChannelService;
    private boolean taskQueueHandlerRun = false;
    private ConcurrentLinkedQueue<SipMsgInfo> taskQueue = new ConcurrentLinkedQueue<>();
    @Qualifier("taskExecutor")
@@ -89,20 +84,20 @@
    @Override
    public void handForDevice(RequestEvent evt, Device device, Element rootElement) {
        logger.info("[收到报警通知]设备:{}", device.getDeviceId());
        boolean isEmpty = taskQueue.isEmpty();
        taskQueue.offer(new SipMsgInfo(evt, device, rootElement));
        if (!taskQueueHandlerRun) {
            taskQueueHandlerRun = true;
        // 回复200 OK
        try {
            responseAck((SIPRequest) evt.getRequest(), Response.OK);
        } catch (SipException | InvalidArgumentException | ParseException e) {
            logger.error("[命令发送失败] 报警通知回复: {}", e.getMessage());
        }
        if (isEmpty) {
            taskExecutor.execute(() -> {
                logger.info("[处理报警通知]待处理数量:{}", taskQueue.size() );
                while (!taskQueue.isEmpty()) {
                    SipMsgInfo sipMsgInfo = taskQueue.poll();
                    // 回复200 OK
                    try {
                        responseAck((SIPRequest) sipMsgInfo.getEvt().getRequest(), Response.OK);
                    } catch (SipException | InvalidArgumentException | ParseException e) {
                        logger.error("[处理报警通知], 回复200OK失败", e);
                    }
                        SipMsgInfo sipMsgInfo = taskQueue.poll();
                    Element deviceIdElement = sipMsgInfo.getRootElement().element("DeviceID");
                    String channelId = deviceIdElement.getText().toString();
@@ -205,12 +200,12 @@
                    if (redisCatchStorage.deviceIsOnline(sipMsgInfo.getDevice().getDeviceId())) {
                        publisher.deviceAlarmEventPublish(deviceAlarm);
                    }
                    }catch (Exception e) {
                        logger.warn("[收到报警通知] 发现未处理的异常, {}\r\n{}",e.getMessage(), evt.getRequest());
                }
                taskQueueHandlerRun = false;
                }
            });
        }
    }
    @Override
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java
@@ -22,7 +22,6 @@
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
@@ -57,8 +56,6 @@
    @Autowired
    private IDeviceChannelService deviceChannelService;
    private boolean taskQueueHandlerRun = false;
    private ConcurrentLinkedQueue<SipMsgInfo> taskQueue = new ConcurrentLinkedQueue<>();
    @Qualifier("taskExecutor")
@@ -73,21 +70,22 @@
    @Override
    public void handForDevice(RequestEvent evt, Device device, Element rootElement) {
        boolean isEmpty = taskQueue.isEmpty();
        taskQueue.offer(new SipMsgInfo(evt, device, rootElement));
        if (!taskQueueHandlerRun) {
            taskQueueHandlerRun = true;
        // 回复200 OK
        try {
            responseAck((SIPRequest) evt.getRequest(), Response.OK);
        } catch (SipException | InvalidArgumentException | ParseException e) {
            logger.error("[命令发送失败] 移动位置通知回复: {}", e.getMessage());
        }
        if (isEmpty) {
            taskExecutor.execute(() -> {
                while (!taskQueue.isEmpty()) {
                    SipMsgInfo sipMsgInfo = taskQueue.poll();
                    try {
                        Element rootElementAfterCharset = getRootElement(sipMsgInfo.getEvt(), sipMsgInfo.getDevice().getCharset());
                        if (rootElementAfterCharset == null) {
                            try {
                                logger.warn("[ 移动设备位置数据通知 ] content cannot be null, {}", sipMsgInfo.getEvt().getRequest());
                                responseAck((SIPRequest) sipMsgInfo.getEvt().getRequest(), Response.BAD_REQUEST);
                            } catch (SipException | InvalidArgumentException | ParseException e) {
                                logger.error("[命令发送失败] 移动设备位置数据通知 内容为空: {}", e.getMessage());
                            }
                            logger.warn("[移动位置通知] {}处理失败,未识别到信息体", device.getDeviceId());
                            continue;
                        }
                        MobilePosition mobilePosition = new MobilePosition();
@@ -137,12 +135,6 @@
                            storager.insertMobilePosition(mobilePosition);
                        }
                        storager.updateChannelPosition(deviceChannel);
                        //回复 200 OK
                        try {
                            responseAck((SIPRequest) sipMsgInfo.getEvt().getRequest(), Response.OK);
                        } catch (SipException | InvalidArgumentException | ParseException e) {
                            logger.error("[命令发送失败] 移动设备位置数据回复200: {}", e.getMessage());
                        }
                        // 发送redis消息。 通知位置信息的变化
                        JSONObject jsonObject = new JSONObject();
@@ -158,14 +150,12 @@
                    } catch (DocumentException e) {
                        e.printStackTrace();
                    } catch (Exception e) {
                        logger.warn("[移动位置通知] 发现未处理的异常, {}\r\n{}",e.getMessage(), evt.getRequest());
                    }
                }
                taskQueueHandlerRun = false;
            });
        }
    }
    @Override
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java
@@ -1,18 +1,11 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.session.CatalogDataCatch;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler;
import com.genersoft.iot.vmp.gb28181.utils.Coordtransform;
import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import gov.nist.javax.sip.message.SIPRequest;
import org.dom4j.DocumentException;
@@ -24,7 +17,6 @@
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
@@ -45,12 +37,10 @@
    private Logger logger = LoggerFactory.getLogger(CatalogResponseMessageHandler.class);
    private final String cmdType = "Catalog";
    private boolean taskQueueHandlerRun = false;
    @Autowired
    private ResponseMessageHandler responseMessageHandler;
    private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
    private final ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
    @Autowired
    private IVideoManagerStorage storager;
@@ -69,6 +59,7 @@
    @Override
    public void handForDevice(RequestEvent evt, Device device, Element element) {
        boolean isEmpty = taskQueue.isEmpty();
        taskQueue.offer(new HandlerCatchData(evt, device, element));
        // 回复200 OK
        try {
@@ -76,10 +67,12 @@
        } catch (SipException | InvalidArgumentException | ParseException e) {
            logger.error("[命令发送失败] 目录查询回复: {}", e.getMessage());
        }
        if (!taskQueueHandlerRun) {
            taskQueueHandlerRun = true;
        // 如果不为空则说明已经开启消息处理
        if (isEmpty) {
            taskExecutor.execute(() -> {
                while (!taskQueue.isEmpty()) {
                    // 全局异常捕获,保证下一条可以得到处理
                    try {
                    HandlerCatchData take = taskQueue.poll();
                    Element rootElement = null;
                    try {
@@ -135,8 +128,10 @@
                        }
                    }
                    }catch (Exception e) {
                        logger.warn("[收到通道] 发现未处理的异常, {}\r\n{}",e.getMessage(), evt.getRequest());
                }
                taskQueueHandlerRun = false;
                }
            });
        }
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java
@@ -9,7 +9,6 @@
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import gov.nist.javax.sip.message.SIPRequest;
import org.dom4j.DocumentException;
import org.dom4j.Element;
@@ -21,17 +20,17 @@
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.message.Response;
import java.text.ParseException;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText;
@@ -46,7 +45,6 @@
    private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
    private boolean taskQueueHandlerRun = false;
    @Autowired
    private ResponseMessageHandler responseMessageHandler;
@@ -70,6 +68,7 @@
    @Override
    public void handForDevice(RequestEvent evt, Device device, Element rootElement) {
        boolean isEmpty = taskQueue.isEmpty();
        try {
            // 回复200 OK
             responseAck((SIPRequest) evt.getRequest(), Response.OK);
@@ -77,8 +76,7 @@
            logger.error("[命令发送失败] 国标级联 国标录像: {}", e.getMessage());
        }
        taskQueue.offer(new HandlerCatchData(evt, device, rootElement));
        if (!taskQueueHandlerRun) {
            taskQueueHandlerRun = true;
        if (isEmpty) {
            taskExecutor.execute(()->{
                while (!taskQueue.isEmpty()) {
                    try {
@@ -151,9 +149,10 @@
                        }
                    } catch (DocumentException e) {
                        logger.error("xml解析异常: ", e);
                    } catch (Exception e) {
                        logger.warn("[国标录像] 发现未处理的异常, {}\r\n{}",e.getMessage(), evt.getRequest());
                    }
                }
                taskQueueHandlerRun = false;
            });
        }
    }
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java
@@ -38,8 +38,6 @@
    @Autowired
    private IVideoManagerStorage storage;
    private boolean taskQueueHandlerRun = false;
    private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
    @Qualifier("taskExecutor")
@@ -49,15 +47,14 @@
    @Override
    public void onMessage(@NotNull Message message, byte[] bytes) {
        logger.info("收到来自REDIS的ALARM通知: {}", new String(message.getBody()));
        boolean isEmpty = taskQueue.isEmpty();
        taskQueue.offer(message);
        if (!taskQueueHandlerRun) {
            taskQueueHandlerRun = true;
        if (isEmpty) {
            logger.info("[线程池信息]活动线程数:{}, 最大线程数: {}", taskExecutor.getActiveCount(), taskExecutor.getMaxPoolSize());
            taskExecutor.execute(() -> {
                while (!taskQueue.isEmpty()) {
                    Message msg = taskQueue.poll();
                    try {
                    AlarmChannelMessage alarmChannelMessage = JSON.parseObject(msg.getBody(), AlarmChannelMessage.class);
                    if (alarmChannelMessage == null) {
                        logger.warn("[REDIS的ALARM通知]消息解析失败");
@@ -107,11 +104,11 @@
                            logger.warn("无法确定" + gbId + "是平台还是设备");
                        }
                    }
                    }catch (Exception e) {
                        logger.warn("[REDIS的ALARM通知] 发现未处理的异常, {}",e.getMessage());
                }
                taskQueueHandlerRun = false;
                }
            });
        }
    }
}
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
@@ -88,8 +88,6 @@
    @Autowired
    private ZlmHttpHookSubscribe subscribe;
    private boolean taskQueueHandlerRun = false;
    private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
    @Qualifier("taskExecutor")
@@ -111,13 +109,13 @@
    @Override
    public void onMessage(Message message, byte[] bytes) {
        boolean isEmpty = taskQueue.isEmpty();
        taskQueue.offer(message);
        if (!taskQueueHandlerRun) {
            taskQueueHandlerRun = true;
        if (isEmpty) {
            taskExecutor.execute(() -> {
                while (!taskQueue.isEmpty()) {
                    Message msg = taskQueue.poll();
                    try {
                    JSONObject msgJSON = JSON.parseObject(msg.getBody(), JSONObject.class);
                    WvpRedisMsg wvpRedisMsg = JSON.to(WvpRedisMsg.class, msgJSON);
                    if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) {
@@ -132,7 +130,7 @@
                                requestSendItemMsgHand(content, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
                                break;
                            case WvpRedisMsgCmd.REQUEST_PUSH_STREAM:
                                RequestPushStreamMsg param = JSON.to(RequestPushStreamMsg.class, wvpRedisMsg.getContent());;
                                    RequestPushStreamMsg param = JSON.to(RequestPushStreamMsg.class, wvpRedisMsg.getContent());
                                requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
                                break;
                            default:
@@ -201,17 +199,14 @@
                            default:
                                break;
                        }
                        }
                    }catch (Exception e) {
                        logger.warn("[RedisGbPlayMsg] 发现未处理的异常, {}",e.getMessage());
                    }
                }
                taskQueueHandlerRun = false;
            });
        }
    }
    /**
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java
@@ -27,8 +27,6 @@
    private final static Logger logger = LoggerFactory.getLogger(RedisGpsMsgListener.class);
    private boolean taskQueueHandlerRun = false;
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
@@ -44,17 +42,20 @@
    @Override
    public void onMessage(@NotNull Message message, byte[] bytes) {
        boolean isEmpty = taskQueue.isEmpty();
        taskQueue.offer(message);
        if (!taskQueueHandlerRun) {
            taskQueueHandlerRun = true;
        if (isEmpty) {
            taskExecutor.execute(() -> {
                while (!taskQueue.isEmpty()) {
                    Message msg = taskQueue.poll();
                    try {
                    GPSMsgInfo gpsMsgInfo = JSON.parseObject(msg.getBody(), GPSMsgInfo.class);
                    // 只是放入redis缓存起来
                    redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo);
                    }catch (Exception e) {
                        logger.warn("[REDIS的ALARM通知] 发现未处理的异常, {}",e.getMessage());
                }
                taskQueueHandlerRun = false;
                }
            });
        }
    }
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java
@@ -1,7 +1,6 @@
package com.genersoft.iot.vmp.service.redisMsg;
import com.alibaba.fastjson2.JSON;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -26,8 +25,6 @@
    private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamResponseListener.class);
    private boolean taskQueueHandlerRun = false;
    private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
    @Qualifier("taskExecutor")
@@ -43,13 +40,14 @@
    @Override
    public void onMessage(Message message, byte[] bytes) {
        logger.warn("[REDIS消息-请求推流结果]: {}", new String(message.getBody()));
        logger.info("[REDIS消息-请求推流结果]: {}", new String(message.getBody()));
        boolean isEmpty = taskQueue.isEmpty();
        taskQueue.offer(message);
        if (!taskQueueHandlerRun) {
            taskQueueHandlerRun = true;
        if (isEmpty) {
            taskExecutor.execute(() -> {
                while (!taskQueue.isEmpty()) {
                    Message msg = taskQueue.poll();
                    try {
                    MessageForPushChannelResponse response = JSON.parseObject(new String(msg.getBody()), MessageForPushChannelResponse.class);
                    if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){
                        logger.info("[REDIS消息-请求推流结果]:参数不全");
@@ -59,8 +57,10 @@
                    if (responseEvents.get(response.getApp() + response.getStream()) != null) {
                        responseEvents.get(response.getApp() + response.getStream()).run(response);
                    }
                    }catch (Exception e) {
                        logger.warn("[REDIS的ALARM通知] 发现未处理的异常, {}",e.getMessage());
                }
                taskQueueHandlerRun = false;
                }
            });
        }
    }
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java
@@ -6,7 +6,6 @@
import com.genersoft.iot.vmp.service.IGbStreamService;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.utils.DateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -18,7 +17,8 @@
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
@@ -38,7 +38,6 @@
    @Resource
    private IGbStreamService gbStreamService;
    private boolean taskQueueHandlerRun = false;
    private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
@@ -49,13 +48,13 @@
    @Override
    public void onMessage(Message message, byte[] bytes) {
        logger.info("[REDIS消息-推流设备列表更新]: {}", new String(message.getBody()));
        boolean isEmpty = taskQueue.isEmpty();
        taskQueue.offer(message);
        if (!taskQueueHandlerRun) {
            taskQueueHandlerRun = true;
        if (isEmpty) {
            taskExecutor.execute(() -> {
                while (!taskQueue.isEmpty()) {
                    Message msg = taskQueue.poll();
                    try {
                    List<StreamPushItem> streamPushItems = JSON.parseArray(new String(msg.getBody()), StreamPushItem.class);
                    //查询全部的app+stream 用于判断是添加还是修改
                    List<String> allAppAndStream = streamPushService.getAllAppAndStream();
@@ -95,8 +94,10 @@
                        logger.info(JSONObject.toJSONString(streamPushItemForUpdate));
                        gbStreamService.updateGbIdOrName(streamPushItemForUpdate);
                    }
                    }catch (Exception e) {
                        logger.warn("[REDIS的ALARM通知] 发现未处理的异常, {}",e.getMessage());
                }
                taskQueueHandlerRun = false;
                }
            });
        }
    }
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusMsgListener.java
@@ -29,8 +29,6 @@
    private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamStatusMsgListener.class);
    private boolean taskQueueHandlerRun = false;
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
@@ -50,14 +48,15 @@
    @Override
    public void onMessage(Message message, byte[] bytes) {
        boolean isEmpty = taskQueue.isEmpty();
        logger.warn("[REDIS消息-推流设备状态变化]: {}", new String(message.getBody()));
        taskQueue.offer(message);
        if (!taskQueueHandlerRun) {
            taskQueueHandlerRun = true;
        if (isEmpty) {
            taskExecutor.execute(() -> {
                while (!taskQueue.isEmpty()) {
                    Message msg = taskQueue.poll();
                    try {
                    PushStreamStatusChangeFromRedisDto statusChangeFromPushStream = JSON.parseObject(msg.getBody(), PushStreamStatusChangeFromRedisDto.class);
                    if (statusChangeFromPushStream == null) {
                        logger.warn("[REDIS消息]推流设备状态变化消息解析失败");
@@ -79,8 +78,10 @@
                        // 更新部分设备上线
                        streamPushService.online(statusChangeFromPushStream.getOnlineStreams());
                    }
                    }catch (Exception e) {
                        logger.warn("[REDIS的ALARM通知] 发现未处理的异常, {}",e.getMessage());
                }
                taskQueueHandlerRun = false;
                }
            });
        }
    }
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java
@@ -33,8 +33,6 @@
    @Autowired
    private ZLMMediaListManager zlmMediaListManager;
    private boolean taskQueueHandlerRun = false;
    private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
    @Qualifier("taskExecutor")
@@ -43,13 +41,13 @@
    @Override
    public void onMessage(Message message, byte[] bytes) {
        boolean isEmpty = taskQueue.isEmpty();
        taskQueue.offer(message);
        if (!taskQueueHandlerRun) {
            taskQueueHandlerRun = true;
        if (isEmpty) {
            taskExecutor.execute(() -> {
                while (!taskQueue.isEmpty()) {
                    Message msg = taskQueue.poll();
                    try {
                    JSONObject steamMsgJson = JSON.parseObject(msg.getBody(), JSONObject.class);
                    if (steamMsgJson == null) {
                        logger.warn("[收到redis 流变化]消息解析失败");
@@ -83,8 +81,10 @@
                    }else {
                        zlmMediaListManager.removeMedia(app, stream);
                    }
                    }catch (Exception e) {
                        logger.warn("[REDIS的ALARM通知] 发现未处理的异常, {}",e.getMessage());
                }
                taskQueueHandlerRun = false;
                }
            });
        }
    }
src/main/resources/all-application.yml
@@ -150,8 +150,6 @@
        enable: true
        # [可选] 在此范围内选择端口用于媒体流传输, 必须提前在zlm上配置该属性,不然自动配置此属性可能不成功
        port-range: 30000,30500 # 端口范围
        # [可选] 国标级联在此范围内选择端口发送媒体流
        send-port-range: 30000,30500 # 端口范围
    # 录像辅助服务, 部署此服务可以实现zlm录像的管理与下载, 0 表示不使用
    record-assist-port: 0