648540858
2022-04-17 0dc1807f621ce9077b58dff40ad4485c9a3d6c40
优化通道同步添加对SN的判断,精简代码
11个文件已修改
227 ■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/gb28181/bean/CatalogData.java 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java 62 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java 49 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java 13 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java 18 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
web_src/src/components/dialog/SyncChannelProgress.vue 45 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/bean/CatalogData.java
@@ -4,6 +4,7 @@
import java.util.List;
public class CatalogData {
    private int sn; // 命令序列号
    private int total;
    private List<DeviceChannel> channelList;
    private Date lastTime;
@@ -15,6 +16,15 @@
    }
    private CatalogDataStatus status;
    public int getSn() {
        return sn;
    }
    public void setSn(int sn) {
        this.sn = sn;
    }
    public int getTotal() {
        return total;
    }
src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java
@@ -54,6 +54,7 @@
    @Autowired
    private SIPCommander cmder;
    private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    @Override
@@ -76,7 +77,7 @@
            if (deviceInStore == null) { //第一次上线
                logger.info("[{}] 首次注册,查询设备信息以及通道信息", device.getDeviceId());
                cmder.deviceInfoQuery(device);
                cmder.catalogQuery(device, null);
                deviceService.sync(device);
            }
            break;
        // 设备主动发送心跳触发的在线事件
