From 47abdde3392f2c5fd88d382ae63c4756b97ed4b0 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期五, 03 六月 2022 16:24:11 +0800 Subject: [PATCH] 解决设备上线停止线程导致的报错,优化录像的获取以及通道的更新 --- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java | 141 ++++++++++++++++++++++++++++------------------ 1 files changed, 86 insertions(+), 55 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java index 891b21d..0fe317a 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java @@ -20,6 +20,8 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; @@ -31,6 +33,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; @Component public class CatalogResponseMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler { @@ -38,8 +41,12 @@ private Logger logger = LoggerFactory.getLogger(CatalogResponseMessageHandler.class); private final String cmdType = "Catalog"; + private boolean taskQueueHandlerRun = false; + @Autowired private ResponseMessageHandler responseMessageHandler; + + private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>(); @Autowired private IVideoManagerStorage storager; @@ -63,6 +70,10 @@ @Autowired private IRedisCatchStorage redisCatchStorage; + @Qualifier("taskExecutor") + @Autowired + private ThreadPoolTaskExecutor taskExecutor; + @Override public void afterPropertiesSet() throws Exception { responseMessageHandler.addHandler(cmdType, this); @@ -70,68 +81,88 @@ @Override public void handForDevice(RequestEvent evt, Device device, Element element) { - String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + device.getDeviceId(); - Element rootElement = null; + taskQueue.offer(new HandlerCatchData(evt, device, element)); + // 鍥炲200 OK try { - rootElement = getRootElement(evt, device.getCharset()); - Element deviceListElement = rootElement.element("DeviceList"); - Element sumNumElement = rootElement.element("SumNum"); - Element snElement = rootElement.element("SN"); - if (snElement == null || sumNumElement == null || deviceListElement == null) { - responseAck(evt, Response.BAD_REQUEST, "xml error"); - return; - } - int sumNum = Integer.parseInt(sumNumElement.getText()); - - if (sumNum == 0) { - // 鏁版嵁宸茬粡瀹屾暣鎺ユ敹 - storager.cleanChannelsForDevice(device.getDeviceId()); - catalogDataCatch.setChannelSyncEnd(device.getDeviceId(), null); - }else { - Iterator<Element> deviceListIterator = deviceListElement.elementIterator(); - if (deviceListIterator != null) { - List<DeviceChannel> channelList = new ArrayList<>(); - // 閬嶅巻DeviceList - while (deviceListIterator.hasNext()) { - Element itemDevice = deviceListIterator.next(); - Element channelDeviceElement = itemDevice.element("DeviceID"); - if (channelDeviceElement == null) { - continue; + responseAck(evt, Response.OK); + } catch (SipException e) { + throw new RuntimeException(e); + } catch (InvalidArgumentException e) { + throw new RuntimeException(e); + } catch (ParseException e) { + throw new RuntimeException(e); + } + if (!taskQueueHandlerRun) { + taskQueueHandlerRun = true; + taskExecutor.execute(()-> { + while (!taskQueue.isEmpty()) { + HandlerCatchData take = taskQueue.poll(); + String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + take.getDevice().getDeviceId(); + Element rootElement = null; + try { + rootElement = getRootElement(take.getEvt(), take.getDevice().getCharset()); + Element deviceListElement = rootElement.element("DeviceList"); + Element sumNumElement = rootElement.element("SumNum"); + Element snElement = rootElement.element("SN"); + if (snElement == null || sumNumElement == null || deviceListElement == null) { + responseAck(take.getEvt(), Response.BAD_REQUEST, "xml error"); + return; } - //by brewswang -// if (NumericUtil.isDouble(XmlUtil.getText(itemDevice, "Longitude"))) {//濡傛灉鍖呭惈浣嶇疆淇℃伅锛屽氨鏇存柊涓�涓嬩綅缃� -// processNotifyMobilePosition(evt, itemDevice); -// } - DeviceChannel deviceChannel = XmlUtil.channelContentHander(itemDevice); - deviceChannel.setDeviceId(device.getDeviceId()); + int sumNum = Integer.parseInt(sumNumElement.getText()); - channelList.add(deviceChannel); - } - int sn = Integer.parseInt(snElement.getText()); - catalogDataCatch.put(device.getDeviceId(), sn, sumNum, device, channelList); - logger.info("鏀跺埌鏉ヨ嚜璁惧銆恵}銆戠殑閫氶亾: {}涓紝{}/{}", device.getDeviceId(), channelList.size(), catalogDataCatch.get(device.getDeviceId()) == null ? 0 :catalogDataCatch.get(device.getDeviceId()).size(), sumNum); - if (catalogDataCatch.get(device.getDeviceId()).size() == sumNum) { - // 鏁版嵁宸茬粡瀹屾暣鎺ユ敹 - boolean resetChannelsResult = storager.resetChannels(device.getDeviceId(), catalogDataCatch.get(device.getDeviceId())); - if (!resetChannelsResult) { - String errorMsg = "鎺ユ敹鎴愬姛锛屽啓鍏ュけ璐ワ紝鍏�" + sumNum + "鏉★紝宸叉帴鏀�" + catalogDataCatch.get(device.getDeviceId()).size() + "鏉�"; - catalogDataCatch.setChannelSyncEnd(device.getDeviceId(), errorMsg); + if (sumNum == 0) { + // 鏁版嵁宸茬粡瀹屾暣鎺ユ敹 + storager.cleanChannelsForDevice(take.getDevice().getDeviceId()); + catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), null); }else { - catalogDataCatch.setChannelSyncEnd(device.getDeviceId(), null); + Iterator<Element> deviceListIterator = deviceListElement.elementIterator(); + if (deviceListIterator != null) { + List<DeviceChannel> channelList = new ArrayList<>(); + // 閬嶅巻DeviceList + while (deviceListIterator.hasNext()) { + Element itemDevice = deviceListIterator.next(); + Element channelDeviceElement = itemDevice.element("DeviceID"); + if (channelDeviceElement == null) { + continue; + } + //by brewswang + // if (NumericUtil.isDouble(XmlUtil.getText(itemDevice, "Longitude"))) {//濡傛灉鍖呭惈浣嶇疆淇℃伅锛屽氨鏇存柊涓�涓嬩綅缃� + // processNotifyMobilePosition(evt, itemDevice); + // } + DeviceChannel deviceChannel = XmlUtil.channelContentHander(itemDevice); + deviceChannel.setDeviceId(take.getDevice().getDeviceId()); + + channelList.add(deviceChannel); + } + int sn = Integer.parseInt(snElement.getText()); + catalogDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, take.getDevice(), channelList); + logger.info("鏀跺埌鏉ヨ嚜璁惧銆恵}銆戠殑閫氶亾: {}涓紝{}/{}", take.getDevice().getDeviceId(), channelList.size(), catalogDataCatch.get(take.getDevice().getDeviceId()) == null ? 0 :catalogDataCatch.get(take.getDevice().getDeviceId()).size(), sumNum); + if (catalogDataCatch.get(take.getDevice().getDeviceId()).size() == sumNum) { + // 鏁版嵁宸茬粡瀹屾暣鎺ユ敹 + boolean resetChannelsResult = storager.resetChannels(take.getDevice().getDeviceId(), catalogDataCatch.get(take.getDevice().getDeviceId())); + if (!resetChannelsResult) { + String errorMsg = "鎺ユ敹鎴愬姛锛屽啓鍏ュけ璐ワ紝鍏�" + sumNum + "鏉★紝宸叉帴鏀�" + catalogDataCatch.get(take.getDevice().getDeviceId()).size() + "鏉�"; + catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), errorMsg); + }else { + catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), null); + } + } + } + } + } catch (DocumentException e) { + e.printStackTrace(); + } catch (InvalidArgumentException e) { + e.printStackTrace(); + } catch (ParseException e) { + e.printStackTrace(); + } catch (SipException e) { + e.printStackTrace(); } } - // 鍥炲200 OK - responseAck(evt, Response.OK); - } - } catch (DocumentException e) { - e.printStackTrace(); - } catch (InvalidArgumentException e) { - e.printStackTrace(); - } catch (ParseException e) { - e.printStackTrace(); - } catch (SipException e) { - e.printStackTrace(); + taskQueueHandlerRun = false; + }); + } } -- Gitblit v1.8.0