648540858
2022-06-03 47abdde3392f2c5fd88d382ae63c4756b97ed4b0
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java
@@ -20,6 +20,8 @@
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
@@ -31,6 +33,7 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
@Component
public class CatalogResponseMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler {
@@ -38,8 +41,12 @@
    private Logger logger = LoggerFactory.getLogger(CatalogResponseMessageHandler.class);
    private final String cmdType = "Catalog";
    private boolean taskQueueHandlerRun = false;
    @Autowired
    private ResponseMessageHandler responseMessageHandler;
    private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
    @Autowired
    private IVideoManagerStorage storager;
@@ -63,6 +70,10 @@
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
    @Qualifier("taskExecutor")
    @Autowired
    private ThreadPoolTaskExecutor taskExecutor;
    @Override
    public void afterPropertiesSet() throws Exception {
        responseMessageHandler.addHandler(cmdType, this);
@@ -70,23 +81,39 @@
    @Override
    public void handForDevice(RequestEvent evt, Device device, Element element) {
        String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + device.getDeviceId();
        taskQueue.offer(new HandlerCatchData(evt, device, element));
        // 回复200 OK
        try {
            responseAck(evt, Response.OK);
        } catch (SipException e) {
            throw new RuntimeException(e);
        } catch (InvalidArgumentException e) {
            throw new RuntimeException(e);
        } catch (ParseException e) {
            throw new RuntimeException(e);
        }
        if (!taskQueueHandlerRun) {
            taskQueueHandlerRun = true;
            taskExecutor.execute(()-> {
                while (!taskQueue.isEmpty()) {
                    HandlerCatchData take = taskQueue.poll();
                    String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + take.getDevice().getDeviceId();
        Element rootElement = null;
        try {
            rootElement = getRootElement(evt, device.getCharset());
                        rootElement = getRootElement(take.getEvt(), take.getDevice().getCharset());
            Element deviceListElement = rootElement.element("DeviceList");
            Element sumNumElement = rootElement.element("SumNum");
            Element snElement = rootElement.element("SN");
            if (snElement == null || sumNumElement == null || deviceListElement == null) {
                responseAck(evt, Response.BAD_REQUEST, "xml error");
                            responseAck(take.getEvt(), Response.BAD_REQUEST, "xml error");
                return;
            }
            int sumNum = Integer.parseInt(sumNumElement.getText());
            if (sumNum == 0) {
                // 数据已经完整接收
                storager.cleanChannelsForDevice(device.getDeviceId());
                catalogDataCatch.setChannelSyncEnd(device.getDeviceId(), null);
                            storager.cleanChannelsForDevice(take.getDevice().getDeviceId());
                            catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), null);
            }else {
                Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
                if (deviceListIterator != null) {
@@ -103,26 +130,25 @@
//                            processNotifyMobilePosition(evt, itemDevice);
//                        }
                        DeviceChannel deviceChannel = XmlUtil.channelContentHander(itemDevice);
                        deviceChannel.setDeviceId(device.getDeviceId());
                                    deviceChannel.setDeviceId(take.getDevice().getDeviceId());
                        channelList.add(deviceChannel);
                    }
                    int sn = Integer.parseInt(snElement.getText());
                    catalogDataCatch.put(device.getDeviceId(), sn, sumNum, device, channelList);
                    logger.info("收到来自设备【{}】的通道: {}个,{}/{}", device.getDeviceId(), channelList.size(), catalogDataCatch.get(device.getDeviceId()) == null ? 0 :catalogDataCatch.get(device.getDeviceId()).size(), sumNum);
                    if (catalogDataCatch.get(device.getDeviceId()).size() == sumNum) {
                                catalogDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, take.getDevice(), channelList);
                                logger.info("收到来自设备【{}】的通道: {}个,{}/{}", take.getDevice().getDeviceId(), channelList.size(), catalogDataCatch.get(take.getDevice().getDeviceId()) == null ? 0 :catalogDataCatch.get(take.getDevice().getDeviceId()).size(), sumNum);
                                if (catalogDataCatch.get(take.getDevice().getDeviceId()).size() == sumNum) {
                        // 数据已经完整接收
                        boolean resetChannelsResult = storager.resetChannels(device.getDeviceId(), catalogDataCatch.get(device.getDeviceId()));
                                    boolean resetChannelsResult = storager.resetChannels(take.getDevice().getDeviceId(), catalogDataCatch.get(take.getDevice().getDeviceId()));
                        if (!resetChannelsResult) {
                            String errorMsg = "接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.get(device.getDeviceId()).size() + "条";
                            catalogDataCatch.setChannelSyncEnd(device.getDeviceId(), errorMsg);
                                        String errorMsg = "接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.get(take.getDevice().getDeviceId()).size() + "条";
                                        catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), errorMsg);
                        }else {
                            catalogDataCatch.setChannelSyncEnd(device.getDeviceId(), null);
                                        catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), null);
                        }
                    }
                }
                // 回复200 OK
                responseAck(evt, Response.OK);
            }
        } catch (DocumentException e) {
            e.printStackTrace();
@@ -134,6 +160,11 @@
            e.printStackTrace();
        }
    }
                taskQueueHandlerRun = false;
            });
        }
    }
    @Override
    public void handForPlatform(RequestEvent evt, ParentPlatform parentPlatform, Element rootElement) {