package com.genersoft.iot.vmp.gb28181.session;
|
|
import com.genersoft.iot.vmp.gb28181.bean.CatalogData;
|
import com.genersoft.iot.vmp.gb28181.bean.Device;
|
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
|
import com.genersoft.iot.vmp.gb28181.bean.SyncStatus;
|
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.scheduling.annotation.Scheduled;
|
import org.springframework.stereotype.Component;
|
|
import java.time.Instant;
|
import java.util.*;
|
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.TimeUnit;
|
|
@Component
|
public class CatalogDataCatch {
|
|
public static Map<String, CatalogData> data = new ConcurrentHashMap<>();
|
|
@Autowired
|
private IVideoManagerStorage storager;
|
|
public void addReady(Device device, int sn ) {
|
CatalogData catalogData = data.get(device.getDeviceId());
|
if (catalogData == null || catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end)) {
|
catalogData = new CatalogData();
|
catalogData.setChannelList(Collections.synchronizedList(new ArrayList<>()));
|
catalogData.setDevice(device);
|
catalogData.setSn(sn);
|
catalogData.setStatus(CatalogData.CatalogDataStatus.ready);
|
catalogData.setLastTime(Instant.now());
|
data.put(device.getDeviceId(), catalogData);
|
}
|
}
|
|
public void put(String deviceId, int sn, int total, Device device, List<DeviceChannel> deviceChannelList) {
|
CatalogData catalogData = data.get(deviceId);
|
if (catalogData == null) {
|
catalogData = new CatalogData();
|
catalogData.setSn(sn);
|
catalogData.setTotal(total);
|
catalogData.setDevice(device);
|
catalogData.setChannelList(deviceChannelList);
|
catalogData.setStatus(CatalogData.CatalogDataStatus.runIng);
|
catalogData.setLastTime(Instant.now());
|
data.put(deviceId, catalogData);
|
}else {
|
// 同一个设备的通道同步请求只考虑一个,其他的直接忽略
|
if (catalogData.getSn() != sn) {
|
return;
|
}
|
catalogData.setTotal(total);
|
catalogData.setDevice(device);
|
catalogData.setStatus(CatalogData.CatalogDataStatus.runIng);
|
catalogData.getChannelList().addAll(deviceChannelList);
|
catalogData.setLastTime(Instant.now());
|
}
|
}
|
|
public List<DeviceChannel> get(String deviceId) {
|
CatalogData catalogData = data.get(deviceId);
|
if (catalogData == null) {
|
return null;
|
}
|
return catalogData.getChannelList();
|
}
|
|
public int getTotal(String deviceId) {
|
CatalogData catalogData = data.get(deviceId);
|
if (catalogData == null) {
|
return 0;
|
}
|
return catalogData.getTotal();
|
}
|
|
public SyncStatus getSyncStatus(String deviceId) {
|
CatalogData catalogData = data.get(deviceId);
|
if (catalogData == null) {
|
return null;
|
}
|
SyncStatus syncStatus = new SyncStatus();
|
syncStatus.setCurrent(catalogData.getChannelList().size());
|
syncStatus.setTotal(catalogData.getTotal());
|
syncStatus.setErrorMsg(catalogData.getErrorMsg());
|
if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end)) {
|
syncStatus.setSyncIng(false);
|
}else {
|
syncStatus.setSyncIng(true);
|
}
|
return syncStatus;
|
}
|
|
public boolean isSyncRunning(String deviceId) {
|
CatalogData catalogData = data.get(deviceId);
|
if (catalogData == null) {
|
return false;
|
}
|
return !catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end);
|
}
|
|
@Scheduled(fixedRate = 5 * 1000) //每5秒执行一次, 发现数据5秒未更新则移除数据并认为数据接收超时
|
private void timerTask(){
|
Set<String> keys = data.keySet();
|
|
Instant instantBefore5S = Instant.now().minusMillis(TimeUnit.SECONDS.toMillis(5));
|
Instant instantBefore30S = Instant.now().minusMillis(TimeUnit.SECONDS.toMillis(30));
|
|
for (String deviceId : keys) {
|
CatalogData catalogData = data.get(deviceId);
|
if ( catalogData.getLastTime().isBefore(instantBefore5S)) {
|
// 超过五秒收不到消息任务超时, 只更新这一部分数据, 收到数据与声明的总数一致,则重置通道数据,数据不全则只对收到的数据做更新操作
|
if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.runIng)) {
|
if (catalogData.getTotal() == catalogData.getChannelList().size()) {
|
storager.resetChannels(catalogData.getDevice().getDeviceId(), catalogData.getChannelList());
|
}else {
|
storager.updateChannels(catalogData.getDevice().getDeviceId(), catalogData.getChannelList());
|
}
|
String errorMsg = "更新成功,共" + catalogData.getTotal() + "条,已更新" + catalogData.getChannelList().size() + "条";
|
catalogData.setErrorMsg(errorMsg);
|
if (catalogData.getTotal() != catalogData.getChannelList().size()) {
|
|
}
|
}else if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.ready)) {
|
String errorMsg = "同步失败,等待回复超时";
|
catalogData.setErrorMsg(errorMsg);
|
}
|
catalogData.setStatus(CatalogData.CatalogDataStatus.end);
|
}
|
if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end) && catalogData.getLastTime().isBefore(instantBefore30S)) { // 超过三十秒,如果标记为end则删除
|
data.remove(deviceId);
|
}
|
}
|
}
|
|
|
public void setChannelSyncEnd(String deviceId, String errorMsg) {
|
CatalogData catalogData = data.get(deviceId);
|
if (catalogData == null) {
|
return;
|
}
|
catalogData.setStatus(CatalogData.CatalogDataStatus.end);
|
catalogData.setErrorMsg(errorMsg);
|
}
|
}
|