src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java
@@ -26,28 +26,35 @@
    @Autowired
    private IVideoManagerStorage storager;
    public void addReady(String key) {
        CatalogData catalogData = data.get(key);
    public void addReady(Device device, int sn ) {
        CatalogData catalogData = data.get(device.getDeviceId());
        if (catalogData == null || catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end)) {
            catalogData = new CatalogData();
            catalogData.setChannelList(new ArrayList<>());
            catalogData.setDevice(device);
            catalogData.setSn(sn);
            catalogData.setStatus(CatalogData.CatalogDataStatus.ready);
            catalogData.setLastTime(new Date(System.currentTimeMillis()));
            data.put(key, catalogData);
            data.put(device.getDeviceId(), catalogData);
        }
    }
    public void put(String key, int total, Device device, List<DeviceChannel> deviceChannelList) {
        CatalogData catalogData = data.get(key);
    public void put(String deviceId, int sn, int total, Device device, List<DeviceChannel> deviceChannelList) {
        CatalogData catalogData = data.get(deviceId);
        if (catalogData == null) {
            catalogData = new CatalogData();
            catalogData.setSn(sn);
            catalogData.setTotal(total);
            catalogData.setDevice(device);
            catalogData.setChannelList(new ArrayList<>());
            catalogData.setStatus(CatalogData.CatalogDataStatus.runIng);
            catalogData.setLastTime(new Date(System.currentTimeMillis()));
            data.put(key, catalogData);
            data.put(deviceId, catalogData);
        }else {
            // 同一个设备的通道同步请求只考虑一个,其他的直接忽略
            if (catalogData.getSn() != sn) {
                return;
            }
            catalogData.setTotal(total);
            catalogData.setDevice(device);
            catalogData.setStatus(CatalogData.CatalogDataStatus.runIng);
@@ -56,30 +63,26 @@
        }
    }
    public List<DeviceChannel> get(String key) {
        CatalogData catalogData = data.get(key);
    public List<DeviceChannel> get(String deviceId) {
        CatalogData catalogData = data.get(deviceId);
        if (catalogData == null) return null;
        return catalogData.getChannelList();
    }
    public int getTotal(String key) {
        CatalogData catalogData = data.get(key);
    public int getTotal(String deviceId) {
        CatalogData catalogData = data.get(deviceId);
        if (catalogData == null) return 0;
        return catalogData.getTotal();
    }
    public SyncStatus getSyncStatus(String key) {
        CatalogData catalogData = data.get(key);
    public SyncStatus getSyncStatus(String deviceId) {
        CatalogData catalogData = data.get(deviceId);
        if (catalogData == null) return null;
        SyncStatus syncStatus = new SyncStatus();
        syncStatus.setCurrent(catalogData.getChannelList().size());
        syncStatus.setTotal(catalogData.getTotal());
        syncStatus.setErrorMsg(catalogData.getErrorMsg());
        return syncStatus;
    }
    public void del(String key) {
        data.remove(key);
    }
    @Scheduled(fixedRate = 5 * 1000)   //每5秒执行一次, 发现数据5秒未更新则移除数据并认为数据接收超时
@@ -92,23 +95,30 @@
        Calendar calendarBefore30S = Calendar.getInstance();
        calendarBefore30S.setTime(new Date());
        calendarBefore30S.set(Calendar.SECOND, calendarBefore30S.get(Calendar.SECOND) - 30);
        for (String key : keys) {
            CatalogData catalogData = data.get(key);
            if (catalogData.getLastTime().before(calendarBefore5S.getTime())) { // 超过五秒收不到消息任务超时, 只更新这一部分数据
                storager.resetChannels(catalogData.getDevice().getDeviceId(), catalogData.getChannelList());
                String errorMsg = "更新成功,共" + catalogData.getTotal() + "条,已更新" + catalogData.getChannelList().size() + "条";
        for (String deviceId : keys) {
            CatalogData catalogData = data.get(deviceId);
            if ( catalogData.getLastTime().before(calendarBefore5S.getTime())) { // 超过五秒收不到消息任务超时, 只更新这一部分数据
                if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.runIng)) {
                    storager.resetChannels(catalogData.getDevice().getDeviceId(), catalogData.getChannelList());
                    if (catalogData.getTotal() != catalogData.getChannelList().size()) {
                        String errorMsg = "更新成功,共" + catalogData.getTotal() + "条,已更新" + catalogData.getChannelList().size() + "条";
                        catalogData.setErrorMsg(errorMsg);
                    }
                }else if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.ready)) {
                    String errorMsg = "同步失败,等待回复超时";
                    catalogData.setErrorMsg(errorMsg);
                }
                catalogData.setStatus(CatalogData.CatalogDataStatus.end);
                catalogData.setErrorMsg(errorMsg);
            }
            if (catalogData.getLastTime().before(calendarBefore30S.getTime())) { // 超过三十秒,如果标记为end则删除
                data.remove(key);
            if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end) && catalogData.getLastTime().before(calendarBefore30S.getTime())) { // 超过三十秒,如果标记为end则删除
                data.remove(deviceId);
            }
        }
    }
    public void setChannelSyncEnd(String key, String errorMsg) {
        CatalogData catalogData = data.get(key);
    public void setChannelSyncEnd(String deviceId, String errorMsg) {
        CatalogData catalogData = data.get(deviceId);
        if (catalogData == null)return;
        catalogData.setStatus(CatalogData.CatalogDataStatus.end);
        catalogData.setErrorMsg(errorMsg);
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java
@@ -250,7 +250,7 @@
     * 
     * @param device 视频设备
     */
    boolean catalogQuery(Device device, SipSubscribe.Event errorEvent);
    boolean catalogQuery(Device device, int sn, SipSubscribe.Event errorEvent);
    
    /**
     * 查询录像信息
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
@@ -1208,14 +1208,14 @@
     * @param device 视频设备
     */ 
    @Override
    public boolean catalogQuery(Device device, SipSubscribe.Event errorEvent) {
    public boolean catalogQuery(Device device, int sn, SipSubscribe.Event errorEvent) {
        try {
            StringBuffer catalogXml = new StringBuffer(200);
            String charset = device.getCharset();
            catalogXml.append("<?xml version=\"1.0\" encoding=\"" + charset + "\"?>\r\n");
            catalogXml.append("<Query>\r\n");
            catalogXml.append("<CmdType>Catalog</CmdType>\r\n");
            catalogXml.append("<SN>" + (int)((Math.random()*9+1)*100000) + "</SN>\r\n");
            catalogXml.append("<SN>" + sn + "</SN>\r\n");
            catalogXml.append("<DeviceID>" + device.getDeviceId() + "</DeviceID>\r\n");
            catalogXml.append("</Query>\r\n");
            
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java
@@ -86,23 +86,17 @@
            rootElement = getRootElement(evt, device.getCharset());
            Element deviceListElement = rootElement.element("DeviceList");
            Element sumNumElement = rootElement.element("SumNum");
            if (sumNumElement == null || deviceListElement == null) {
            Element snElement = rootElement.element("SN");
            if (snElement == null || sumNumElement == null || deviceListElement == null) {
                responseAck(evt, Response.BAD_REQUEST, "xml error");
                return;
            }
            int sumNum = Integer.parseInt(sumNumElement.getText());
            if (sumNum == 0) {
                // 数据已经完整接收
                storager.cleanChannelsForDevice(device.getDeviceId());
                RequestMessage msg = new RequestMessage();
                msg.setKey(key);
                WVPResult<Object> result = new WVPResult<>();
                result.setCode(0);
                result.setData(device);
                msg.setData(result);
                result.setMsg("更新成功,共0条");
                deferredResultHolder.invokeAllResult(msg);
                catalogDataCatch.del(key);
                catalogDataCatch.setChannelSyncEnd(device.getDeviceId(), null);
            }else {
                Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
                if (deviceListIterator != null) {
@@ -123,24 +117,18 @@
                        channelList.add(deviceChannel);
                    }
                    int sn = Integer.parseInt(snElement.getText());
                    logger.info("收到来自设备【{}】的通道: {}个,{}/{}", device.getDeviceId(), channelList.size(), catalogDataCatch.get(key) == null ? 0 :catalogDataCatch.get(key).size(), sumNum);
                    catalogDataCatch.put(key, sumNum, device, channelList);
                    if (catalogDataCatch.get(key).size() == sumNum) {
                    catalogDataCatch.put(device.getDeviceId(), sn, sumNum, device, channelList);
                    if (catalogDataCatch.get(device.getDeviceId()).size() == sumNum) {
                        // 数据已经完整接收
                        boolean resetChannelsResult = storager.resetChannels(device.getDeviceId(), catalogDataCatch.get(key));
                        RequestMessage msg = new RequestMessage();
                        msg.setKey(key);
                        WVPResult<Object> result = new WVPResult<>();
                        result.setCode(0);
                        result.setData(device);
                        if (resetChannelsResult || sumNum ==0) {
                            result.setMsg("更新成功,共" + sumNum + "条,已更新" + catalogDataCatch.get(key).size() + "条");
                        boolean resetChannelsResult = storager.resetChannels(device.getDeviceId(), catalogDataCatch.get(device.getDeviceId()));
                        if (!resetChannelsResult) {
                            String errorMsg = "接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.get(device.getDeviceId()).size() + "条";
                            catalogDataCatch.setChannelSyncEnd(device.getDeviceId(), errorMsg);
                        }else {
                            result.setMsg("接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.get(key).size() + "条");
                            catalogDataCatch.setChannelSyncEnd(device.getDeviceId(), null);
                        }
                        msg.setData(result);
                        deferredResultHolder.invokeAllResult(msg);
                        catalogDataCatch.del(key);
                    }
                }
                // 回复200 OK
@@ -228,21 +216,18 @@
    }
    public SyncStatus getChannelSyncProgress(String deviceId) {
        String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + deviceId;
        if (catalogDataCatch.get(key) == null) {
        if (catalogDataCatch.get(deviceId) == null) {
            return null;
        }else {
            return catalogDataCatch.getSyncStatus(key);
            return catalogDataCatch.getSyncStatus(deviceId);
        }
    }
    public void setChannelSyncReady(String deviceId) {
        String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + deviceId;
        catalogDataCatch.addReady(key);
    public void setChannelSyncReady(Device device, int sn) {
        catalogDataCatch.addReady(device, sn);
    }
    public void setChannelSyncEnd(String deviceId, String errorMsg) {
        String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + deviceId;
        catalogDataCatch.setChannelSyncEnd(key, errorMsg);
        catalogDataCatch.setChannelSyncEnd(deviceId, errorMsg);
    }
}
src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java
@@ -44,15 +44,8 @@
    SyncStatus getChannelSyncStatus(String deviceId);
    /**
     * 设置通道同步状态
     * @param deviceId 设备ID
     * 通道同步
     * @param device
     */
    void setChannelSyncReady(String deviceId);
    /**
     * 设置同步结束
     * @param deviceId 设备ID
     * @param errorMsg 错误信息
     */
    void setChannelSyncEnd(String deviceId, String errorMsg);
    void sync(Device device);
}
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
@@ -100,12 +100,16 @@
    }
    @Override
    public void setChannelSyncReady(String deviceId) {
        catalogResponseMessageHandler.setChannelSyncReady(deviceId);
    }
    @Override
    public void setChannelSyncEnd(String deviceId, String errorMsg) {
        catalogResponseMessageHandler.setChannelSyncEnd(deviceId, errorMsg);
    public void sync(Device device) {
        if (catalogResponseMessageHandler.getChannelSyncProgress(device.getDeviceId()) != null) {
            logger.info("开启同步时发现同步已经存在");
            return;
        }
        int sn = (int)((Math.random()*9+1)*100000);
        catalogResponseMessageHandler.setChannelSyncReady(device, sn);
        sipCommander.catalogQuery(device, sn, event -> {
            String errorMsg = String.format("同步通道失败,错误码: %s, %s", event.statusCode, event.msg);
            catalogResponseMessageHandler.setChannelSyncEnd(device.getDeviceId(), errorMsg);
        });
    }
}
src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java
@@ -238,12 +238,15 @@
    @Override
    public boolean resetChannels(String deviceId, List<DeviceChannel> deviceChannelList) {
        if (deviceChannelList == null) {
            return false;
        }
        TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
        // 数据去重
        List<DeviceChannel> channels = new ArrayList<>();
        StringBuilder stringBuilder = new StringBuilder();
        Map<String, Integer> subContMap = new HashMap<>();
        if (deviceChannelList.size() > 1) {
        if (deviceChannelList != null && deviceChannelList.size() > 1) {
            // 数据去重
            Set<String> gbIdSet = new HashSet<>();
            for (DeviceChannel deviceChannel : deviceChannelList) {
@@ -300,6 +303,7 @@
            dataSourceTransactionManager.commit(transactionStatus);     //手动提交
            return true;
        }catch (Exception e) {
            e.printStackTrace();
            dataSourceTransactionManager.rollback(transactionStatus);
            return false;
        }
@@ -415,10 +419,9 @@
        TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
        boolean result = false;
        try {
            if (platformChannelMapper.delChannelForDeviceId(deviceId) <0  // 删除与国标平台的关联
                    || deviceChannelMapper.cleanChannelsByDeviceId(deviceId) < 0 // 删除他的通道
                    || deviceMapper.del(deviceId) < 0 // 移除设备信息
            ) {
            platformChannelMapper.delChannelForDeviceId(deviceId);
            deviceChannelMapper.cleanChannelsByDeviceId(deviceId);
            if ( deviceMapper.del(deviceId) < 0 ) {
                //事务回滚
                dataSourceTransactionManager.rollback(transactionStatus);
            }
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java
@@ -172,12 +172,8 @@
            wvpResult.setData(syncStatus);
            return wvpResult;
        }
        SyncStatus syncStatusReady = new SyncStatus();
        deviceService.setChannelSyncReady(deviceId);
        cmder.catalogQuery(device, event -> {
            String errorMsg = String.format("同步通道失败,错误码: %s, %s", event.statusCode, event.msg);
            deviceService.setChannelSyncEnd(deviceId, errorMsg);
        });
        deviceService.sync(device);
        WVPResult<SyncStatus> wvpResult = new WVPResult<>();
        wvpResult.setCode(0);
        wvpResult.setMsg("开始同步");
web_src/src/components/dialog/SyncChannelProgress.vue
@@ -61,23 +61,36 @@
          if (!this.syncFlag) {
            this.syncFlag = true;
          }
          if (res.data.data == null) {
            this.syncStatus = "success"
            this.percentage = 100;
            this.msg = '同步成功';
          }else if (res.data.data.total == 0){
            this.msg = `等待同步中`;
            this.timmer = setTimeout(this.getProgress, 300)
          }else if (res.data.data.errorMsg !== null ){
            this.msg = res.data.data.errorMsg;
            this.syncStatus = "exception"
          }else {
            this.total = res.data.data.total;
            this.current = res.data.data.current;
            this.percentage = Math.floor(Number(res.data.data.current)/Number(res.data.data.total)* 10000)/100;
            this.msg = `同步中...[${res.data.data.current}/${res.data.data.total}]`;
            this.timmer = setTimeout(this.getProgress, 300)
          if (res.data.data != null) {
            if (res.data.data.total == 0) {
              if (res.data.data.errorMsg !== null ){
                this.msg = res.data.data.errorMsg;
                this.syncStatus = "exception"
              }else {
                this.msg = `等待同步中`;
                this.timmer = setTimeout(this.getProgress, 300)
              }
            }else  {
              if (res.data.data.total == res.data.data.current) {
                this.syncStatus = "success"
                this.percentage = 100;
                this.msg = '同步成功';
              }else {
                if (res.data.data.errorMsg !== null ){
                  this.msg = res.data.data.errorMsg;
                  this.syncStatus = "exception"
                }else {
                  this.total = res.data.data.total;
                  this.current = res.data.data.current;
                  this.percentage = Math.floor(Number(res.data.data.current)/Number(res.data.data.total)* 10000)/100;
                  this.msg = `同步中...[${res.data.data.current}/${res.data.data.total}]`;
                  this.timmer = setTimeout(this.getProgress, 300)
                }
              }
            }
          }
        }else {
          if (this.syncFlag) {
            this.syncStatus = "success"