| package com.genersoft.iot.vmp.gb28181.session; | 
|   | 
| import com.genersoft.iot.vmp.gb28181.bean.*; | 
| import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; | 
| import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; | 
| import com.genersoft.iot.vmp.vmanager.bean.WVPResult; | 
| import org.springframework.beans.factory.annotation.Autowired; | 
| import org.springframework.scheduling.annotation.Scheduled; | 
| import org.springframework.stereotype.Component; | 
|   | 
| import java.time.Instant; | 
| import java.util.*; | 
| import java.util.concurrent.ConcurrentHashMap; | 
| import java.util.concurrent.TimeUnit; | 
|   | 
| /** | 
|  * @author lin | 
|  */ | 
| @Component | 
| public class RecordDataCatch { | 
|   | 
|     public static Map<String, RecordInfo> data = new ConcurrentHashMap<>(); | 
|   | 
|     @Autowired | 
|     private DeferredResultHolder deferredResultHolder; | 
|   | 
|   | 
|     public int put(String deviceId, String sn, int sumNum, List<RecordItem> recordItems) { | 
|         String key = deviceId + sn; | 
|         RecordInfo recordInfo = data.get(key); | 
|         if (recordInfo == null) { | 
|             recordInfo = new RecordInfo(); | 
|             recordInfo.setDeviceId(deviceId); | 
|             recordInfo.setSn(sn.trim()); | 
|             recordInfo.setSumNum(sumNum); | 
|             recordInfo.setRecordList(Collections.synchronizedList(new ArrayList<>())); | 
|             recordInfo.setLastTime(Instant.now()); | 
|             recordInfo.getRecordList().addAll(recordItems); | 
|             data.put(key, recordInfo); | 
|         }else { | 
|             // 同一个设备的通道同步请求只考虑一个,其他的直接忽略 | 
|             if (!Objects.equals(sn.trim(), recordInfo.getSn())) { | 
|                 return 0; | 
|             } | 
|             recordInfo.getRecordList().addAll(recordItems); | 
|             recordInfo.setLastTime(Instant.now()); | 
|         } | 
|         return recordInfo.getRecordList().size(); | 
|     } | 
|   | 
|     @Scheduled(fixedRate = 5 * 1000)   //每5秒执行一次, 发现数据5秒未更新则移除数据并认为数据接收超时 | 
|     private void timerTask(){ | 
|         Set<String> keys = data.keySet(); | 
|         // 获取五秒前的时刻 | 
|         Instant instantBefore5S = Instant.now().minusMillis(TimeUnit.SECONDS.toMillis(5)); | 
|         for (String key : keys) { | 
|             RecordInfo recordInfo = data.get(key); | 
|             // 超过五秒收不到消息任务超时, 只更新这一部分数据 | 
|             if ( recordInfo.getLastTime().isBefore(instantBefore5S)) { | 
|                 // 处理录像数据, 返回给前端 | 
|                 String msgKey = DeferredResultHolder.CALLBACK_CMD_RECORDINFO + recordInfo.getDeviceId() + recordInfo.getSn(); | 
|   | 
|                 WVPResult<RecordInfo> wvpResult = new WVPResult<>(); | 
|                 wvpResult.setCode(0); | 
|                 wvpResult.setMsg("success"); | 
|                 // 对数据进行排序 | 
|                 Collections.sort(recordInfo.getRecordList()); | 
|                 wvpResult.setData(recordInfo); | 
|   | 
|                 RequestMessage msg = new RequestMessage(); | 
|                 msg.setKey(msgKey); | 
|                 msg.setData(wvpResult); | 
|                 deferredResultHolder.invokeAllResult(msg); | 
|                 data.remove(key); | 
|             } | 
|         } | 
|     } | 
|   | 
|     public boolean isComplete(String deviceId, String sn) { | 
|         RecordInfo recordInfo = data.get(deviceId + sn); | 
|         return recordInfo != null && recordInfo.getRecordList().size() == recordInfo.getSumNum(); | 
|     } | 
|   | 
|     public RecordInfo getRecordInfo(String deviceId, String sn) { | 
|         return data.get(deviceId + sn); | 
|     } | 
|   | 
|     public void remove(String deviceId, String sn) { | 
|         data.remove(deviceId + sn); | 
|     } | 
| } |