New file |
| | |
| | | package com.genersoft.iot.vmp.gb28181.session; |
| | | |
| | | import com.genersoft.iot.vmp.gb28181.bean.*; |
| | | import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; |
| | | import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; |
| | | import com.genersoft.iot.vmp.vmanager.bean.WVPResult; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.scheduling.annotation.Scheduled; |
| | | import org.springframework.stereotype.Component; |
| | | |
| | | import java.time.Instant; |
| | | import java.util.*; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.TimeUnit; |
| | | |
| | | /** |
| | | * @author lin |
| | | */ |
| | | @Component |
| | | public class RecordDataCatch { |
| | | |
| | | public static Map<String, RecordInfo> data = new ConcurrentHashMap<>(); |
| | | |
| | | @Autowired |
| | | private DeferredResultHolder deferredResultHolder; |
| | | |
| | | |
| | | public int put(String deviceId, String sn, int sumNum, List<RecordItem> recordItems) { |
| | | String key = deviceId + sn; |
| | | RecordInfo recordInfo = data.get(key); |
| | | if (recordInfo == null) { |
| | | recordInfo = new RecordInfo(); |
| | | recordInfo.setDeviceId(deviceId); |
| | | recordInfo.setSn(sn.trim()); |
| | | recordInfo.setSumNum(sumNum); |
| | | recordInfo.setRecordList(Collections.synchronizedList(new ArrayList<>())); |
| | | recordInfo.setLastTime(Instant.now()); |
| | | recordInfo.getRecordList().addAll(recordItems); |
| | | data.put(key, recordInfo); |
| | | }else { |
| | | // 同一个设备的通道同步请求只考虑一个,其他的直接忽略 |
| | | if (!Objects.equals(sn.trim(), recordInfo.getSn())) { |
| | | return 0; |
| | | } |
| | | recordInfo.getRecordList().addAll(recordItems); |
| | | recordInfo.setLastTime(Instant.now()); |
| | | } |
| | | return recordInfo.getRecordList().size(); |
| | | } |
| | | |
| | | @Scheduled(fixedRate = 5 * 1000) //每5秒执行一次, 发现数据5秒未更新则移除数据并认为数据接收超时 |
| | | private void timerTask(){ |
| | | Set<String> keys = data.keySet(); |
| | | // 获取五秒前的时刻 |
| | | Instant instantBefore5S = Instant.now().minusMillis(TimeUnit.SECONDS.toMillis(5)); |
| | | for (String key : keys) { |
| | | RecordInfo recordInfo = data.get(key); |
| | | // 超过五秒收不到消息任务超时, 只更新这一部分数据 |
| | | if ( recordInfo.getLastTime().isBefore(instantBefore5S)) { |
| | | // 处理录像数据, 返回给前端 |
| | | String msgKey = DeferredResultHolder.CALLBACK_CMD_RECORDINFO + recordInfo.getDeviceId() + recordInfo.getSn(); |
| | | |
| | | WVPResult<RecordInfo> wvpResult = new WVPResult<>(); |
| | | wvpResult.setCode(0); |
| | | wvpResult.setMsg("success"); |
| | | // 对数据进行排序 |
| | | Collections.sort(recordInfo.getRecordList()); |
| | | wvpResult.setData(recordInfo); |
| | | |
| | | RequestMessage msg = new RequestMessage(); |
| | | msg.setKey(msgKey); |
| | | msg.setData(wvpResult); |
| | | deferredResultHolder.invokeAllResult(msg); |
| | | data.remove(key); |
| | | } |
| | | } |
| | | } |
| | | |
| | | public boolean isComplete(String deviceId, String sn) { |
| | | RecordInfo recordInfo = data.get(deviceId + sn); |
| | | return recordInfo != null && recordInfo.getRecordList().size() == recordInfo.getSumNum(); |
| | | } |
| | | |
| | | public RecordInfo getRecordInfo(String deviceId, String sn) { |
| | | return data.get(deviceId + sn); |
| | | } |
| | | |
| | | public void remove(String deviceId, String sn) { |
| | | data.remove(deviceId + sn); |
| | | } |
| | | } |