package com.genersoft.iot.vmp.gb28181.session; 
 | 
  
 | 
import com.genersoft.iot.vmp.gb28181.bean.CatalogData; 
 | 
import com.genersoft.iot.vmp.gb28181.bean.Device; 
 | 
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; 
 | 
import com.genersoft.iot.vmp.gb28181.bean.SyncStatus; 
 | 
import com.genersoft.iot.vmp.storager.IVideoManagerStorage; 
 | 
import org.springframework.beans.factory.annotation.Autowired; 
 | 
import org.springframework.scheduling.annotation.Scheduled; 
 | 
import org.springframework.stereotype.Component; 
 | 
  
 | 
import java.time.Instant; 
 | 
import java.util.*; 
 | 
import java.util.concurrent.ConcurrentHashMap; 
 | 
import java.util.concurrent.TimeUnit; 
 | 
  
 | 
@Component 
 | 
public class CatalogDataCatch { 
 | 
  
 | 
    public static Map<String, CatalogData> data = new ConcurrentHashMap<>(); 
 | 
  
 | 
    @Autowired 
 | 
    private IVideoManagerStorage storager; 
 | 
  
 | 
    public void addReady(Device device, int sn ) { 
 | 
        CatalogData catalogData = data.get(device.getDeviceId()); 
 | 
        if (catalogData == null || catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end)) { 
 | 
            catalogData = new CatalogData(); 
 | 
            catalogData.setChannelList(Collections.synchronizedList(new ArrayList<>())); 
 | 
            catalogData.setDevice(device); 
 | 
            catalogData.setSn(sn); 
 | 
            catalogData.setStatus(CatalogData.CatalogDataStatus.ready); 
 | 
            catalogData.setLastTime(Instant.now()); 
 | 
            data.put(device.getDeviceId(), catalogData); 
 | 
        } 
 | 
    } 
 | 
  
 | 
    public void put(String deviceId, int sn, int total, Device device, List<DeviceChannel> deviceChannelList) { 
 | 
        CatalogData catalogData = data.get(deviceId); 
 | 
        if (catalogData == null) { 
 | 
            catalogData = new CatalogData(); 
 | 
            catalogData.setSn(sn); 
 | 
            catalogData.setTotal(total); 
 | 
            catalogData.setDevice(device); 
 | 
            catalogData.setChannelList(deviceChannelList); 
 | 
            catalogData.setStatus(CatalogData.CatalogDataStatus.runIng); 
 | 
            catalogData.setLastTime(Instant.now()); 
 | 
            data.put(deviceId, catalogData); 
 | 
        }else { 
 | 
            // 同一个设备的通道同步请求只考虑一个,其他的直接忽略 
 | 
            if (catalogData.getSn() != sn) { 
 | 
                return; 
 | 
            } 
 | 
            catalogData.setTotal(total); 
 | 
            catalogData.setDevice(device); 
 | 
            catalogData.setStatus(CatalogData.CatalogDataStatus.runIng); 
 | 
            catalogData.getChannelList().addAll(deviceChannelList); 
 | 
            catalogData.setLastTime(Instant.now()); 
 | 
        } 
 | 
    } 
 | 
  
 | 
    public List<DeviceChannel> get(String deviceId) { 
 | 
        CatalogData catalogData = data.get(deviceId); 
 | 
        if (catalogData == null) { 
 | 
            return null; 
 | 
        } 
 | 
        return catalogData.getChannelList(); 
 | 
    } 
 | 
  
 | 
    public int getTotal(String deviceId) { 
 | 
        CatalogData catalogData = data.get(deviceId); 
 | 
        if (catalogData == null) { 
 | 
            return 0; 
 | 
        } 
 | 
        return catalogData.getTotal(); 
 | 
    } 
 | 
  
 | 
    public SyncStatus getSyncStatus(String deviceId) { 
 | 
        CatalogData catalogData = data.get(deviceId); 
 | 
        if (catalogData == null) { 
 | 
            return null; 
 | 
        } 
 | 
        SyncStatus syncStatus = new SyncStatus(); 
 | 
        syncStatus.setCurrent(catalogData.getChannelList().size()); 
 | 
        syncStatus.setTotal(catalogData.getTotal()); 
 | 
        syncStatus.setErrorMsg(catalogData.getErrorMsg()); 
 | 
        if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end)) { 
 | 
            syncStatus.setSyncIng(false); 
 | 
        }else { 
 | 
            syncStatus.setSyncIng(true); 
 | 
        } 
 | 
        return syncStatus; 
 | 
    } 
 | 
  
 | 
    public boolean isSyncRunning(String deviceId) { 
 | 
        CatalogData catalogData = data.get(deviceId); 
 | 
        if (catalogData == null) { 
 | 
            return false; 
 | 
        } 
 | 
        return !catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end); 
 | 
    } 
 | 
  
 | 
    @Scheduled(fixedRate = 5 * 1000)   //每5秒执行一次, 发现数据5秒未更新则移除数据并认为数据接收超时 
 | 
    private void timerTask(){ 
 | 
        Set<String> keys = data.keySet(); 
 | 
  
 | 
        Instant instantBefore5S = Instant.now().minusMillis(TimeUnit.SECONDS.toMillis(5)); 
 | 
        Instant instantBefore30S = Instant.now().minusMillis(TimeUnit.SECONDS.toMillis(30)); 
 | 
  
 | 
        for (String deviceId : keys) { 
 | 
            CatalogData catalogData = data.get(deviceId); 
 | 
            if ( catalogData.getLastTime().isBefore(instantBefore5S)) { 
 | 
                // 超过五秒收不到消息任务超时, 只更新这一部分数据, 收到数据与声明的总数一致,则重置通道数据,数据不全则只对收到的数据做更新操作 
 | 
                if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.runIng)) { 
 | 
                    if (catalogData.getTotal() == catalogData.getChannelList().size()) { 
 | 
                        storager.resetChannels(catalogData.getDevice().getDeviceId(), catalogData.getChannelList()); 
 | 
                    }else { 
 | 
                        storager.updateChannels(catalogData.getDevice().getDeviceId(), catalogData.getChannelList()); 
 | 
                    } 
 | 
                    String errorMsg = "更新成功,共" + catalogData.getTotal() + "条,已更新" + catalogData.getChannelList().size() + "条"; 
 | 
                    catalogData.setErrorMsg(errorMsg); 
 | 
                    if (catalogData.getTotal() != catalogData.getChannelList().size()) { 
 | 
  
 | 
                    } 
 | 
                }else if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.ready)) { 
 | 
                    String errorMsg = "同步失败,等待回复超时"; 
 | 
                    catalogData.setErrorMsg(errorMsg); 
 | 
                } 
 | 
                catalogData.setStatus(CatalogData.CatalogDataStatus.end); 
 | 
            } 
 | 
            if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end) && catalogData.getLastTime().isBefore(instantBefore30S)) { // 超过三十秒,如果标记为end则删除 
 | 
                data.remove(deviceId); 
 | 
            } 
 | 
        } 
 | 
    } 
 | 
  
 | 
  
 | 
    public void setChannelSyncEnd(String deviceId, String errorMsg) { 
 | 
        CatalogData catalogData = data.get(deviceId); 
 | 
        if (catalogData == null) { 
 | 
            return; 
 | 
        } 
 | 
        catalogData.setStatus(CatalogData.CatalogDataStatus.end); 
 | 
        catalogData.setErrorMsg(errorMsg); 
 | 
    } 
 | 
} 
 |