package com.genersoft.iot.vmp.gb28181.session; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.time.Instant; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; /** * @author lin */ @Component public class RecordDataCatch { public static Map data = new ConcurrentHashMap<>(); @Autowired private DeferredResultHolder deferredResultHolder; public int put(String deviceId, String sn, int sumNum, List recordItems) { String key = deviceId + sn; RecordInfo recordInfo = data.get(key); if (recordInfo == null) { recordInfo = new RecordInfo(); recordInfo.setDeviceId(deviceId); recordInfo.setSn(sn.trim()); recordInfo.setSumNum(sumNum); recordInfo.setRecordList(Collections.synchronizedList(new ArrayList<>())); recordInfo.setLastTime(Instant.now()); recordInfo.getRecordList().addAll(recordItems); data.put(key, recordInfo); }else { // 同一个设备的通道同步请求只考虑一个,其他的直接忽略 if (!Objects.equals(sn.trim(), recordInfo.getSn())) { return 0; } recordInfo.getRecordList().addAll(recordItems); recordInfo.setLastTime(Instant.now()); } return recordInfo.getRecordList().size(); } @Scheduled(fixedRate = 5 * 1000) //每5秒执行一次, 发现数据5秒未更新则移除数据并认为数据接收超时 private void timerTask(){ Set keys = data.keySet(); // 获取五秒前的时刻 Instant instantBefore5S = Instant.now().minusMillis(TimeUnit.SECONDS.toMillis(5)); for (String key : keys) { RecordInfo recordInfo = data.get(key); // 超过五秒收不到消息任务超时, 只更新这一部分数据 if ( recordInfo.getLastTime().isBefore(instantBefore5S)) { // 处理录像数据, 返回给前端 String msgKey = DeferredResultHolder.CALLBACK_CMD_RECORDINFO + recordInfo.getDeviceId() + recordInfo.getSn(); WVPResult wvpResult = new WVPResult<>(); wvpResult.setCode(0); wvpResult.setMsg("success"); // 对数据进行排序 Collections.sort(recordInfo.getRecordList()); wvpResult.setData(recordInfo); RequestMessage msg = new RequestMessage(); msg.setKey(msgKey); msg.setData(wvpResult); deferredResultHolder.invokeAllResult(msg); data.remove(key); } } } public boolean isComplete(String deviceId, String sn) { RecordInfo recordInfo = data.get(deviceId + sn); return recordInfo != null && recordInfo.getRecordList().size() == recordInfo.getSumNum(); } public RecordInfo getRecordInfo(String deviceId, String sn) { return data.get(deviceId + sn); } public void remove(String deviceId, String sn) { data.remove(deviceId + sn); } }