648540858
2022-10-17 4c66f6b29eda2d459aed86f7a138438191de7e47
优化消息处理中存在可能异常的处理流程
12个文件已修改
720 ■■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java 256 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java 72 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/control/cmd/DeviceControlQueryMessageHandler.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java 49 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java 16 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/CatalogQueryMessageHandler.java 52 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/ConfigDownloadResponseMessageHandler.java 23 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/DeviceControlResponseMessageHandler.java 23 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/DeviceInfoResponseMessageHandler.java 24 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/MobilePositionResponseMessageHandler.java 15 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/PresetQueryResponseMessageHandler.java 20 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java 168 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -132,7 +132,11 @@
            if (requesterId == null || channelId == null) {
                logger.info("无法从FromHeader的Address中获取到平台id,返回400");
                // 参数不全, 发400,请求错误
                responseAck(serverTransaction, Response.BAD_REQUEST);
                try {
                    responseAck(serverTransaction, Response.BAD_REQUEST);
                } catch (SipException | InvalidArgumentException | ParseException e) {
                    logger.error("[命令发送失败] invite BAD_REQUEST: {}", e.getMessage());
                }
                return;
            }
@@ -141,6 +145,7 @@
            ParentPlatform platform = storager.queryParentPlatByServerGBId(requesterId);
            if (platform == null) {
                inviteFromDeviceHandle(serverTransaction, requesterId);
            } else {
                // 查询平台下是否有该通道
                DeviceChannel channel = storager.queryChannelInParentPlatform(requesterId, channelId);
@@ -158,7 +163,11 @@
//                        return;
//                    }
                    // 通道存在,发100,TRYING
                    responseAck(serverTransaction, Response.TRYING);
                    try {
                        responseAck(serverTransaction, Response.TRYING);
                    } catch (SipException | InvalidArgumentException | ParseException e) {
                        logger.error("[命令发送失败] invite TRYING: {}", e.getMessage());
                    }
                } else if (channel == null && gbStream != null) {
                    String mediaServerId = gbStream.getMediaServerId();
@@ -166,13 +175,21 @@
                    if (mediaServerItem == null) {
                        if ("proxy".equals(gbStream.getStreamType())) {
                            logger.info("[ app={}, stream={} ]找不到zlm {},返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId);
                            responseAck(serverTransaction, Response.GONE);
                            try {
                                responseAck(serverTransaction, Response.GONE);
                            } catch (SipException | InvalidArgumentException | ParseException e) {
                                logger.error("[命令发送失败] invite GONE: {}", e.getMessage());
                            }
                            return;
                        } else {
                            streamPushItem = streamPushService.getPush(gbStream.getApp(), gbStream.getStream());
                            if (streamPushItem == null || streamPushItem.getServerId().equals(userSetting.getServerId())) {
                                logger.info("[ app={}, stream={} ]找不到zlm {},返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId);
                                responseAck(serverTransaction, Response.GONE);
                                try {
                                    responseAck(serverTransaction, Response.GONE);
                                } catch (SipException | InvalidArgumentException | ParseException e) {
                                    logger.error("[命令发送失败] invite GONE: {}", e.getMessage());
                                }
                                return;
                            }
                        }
@@ -181,25 +198,47 @@
                            streamPushItem = streamPushService.getPush(gbStream.getApp(), gbStream.getStream());
                            if (streamPushItem == null) {
                                logger.info("[ app={}, stream={} ]找不到zlm {},返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId);
                                responseAck(serverTransaction, Response.GONE);
                                try {
                                    responseAck(serverTransaction, Response.GONE);
                                } catch (SipException | InvalidArgumentException | ParseException e) {
                                    logger.error("[命令发送失败] invite GONE: {}", e.getMessage());
                                }
                                return;
                            }
                        }else if("proxy".equals(gbStream.getStreamType())){
                            proxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(gbStream.getApp(), gbStream.getStream());
                            if (proxyByAppAndStream == null) {
                                logger.info("[ app={}, stream={} ]找不到zlm {},返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId);
                                responseAck(serverTransaction, Response.GONE);
                                try {
                                    responseAck(serverTransaction, Response.GONE);
                                } catch (SipException | InvalidArgumentException | ParseException e) {
                                    logger.error("[命令发送失败] invite GONE: {}", e.getMessage());
                                }
                                return;
                            }
                        }
                    }
                    responseAck(serverTransaction, Response.CALL_IS_BEING_FORWARDED); // 通道存在,发181,呼叫转接中
                    try {
                        responseAck(serverTransaction, Response.CALL_IS_BEING_FORWARDED);
                    } catch (SipException | InvalidArgumentException | ParseException e) {
                        logger.error("[命令发送失败] invite CALL_IS_BEING_FORWARDED: {}", e.getMessage());
                    }
                } else if (catalog != null) {
                    responseAck(serverTransaction, Response.BAD_REQUEST, "catalog channel can not play"); // 目录不支持点播
                    try {
                        // 目录不支持点播
                        responseAck(serverTransaction, Response.BAD_REQUEST, "catalog channel can not play");
                    } catch (SipException | InvalidArgumentException | ParseException e) {
                        logger.error("[命令发送失败] invite 目录不支持点播: {}", e.getMessage());
                    }
                    return;
                } else {
                    logger.info("通道不存在,返回404");
                    responseAck(serverTransaction, Response.NOT_FOUND); // 通道不存在,发404,资源不存在
                    try {
                        // 通道不存在,发404,资源不存在
                        responseAck(serverTransaction, Response.NOT_FOUND);
                    } catch (SipException | InvalidArgumentException | ParseException e) {
                        logger.error("[命令发送失败] invite 通道不存在: {}", e.getMessage());
                    }
                    return;
                }
                // 解析sdp消息, 使用jainsip 自带的sdp解析方式
@@ -270,7 +309,12 @@
                if (port == -1) {
                    logger.info("不支持的媒体格式,返回415");
                    // 回复不支持的格式
                    responseAck(serverTransaction, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式,发415
                    try {
                        // 不支持的格式,发415
                        responseAck(serverTransaction, Response.UNSUPPORTED_MEDIA_TYPE);
                    } catch (SipException | InvalidArgumentException | ParseException e) {
                        logger.error("[命令发送失败] invite 不支持的格式: {}", e.getMessage());
                    }
                    return;
                }
                String username = sdp.getOrigin().getUsername();
@@ -283,13 +327,21 @@
                    device = storager.queryVideoDeviceByPlatformIdAndChannelId(requesterId, channelId);
                    if (device == null) {
                        logger.warn("点播平台{}的通道{}时未找到设备信息", requesterId, channel);
                        responseAck(serverTransaction, Response.SERVER_INTERNAL_ERROR);
                        try {
                            responseAck(serverTransaction, Response.SERVER_INTERNAL_ERROR);
                        } catch (SipException | InvalidArgumentException | ParseException e) {
                            logger.error("[命令发送失败] invite 未找到设备信息: {}", e.getMessage());
                        }
                        return;
                    }
                    mediaServerItem = playService.getNewMediaServerItem(device);
                    if (mediaServerItem == null) {
                        logger.warn("未找到可用的zlm");
                        responseAck(serverTransaction, Response.BUSY_HERE);
                        try {
                            responseAck(serverTransaction, Response.BUSY_HERE);
                        } catch (SipException | InvalidArgumentException | ParseException e) {
                            logger.error("[命令发送失败] invite BUSY_HERE: {}", e.getMessage());
                        }
                        return;
                    }
                    SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
@@ -301,7 +353,11 @@
                    }
                    if (sendRtpItem == null) {
                        logger.warn("服务器端口资源不足");
                        responseAck(serverTransaction, Response.BUSY_HERE);
                        try {
                            responseAck(serverTransaction, Response.BUSY_HERE);
                        } catch (SipException | InvalidArgumentException | ParseException e) {
                            logger.error("[命令发送失败] invite 服务器端口资源不足: {}", e.getMessage());
                        }
                        return;
                    }
                    sendRtpItem.setCallId(callIdHeader.getCallId());
@@ -474,13 +530,8 @@
                    }
                }
            }
        } catch (SipException | InvalidArgumentException | ParseException e) {
            e.printStackTrace();
            logger.warn("sdp解析错误");
            e.printStackTrace();
        } catch (SdpParseException e) {
            e.printStackTrace();
            logger.error("sdp解析错误", e);
        } catch (SdpException e) {
            e.printStackTrace();
        }
