From c2e26291ceb940892b266852a671deda8f2cd952 Mon Sep 17 00:00:00 2001 From: gaofw189 <gaofw189@chinatelecom.cn> Date: 星期二, 07 二月 2023 09:28:55 +0800 Subject: [PATCH] 修复WVP作为下级平台接受recordinfo指令上报上级平台的问题 --- src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java | 37 ++++++++++++++++++++++++++++++------- src/main/java/com/genersoft/iot/vmp/gb28181/session/RecordDataCatch.java | 7 ++++++- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java | 8 ++++---- src/main/java/com/genersoft/iot/vmp/gb28181/bean/RecordInfo.java | 5 +++++ 4 files changed, 45 insertions(+), 12 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/RecordInfo.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/RecordInfo.java index 2121db7..04796eb 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/RecordInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/RecordInfo.java @@ -1,5 +1,7 @@ package com.genersoft.iot.vmp.gb28181.bean; +import lombok.Data; + import java.time.Instant; import java.util.List; @@ -8,6 +10,7 @@ * @author: swwheihei * @date: 2020骞�5鏈�8鏃� 涓嬪崍2:05:56 */ +@Data public class RecordInfo { private String deviceId; @@ -20,6 +23,8 @@ private int sumNum; + private int count; + private Instant lastTime; private List<RecordItem> recordList; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java index 92a4351..cb46823 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java @@ -1,8 +1,10 @@ package com.genersoft.iot.vmp.gb28181.event.record; import com.genersoft.iot.vmp.gb28181.bean.RecordInfo; +import com.genersoft.iot.vmp.utils.redis.RedisUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationListener; import org.springframework.stereotype.Component; @@ -20,25 +22,46 @@ private final static Logger logger = LoggerFactory.getLogger(RecordEndEventListener.class); + private Map<String, RecordEndEventHandler> handlerMap = new ConcurrentHashMap<>(); public interface RecordEndEventHandler{ void handler(RecordInfo recordInfo); } - private Map<String, RecordEndEventHandler> handlerMap = new ConcurrentHashMap<>(); - @Override public void onApplicationEvent(RecordEndEvent event) { - logger.info("褰曞儚鏌ヨ瀹屾垚浜嬩欢瑙﹀彂锛宒eviceId锛歿}, channelId: {}, 褰曞儚鏁伴噺{}鏉�", event.getRecordInfo().getDeviceId(), - event.getRecordInfo().getChannelId(), event.getRecordInfo().getSumNum() ); + String deviceId = event.getRecordInfo().getDeviceId(); + String channelId = event.getRecordInfo().getChannelId(); + int count = event.getRecordInfo().getCount(); + int sumNum = event.getRecordInfo().getSumNum(); + logger.info("褰曞儚鏌ヨ瀹屾垚浜嬩欢瑙﹀彂锛宒eviceId锛歿}, channelId: {}, 褰曞儚鏁伴噺{}/{}鏉�", event.getRecordInfo().getDeviceId(), + event.getRecordInfo().getChannelId(), count,sumNum); if (handlerMap.size() > 0) { - for (RecordEndEventHandler recordEndEventHandler : handlerMap.values()) { - recordEndEventHandler.handler(event.getRecordInfo()); + RecordEndEventHandler handler = handlerMap.get(deviceId + channelId); + if (handler !=null){ + handler.handler(event.getRecordInfo()); + if (count ==sumNum){ + handlerMap.remove(deviceId + channelId); + } } } - handlerMap.clear(); } + /** + * 娣诲姞 + * @param device + * @param channelId + * @param recordEndEventHandler + */ public void addEndEventHandler(String device, String channelId, RecordEndEventHandler recordEndEventHandler) { handlerMap.put(device + channelId, recordEndEventHandler); } + /** + * 娣诲姞 + * @param device + * @param channelId + */ + public void delEndEventHandler(String device, String channelId) { + handlerMap.remove(device + channelId); + } + } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/RecordDataCatch.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/RecordDataCatch.java index 1d2b34b..3f24dbe 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/RecordDataCatch.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/RecordDataCatch.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.gb28181.session; import com.genersoft.iot.vmp.gb28181.bean.*; +import com.genersoft.iot.vmp.gb28181.event.record.RecordEndEventListener; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; @@ -23,14 +24,17 @@ @Autowired private DeferredResultHolder deferredResultHolder; + @Autowired + private RecordEndEventListener recordEndEventListener; - public int put(String deviceId, String sn, int sumNum, List<RecordItem> recordItems) { + public int put(String deviceId,String channelId, String sn, int sumNum, List<RecordItem> recordItems) { String key = deviceId + sn; RecordInfo recordInfo = data.get(key); if (recordInfo == null) { recordInfo = new RecordInfo(); recordInfo.setDeviceId(deviceId); + recordInfo.setChannelId(channelId); recordInfo.setSn(sn.trim()); recordInfo.setSumNum(sumNum); recordInfo.setRecordList(Collections.synchronizedList(new ArrayList<>())); @@ -67,6 +71,7 @@ msg.setKey(msgKey); msg.setData(recordInfo); deferredResultHolder.invokeAllResult(msg); + recordEndEventListener.delEndEventHandler(recordInfo.getDeviceId(),recordInfo.getChannelId()); data.remove(key); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java index 11d239e..8b4ae2e 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java @@ -102,8 +102,9 @@ Element recordListElement = rootElementForCharset.element("RecordList"); if (recordListElement == null || sumNum == 0) { logger.info("鏃犲綍鍍忔暟鎹�"); + int count = recordDataCatch.put(take.getDevice().getDeviceId(),channelId, sn, sumNum, new ArrayList<>()); + recordInfo.setCount(count); eventPublisher.recordEndEventPush(recordInfo); - recordDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, new ArrayList<>()); releaseRequest(take.getDevice().getDeviceId(), sn); } else { Iterator<Element> recordListIterator = recordListElement.elementIterator(); @@ -137,12 +138,11 @@ recordList.add(record); } recordInfo.setRecordList(recordList); + int count = recordDataCatch.put(take.getDevice().getDeviceId(),channelId, sn, sumNum, recordList);recordInfo.setCount(count); + logger.info("[鍥芥爣褰曞儚]锛� {}->{}: {}/{}", take.getDevice().getDeviceId(), sn, count, sumNum); // 鍙戦�佹秷鎭紝濡傛灉鏄笂绾ф煡璇㈡褰曞儚锛屽垯浼氶�氳繃杩欓噷閫氱煡缁欎笂绾� eventPublisher.recordEndEventPush(recordInfo); - int count = recordDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, recordList); - logger.info("[鍥芥爣褰曞儚]锛� {}->{}: {}/{}", take.getDevice().getDeviceId(), sn, count, sumNum); } - if (recordDataCatch.isComplete(take.getDevice().getDeviceId(), sn)){ releaseRequest(take.getDevice().getDeviceId(), sn); } -- Gitblit v1.8.0