gaofw189
2023-02-13 39199a1daa2d3faf4a874c57039bc771545a4ba7
优化WVP作为下级平台查询设备录像列表上报缓慢问题
4个文件已修改
1个文件已添加
340 ■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java 175 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/utils/UJson.java 150 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/utils/redis/RedisUtil.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
@@ -138,4 +138,15 @@
    public static final String WVP_STREAM_GB_ID_PREFIX = "memberNo_";
    public static final String WVP_STREAM_GPS_MSG_PREFIX = "WVP_STREAM_GPS_MSG_";
    /**
     * Redis Const
     * 设备录像信息结果前缀
     */
    public static final String REDIS_RECORD_INFO_RES_PRE = "GB_RECORD_INFO_RES_";
    /**
     * Redis Const
     * 设备录像信息结果前缀
     */
    public static final String REDIS_RECORD_INFO_RES_COUNT_PRE = "GB_RECORD_INFO_RES_COUNT:";
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java
@@ -1,16 +1,17 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.session.RecordDataCatch;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.utils.UJson;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import gov.nist.javax.sip.message.SIPRequest;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -26,11 +27,9 @@
import javax.sip.SipException;
import javax.sip.message.Response;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Collectors;
import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText;
@@ -49,9 +48,6 @@
    private ResponseMessageHandler responseMessageHandler;
    @Autowired
    private RecordDataCatch recordDataCatch;
    @Autowired
    private DeferredResultHolder deferredResultHolder;
    @Autowired
