From 47abdde3392f2c5fd88d382ae63c4756b97ed4b0 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期五, 03 六月 2022 16:24:11 +0800 Subject: [PATCH] 解决设备上线停止线程导致的报错,优化录像的获取以及通道的更新 --- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java | 151 ++++++++++++++++++++++++++++--------------------- 1 files changed, 86 insertions(+), 65 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java index 1f70171..87adc3e 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java @@ -1,9 +1,6 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd; -import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; -import com.genersoft.iot.vmp.gb28181.bean.RecordInfo; -import com.genersoft.iot.vmp.gb28181.bean.RecordItem; +import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.session.RecordDataCatch; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; @@ -19,6 +16,8 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; @@ -28,6 +27,9 @@ import javax.sip.message.Response; import java.text.ParseException; import java.util.*; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText; @@ -42,6 +44,9 @@ private final String cmdType = "RecordInfo"; private final static String CACHE_RECORDINFO_KEY = "CACHE_RECORDINFO_"; + private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>(); + + private boolean taskQueueHandlerRun = false; @Autowired private ResponseMessageHandler responseMessageHandler; @@ -51,10 +56,12 @@ @Autowired private DeferredResultHolder deferredResultHolder; - - @Autowired private EventPublisher eventPublisher; + + @Qualifier("taskExecutor") + @Autowired + private ThreadPoolTaskExecutor taskExecutor; @Override public void afterPropertiesSet() throws Exception { @@ -67,74 +74,88 @@ // 鍥炲200 OK try { responseAck(evt, Response.OK); + taskQueue.offer(new HandlerCatchData(evt, device, rootElement)); + if (!taskQueueHandlerRun) { + taskQueueHandlerRun = true; + taskExecutor.execute(()->{ + try { + while (!taskQueue.isEmpty()) { + HandlerCatchData take = taskQueue.poll(); + Element rootElementForCharset = getRootElement(take.getEvt(), take.getDevice().getCharset()); + String sn = getText(rootElementForCharset, "SN"); + String channelId = getText(rootElementForCharset, "DeviceID"); + RecordInfo recordInfo = new RecordInfo(); + recordInfo.setChannelId(channelId); + recordInfo.setDeviceId(take.getDevice().getDeviceId()); + recordInfo.setSn(sn); + recordInfo.setName(getText(rootElementForCharset, "Name")); + String sumNumStr = getText(rootElementForCharset, "SumNum"); + int sumNum = 0; + if (!StringUtils.isEmpty(sumNumStr)) { + sumNum = Integer.parseInt(sumNumStr); + } + recordInfo.setSumNum(sumNum); + Element recordListElement = rootElementForCharset.element("RecordList"); + if (recordListElement == null || sumNum == 0) { + logger.info("鏃犲綍鍍忔暟鎹�"); + eventPublisher.recordEndEventPush(recordInfo); + recordDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, new ArrayList<>()); + releaseRequest(take.getDevice().getDeviceId(), sn); + } else { + Iterator<Element> recordListIterator = recordListElement.elementIterator(); + if (recordListIterator != null) { + List<RecordItem> recordList = new ArrayList<>(); + // 閬嶅巻DeviceList + while (recordListIterator.hasNext()) { + Element itemRecord = recordListIterator.next(); + Element recordElement = itemRecord.element("DeviceID"); + if (recordElement == null) { + logger.info("璁板綍涓虹┖锛屼笅涓�涓�..."); + continue; + } + RecordItem record = new RecordItem(); + record.setDeviceId(getText(itemRecord, "DeviceID")); + record.setName(getText(itemRecord, "Name")); + record.setFilePath(getText(itemRecord, "FilePath")); + record.setFileSize(getText(itemRecord, "FileSize")); + record.setAddress(getText(itemRecord, "Address")); - rootElement = getRootElement(evt, device.getCharset()); - String sn = getText(rootElement, "SN"); - RecordInfo recordInfo = new RecordInfo(); - recordInfo.setDeviceId(device.getDeviceId()); - recordInfo.setSn(sn); - recordInfo.setName(getText(rootElement, "Name")); - String sumNumStr = getText(rootElement, "SumNum"); - int sumNum = 0; - if (!StringUtils.isEmpty(sumNumStr)) { - sumNum = Integer.parseInt(sumNumStr); - } - recordInfo.setSumNum(sumNum); - Element recordListElement = rootElement.element("RecordList"); - if (recordListElement == null || sumNum == 0) { - logger.info("鏃犲綍鍍忔暟鎹�"); - eventPublisher.recordEndEventPush(recordInfo); - recordDataCatch.put(device.getDeviceId(), sn, sumNum, new ArrayList<>()); - releaseRequest(device.getDeviceId(), sn); - } else { - Iterator<Element> recordListIterator = recordListElement.elementIterator(); - if (recordListIterator != null) { - List<RecordItem> recordList = new ArrayList<>(); - // 閬嶅巻DeviceList - while (recordListIterator.hasNext()) { - Element itemRecord = recordListIterator.next(); - Element recordElement = itemRecord.element("DeviceID"); - if (recordElement == null) { - logger.info("璁板綍涓虹┖锛屼笅涓�涓�..."); - continue; + String startTimeStr = getText(itemRecord, "StartTime"); + record.setStartTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(startTimeStr)); + + String endTimeStr = getText(itemRecord, "EndTime"); + record.setEndTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(endTimeStr)); + + record.setSecrecy(itemRecord.element("Secrecy") == null ? 0 + : Integer.parseInt(getText(itemRecord, "Secrecy"))); + record.setType(getText(itemRecord, "Type")); + record.setRecorderId(getText(itemRecord, "RecorderID")); + recordList.add(record); + } + recordInfo.setRecordList(recordList); + // 鍙戦�佹秷鎭紝濡傛灉鏄笂绾ф煡璇㈡褰曞儚锛屽垯浼氶�氳繃杩欓噷閫氱煡缁欎笂绾� + eventPublisher.recordEndEventPush(recordInfo); + int count = recordDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, recordList); + logger.info("[鍥芥爣褰曞儚]锛� {}->{}: {}/{}", take.getDevice().getDeviceId(), sn, count, sumNum); + } + + if (recordDataCatch.isComplete(take.getDevice().getDeviceId(), sn)){ + releaseRequest(take.getDevice().getDeviceId(), sn); + } + } } - RecordItem record = new RecordItem(); - record.setDeviceId(getText(itemRecord, "DeviceID")); - record.setName(getText(itemRecord, "Name")); - record.setFilePath(getText(itemRecord, "FilePath")); - record.setFileSize(getText(itemRecord, "FileSize")); - record.setAddress(getText(itemRecord, "Address")); - - String startTimeStr = getText(itemRecord, "StartTime"); - record.setStartTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(startTimeStr)); - - String endTimeStr = getText(itemRecord, "EndTime"); - record.setEndTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(endTimeStr)); - - record.setSecrecy(itemRecord.element("Secrecy") == null ? 0 - : Integer.parseInt(getText(itemRecord, "Secrecy"))); - record.setType(getText(itemRecord, "Type")); - record.setRecorderId(getText(itemRecord, "RecorderID")); - recordList.add(record); + taskQueueHandlerRun = false; + }catch (DocumentException e) { + throw new RuntimeException(e); } - recordInfo.setRecordList(recordList); - // 鍙戦�佹秷鎭紝濡傛灉鏄笂绾ф煡璇㈡褰曞儚锛屽垯浼氶�氳繃杩欓噷閫氱煡缁欎笂绾� - eventPublisher.recordEndEventPush(recordInfo); - int count = recordDataCatch.put(device.getDeviceId(), sn, sumNum, recordList); - logger.info("[鍥芥爣褰曞儚]锛� {}->{}: {}/{}", device.getDeviceId(), sn, count, sumNum); - } - - if (recordDataCatch.isComplete(device.getDeviceId(), sn)){ - releaseRequest(device.getDeviceId(), sn); - } + }); } + } catch (SipException e) { e.printStackTrace(); } catch (InvalidArgumentException e) { e.printStackTrace(); } catch (ParseException e) { - e.printStackTrace(); - } catch (DocumentException e) { e.printStackTrace(); } } -- Gitblit v1.8.0