src/main/java/com/genersoft/iot/vmp/gb28181/bean/CatalogData.java
@@ -4,6 +4,7 @@ import java.util.List; public class CatalogData { private int sn; // 命令序列号 private int total; private List<DeviceChannel> channelList; private Date lastTime; @@ -15,6 +16,15 @@ } private CatalogDataStatus status; public int getSn() { return sn; } public void setSn(int sn) { this.sn = sn; } public int getTotal() { return total; } src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java
@@ -54,6 +54,7 @@ @Autowired private SIPCommander cmder; private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @Override @@ -76,7 +77,7 @@ if (deviceInStore == null) { //第一次上线 logger.info("[{}] 首次注册,查询设备信息以及通道信息", device.getDeviceId()); cmder.deviceInfoQuery(device); cmder.catalogQuery(device, null); deviceService.sync(device); } break; // 设备主动发送心跳触发的在线事件 src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java
@@ -26,28 +26,35 @@ @Autowired private IVideoManagerStorage storager; public void addReady(String key) { CatalogData catalogData = data.get(key); public void addReady(Device device, int sn ) { CatalogData catalogData = data.get(device.getDeviceId()); if (catalogData == null || catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end)) { catalogData = new CatalogData(); catalogData.setChannelList(new ArrayList<>()); catalogData.setDevice(device); catalogData.setSn(sn); catalogData.setStatus(CatalogData.CatalogDataStatus.ready); catalogData.setLastTime(new Date(System.currentTimeMillis())); data.put(key, catalogData); data.put(device.getDeviceId(), catalogData); } } public void put(String key, int total, Device device, List<DeviceChannel> deviceChannelList) { CatalogData catalogData = data.get(key); public void put(String deviceId, int sn, int total, Device device, List<DeviceChannel> deviceChannelList) { CatalogData catalogData = data.get(deviceId); if (catalogData == null) { catalogData = new CatalogData(); catalogData.setSn(sn); catalogData.setTotal(total); catalogData.setDevice(device); catalogData.setChannelList(new ArrayList<>()); catalogData.setStatus(CatalogData.CatalogDataStatus.runIng); catalogData.setLastTime(new Date(System.currentTimeMillis())); data.put(key, catalogData); data.put(deviceId, catalogData); }else { // 同一个设备的通道同步请求只考虑一个,其他的直接忽略 if (catalogData.getSn() != sn) { return; } catalogData.setTotal(total); catalogData.setDevice(device); catalogData.setStatus(CatalogData.CatalogDataStatus.runIng); @@ -56,30 +63,26 @@ } } public List<DeviceChannel> get(String key) { CatalogData catalogData = data.get(key); public List<DeviceChannel> get(String deviceId) { CatalogData catalogData = data.get(deviceId); if (catalogData == null) return null; return catalogData.getChannelList(); } public int getTotal(String key) { CatalogData catalogData = data.get(key); public int getTotal(String deviceId) { CatalogData catalogData = data.get(deviceId); if (catalogData == null) return 0; return catalogData.getTotal(); } public SyncStatus getSyncStatus(String key) { CatalogData catalogData = data.get(key); public SyncStatus getSyncStatus(String deviceId) { CatalogData catalogData = data.get(deviceId); if (catalogData == null) return null; SyncStatus syncStatus = new SyncStatus(); syncStatus.setCurrent(catalogData.getChannelList().size()); syncStatus.setTotal(catalogData.getTotal()); syncStatus.setErrorMsg(catalogData.getErrorMsg()); return syncStatus; } public void del(String key) { data.remove(key); } @Scheduled(fixedRate = 5 * 1000) //每5秒执行一次, 发现数据5秒未更新则移除数据并认为数据接收超时 @@ -92,23 +95,30 @@ Calendar calendarBefore30S = Calendar.getInstance(); calendarBefore30S.setTime(new Date()); calendarBefore30S.set(Calendar.SECOND, calendarBefore30S.get(Calendar.SECOND) - 30); for (String key : keys) { CatalogData catalogData = data.get(key); if (catalogData.getLastTime().before(calendarBefore5S.getTime())) { // 超过五秒收不到消息任务超时, 只更新这一部分数据 storager.resetChannels(catalogData.getDevice().getDeviceId(), catalogData.getChannelList()); String errorMsg = "更新成功,共" + catalogData.getTotal() + "条,已更新" + catalogData.getChannelList().size() + "条"; for (String deviceId : keys) { CatalogData catalogData = data.get(deviceId); if ( catalogData.getLastTime().before(calendarBefore5S.getTime())) { // 超过五秒收不到消息任务超时, 只更新这一部分数据 if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.runIng)) { storager.resetChannels(catalogData.getDevice().getDeviceId(), catalogData.getChannelList()); if (catalogData.getTotal() != catalogData.getChannelList().size()) { String errorMsg = "更新成功,共" + catalogData.getTotal() + "条,已更新" + catalogData.getChannelList().size() + "条"; catalogData.setErrorMsg(errorMsg); } }else if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.ready)) { String errorMsg = "同步失败,等待回复超时"; catalogData.setErrorMsg(errorMsg); } catalogData.setStatus(CatalogData.CatalogDataStatus.end); catalogData.setErrorMsg(errorMsg); } if (catalogData.getLastTime().before(calendarBefore30S.getTime())) { // 超过三十秒,如果标记为end则删除 data.remove(key); if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end) && catalogData.getLastTime().before(calendarBefore30S.getTime())) { // 超过三十秒,如果标记为end则删除 data.remove(deviceId); } } } public void setChannelSyncEnd(String key, String errorMsg) { CatalogData catalogData = data.get(key); public void setChannelSyncEnd(String deviceId, String errorMsg) { CatalogData catalogData = data.get(deviceId); if (catalogData == null)return; catalogData.setStatus(CatalogData.CatalogDataStatus.end); catalogData.setErrorMsg(errorMsg); src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java
@@ -250,7 +250,7 @@ * * @param device 视频设备 */ boolean catalogQuery(Device device, SipSubscribe.Event errorEvent); boolean catalogQuery(Device device, int sn, SipSubscribe.Event errorEvent); /** * 查询录像信息 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
@@ -1208,14 +1208,14 @@ * @param device 视频设备 */ @Override public boolean catalogQuery(Device device, SipSubscribe.Event errorEvent) { public boolean catalogQuery(Device device, int sn, SipSubscribe.Event errorEvent) { try { StringBuffer catalogXml = new StringBuffer(200); String charset = device.getCharset(); catalogXml.append("<?xml version=\"1.0\" encoding=\"" + charset + "\"?>\r\n"); catalogXml.append("<Query>\r\n"); catalogXml.append("<CmdType>Catalog</CmdType>\r\n"); catalogXml.append("<SN>" + (int)((Math.random()*9+1)*100000) + "</SN>\r\n"); catalogXml.append("<SN>" + sn + "</SN>\r\n"); catalogXml.append("<DeviceID>" + device.getDeviceId() + "</DeviceID>\r\n"); catalogXml.append("</Query>\r\n"); src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java
@@ -86,23 +86,17 @@ rootElement = getRootElement(evt, device.getCharset()); Element deviceListElement = rootElement.element("DeviceList"); Element sumNumElement = rootElement.element("SumNum"); if (sumNumElement == null || deviceListElement == null) { Element snElement = rootElement.element("SN"); if (snElement == null || sumNumElement == null || deviceListElement == null) { responseAck(evt, Response.BAD_REQUEST, "xml error"); return; } int sumNum = Integer.parseInt(sumNumElement.getText()); if (sumNum == 0) { // 数据已经完整接收 storager.cleanChannelsForDevice(device.getDeviceId()); RequestMessage msg = new RequestMessage(); msg.setKey(key); WVPResult<Object> result = new WVPResult<>(); result.setCode(0); result.setData(device); msg.setData(result); result.setMsg("更新成功,共0条"); deferredResultHolder.invokeAllResult(msg); catalogDataCatch.del(key); catalogDataCatch.setChannelSyncEnd(device.getDeviceId(), null); }else { Iterator<Element> deviceListIterator = deviceListElement.elementIterator(); if (deviceListIterator != null) { @@ -123,24 +117,18 @@ channelList.add(deviceChannel); } int sn = Integer.parseInt(snElement.getText()); logger.info("收到来自设备【{}】的通道: {}个,{}/{}", device.getDeviceId(), channelList.size(), catalogDataCatch.get(key) == null ? 0 :catalogDataCatch.get(key).size(), sumNum); catalogDataCatch.put(key, sumNum, device, channelList); if (catalogDataCatch.get(key).size() == sumNum) { catalogDataCatch.put(device.getDeviceId(), sn, sumNum, device, channelList); if (catalogDataCatch.get(device.getDeviceId()).size() == sumNum) { // 数据已经完整接收 boolean resetChannelsResult = storager.resetChannels(device.getDeviceId(), catalogDataCatch.get(key)); RequestMessage msg = new RequestMessage(); msg.setKey(key); WVPResult<Object> result = new WVPResult<>(); result.setCode(0); result.setData(device); if (resetChannelsResult || sumNum ==0) { result.setMsg("更新成功,共" + sumNum + "条,已更新" + catalogDataCatch.get(key).size() + "条"); boolean resetChannelsResult = storager.resetChannels(device.getDeviceId(), catalogDataCatch.get(device.getDeviceId())); if (!resetChannelsResult) { String errorMsg = "接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.get(device.getDeviceId()).size() + "条"; catalogDataCatch.setChannelSyncEnd(device.getDeviceId(), errorMsg); }else { result.setMsg("接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.get(key).size() + "条"); catalogDataCatch.setChannelSyncEnd(device.getDeviceId(), null); } msg.setData(result); deferredResultHolder.invokeAllResult(msg); catalogDataCatch.del(key); } } // 回复200 OK @@ -228,21 +216,18 @@ } public SyncStatus getChannelSyncProgress(String deviceId) { String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + deviceId; if (catalogDataCatch.get(key) == null) { if (catalogDataCatch.get(deviceId) == null) { return null; }else { return catalogDataCatch.getSyncStatus(key); return catalogDataCatch.getSyncStatus(deviceId); } } public void setChannelSyncReady(String deviceId) { String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + deviceId; catalogDataCatch.addReady(key); public void setChannelSyncReady(Device device, int sn) { catalogDataCatch.addReady(device, sn); } public void setChannelSyncEnd(String deviceId, String errorMsg) { String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + deviceId; catalogDataCatch.setChannelSyncEnd(key, errorMsg); catalogDataCatch.setChannelSyncEnd(deviceId, errorMsg); } } src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java
@@ -44,15 +44,8 @@ SyncStatus getChannelSyncStatus(String deviceId); /** * 设置通道同步状态 * @param deviceId 设备ID * 通道同步 * @param device */ void setChannelSyncReady(String deviceId); /** * 设置同步结束 * @param deviceId 设备ID * @param errorMsg 错误信息 */ void setChannelSyncEnd(String deviceId, String errorMsg); void sync(Device device); } src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
@@ -100,12 +100,16 @@ } @Override public void setChannelSyncReady(String deviceId) { catalogResponseMessageHandler.setChannelSyncReady(deviceId); } @Override public void setChannelSyncEnd(String deviceId, String errorMsg) { catalogResponseMessageHandler.setChannelSyncEnd(deviceId, errorMsg); public void sync(Device device) { if (catalogResponseMessageHandler.getChannelSyncProgress(device.getDeviceId()) != null) { logger.info("开启同步时发现同步已经存在"); return; } int sn = (int)((Math.random()*9+1)*100000); catalogResponseMessageHandler.setChannelSyncReady(device, sn); sipCommander.catalogQuery(device, sn, event -> { String errorMsg = String.format("同步通道失败,错误码: %s, %s", event.statusCode, event.msg); catalogResponseMessageHandler.setChannelSyncEnd(device.getDeviceId(), errorMsg); }); } } src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java
@@ -238,12 +238,15 @@ @Override public boolean resetChannels(String deviceId, List<DeviceChannel> deviceChannelList) { if (deviceChannelList == null) { return false; } TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); // 数据去重 List<DeviceChannel> channels = new ArrayList<>(); StringBuilder stringBuilder = new StringBuilder(); Map<String, Integer> subContMap = new HashMap<>(); if (deviceChannelList.size() > 1) { if (deviceChannelList != null && deviceChannelList.size() > 1) { // 数据去重 Set<String> gbIdSet = new HashSet<>(); for (DeviceChannel deviceChannel : deviceChannelList) { @@ -300,6 +303,7 @@ dataSourceTransactionManager.commit(transactionStatus); //手动提交 return true; }catch (Exception e) { e.printStackTrace(); dataSourceTransactionManager.rollback(transactionStatus); return false; } @@ -415,10 +419,9 @@ TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); boolean result = false; try { if (platformChannelMapper.delChannelForDeviceId(deviceId) <0 // 删除与国标平台的关联 || deviceChannelMapper.cleanChannelsByDeviceId(deviceId) < 0 // 删除他的通道 || deviceMapper.del(deviceId) < 0 // 移除设备信息 ) { platformChannelMapper.delChannelForDeviceId(deviceId); deviceChannelMapper.cleanChannelsByDeviceId(deviceId); if ( deviceMapper.del(deviceId) < 0 ) { //事务回滚 dataSourceTransactionManager.rollback(transactionStatus); } src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java
@@ -172,12 +172,8 @@ wvpResult.setData(syncStatus); return wvpResult; } SyncStatus syncStatusReady = new SyncStatus(); deviceService.setChannelSyncReady(deviceId); cmder.catalogQuery(device, event -> { String errorMsg = String.format("同步通道失败,错误码: %s, %s", event.statusCode, event.msg); deviceService.setChannelSyncEnd(deviceId, errorMsg); }); deviceService.sync(device); WVPResult<SyncStatus> wvpResult = new WVPResult<>(); wvpResult.setCode(0); wvpResult.setMsg("开始同步"); web_src/src/components/dialog/SyncChannelProgress.vue
@@ -61,23 +61,36 @@ if (!this.syncFlag) { this.syncFlag = true; } if (res.data.data == null) { this.syncStatus = "success" this.percentage = 100; this.msg = '同步成功'; }else if (res.data.data.total == 0){ this.msg = `等待同步中`; this.timmer = setTimeout(this.getProgress, 300) }else if (res.data.data.errorMsg !== null ){ this.msg = res.data.data.errorMsg; this.syncStatus = "exception" }else { this.total = res.data.data.total; this.current = res.data.data.current; this.percentage = Math.floor(Number(res.data.data.current)/Number(res.data.data.total)* 10000)/100; this.msg = `同步中...[${res.data.data.current}/${res.data.data.total}]`; this.timmer = setTimeout(this.getProgress, 300) if (res.data.data != null) { if (res.data.data.total == 0) { if (res.data.data.errorMsg !== null ){ this.msg = res.data.data.errorMsg; this.syncStatus = "exception" }else { this.msg = `等待同步中`; this.timmer = setTimeout(this.getProgress, 300) } }else { if (res.data.data.total == res.data.data.current) { this.syncStatus = "success" this.percentage = 100; this.msg = '同步成功'; }else { if (res.data.data.errorMsg !== null ){ this.msg = res.data.data.errorMsg; this.syncStatus = "exception" }else { this.total = res.data.data.total; this.current = res.data.data.current; this.percentage = Math.floor(Number(res.data.data.current)/Number(res.data.data.total)* 10000)/100; this.msg = `同步中...[${res.data.data.current}/${res.data.data.total}]`; this.timmer = setTimeout(this.getProgress, 300) } } } } }else { if (this.syncFlag) { this.syncStatus = "success"