@@ -492,7 +543,7 @@
    private void pushProxyStream(RequestEvent evt, ServerTransaction serverTransaction, GbStream gbStream, ParentPlatform platform,
                            CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
                            int port, Boolean tcpActive, boolean mediaTransmissionTCP,
                            String channelId, String addressStr, String ssrc, String requesterId) throws InvalidArgumentException, ParseException, SipException {
                            String channelId, String addressStr, String ssrc, String requesterId) {
            Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
            if (streamReady) {
                // 自平台内容
@@ -502,7 +553,11 @@
                if (sendRtpItem == null) {
                    logger.warn("服务器端口资源不足");
                    responseAck(serverTransaction, Response.BUSY_HERE);
                    try {
                        responseAck(serverTransaction, Response.BUSY_HERE);
                    } catch (SipException | InvalidArgumentException | ParseException e) {
                        logger.error("[命令发送失败] invite 服务器端口资源不足: {}", e.getMessage());
                    }
                    return;
                }
                if (tcpActive != null) {
@@ -527,7 +582,7 @@
    private void pushStream(RequestEvent evt, ServerTransaction serverTransaction, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
                            CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
                            int port, Boolean tcpActive, boolean mediaTransmissionTCP,
                            String channelId, String addressStr, String ssrc, String requesterId) throws InvalidArgumentException, ParseException, SipException {
                            String channelId, String addressStr, String ssrc, String requesterId) {
        // 推流
        if (streamPushItem.isSelf()) {
            Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
@@ -539,7 +594,11 @@
                if (sendRtpItem == null) {
                    logger.warn("服务器端口资源不足");
                    responseAck(serverTransaction, Response.BUSY_HERE);
                    try {
                        responseAck(serverTransaction, Response.BUSY_HERE);
                    } catch (SipException | InvalidArgumentException | ParseException e) {
                        logger.error("[命令发送失败] invite 服务器端口资源不足: {}", e.getMessage());
                    }
                    return;
                }
                if (tcpActive != null) {
@@ -577,15 +636,23 @@
    private void notifyStreamOnline(RequestEvent evt, ServerTransaction serverTransaction, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
                                    CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
                                    int port, Boolean tcpActive, boolean mediaTransmissionTCP,
                                    String channelId, String addressStr, String ssrc, String requesterId) throws InvalidArgumentException, ParseException, SipException {
                                    String channelId, String addressStr, String ssrc, String requesterId) {
        if ("proxy".equals(gbStream.getStreamType())) {
            // TODO 控制启用以使设备上线
            logger.info("[ app={}, stream={} ]通道未推流,启用流后开始推流", gbStream.getApp(), gbStream.getStream());
            responseAck(serverTransaction, Response.BAD_REQUEST, "channel [" + gbStream.getGbId() + "] offline");
            try {
                responseAck(serverTransaction, Response.BAD_REQUEST, "channel [" + gbStream.getGbId() + "] offline");
            } catch (SipException | InvalidArgumentException | ParseException e) {
                logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage());
            }
        } else if ("push".equals(gbStream.getStreamType())) {
            if (!platform.isStartOfflinePush()) {
                // 平台设置中关闭了拉起离线的推流则直接回复
                responseAck(serverTransaction, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing");
                try {
                    responseAck(serverTransaction, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing");
                } catch (SipException | InvalidArgumentException | ParseException e) {
                    logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage());
                }
                return;
            }
            // 发送redis消息以使设备上线
@@ -713,38 +780,28 @@
                    }
                    redisCatchStorage.updateSendRTPSever(sendRtpItem);
                }, (wvpResult) -> {
                    try {
                        // 错误
                        if (wvpResult.getCode() == RedisGbPlayMsgListener.ERROR_CODE_OFFLINE) {
                            // 离线
                            // 查询是否在本机上线了
                            StreamPushItem currentStreamPushItem = streamPushService.getPush(streamPushItem.getApp(), streamPushItem.getStream());
                            if (currentStreamPushItem.isPushIng()) {
                                // 在线状态
                                pushStream(evt, serverTransaction, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                            } else {
                                // 不在线 拉起
                                notifyStreamOnline(evt, serverTransaction, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                            }
                    // 错误
                    if (wvpResult.getCode() == RedisGbPlayMsgListener.ERROR_CODE_OFFLINE) {
                        // 离线
                        // 查询是否在本机上线了
                        StreamPushItem currentStreamPushItem = streamPushService.getPush(streamPushItem.getApp(), streamPushItem.getStream());
                        if (currentStreamPushItem.isPushIng()) {
                            // 在线状态
                            pushStream(evt, serverTransaction, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                                    mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                        } else {
                            // 不在线 拉起
                            notifyStreamOnline(evt, serverTransaction, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                                    mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                        }
                    } catch (InvalidArgumentException | ParseException | SipException e) {
                        logger.error("[命令发送失败] 国标级联 点播回复: {}", e.getMessage());
                    }
                    try {
                        responseAck(serverTransaction, Response.BUSY_HERE);
                    } catch (SipException e) {
                        e.printStackTrace();
                    } catch (InvalidArgumentException e) {
                        e.printStackTrace();
                    } catch (ParseException e) {
                        e.printStackTrace();
                    } catch (InvalidArgumentException | ParseException | SipException e) {
                        logger.error("[命令发送失败] 国标级联 点播回复 BUSY_HERE: {}", e.getMessage());
                    }
                    return;
                });
    }
@@ -782,14 +839,17 @@
        return null;
    }
    public void inviteFromDeviceHandle(ServerTransaction serverTransaction, String requesterId) throws InvalidArgumentException, ParseException, SipException, SdpException {
    public void inviteFromDeviceHandle(ServerTransaction serverTransaction, String requesterId) {
        // 非上级平台请求,查询是否设备请求(通常为接收语音广播的设备)
        Device device = redisCatchStorage.getDevice(requesterId);
        if (device != null) {
            logger.info("收到设备" + requesterId + "的语音广播Invite请求");
            responseAck(serverTransaction, Response.TRYING);
            try {
                responseAck(serverTransaction, Response.TRYING);
            } catch (SipException | InvalidArgumentException | ParseException e) {
                logger.error("[命令发送失败] invite BAD_REQUEST: {}", e.getMessage());
            }
            String contentString = new String(serverTransaction.getRequest().getRawContent());
            // jainSip不支持y=字段, 移除移除以解析。
            String substring = contentString;
@@ -803,51 +863,65 @@
            if (ssrcIndex > 0) {
                substring = contentString.substring(0, ssrcIndex);
            }
            SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring);
            SessionDescription sdp = null;
            try {
                sdp = SdpFactory.getInstance().createSessionDescription(substring);
                //  获取支持的格式
                Vector mediaDescriptions = sdp.getMediaDescriptions(true);
                // 查看是否支持PS 负载96
                int port = -1;
                //boolean recvonly = false;
                boolean mediaTransmissionTCP = false;
                Boolean tcpActive = null;
                for (int i = 0; i < mediaDescriptions.size(); i++) {
                    MediaDescription mediaDescription = (MediaDescription) mediaDescriptions.get(i);
                    Media media = mediaDescription.getMedia();
            //  获取支持的格式
            Vector mediaDescriptions = sdp.getMediaDescriptions(true);
            // 查看是否支持PS 负载96
            int port = -1;
            //boolean recvonly = false;
            boolean mediaTransmissionTCP = false;
            Boolean tcpActive = null;
            for (int i = 0; i < mediaDescriptions.size(); i++) {
                MediaDescription mediaDescription = (MediaDescription) mediaDescriptions.get(i);
                Media media = mediaDescription.getMedia();
                Vector mediaFormats = media.getMediaFormats(false);
                if (mediaFormats.contains("8")) {
                    port = media.getMediaPort();
                    String protocol = media.getProtocol();
                    // 区分TCP发流还是udp, 当前默认udp
                    if ("TCP/RTP/AVP".equals(protocol)) {
                        String setup = mediaDescription.getAttribute("setup");
                        if (setup != null) {
                            mediaTransmissionTCP = true;
                            if ("active".equals(setup)) {
                                tcpActive = true;
                            } else if ("passive".equals(setup)) {
                                tcpActive = false;
                    Vector mediaFormats = media.getMediaFormats(false);
                    if (mediaFormats.contains("8")) {
                        port = media.getMediaPort();
                        String protocol = media.getProtocol();
                        // 区分TCP发流还是udp, 当前默认udp
                        if ("TCP/RTP/AVP".equals(protocol)) {
                            String setup = mediaDescription.getAttribute("setup");
                            if (setup != null) {
                                mediaTransmissionTCP = true;
                                if ("active".equals(setup)) {
                                    tcpActive = true;
                                } else if ("passive".equals(setup)) {
                                    tcpActive = false;
                                }
                            }
                        }
                        break;
                    }
                    break;
                }
                if (port == -1) {
                    logger.info("不支持的媒体格式,返回415");
                    // 回复不支持的格式
                    try {
                        responseAck(serverTransaction, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式,发415
                    } catch (SipException | InvalidArgumentException | ParseException e) {
                        logger.error("[命令发送失败] invite 不支持的媒体格式,返回415, {}", e.getMessage());
                    }
                    return;
                }
                String username = sdp.getOrigin().getUsername();
                String addressStr = sdp.getOrigin().getAddress();
                logger.info("设备{}请求语音流,地址:{}:{},ssrc:{}", username, addressStr, port, ssrc);
            } catch (SdpException e) {
                logger.error("[SDP解析异常]", e);
            }
            if (port == -1) {
                logger.info("不支持的媒体格式,返回415");
                // 回复不支持的格式
                responseAck(serverTransaction, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式,发415
                return;
            }
            String username = sdp.getOrigin().getUsername();
            String addressStr = sdp.getOrigin().getAddress();
            logger.info("设备{}请求语音流,地址:{}:{},ssrc:{}", username, addressStr, port, ssrc);
        } else {
            logger.warn("来自无效设备/平台的请求");
            responseAck(serverTransaction, Response.BAD_REQUEST);
            try {
                responseAck(serverTransaction, Response.BAD_REQUEST);; // 不支持的格式,发415
            } catch (SipException | InvalidArgumentException | ParseException e) {
                logger.error("[命令发送失败] invite 来自无效设备/平台的请求, {}", e.getMessage());
            }
        }
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
@@ -93,46 +93,44 @@
    @Override
    public void process(RequestEvent evt) {
        ServerTransaction serverTransaction = getServerTransaction(evt);
        try {
            taskQueue.offer(new HandlerCatchData(evt, null, null));
            ServerTransaction serverTransaction = getServerTransaction(evt);
            responseAck(serverTransaction, Response.OK);
            if (!taskQueueHandlerRun) {
                taskQueueHandlerRun = true;
                taskExecutor.execute(()-> {
                    while (!taskQueue.isEmpty()) {
                        try {
                            HandlerCatchData take = taskQueue.poll();
                            Element rootElement = getRootElement(take.getEvt());
                            if (rootElement == null) {
                                logger.error("处理NOTIFY消息时未获取到消息体,{}", take.getEvt().getRequest());
                                continue;
                            }
                            String cmd = XmlUtil.getText(rootElement, "CmdType");
                            if (CmdType.CATALOG.equals(cmd)) {
                                logger.info("接收到Catalog通知");
                                processNotifyCatalogList(take.getEvt());
                            } else if (CmdType.ALARM.equals(cmd)) {
                                logger.info("接收到Alarm通知");
                                processNotifyAlarm(take.getEvt());
                            } else if (CmdType.MOBILE_POSITION.equals(cmd)) {
                                logger.info("接收到MobilePosition通知");
                                processNotifyMobilePosition(take.getEvt());
                            } else {
                                logger.info("接收到消息:" + cmd);
                            }
                        } catch (DocumentException e) {
                            logger.error("处理NOTIFY消息时错误", e);
                        }
                    }
                    taskQueueHandlerRun = false;
                });
            }
        } catch (SipException | InvalidArgumentException | ParseException e) {
        }catch (SipException | InvalidArgumentException | ParseException e) {
            e.printStackTrace();
        } finally {
            taskQueueHandlerRun = false;
        }
        taskQueue.offer(new HandlerCatchData(evt, null, null));
        if (!taskQueueHandlerRun) {
            taskQueueHandlerRun = true;
            taskExecutor.execute(()-> {
                while (!taskQueue.isEmpty()) {
                    try {
                        HandlerCatchData take = taskQueue.poll();
                        Element rootElement = getRootElement(take.getEvt());
                        if (rootElement == null) {
                            logger.error("处理NOTIFY消息时未获取到消息体,{}", take.getEvt().getRequest());
                            continue;
                        }
                        String cmd = XmlUtil.getText(rootElement, "CmdType");
                        if (CmdType.CATALOG.equals(cmd)) {
                            logger.info("接收到Catalog通知");
                            processNotifyCatalogList(take.getEvt());
                        } else if (CmdType.ALARM.equals(cmd)) {
                            logger.info("接收到Alarm通知");
                            processNotifyAlarm(take.getEvt());
                        } else if (CmdType.MOBILE_POSITION.equals(cmd)) {
                            logger.info("接收到MobilePosition通知");
                            processNotifyMobilePosition(take.getEvt());
                        } else {
                            logger.info("接收到消息:" + cmd);
                        }
                    } catch (DocumentException e) {
                        logger.error("处理NOTIFY消息时错误", e);
                    }
                }
                taskQueueHandlerRun = false;
            });
        }
    }
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/control/cmd/DeviceControlQueryMessageHandler.java
@@ -112,10 +112,10 @@
            if (deviceForPlatform == null) {
                try {
                    responseAck(serverTransaction, Response.NOT_FOUND);
                    return;
                } catch (SipException | InvalidArgumentException | ParseException e) {
                    logger.error("[命令发送失败] 错误信息: {}", e.getMessage());
                }
                return;
            }
            try {
                cmder.fronEndCmd(deviceForPlatform, channelId, cmdString, eventResult -> {
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java
@@ -52,35 +52,36 @@
            // 未注册的设备不做处理
            return;
        }
        // 回复200 OK
        try {
            // 判断RPort是否改变,改变则说明路由nat信息变化,修改设备信息
            // 获取到通信地址等信息
            ViaHeader viaHeader = (ViaHeader) evt.getRequest().getHeader(ViaHeader.NAME);
            String received = viaHeader.getReceived();
            int rPort = viaHeader.getRPort();
            // 解析本地地址替代
            if (ObjectUtils.isEmpty(received) || rPort == -1) {
                received = viaHeader.getHost();
                rPort = viaHeader.getPort();
            }
            if (device.getPort() != rPort) {
                device.setPort(rPort);
                device.setHostAddress(received.concat(":").concat(String.valueOf(rPort)));
            }
            device.setKeepaliveTime(DateUtil.getNow());
            // 回复200 OK
            responseAck(getServerTransaction(evt), Response.OK);
            if (device.getOnline() == 1) {
                deviceService.updateDevice(device);
            }else {
                // 对于已经离线的设备判断他的注册是否已经过期
                if (!deviceService.expire(device)){
                    deviceService.online(device);
                }
            }
        } catch (SipException | InvalidArgumentException | ParseException e) {
            logger.error("[命令发送失败] 国标级联 心跳回复: {}", e.getMessage());
        }
        // 判断RPort是否改变,改变则说明路由nat信息变化,修改设备信息
        // 获取到通信地址等信息
        ViaHeader viaHeader = (ViaHeader) evt.getRequest().getHeader(ViaHeader.NAME);
        String received = viaHeader.getReceived();
        int rPort = viaHeader.getRPort();
        // 解析本地地址替代
        if (ObjectUtils.isEmpty(received) || rPort == -1) {
            received = viaHeader.getHost();
            rPort = viaHeader.getPort();
        }
        if (device.getPort() != rPort) {
            device.setPort(rPort);
            device.setHostAddress(received.concat(":").concat(String.valueOf(rPort)));
        }
        device.setKeepaliveTime(DateUtil.getNow());
        if (device.getOnline() == 1) {
            deviceService.updateDevice(device);
        }else {
            // 对于已经离线的设备判断他的注册是否已经过期
            if (!deviceService.expire(device)){
                deviceService.online(device);
            }
        }
    }
    @Override
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java
@@ -81,8 +81,12 @@
                    try {
                        Element rootElementAfterCharset = getRootElement(sipMsgInfo.getEvt(), sipMsgInfo.getDevice().getCharset());
                        if (rootElementAfterCharset == null) {
                            logger.warn("[ 移动设备位置数据通知 ] content cannot be null, {}", sipMsgInfo.getEvt().getRequest());
                            responseAck(getServerTransaction(sipMsgInfo.getEvt()), Response.BAD_REQUEST);
                            try {
                                logger.warn("[ 移动设备位置数据通知 ] content cannot be null, {}", sipMsgInfo.getEvt().getRequest());
                                responseAck(getServerTransaction(sipMsgInfo.getEvt()), Response.BAD_REQUEST);
                            } catch (SipException | InvalidArgumentException | ParseException e) {
                                logger.error("[命令发送失败] 移动设备位置数据通知 内容为空: {}", e.getMessage());
                            }
                            continue;
                        }
                        MobilePosition mobilePosition = new MobilePosition();
@@ -133,7 +137,11 @@
                        }
                        storager.updateChannelPosition(deviceChannel);
                        //回复 200 OK
                        responseAck(getServerTransaction(sipMsgInfo.getEvt()), Response.OK);
                        try {
                            responseAck(getServerTransaction(sipMsgInfo.getEvt()), Response.OK);
                        } catch (SipException | InvalidArgumentException | ParseException e) {
                            logger.error("[命令发送失败] 移动设备位置数据回复200: {}", e.getMessage());
                        }
                        // 发送redis消息。 通知位置信息的变化
                        JSONObject jsonObject = new JSONObject();
@@ -147,7 +155,7 @@
                        jsonObject.put("speed", mobilePosition.getSpeed());
                        redisCatchStorage.sendMobilePositionMsg(jsonObject);
                    } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) {
                    } catch (DocumentException e) {
                        e.printStackTrace();
                    }
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/CatalogQueryMessageHandler.java
@@ -67,33 +67,37 @@
        try {
            // 回复200 OK
            responseAck(getServerTransaction(evt), Response.OK);
            Element snElement = rootElement.element("SN");
            String sn = snElement.getText();
            // 准备回复通道信息
            List<DeviceChannel> deviceChannelInPlatforms = storager.queryChannelWithCatalog(parentPlatform.getServerGBId());
            // 查询关联的直播通道
            List<DeviceChannel> gbStreams = storager.queryGbStreamListInPlatform(parentPlatform.getServerGBId());
            // 回复目录信息
            List<DeviceChannel> catalogs =  storager.queryCatalogInPlatform(parentPlatform.getServerGBId());
        } catch (SipException | InvalidArgumentException | ParseException e) {
            logger.error("[命令发送失败] 国标级联 目录查询回复200OK: {}", e.getMessage());
        }
        Element snElement = rootElement.element("SN");
        String sn = snElement.getText();
        // 准备回复通道信息
        List<DeviceChannel> deviceChannelInPlatforms = storager.queryChannelWithCatalog(parentPlatform.getServerGBId());
        // 查询关联的直播通道
        List<DeviceChannel> gbStreams = storager.queryGbStreamListInPlatform(parentPlatform.getServerGBId());
        // 回复目录信息
        List<DeviceChannel> catalogs =  storager.queryCatalogInPlatform(parentPlatform.getServerGBId());
            List<DeviceChannel> allChannels = new ArrayList<>();
        List<DeviceChannel> allChannels = new ArrayList<>();
            // 回复平台
        // 回复平台
//            DeviceChannel deviceChannel = getChannelForPlatform(parentPlatform);
//            allChannels.add(deviceChannel);
            // 回复目录
            if (catalogs.size() > 0) {
                allChannels.addAll(catalogs);
            }
            // 回复级联的通道
            if (deviceChannelInPlatforms.size() > 0) {
                allChannels.addAll(deviceChannelInPlatforms);
            }
            // 回复直播的通道
            if (gbStreams.size() > 0) {
                allChannels.addAll(gbStreams);
            }
        // 回复目录
        if (catalogs.size() > 0) {
            allChannels.addAll(catalogs);
        }
        // 回复级联的通道
        if (deviceChannelInPlatforms.size() > 0) {
            allChannels.addAll(deviceChannelInPlatforms);
        }
        // 回复直播的通道
        if (gbStreams.size() > 0) {
            allChannels.addAll(gbStreams);
        }
        try {
            if (allChannels.size() > 0) {
                cmderFroPlatform.catalogQuery(allChannels, parentPlatform, sn, fromHeader.getTag());
            }else {
@@ -101,9 +105,11 @@
                cmderFroPlatform.catalogQuery(null, parentPlatform, sn, fromHeader.getTag(), 0);
            }
        } catch (SipException | InvalidArgumentException | ParseException e) {
            logger.error("[命令发送失败] 国标级联 目录查询: {}", e.getMessage());
            logger.error("[命令发送失败] 国标级联 目录查询回复: {}", e.getMessage());
        }
    }
    private DeviceChannel getChannelForPlatform(ParentPlatform platform) {
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/ConfigDownloadResponseMessageHandler.java
@@ -53,19 +53,20 @@
        try {
            // 回复200 OK
            responseAck(getServerTransaction(evt), Response.OK);
            // 此处是对本平台发出DeviceControl指令的应答
            JSONObject json = new JSONObject();
            XmlUtil.node2Json(element, json);
            if (logger.isDebugEnabled()) {
                logger.debug(json.toJSONString());
            }
            RequestMessage msg = new RequestMessage();
            msg.setKey(key);
            msg.setData(json);
            deferredResultHolder.invokeAllResult(msg);
        } catch (SipException | InvalidArgumentException | ParseException e) {
            logger.error("[命令发送失败] 国标级联 设备配置查询: {}", e.getMessage());
            logger.error("[命令发送失败] 设备配置查询: {}", e.getMessage());
        }
        // 此处是对本平台发出DeviceControl指令的应答
        JSONObject json = new JSONObject();
        XmlUtil.node2Json(element, json);
        if (logger.isDebugEnabled()) {
            logger.debug(json.toJSONString());
        }
        RequestMessage msg = new RequestMessage();
        msg.setKey(key);
        msg.setData(json);
        deferredResultHolder.invokeAllResult(msg);
    }
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/DeviceControlResponseMessageHandler.java
@@ -47,20 +47,21 @@
        // 此处是对本平台发出DeviceControl指令的应答
        try {
            responseAck(getServerTransaction(evt), Response.OK);
            JSONObject json = new JSONObject();
            String channelId = getText(element, "DeviceID");
            XmlUtil.node2Json(element, json);
            if (logger.isDebugEnabled()) {
                logger.debug(json.toJSONString());
            }
            RequestMessage msg = new RequestMessage();
            String key = DeferredResultHolder.CALLBACK_CMD_DEVICECONTROL +  device.getDeviceId() + channelId;
            msg.setKey(key);
            msg.setData(json);
            deferredResultHolder.invokeAllResult(msg);
        } catch (SipException | InvalidArgumentException | ParseException e) {
            logger.error("[命令发送失败] 国标级联 设备控制: {}", e.getMessage());
        }
        JSONObject json = new JSONObject();
        String channelId = getText(element, "DeviceID");
        XmlUtil.node2Json(element, json);
        if (logger.isDebugEnabled()) {
            logger.debug(json.toJSONString());
        }
        RequestMessage msg = new RequestMessage();
        String key = DeferredResultHolder.CALLBACK_CMD_DEVICECONTROL +  device.getDeviceId() + channelId;
        msg.setKey(key);
        msg.setData(json);
        deferredResultHolder.invokeAllResult(msg);
    }
    @Override
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/DeviceInfoResponseMessageHandler.java
@@ -78,9 +78,14 @@
        ServerTransaction serverTransaction = getServerTransaction(evt);
        try {
            rootElement = getRootElement(evt, device.getCharset());
            if (rootElement == null) {
        if (rootElement == null) {
                logger.warn("[ 接收到DeviceInfo应答消息 ] content cannot be null, {}", evt.getRequest());
                responseAck(serverTransaction, Response.BAD_REQUEST);
                try {
                    responseAck(serverTransaction, Response.BAD_REQUEST);
                } catch (SipException | InvalidArgumentException | ParseException e) {
                    logger.error("[命令发送失败] DeviceInfo应答消息 BAD_REQUEST: {}", e.getMessage());
                }
                return;
            }
            Element deviceIdElement = rootElement.element("DeviceID");
@@ -100,17 +105,16 @@
            msg.setKey(key);
            msg.setData(device);
            deferredResultHolder.invokeAllResult(msg);
        } catch (DocumentException e) {
            throw new RuntimeException(e);
        }
        try {
            // 回复200 OK
            responseAck(serverTransaction, Response.OK);
        } catch (DocumentException e) {
            e.printStackTrace();
        } catch (InvalidArgumentException e) {
            e.printStackTrace();
        } catch (ParseException e) {
            e.printStackTrace();
        } catch (SipException e) {
            e.printStackTrace();
        } catch (SipException | InvalidArgumentException | ParseException e) {
            logger.error("[命令发送失败] DeviceInfo应答消息 200: {}", e.getMessage());
        }
    }
    @Override
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/MobilePositionResponseMessageHandler.java
@@ -71,7 +71,11 @@
            rootElement = getRootElement(evt, device.getCharset());
            if (rootElement == null) {
                logger.warn("[ 移动设备位置数据查询回复 ] content cannot be null, {}", evt.getRequest());
                responseAck(serverTransaction, Response.BAD_REQUEST);
                try {
                    responseAck(serverTransaction, Response.BAD_REQUEST);
                } catch (SipException | InvalidArgumentException | ParseException e) {
                    logger.error("[命令发送失败] 移动设备位置数据查询 BAD_REQUEST: {}", e.getMessage());
                }
                return;
            }
            MobilePosition mobilePosition = new MobilePosition();
@@ -133,8 +137,13 @@
            jsonObject.put("speed", mobilePosition.getSpeed());
            redisCatchStorage.sendMobilePositionMsg(jsonObject);
            //回复 200 OK
            responseAck(serverTransaction, Response.OK);
        } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) {
            try {
                responseAck(serverTransaction, Response.OK);
            } catch (SipException | InvalidArgumentException | ParseException e) {
                logger.error("[命令发送失败] 移动设备位置数据查询 200: {}", e.getMessage());
            }
        } catch (DocumentException e) {
            e.printStackTrace();
        }
    }
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/PresetQueryResponseMessageHandler.java
@@ -58,7 +58,11 @@
            if (rootElement == null) {
                logger.warn("[ 设备预置位查询应答 ] content cannot be null, {}", evt.getRequest());
                responseAck(serverTransaction, Response.BAD_REQUEST);
                try {
                    responseAck(serverTransaction, Response.BAD_REQUEST);
                } catch (InvalidArgumentException | ParseException | SipException e) {
                    logger.error("[命令发送失败] 设备预置位查询应答处理: {}", e.getMessage());
                }
                return;
            }
            Element presetListNumElement = rootElement.element("PresetList");
@@ -67,7 +71,11 @@
            String deviceId = getText(rootElement, "DeviceID");
            String key = DeferredResultHolder.CALLBACK_CMD_PRESETQUERY + deviceId;
            if (snElement == null || presetListNumElement == null) {
                responseAck(serverTransaction, Response.BAD_REQUEST, "xml error");
                try {
                    responseAck(serverTransaction, Response.BAD_REQUEST, "xml error");
                } catch (InvalidArgumentException | ParseException | SipException e) {
                    logger.error("[命令发送失败] 设备预置位查询应答处理: {}", e.getMessage());
                }
                return;
            }
            int sumNum = Integer.parseInt(presetListNumElement.attributeValue("Num"));
@@ -94,11 +102,13 @@
            requestMessage.setKey(key);
            requestMessage.setData(presetQuerySipReqList);
            deferredResultHolder.invokeAllResult(requestMessage);
            responseAck(serverTransaction, Response.OK);
            try {
                responseAck(serverTransaction, Response.OK);
            } catch (InvalidArgumentException | ParseException | SipException e) {
                logger.error("[命令发送失败] 设备预置位查询应答处理: {}", e.getMessage());
            }
        } catch (DocumentException e) {
            logger.error("[解析xml]失败: ", e);
        } catch (InvalidArgumentException | ParseException | SipException e) {
            logger.error("[命令发送失败] 设备预置位查询应答处理: {}", e.getMessage());
        }
    }
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java
@@ -69,95 +69,91 @@
    @Override
    public void handForDevice(RequestEvent evt, Device device, Element rootElement) {
        // 回复200 OK
        try {
            // 回复200 OK
            responseAck(getServerTransaction(evt), Response.OK);
            taskQueue.offer(new HandlerCatchData(evt, device, rootElement));
            if (!taskQueueHandlerRun) {
                taskQueueHandlerRun = true;
                taskExecutor.execute(()->{
                    while (!taskQueue.isEmpty()) {
                        try {
                            HandlerCatchData take = taskQueue.poll();
                            Element rootElementForCharset = getRootElement(take.getEvt(), take.getDevice().getCharset());
                            if (rootElement == null) {
                                logger.warn("[ 国标录像 ] content cannot be null, {}", evt.getRequest());
                                continue;
                            }
                            String sn = getText(rootElementForCharset, "SN");
                            String channelId = getText(rootElementForCharset, "DeviceID");
                            RecordInfo recordInfo = new RecordInfo();
                            recordInfo.setChannelId(channelId);
                            recordInfo.setDeviceId(take.getDevice().getDeviceId());
                            recordInfo.setSn(sn);
                            recordInfo.setName(getText(rootElementForCharset, "Name"));
                            String sumNumStr = getText(rootElementForCharset, "SumNum");
                            int sumNum = 0;
                            if (!ObjectUtils.isEmpty(sumNumStr)) {
                                sumNum = Integer.parseInt(sumNumStr);
                            }
                            recordInfo.setSumNum(sumNum);
                            Element recordListElement = rootElementForCharset.element("RecordList");
                            if (recordListElement == null || sumNum == 0) {
                                logger.info("无录像数据");
                                eventPublisher.recordEndEventPush(recordInfo);
                                recordDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, new ArrayList<>());
                                releaseRequest(take.getDevice().getDeviceId(), sn);
                            } else {
                                Iterator<Element> recordListIterator = recordListElement.elementIterator();
                                if (recordListIterator != null) {
                                    List<RecordItem> recordList = new ArrayList<>();
                                    // 遍历DeviceList
                                    while (recordListIterator.hasNext()) {
                                        Element itemRecord = recordListIterator.next();
                                        Element recordElement = itemRecord.element("DeviceID");
                                        if (recordElement == null) {
                                            logger.info("记录为空,下一个...");
                                            continue;
                                        }
                                        RecordItem record = new RecordItem();
                                        record.setDeviceId(getText(itemRecord, "DeviceID"));
                                        record.setName(getText(itemRecord, "Name"));
                                        record.setFilePath(getText(itemRecord, "FilePath"));
                                        record.setFileSize(getText(itemRecord, "FileSize"));
                                        record.setAddress(getText(itemRecord, "Address"));
                                        String startTimeStr = getText(itemRecord, "StartTime");
                                        record.setStartTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(startTimeStr));
                                        String endTimeStr = getText(itemRecord, "EndTime");
                                        record.setEndTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(endTimeStr));
                                        record.setSecrecy(itemRecord.element("Secrecy") == null ? 0
                                                : Integer.parseInt(getText(itemRecord, "Secrecy")));
                                        record.setType(getText(itemRecord, "Type"));
                                        record.setRecorderId(getText(itemRecord, "RecorderID"));
                                        recordList.add(record);
                                    }
                                    recordInfo.setRecordList(recordList);
                                    // 发送消息,如果是上级查询此录像,则会通过这里通知给上级
                                    eventPublisher.recordEndEventPush(recordInfo);
                                    int count = recordDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, recordList);
                                    logger.info("[国标录像], {}->{}: {}/{}", take.getDevice().getDeviceId(), sn, count, sumNum);
                                }
                                if (recordDataCatch.isComplete(take.getDevice().getDeviceId(), sn)){
                                    releaseRequest(take.getDevice().getDeviceId(), sn);
                                }
                            }
                        } catch (DocumentException e) {
                            logger.error("xml解析异常: ", e);
                        }
                    }
                    taskQueueHandlerRun = false;
                });
            }
        } catch (SipException | InvalidArgumentException | ParseException e) {
        }catch (SipException | InvalidArgumentException | ParseException e) {
            logger.error("[命令发送失败] 国标级联 国标录像: {}", e.getMessage());
        } finally {
            taskQueueHandlerRun = false;
        }
        taskQueue.offer(new HandlerCatchData(evt, device, rootElement));
        if (!taskQueueHandlerRun) {
            taskQueueHandlerRun = true;
            taskExecutor.execute(()->{
                while (!taskQueue.isEmpty()) {
                    try {
                        HandlerCatchData take = taskQueue.poll();
                        Element rootElementForCharset = getRootElement(take.getEvt(), take.getDevice().getCharset());
                        if (rootElement == null) {
                            logger.warn("[ 国标录像 ] content cannot be null, {}", evt.getRequest());
                            continue;
                        }
                        String sn = getText(rootElementForCharset, "SN");
                        String channelId = getText(rootElementForCharset, "DeviceID");
                        RecordInfo recordInfo = new RecordInfo();
                        recordInfo.setChannelId(channelId);
                        recordInfo.setDeviceId(take.getDevice().getDeviceId());
                        recordInfo.setSn(sn);
                        recordInfo.setName(getText(rootElementForCharset, "Name"));
                        String sumNumStr = getText(rootElementForCharset, "SumNum");
                        int sumNum = 0;
                        if (!ObjectUtils.isEmpty(sumNumStr)) {
                            sumNum = Integer.parseInt(sumNumStr);
                        }
                        recordInfo.setSumNum(sumNum);
                        Element recordListElement = rootElementForCharset.element("RecordList");
                        if (recordListElement == null || sumNum == 0) {
                            logger.info("无录像数据");
                            eventPublisher.recordEndEventPush(recordInfo);
                            recordDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, new ArrayList<>());
                            releaseRequest(take.getDevice().getDeviceId(), sn);
                        } else {
                            Iterator<Element> recordListIterator = recordListElement.elementIterator();
                            if (recordListIterator != null) {
                                List<RecordItem> recordList = new ArrayList<>();
                                // 遍历DeviceList
                                while (recordListIterator.hasNext()) {
                                    Element itemRecord = recordListIterator.next();
                                    Element recordElement = itemRecord.element("DeviceID");
                                    if (recordElement == null) {
                                        logger.info("记录为空,下一个...");
                                        continue;
                                    }
                                    RecordItem record = new RecordItem();
                                    record.setDeviceId(getText(itemRecord, "DeviceID"));
                                    record.setName(getText(itemRecord, "Name"));
                                    record.setFilePath(getText(itemRecord, "FilePath"));
                                    record.setFileSize(getText(itemRecord, "FileSize"));
                                    record.setAddress(getText(itemRecord, "Address"));
                                    String startTimeStr = getText(itemRecord, "StartTime");
                                    record.setStartTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(startTimeStr));
                                    String endTimeStr = getText(itemRecord, "EndTime");
                                    record.setEndTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(endTimeStr));
                                    record.setSecrecy(itemRecord.element("Secrecy") == null ? 0
                                            : Integer.parseInt(getText(itemRecord, "Secrecy")));
                                    record.setType(getText(itemRecord, "Type"));
                                    record.setRecorderId(getText(itemRecord, "RecorderID"));
                                    recordList.add(record);
                                }
                                recordInfo.setRecordList(recordList);
                                // 发送消息,如果是上级查询此录像,则会通过这里通知给上级
                                eventPublisher.recordEndEventPush(recordInfo);
                                int count = recordDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, recordList);
                                logger.info("[国标录像], {}->{}: {}/{}", take.getDevice().getDeviceId(), sn, count, sumNum);
                            }
                            if (recordDataCatch.isComplete(take.getDevice().getDeviceId(), sn)){
                                releaseRequest(take.getDevice().getDeviceId(), sn);
                            }
                        }
                    } catch (DocumentException e) {
                        logger.error("xml解析异常: ", e);
                    }
                }
                taskQueueHandlerRun = false;
            });
        }
    }