@@ -61,6 +57,8 @@
    @Autowired
    private ThreadPoolTaskExecutor taskExecutor;
    private Long recordInfoTtl = 1800L;
    @Override
    public void afterPropertiesSet() throws Exception {
        responseMessageHandler.addHandler(cmdType, this);
@@ -68,93 +66,93 @@
    @Override
    public void handForDevice(RequestEvent evt, Device device, Element rootElement) {
        boolean isEmpty = taskQueue.isEmpty();
        try {
            // 回复200 OK
             responseAck((SIPRequest) evt.getRequest(), Response.OK);
        }catch (SipException | InvalidArgumentException | ParseException e) {
            logger.error("[命令发送失败] 国标级联 国标录像: {}", e.getMessage());
        }
        taskQueue.offer(new HandlerCatchData(evt, device, rootElement));
        if (isEmpty) {
            taskExecutor.execute(()->{
                while (!taskQueue.isEmpty()) {
                    try {
                        HandlerCatchData take = taskQueue.poll();
                        Element rootElementForCharset = getRootElement(take.getEvt(), take.getDevice().getCharset());
                        if (rootElement == null) {
                            logger.warn("[ 国标录像 ] content cannot be null, {}", evt.getRequest());
                            continue;
                        }
                        String sn = getText(rootElementForCharset, "SN");
                        String channelId = getText(rootElementForCharset, "DeviceID");
                        RecordInfo recordInfo = new RecordInfo();
                        recordInfo.setChannelId(channelId);
                        recordInfo.setDeviceId(take.getDevice().getDeviceId());
                        recordInfo.setSn(sn);
                        recordInfo.setName(getText(rootElementForCharset, "Name"));
                        String sumNumStr = getText(rootElementForCharset, "SumNum");
                        int sumNum = 0;
                        if (!ObjectUtils.isEmpty(sumNumStr)) {
                            sumNum = Integer.parseInt(sumNumStr);
                        }
                        recordInfo.setSumNum(sumNum);
                        Element recordListElement = rootElementForCharset.element("RecordList");
                        if (recordListElement == null || sumNum == 0) {
                            logger.info("无录像数据");
                            int count = recordDataCatch.put(take.getDevice().getDeviceId(),channelId, sn, sumNum, new ArrayList<>());
                            recordInfo.setCount(count);
                            eventPublisher.recordEndEventPush(recordInfo);
                            releaseRequest(take.getDevice().getDeviceId(), sn);
                        } else {
                            Iterator<Element> recordListIterator = recordListElement.elementIterator();
                            if (recordListIterator != null) {
                                List<RecordItem> recordList = new ArrayList<>();
                                // 遍历DeviceList
                                while (recordListIterator.hasNext()) {
                                    Element itemRecord = recordListIterator.next();
                                    Element recordElement = itemRecord.element("DeviceID");
                                    if (recordElement == null) {
                                        logger.info("记录为空,下一个...");
                                        continue;
                                    }
                                    RecordItem record = new RecordItem();
                                    record.setDeviceId(getText(itemRecord, "DeviceID"));
                                    record.setName(getText(itemRecord, "Name"));
                                    record.setFilePath(getText(itemRecord, "FilePath"));
                                    record.setFileSize(getText(itemRecord, "FileSize"));
                                    record.setAddress(getText(itemRecord, "Address"));
        taskExecutor.execute(()->{
            try {
                                    String startTimeStr = getText(itemRecord, "StartTime");
                                    record.setStartTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(startTimeStr));
                                    String endTimeStr = getText(itemRecord, "EndTime");
                                    record.setEndTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(endTimeStr));
                                    record.setSecrecy(itemRecord.element("Secrecy") == null ? 0
                                            : Integer.parseInt(getText(itemRecord, "Secrecy")));
                                    record.setType(getText(itemRecord, "Type"));
                                    record.setRecorderId(getText(itemRecord, "RecorderID"));
                                    recordList.add(record);
                                }
                                recordInfo.setRecordList(recordList);
                                int count = recordDataCatch.put(take.getDevice().getDeviceId(),channelId, sn, sumNum, recordList);recordInfo.setCount(count);
                                logger.info("[国标录像], {}->{}: {}/{}", take.getDevice().getDeviceId(), sn, count, sumNum);
                                // 发送消息,如果是上级查询此录像,则会通过这里通知给上级
                                eventPublisher.recordEndEventPush(recordInfo);
                String sn = getText(rootElement, "SN");
                String channelId = getText(rootElement, "DeviceID");
                RecordInfo recordInfo = new RecordInfo();
                recordInfo.setChannelId(channelId);
                recordInfo.setDeviceId(device.getDeviceId());
                recordInfo.setSn(sn);
                recordInfo.setName(getText(rootElement, "Name"));
                String sumNumStr = getText(rootElement, "SumNum");
                int sumNum = 0;
                if (!ObjectUtils.isEmpty(sumNumStr)) {
                    sumNum = Integer.parseInt(sumNumStr);
                }
                recordInfo.setSumNum(sumNum);
                Element recordListElement = rootElement.element("RecordList");
                if (recordListElement == null || sumNum == 0) {
                    logger.info("无录像数据");
                    recordInfo.setCount(sumNum);
                    eventPublisher.recordEndEventPush(recordInfo);
                    releaseRequest(device.getDeviceId(), sn,recordInfo);
                } else {
                    Iterator<Element> recordListIterator = recordListElement.elementIterator();
                    if (recordListIterator != null) {
                        List<RecordItem> recordList = new ArrayList<>();
                        // 遍历DeviceList
                        while (recordListIterator.hasNext()) {
                            Element itemRecord = recordListIterator.next();
                            Element recordElement = itemRecord.element("DeviceID");
                            if (recordElement == null) {
                                logger.info("记录为空,下一个...");
                                continue;
                            }
                            if (recordDataCatch.isComplete(take.getDevice().getDeviceId(), sn)){
                                releaseRequest(take.getDevice().getDeviceId(), sn);
                            }
                            RecordItem record = new RecordItem();
                            record.setDeviceId(getText(itemRecord, "DeviceID"));
                            record.setName(getText(itemRecord, "Name"));
                            record.setFilePath(getText(itemRecord, "FilePath"));
                            record.setFileSize(getText(itemRecord, "FileSize"));
                            record.setAddress(getText(itemRecord, "Address"));
                            String startTimeStr = getText(itemRecord, "StartTime");
                            record.setStartTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(startTimeStr));
                            String endTimeStr = getText(itemRecord, "EndTime");
                            record.setEndTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(endTimeStr));
                            record.setSecrecy(itemRecord.element("Secrecy") == null ? 0
                                    : Integer.parseInt(getText(itemRecord, "Secrecy")));
                            record.setType(getText(itemRecord, "Type"));
                            record.setRecorderId(getText(itemRecord, "RecorderID"));
                            recordList.add(record);
                        }
                    } catch (DocumentException e) {
                        logger.error("xml解析异常: ", e);
                    } catch (Exception e) {
                        logger.warn("[国标录像] 发现未处理的异常, {}\r\n{}",e.getMessage(), evt.getRequest());
                        Map<String, String> map = recordList.stream()
                                .filter(record -> record.getDeviceId() != null)
                                .collect(Collectors.toMap(record -> record.getStartTime()+ record.getEndTime(), UJson::writeJson));
                        // 获取任务结果数据
                        String resKey = VideoManagerConstants.REDIS_RECORD_INFO_RES_PRE + channelId + sn;
                        RedisUtil.hmset(resKey, map, recordInfoTtl);
                        String resCountKey = VideoManagerConstants.REDIS_RECORD_INFO_RES_COUNT_PRE + channelId + sn;
                        long incr = RedisUtil.incr(resCountKey, map.size());
                        RedisUtil.expire(resCountKey, recordInfoTtl);
                        recordInfo.setRecordList(recordList);
                        recordInfo.setCount(Math.toIntExact(incr));
                        eventPublisher.recordEndEventPush(recordInfo);
                        if (incr < sumNum) {
                            return;
                        }
                        // 已接收完成
                        List<RecordItem> resList = RedisUtil.hmget(resKey).values().stream().map(e -> UJson.readJson(e.toString(), RecordItem.class)).collect(Collectors.toList());
                        if (resList.size() < sumNum) {
                            return;
                        }
                        recordInfo.setRecordList(resList);
                        releaseRequest(device.getDeviceId(), sn,recordInfo);
                    }
                }
            });
        }
            } catch (Exception e) {
                logger.error("[国标录像] 发现未处理的异常, "+e.getMessage(), e);
            }
        });
    }
    @Override
@@ -162,15 +160,14 @@
    }
    public void releaseRequest(String deviceId, String sn){
    public void releaseRequest(String deviceId, String sn,RecordInfo recordInfo){
        String key = DeferredResultHolder.CALLBACK_CMD_RECORDINFO + deviceId + sn;
        // 对数据进行排序
        Collections.sort(recordDataCatch.getRecordInfo(deviceId, sn).getRecordList());
        Collections.sort(recordInfo.getRecordList());
        RequestMessage msg = new RequestMessage();
        msg.setKey(key);
        msg.setData(recordDataCatch.getRecordInfo(deviceId, sn));
        msg.setData(recordInfo);
        deferredResultHolder.invokeAllResult(msg);
        recordDataCatch.remove(deviceId, sn);
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java
@@ -80,8 +80,8 @@
     */
    @Override
    public void process(ResponseEvent evt ){
        logger.debug("接收到消息:" + evt.getResponse());
        try {
            SIPResponse response = (SIPResponse)evt.getResponse();
            int statusCode = response.getStatusCode();
            // trying不会回复
src/main/java/com/genersoft/iot/vmp/utils/UJson.java
New file
@@ -0,0 +1,150 @@
package com.genersoft.iot.vmp.utils;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
/**
 * @author gaofuwang
 * @version 1.0
 * @date 2022/3/11 10:17
 */
public class UJson {
    private static Logger logger = LoggerFactory.getLogger(UJson.class);
    public static final ObjectMapper JSON_MAPPER = new ObjectMapper();
    static {
        JSON_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,false);
    }
    private ObjectNode node;
    public UJson(){
        this.node = JSON_MAPPER.createObjectNode();
    }
    public UJson(String json){
        if(StringUtils.isBlank(json)){
            this.node = JSON_MAPPER.createObjectNode();
        }else{
            try {
                this.node = JSON_MAPPER.readValue(json, ObjectNode.class);
            }catch (Exception e){
                logger.error(e.getMessage(), e);
                this.node = JSON_MAPPER.createObjectNode();
            }
        }
    }
    public UJson(ObjectNode node){
        this.node = node;
    }
    public String asText(String key){
        JsonNode jsonNode = node.get(key);
        if(Objects.isNull(jsonNode)){
            return "";
        }
        return jsonNode.asText();
    }
    public String asText(String key, String defaultVal){
        JsonNode jsonNode = node.get(key);
        if(Objects.isNull(jsonNode)){
            return "";
        }
        return jsonNode.asText(defaultVal);
    }
    public UJson put(String key, String value){
        this.node.put(key, value);
        return this;
    }
    public UJson put(String key, Integer value){
        this.node.put(key, value);
        return this;
    }
    public static UJson json(){
        return new UJson();
    }
    public static UJson json(String json){
        return new UJson(json);
    }
    public static <T> T readJson(String json, Class<T> clazz){
        if(StringUtils.isBlank(json)){
            return null;
        }
        try {
            return JSON_MAPPER.readValue(json, clazz);
        }catch (Exception e){
            logger.error(e.getMessage(), e);
            return null;
        }
    }
    public static String writeJson(Object object) {
        try{
            return JSON_MAPPER.writeValueAsString(object);
        }catch (Exception e){
            logger.error(e.getMessage(), e);
            return "";
        }
    }
    @Override
    public String toString() {
        return node.toString();
    }
    public int asInt(String key, int defValue) {
        JsonNode jsonNode = this.node.get(key);
        if(Objects.isNull(jsonNode)){
            return defValue;
        }
        return jsonNode.asInt(defValue);
    }
    public UJson getSon(String key) {
        JsonNode sonNode = this.node.get(key);
        if(Objects.isNull(sonNode)){
            return new UJson();
        }
        return new UJson((ObjectNode) sonNode);
    }
    public UJson set(String key, ObjectNode sonNode) {
        this.node.set(key, sonNode);
        return this;
    }
    public UJson set(String key, UJson sonNode) {
        this.node.set(key, sonNode.node);
        return this;
    }
    public Iterator<Map.Entry<String, JsonNode>> fields() {
        return this.node.fields();
    }
    public ObjectNode getNode() {
        return this.node;
    }
    public UJson setAll(UJson json) {
        this.node.setAll(json.node);
        return this;
    }
}
src/main/java/com/genersoft/iot/vmp/utils/redis/RedisUtil.java
@@ -238,7 +238,7 @@
     * @param time 时间
     * @return true / false
     */
    public static boolean hmset(String key, Map<Object, Object> map, long time) {
    public static boolean hmset(String key, Map<?, ?> map, long time) {
        if (redisTemplate == null) {
            redisTemplate = SpringBeanFactory.getBean("redisTemplate");
        }