lawrencehj
2021-02-05 6868577e329dea7b48b33a2e5d662ad855edb341
将录像文件统计改为独立线程进行,实现超时返回已接收结果
1个文件已修改
1个文件已添加
193 ■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/CheckForAllRecordsThread.java 78 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/MessageRequestProcessor.java 115 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/CheckForAllRecordsThread.java
New file
@@ -0,0 +1,78 @@
package com.genersoft.iot.vmp.gb28181.transmit.callback;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import com.genersoft.iot.vmp.gb28181.bean.RecordInfo;
import com.genersoft.iot.vmp.gb28181.bean.RecordItem;
import com.genersoft.iot.vmp.gb28181.transmit.request.impl.MessageRequestProcessor;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import org.slf4j.Logger;
@SuppressWarnings("unchecked")
public class CheckForAllRecordsThread extends Thread {
    private String key;
    private RecordInfo recordInfo;
    private RedisUtil redis;
    private Logger logger;
    private DeferredResultHolder deferredResultHolder;
    public CheckForAllRecordsThread(String key, RecordInfo recordInfo) {
        this.key = key;
        this.recordInfo = recordInfo;
    }
    public void run() {
        String cacheKey = this.key;
        for (long stop = System.nanoTime() + TimeUnit.SECONDS.toNanos(10); stop > System.nanoTime();) {
            List<Object> cacheKeys = redis.scan(cacheKey + "_*");
            List<RecordItem> totalRecordList = new ArrayList<RecordItem>();
            for (int i = 0; i < cacheKeys.size(); i++) {
                totalRecordList.addAll((List<RecordItem>) redis.get(cacheKeys.get(i).toString()));
            }
            if (totalRecordList.size() < this.recordInfo.getSumNum()) {
                logger.info("已获取" + totalRecordList.size() + "项录像数据,共" + this.recordInfo.getSumNum() + "项");
            } else {
                logger.info("录像数据已全部获取,共" + this.recordInfo.getSumNum() + "项");
                this.recordInfo.setRecordList(totalRecordList);
                for (int i = 0; i < cacheKeys.size(); i++) {
                    redis.del(cacheKeys.get(i).toString());
                }
                break;
            }
        }
        // 自然顺序排序, 元素进行升序排列
        this.recordInfo.getRecordList().sort(Comparator.naturalOrder());
        RequestMessage msg = new RequestMessage();
        String deviceId = recordInfo.getDeviceId();
        msg.setDeviceId(deviceId);
        msg.setType(DeferredResultHolder.CALLBACK_CMD_RECORDINFO);
        msg.setData(recordInfo);
        deferredResultHolder.invokeResult(msg);
        logger.info("处理完成,返回结果");
        MessageRequestProcessor.threadNameList.remove(cacheKey);
    }
    public void setRedis(RedisUtil redis) {
        this.redis = redis;
    }
    public void setDeferredResultHolder(DeferredResultHolder deferredResultHolder) {
        this.deferredResultHolder = deferredResultHolder;
    }
    public void setLogger(Logger logger) {
        this.logger = logger;
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/MessageRequestProcessor.java
@@ -23,6 +23,7 @@
import com.genersoft.iot.vmp.gb28181.bean.RecordItem;
import com.genersoft.iot.vmp.gb28181.event.DeviceOffLineDetector;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.transmit.callback.CheckForAllRecordsThread;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
@@ -43,13 +44,16 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
/**
 * @Description:MESSAGE请求处理器
 * @author: swwheihei
 * @date: 2020年5月3日 下午5:32:41
 */
@SuppressWarnings(value={"unchecked", "rawtypes"})
public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
    public static volatile List<String> threadNameList = new ArrayList();
    private UserSetup userSetup = (UserSetup) SpringBeanFactory.getBean("userSetup");
@@ -240,10 +244,10 @@
        try {
            Element rootElement = getRootElement(evt);
            String deviceId = XmlUtil.getText(rootElement, "DeviceID");
            String result = XmlUtil.getText(rootElement, "Result");
            //String result = XmlUtil.getText(rootElement, "Result");
            // 回复200 OK
            responseAck(evt);
            if (!XmlUtil.isEmpty(result)) {
            if (rootElement.getName().equals("Response")) {//} !XmlUtil.isEmpty(result)) {
                // 此处是对本平台发出DeviceControl指令的应答
                JSONObject json = new JSONObject();
                XmlUtil.node2Json(rootElement, json);
@@ -272,11 +276,10 @@
        try {
            Element rootElement = getRootElement(evt);
            String deviceId = XmlUtil.getText(rootElement, "DeviceID");
            String result = XmlUtil.getText(rootElement, "Result");
            // 回复200 OK
            responseAck(evt);
            //if (!XmlUtil.isEmpty(result)) {
                // 此处是对本平台发出DeviceControl指令的应答
            if (rootElement.getName().equals("Response")) {
                    // 此处是对本平台发出DeviceControl指令的应答
                JSONObject json = new JSONObject();
                XmlUtil.node2Json(rootElement, json);
                if (logger.isDebugEnabled()) {
@@ -287,9 +290,9 @@
                msg.setType(DeferredResultHolder.CALLBACK_CMD_DEVICECONFIG);
                msg.setData(json);
                deferredResultHolder.invokeResult(msg);
            // } else {
            //     // 此处是上级发出的DeviceConfig指令
            //}
            } else {
                // 此处是上级发出的DeviceConfig指令
            }
        } catch (ParseException | SipException | InvalidArgumentException | DocumentException e) {
            e.printStackTrace();
        }
@@ -304,11 +307,10 @@
        try {
            Element rootElement = getRootElement(evt);
            String deviceId = XmlUtil.getText(rootElement, "DeviceID");
            String result = XmlUtil.getText(rootElement, "Result");
            // 回复200 OK
            responseAck(evt);
            //if (!XmlUtil.isEmpty(result)) {
                // 此处是对本平台发出DeviceControl指令的应答
            if (rootElement.getName().equals("Response")) {
                    // 此处是对本平台发出DeviceControl指令的应答
                JSONObject json = new JSONObject();
                XmlUtil.node2Json(rootElement, json);
                if (logger.isDebugEnabled()) {
@@ -319,9 +321,9 @@
                msg.setType(DeferredResultHolder.CALLBACK_CMD_CONFIGDOWNLOAD);
                msg.setData(json);
                deferredResultHolder.invokeResult(msg);
            // } else {
            //     // 此处是上级发出的DeviceConfig指令
            //}
            } else {
                // 此处是上级发出的DeviceConfig指令
            }
        } catch (ParseException | SipException | InvalidArgumentException | DocumentException e) {
            e.printStackTrace();
        }
@@ -336,7 +338,6 @@
        try {
            Element rootElement = getRootElement(evt);
            String deviceId = XmlUtil.getText(rootElement, "DeviceID");
            String result = XmlUtil.getText(rootElement, "Result");
            // 回复200 OK
            responseAck(evt);
            if (rootElement.getName().equals("Response")) {//   !XmlUtil.isEmpty(result)) {
@@ -648,8 +649,11 @@
            Element recordListElement = rootElement.element("RecordList");
            if (recordListElement == null || recordInfo.getSumNum() == 0) {
                logger.info("无录像数据");
                // responseAck(evt);
                // return;
                RequestMessage msg = new RequestMessage();
                msg.setDeviceId(deviceId);
                msg.setType(DeferredResultHolder.CALLBACK_CMD_RECORDINFO);
                msg.setData(recordInfo);
                deferredResultHolder.invokeResult(msg);
            } else {
                Iterator<Element> recordListIterator = recordListElement.elementIterator();
                List<RecordItem> recordList = new ArrayList<RecordItem>();
@@ -679,44 +683,63 @@
                        record.setRecorderId(XmlUtil.getText(itemRecord, "RecorderID"));
                        recordList.add(record);
                    }
                    // recordList.sort(Comparator.naturalOrder());
                    recordInfo.setRecordList(recordList);
                }
                // 存在录像且如果当前录像明细个数小于总条数,说明拆包返回,需要组装,暂不返回
                if (recordInfo.getSumNum() > 0 && recordList.size() > 0 && recordList.size() < recordInfo.getSumNum()) {
                    // 为防止连续请求该设备的录像数据,返回数据错乱,特增加sn进行区分
                    String cacheKey = CACHE_RECORDINFO_KEY + deviceId + sn;
                    redis.set(cacheKey + "_" + uuid, recordList, 90);
                    List<Object> cacheKeys = redis.scan(cacheKey + "_*");
                    List<RecordItem> totalRecordList = new ArrayList<RecordItem>();
                    for (int i = 0; i < cacheKeys.size(); i++) {
                        totalRecordList.addAll((List<RecordItem>) redis.get(cacheKeys.get(i).toString()));
                // 改用单独线程统计已获取录像文件数量,避免多包并行分别统计不完整的问题
                String cacheKey = CACHE_RECORDINFO_KEY + deviceId + sn;
                redis.set(cacheKey + "_" + uuid, recordList, 90);
                if (!threadNameList.contains(cacheKey)) {
                    threadNameList.add(cacheKey);
                    CheckForAllRecordsThread chk = new CheckForAllRecordsThread(cacheKey, recordInfo);
                    chk.setName(cacheKey);
                    chk.setDeferredResultHolder(deferredResultHolder);
                    chk.setRedis(redis);
                    chk.setLogger(logger);
                    chk.start();
                    if (logger.isDebugEnabled()) {
                        logger.debug("Start Thread " + cacheKey + ".");
                    }
                    if (totalRecordList.size() < recordInfo.getSumNum()) {
                        logger.info("已获取" + totalRecordList.size() + "项录像数据,共" + recordInfo.getSumNum() + "项");
                        return;
                    }
                    logger.info("录像数据已全部获取,共" + recordInfo.getSumNum() + "项");
                    recordInfo.setRecordList(totalRecordList);
                    for (int i = 0; i < cacheKeys.size(); i++) {
                        redis.del(cacheKeys.get(i).toString());
                } else {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Thread " + cacheKey + " already started.");
                    }
                }
                // 自然顺序排序, 元素进行升序排列
                recordInfo.getRecordList().sort(Comparator.naturalOrder());
                // 存在录像且如果当前录像明细个数小于总条数,说明拆包返回,需要组装,暂不返回
                // if (recordInfo.getSumNum() > 0 && recordList.size() > 0 && recordList.size() < recordInfo.getSumNum()) {
                //     // 为防止连续请求该设备的录像数据,返回数据错乱,特增加sn进行区分
                //     String cacheKey = CACHE_RECORDINFO_KEY + deviceId + sn;
                //     redis.set(cacheKey + "_" + uuid, recordList, 90);
                //     List<Object> cacheKeys = redis.scan(cacheKey + "_*");
                //     List<RecordItem> totalRecordList = new ArrayList<RecordItem>();
                //     for (int i = 0; i < cacheKeys.size(); i++) {
                //         totalRecordList.addAll((List<RecordItem>) redis.get(cacheKeys.get(i).toString()));
                //     }
                //     if (totalRecordList.size() < recordInfo.getSumNum()) {
                //         logger.info("已获取" + totalRecordList.size() + "项录像数据,共" + recordInfo.getSumNum() + "项");
                //         return;
                //     }
                //     logger.info("录像数据已全部获取,共" + recordInfo.getSumNum() + "项");
                //     recordInfo.setRecordList(totalRecordList);
                //     for (int i = 0; i < cacheKeys.size(); i++) {
                //         redis.del(cacheKeys.get(i).toString());
                //     }
                // }
                // // 自然顺序排序, 元素进行升序排列
                // recordInfo.getRecordList().sort(Comparator.naturalOrder());
            }
            // 走到这里,有以下可能:1、没有录像信息,第一次收到recordinfo的消息即返回响应数据,无redis操作
            // 2、有录像数据,且第一次即收到完整数据,返回响应数据,无redis操作
            // 3、有录像数据,在超时时间内收到多次包组装后数量足够,返回数据
            RequestMessage msg = new RequestMessage();
            msg.setDeviceId(deviceId);
            msg.setType(DeferredResultHolder.CALLBACK_CMD_RECORDINFO);
            msg.setData(recordInfo);
            deferredResultHolder.invokeResult(msg);
            logger.info("处理完成,返回结果");
            // RequestMessage msg = new RequestMessage();
            // msg.setDeviceId(deviceId);
            // msg.setType(DeferredResultHolder.CALLBACK_CMD_RECORDINFO);
            // msg.setData(recordInfo);
            // deferredResultHolder.invokeResult(msg);
            // logger.info("处理完成,返回结果");
        } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) {
            e.printStackTrace();
        }
@@ -799,4 +822,4 @@
    public void setRedisCatchStorage(IRedisCatchStorage redisCatchStorage) {
        this.redisCatchStorage = redisCatchStorage;
    }
}
}