648540858
2023-02-22 019d95cfba07f0231bd5b385f6736883c6e90623
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java
@@ -1,16 +1,17 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.session.RecordDataCatch;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.utils.UJson;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import gov.nist.javax.sip.message.SIPRequest;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -26,11 +27,9 @@
import javax.sip.SipException;
import javax.sip.message.Response;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Collectors;
import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText;
@@ -49,9 +48,6 @@
    private ResponseMessageHandler responseMessageHandler;
    @Autowired
    private RecordDataCatch recordDataCatch;
    @Autowired
    private DeferredResultHolder deferredResultHolder;
    @Autowired
@@ -61,6 +57,8 @@
    @Autowired
    private ThreadPoolTaskExecutor taskExecutor;
    private Long recordInfoTtl = 1800L;
    @Override
    public void afterPropertiesSet() throws Exception {
        responseMessageHandler.addHandler(cmdType, this);
@@ -68,44 +66,34 @@
    @Override
    public void handForDevice(RequestEvent evt, Device device, Element rootElement) {
        boolean isEmpty = taskQueue.isEmpty();
        try {
            // 回复200 OK
             responseAck((SIPRequest) evt.getRequest(), Response.OK);
        }catch (SipException | InvalidArgumentException | ParseException e) {
            logger.error("[命令发送失败] 国标级联 国标录像: {}", e.getMessage());
        }
        taskQueue.offer(new HandlerCatchData(evt, device, rootElement));
        if (isEmpty) {
            taskExecutor.execute(()->{
                while (!taskQueue.isEmpty()) {
                    try {
                        HandlerCatchData take = taskQueue.poll();
                        Element rootElementForCharset = getRootElement(take.getEvt(), take.getDevice().getCharset());
                        if (rootElement == null) {
                            logger.warn("[ 国标录像 ] content cannot be null, {}", evt.getRequest());
                            continue;
                        }
                        String sn = getText(rootElementForCharset, "SN");
                        String channelId = getText(rootElementForCharset, "DeviceID");
                String sn = getText(rootElement, "SN");
                String channelId = getText(rootElement, "DeviceID");
                        RecordInfo recordInfo = new RecordInfo();
                        recordInfo.setChannelId(channelId);
                        recordInfo.setDeviceId(take.getDevice().getDeviceId());
                recordInfo.setDeviceId(device.getDeviceId());
                        recordInfo.setSn(sn);
                        recordInfo.setName(getText(rootElementForCharset, "Name"));
                        String sumNumStr = getText(rootElementForCharset, "SumNum");
                recordInfo.setName(getText(rootElement, "Name"));
                String sumNumStr = getText(rootElement, "SumNum");
                        int sumNum = 0;
                        if (!ObjectUtils.isEmpty(sumNumStr)) {
                            sumNum = Integer.parseInt(sumNumStr);
                        }
                        recordInfo.setSumNum(sumNum);
                        Element recordListElement = rootElementForCharset.element("RecordList");
                Element recordListElement = rootElement.element("RecordList");
                        if (recordListElement == null || sumNum == 0) {
                            logger.info("无录像数据");
                            int count = recordDataCatch.put(take.getDevice().getDeviceId(),channelId, sn, sumNum, new ArrayList<>());
                            recordInfo.setCount(count);
                    recordInfo.setCount(sumNum);
                            eventPublisher.recordEndEventPush(recordInfo);
                            releaseRequest(take.getDevice().getDeviceId(), sn);
                    releaseRequest(device.getDeviceId(), sn,recordInfo);
                        } else {
                            Iterator<Element> recordListIterator = recordListElement.elementIterator();
                            if (recordListIterator != null) {
@@ -137,24 +125,34 @@
                                    record.setRecorderId(getText(itemRecord, "RecorderID"));
                                    recordList.add(record);
                                }
                        Map<String, String> map = recordList.stream()
                                .filter(record -> record.getDeviceId() != null)
                                .collect(Collectors.toMap(record -> record.getStartTime()+ record.getEndTime(), UJson::writeJson));
                        // 获取任务结果数据
                        String resKey = VideoManagerConstants.REDIS_RECORD_INFO_RES_PRE + channelId + sn;
                        RedisUtil.hmset(resKey, map, recordInfoTtl);
                        String resCountKey = VideoManagerConstants.REDIS_RECORD_INFO_RES_COUNT_PRE + channelId + sn;
                        long incr = RedisUtil.incr(resCountKey, map.size());
                        RedisUtil.expire(resCountKey, recordInfoTtl);
                                recordInfo.setRecordList(recordList);
                                int count = recordDataCatch.put(take.getDevice().getDeviceId(),channelId, sn, sumNum, recordList);recordInfo.setCount(count);
                                logger.info("[国标录像], {}->{}: {}/{}", take.getDevice().getDeviceId(), sn, count, sumNum);
                                // 发送消息,如果是上级查询此录像,则会通过这里通知给上级
                        recordInfo.setCount(Math.toIntExact(incr));
                                eventPublisher.recordEndEventPush(recordInfo);
                        if (incr < sumNum) {
                            return;
                            }
                            if (recordDataCatch.isComplete(take.getDevice().getDeviceId(), sn)){
                                releaseRequest(take.getDevice().getDeviceId(), sn);
                        // 已接收完成
                        List<RecordItem> resList = RedisUtil.hmget(resKey).values().stream().map(e -> UJson.readJson(e.toString(), RecordItem.class)).collect(Collectors.toList());
                        if (resList.size() < sumNum) {
                            return;
                        }
                        recordInfo.setRecordList(resList);
                        releaseRequest(device.getDeviceId(), sn,recordInfo);
                            }
                        }
                    } catch (DocumentException e) {
                        logger.error("xml解析异常: ", e);
                    } catch (Exception e) {
                        logger.warn("[国标录像] 发现未处理的异常, {}\r\n{}",e.getMessage(), evt.getRequest());
                    }
                logger.error("[国标录像] 发现未处理的异常, "+e.getMessage(), e);
                }
            });
        }
    }
    @Override
@@ -162,15 +160,14 @@
    }
    public void releaseRequest(String deviceId, String sn){
    public void releaseRequest(String deviceId, String sn,RecordInfo recordInfo){
        String key = DeferredResultHolder.CALLBACK_CMD_RECORDINFO + deviceId + sn;
        // 对数据进行排序
        Collections.sort(recordDataCatch.getRecordInfo(deviceId, sn).getRecordList());
        Collections.sort(recordInfo.getRecordList());
        RequestMessage msg = new RequestMessage();
        msg.setKey(key);
        msg.setData(recordDataCatch.getRecordInfo(deviceId, sn));
        msg.setData(recordInfo);
        deferredResultHolder.invokeAllResult(msg);
        recordDataCatch.remove(deviceId, sn);
    }
}