package com.tievd.jyz.handler; import cn.hutool.core.util.ObjectUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.tievd.jyz.cache.DeviceCache; import com.tievd.jyz.constants.SystemConstant; import com.tievd.jyz.entity.Camera; import com.tievd.jyz.entity.Device; import com.tievd.jyz.entity.OilPosition; import com.tievd.jyz.mapper.CameraMapper; import com.tievd.jyz.mapper.DeviceMapper; import com.tievd.jyz.mapper.OilPositionMapper; import com.tievd.jyz.mqtt.PojoDataConverter; import com.tievd.jyz.mqtt.command.BaseUpdatePullCommand; import com.tievd.jyz.mqtt.command.BaseUpdatePushCommand; import com.tievd.jyz.mqtt.command.MqttCommandReceiver; import com.tievd.jyz.mqtt.dto.*; import com.tievd.jyz.util.BeanUtil; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import java.util.*; /** * 基础数据同步处理器 * * @author timi */ @Slf4j @Component public class BaseDataSyncHandler { @Autowired private CameraMapper cameraMapper; @Autowired private DeviceMapper deviceMapper; @Autowired private OilPositionMapper oilPositionMapper; @Autowired private MqttCommandReceiver mqttCommandReceiver; @Value("${init.sendCommandTopic:/ty/aibox/command/}") private String sendCommandTopic; /** * 基础数据核验操作 * * @param mqttParamDTO */ public void baseUpdateCheck(MqttParamDTO mqttParamDTO) { String deviceSN = mqttParamDTO.getSn(); Device device = deviceMapper.selectOne(new QueryWrapper().eq("sn", deviceSN)); if (ObjectUtil.isNull(device)) { log.error("平台未找到网关设备,操作失败,SN:{}", deviceSN); return; } String pullDataSource = StringUtils.EMPTY; String pushDataSource = StringUtils.EMPTY; Date localCameraTime = cameraMapper.findMaxChangeTime(device.getId()); Date localOilPositionTime = oilPositionMapper.findMaxChangeTime(device.getOrgCode()); Long aiBoxCameraUpTime = ((JSONObject) mqttParamDTO.getData()).getLong("cameraUpTime"); Long aiBoxOilPositionUpTime = ((JSONObject) mqttParamDTO.getData()).getLong("oilPositionUpTime"); if (ObjectUtil.isNull(localCameraTime) || localCameraTime.getTime() < aiBoxCameraUpTime) { //终端平台更新时间小于设备更新时间,获取终端 log.info("终端平台更新时间小于设备更新时间,获取终端,deviceSN:{}",deviceSN); pullDataSource = SystemConstant.CAMERA_REQ_SOURCE; } else { //终端平台更新时间大于设备更新时间,下发终端 log.info("终端平台更新时间大于设备更新时间,下发终端(已废弃)"); pushDataSource = SystemConstant.CAMERA_REQ_SOURCE; } if (ObjectUtil.isNull(localOilPositionTime) || localOilPositionTime.getTime() < aiBoxOilPositionUpTime) { //加油位平台更新时间小于设备更新时间,获取加油位 log.info("加油位平台更新时间小于设备更新时间,获取加油位,deviceSN:{}",deviceSN); pullDataSource = pullDataSource.equals(SystemConstant.CAMERA_REQ_SOURCE) ? SystemConstant.ALL_REQ_SOURCE : SystemConstant.OIL_REQ_SOURCE; } else { //加油位平台更新时间大于设备更新时间,下发加油位(暂时不下发加油位,加油位只单向同步) //pushDataSource = pushDataSource.equals(SystemConstant.CAMERA_REQ_SOURCE) ? SystemConstant.ALL_REQ_SOURCE : SystemConstant.OIL_REQ_SOURCE; } if (StringUtils.isNotEmpty(pushDataSource)) { //推送数据 //屏蔽推终端逻辑(改成单向同步,设备-》平台) //pushFullDataToDevice(pushDataSource, device); } pullData(pullDataSource,deviceSN); } /** * 拉取/同步基础数据 * @param pullDataSource 拉取数据类型:camera:终端; oil:加油区;all:所有(目前包括终端和加油区); * SystemConstant.CAMERA_REQ_SOURCE * SystemConstant.OIL_REQ_SOURCE * SystemConstant.ALL_REQ_SOURCE * @param deviceSN 网关序列号 */ public void pullData(String pullDataSource,String deviceSN){ if (StringUtils.isNotEmpty(pullDataSource)) { //拉取数据 BaseUpdatePullCommand baseUpdatePullCommand = new BaseUpdatePullCommand(mqttCommandReceiver, pullDataSource, sendCommandTopic, deviceSN); if (baseUpdatePullCommand.init() == SystemConstant.DEAL_FAIL) { log.error("基础数据拉取指令配置失败"); } if (baseUpdatePullCommand.execute() == SystemConstant.DEAL_FAIL) { log.error("基础数据拉取指令发送失败"); } } } /** * 上报的基础数据更新 */ @Transactional(rollbackFor = RuntimeException.class) public void baseInfoUpdate(MqttParamDTO mqttParamDTO) { String deviceSN = mqttParamDTO.getSn(); Device device = deviceMapper.selectOne(new QueryWrapper().eq("sn", deviceSN)); if (ObjectUtil.isNull(device)) { log.error("平台未找到网关设备,操作失败,SN:{}", deviceSN); return; } SyncBaseUpdateDTO syncBaseUpdateDTO = JSONObject.toJavaObject(mqttParamDTO.getData(), SyncBaseUpdateDTO.class); if (SystemConstant.FULL_MODE_REQ_TYPE.equals(syncBaseUpdateDTO.getMode())) { //全量更新 List cameras = syncBaseUpdateDTO.getExist().getCameras(); if (ObjectUtil.isNotEmpty(cameras)) { syncFullCameraInfo(cameras, device); } List oilPositions = syncBaseUpdateDTO.getExist().getOilPositions(); if (ObjectUtil.isNotEmpty(oilPositions)) { syncFullOilInfo(oilPositions, device); } } else { //增量更新 List addOrUpdateCameras = syncBaseUpdateDTO.getExist().getCameras(); List delCameras = syncBaseUpdateDTO.getDel().getCameras(); syncIncCameraInfo(addOrUpdateCameras, delCameras, device); List addOrUpdateOilDTOs = syncBaseUpdateDTO.getExist().getOilPositions(); List delOilDTOs = syncBaseUpdateDTO.getDel().getOilPositions(); syncIncOilInfo(addOrUpdateOilDTOs, delOilDTOs, device); } } /** * 增量同步终端信息 * * @param addOrUpdateCameras * @param delCameras */ private void syncIncCameraInfo(List addOrUpdateCameras, List delCameras, Device device) { if(ObjectUtil.isNotEmpty(addOrUpdateCameras)){ for (CameraDTO cameraDTO : addOrUpdateCameras) { if(StringUtils.isEmpty(cameraDTO.getCode())){ continue; } Camera camera = cameraMapper.selectOne(new QueryWrapper().eq("code", cameraDTO.getCode()).eq("device_id", device.getId())); Camera camera1 = PojoDataConverter.cameraConvert(cameraDTO); if (ObjectUtil.isNull(camera)) { //新增 camera1.setCreateTime(new Date()); camera1.setDeviceId(device.getId()); camera1.setOrgCode(device.getOrgCode()); camera1.setCreateTime(camera1.getUpdateTime()); cameraMapper.insert(camera1); } else { //更新 BeanUtil.copyPropertiesIgnoreNull(camera1, camera); cameraMapper.updateById(camera); } } } if(ObjectUtil.isNotEmpty(delCameras)){ for (CameraDTO cameraDTO : delCameras) { if(StringUtils.isEmpty(cameraDTO.getCode())){ continue; } cameraMapper.delete(new QueryWrapper().eq("code", cameraDTO.getCode()).eq("device_id", device.getId())); } } } /** * 增量同步加油位信息 * * @param addOrUpdateOils * @param delOils */ private void syncIncOilInfo(List addOrUpdateOils, List delOils, Device device) { if(ObjectUtil.isNotEmpty(addOrUpdateOils)){ for (OilPositionDTO oilPositionDTO : addOrUpdateOils) { OilPosition oilPosition = oilPositionMapper.selectOne(new QueryWrapper().eq("position_code", oilPositionDTO.getCode()).eq("org_code", device.getOrgCode()).eq("device_id", device.getId())); OilPosition oilPosition1 = PojoDataConverter.oilPositionConvert(oilPositionDTO); if (ObjectUtil.isNull(oilPosition)) { //新增 oilPosition1.setOrgCode(device.getOrgCode()); oilPosition1.setDeviceId(device.getId()); oilPositionMapper.insert(oilPosition1); } else { //更新 BeanUtil.copyPropertiesIgnoreNull(oilPosition1, oilPosition); oilPositionMapper.updateById(oilPosition); } } } if(ObjectUtil.isNotEmpty(delOils)){ for (OilPositionDTO oilPositionDTO : delOils) { oilPositionMapper.delete(new QueryWrapper().eq("position_code", oilPositionDTO.getCode()).eq("org_code", device.getOrgCode())); } } } /** * 全量同步终端数据 * * @param newCameras * @param device */ private void syncFullCameraInfo(List newCameras, Device device) { if (ObjectUtil.isNotEmpty(newCameras)) { List currentCameraList = cameraMapper.selectList(new QueryWrapper().eq("device_id", device.getId())); for (CameraDTO cameraDTO : newCameras) { boolean isUpdate = false; Camera newCamera = PojoDataConverter.cameraConvert(cameraDTO); Iterator cameraIterator = currentCameraList.iterator(); while (cameraIterator.hasNext()) { Camera oldCamera = cameraIterator.next(); if (cameraDTO.getCode().equals(oldCamera.getCode())) { //更新 isUpdate = true; cameraIterator.remove(); newCamera.setId(oldCamera.getId()); cameraMapper.updateById(newCamera); } } if (!isUpdate) { //新增 newCamera.setDeviceId(device.getId()); newCamera.setOrgCode(device.getOrgCode()); newCamera.setCreateTime(newCamera.getUpdateTime()); cameraMapper.insert(newCamera); } } //删除 Iterator cameraIterator = currentCameraList.iterator(); while (cameraIterator.hasNext()) { Camera camera = cameraIterator.next(); cameraMapper.deleteById(camera.getId()); } } } /** * 全量同步终端数据 * * @param newOilPositionDTOS * @param device */ private void syncFullOilInfo(List newOilPositionDTOS, Device device) { if (ObjectUtil.isNotEmpty(newOilPositionDTOS)) { List currentOilList = oilPositionMapper.selectList(new QueryWrapper().eq("org_code", device.getOrgCode()).eq("device_id", device.getId())); for (OilPositionDTO oilPositionDTO : newOilPositionDTOS) { boolean isUpdate = false; OilPosition newOilPosition = PojoDataConverter.oilPositionConvert(oilPositionDTO); Iterator oilIterator = currentOilList.iterator(); while (oilIterator.hasNext()) { OilPosition oldOil = oilIterator.next(); if (oilPositionDTO.getCode().equals(oldOil.getPositionCode())) { //更新 isUpdate = true; oilIterator.remove(); newOilPosition.setId(oldOil.getId()); oilPositionMapper.updateById(newOilPosition); } } if (!isUpdate) { //新增 newOilPosition.setDeviceId(device.getId()); newOilPosition.setOrgCode(device.getOrgCode()); oilPositionMapper.insert(newOilPosition); } } //删除 Iterator oilIterator = currentOilList.iterator(); while (oilIterator.hasNext()) { OilPosition oilPosition = oilIterator.next(); oilPositionMapper.deleteById(oilPosition.getId()); } } } /** * 需全量推送基础数据组装 * @param pushDataSource * @param device * @return */ private SyncBaseUpdateDTO pushFullDataGen(String pushDataSource, Device device) { SyncBaseUpdateDTO syncBaseUpdateDTO = new SyncBaseUpdateDTO(); syncBaseUpdateDTO.setMode(SystemConstant.FULL_MODE_REQ_TYPE); SyncDataDTO syncDataDTO = new SyncDataDTO(); syncBaseUpdateDTO.setExist(syncDataDTO); List cameras = new ArrayList<>(); List oilPositions = new ArrayList<>(); syncDataDTO.setCameras(cameras); syncDataDTO.setOilPositions(oilPositions); if (pushDataSource.equals(SystemConstant.CAMERA_REQ_SOURCE) || pushDataSource.equals(SystemConstant.ALL_REQ_SOURCE)) { List cameraList = cameraMapper.selectList(new QueryWrapper().eq("device_id", device.getId())); for (Camera camera : cameraList) { CameraDTO cameraDTO = PojoDataConverter.cameraDTOConvert(camera); cameras.add(cameraDTO); } } if (pushDataSource.equals(SystemConstant.OIL_REQ_SOURCE) || pushDataSource.equals(SystemConstant.ALL_REQ_SOURCE)) { List oilPositionList = oilPositionMapper.selectList(new QueryWrapper().eq("org_code", device.getOrgCode()).eq("device_id", device.getId())); for (OilPosition oilPosition : oilPositionList) { OilPositionDTO oilPositionDTO = PojoDataConverter.oilPositionDTOConvert(oilPosition); oilPositions.add(oilPositionDTO); } } return syncBaseUpdateDTO; } /** * 推送全量数据至网关设备(废弃,改成单向同步,设备-》平台) * * @param pushDataSource 请求数据类型,camera:终端;oil:加油位;all:终端加油位都需要 * @param device 网关设备 */ @Deprecated private void pushFullDataToDevice(String pushDataSource, Device device) { SyncBaseUpdateDTO syncBaseUpdateDTO = pushFullDataGen(pushDataSource, device); log.info("发送全量基础数据至设备,msg:{}",JSON.toJSONString(syncBaseUpdateDTO)); BaseUpdatePushCommand baseUpdatePushCommand = new BaseUpdatePushCommand(mqttCommandReceiver, syncBaseUpdateDTO, sendCommandTopic, device.getSn()); if (baseUpdatePushCommand.init() == SystemConstant.DEAL_FAIL) { log.error("基础数据推送指令配置失败"); } if (baseUpdatePushCommand.execute() == SystemConstant.DEAL_FAIL) { log.error("基础数据推送指令发送失败"); } } /** * 推送增量终端数据至网关设备(已废弃,改成单向同步,设备-》平台) * * @param updateCameras 新增/更新终端数据 * @param delCameras 删除终端数据 */ @Async @Deprecated public void pushIncCamerasToDevice(List updateCameras, List delCameras) { try { Map deviceCameraMap = new HashMap<>(); if (ObjectUtil.isNotEmpty(updateCameras)) { for (Camera camera : updateCameras) { TmpSyncCameraInfo tmpSyncCameraInfo = deviceCameraMap.get(camera.getDeviceId()); if (ObjectUtil.isNull(tmpSyncCameraInfo)) { tmpSyncCameraInfo = new TmpSyncCameraInfo(); deviceCameraMap.put(camera.getDeviceId(), tmpSyncCameraInfo); } tmpSyncCameraInfo.getUpdateCameras().add(camera); } } if (ObjectUtil.isNotEmpty(delCameras)) { for (Camera camera : delCameras) { TmpSyncCameraInfo tmpSyncCameraInfo = deviceCameraMap.get(camera.getDeviceId()); if (ObjectUtil.isNull(tmpSyncCameraInfo)) { tmpSyncCameraInfo = new TmpSyncCameraInfo(); deviceCameraMap.put(camera.getDeviceId(), tmpSyncCameraInfo); } tmpSyncCameraInfo.getDelCameras().add(camera); } } for (Map.Entry entry : deviceCameraMap.entrySet()) { Device device = DeviceCache.getDeviceById(entry.getKey()); if (ObjectUtil.isNull(device)) { log.error("网关设备不存在,id:{}", entry.getKey()); continue; } SyncBaseUpdateDTO syncBaseUpdateDTO = pushIncCameraDataGen(entry.getValue().getUpdateCameras(), entry.getValue().getDelCameras(), device); log.info("发送增量基础数据至设备,msg:{}",JSON.toJSONString(syncBaseUpdateDTO)); BaseUpdatePushCommand baseUpdatePushCommand = new BaseUpdatePushCommand(mqttCommandReceiver, syncBaseUpdateDTO, sendCommandTopic, device.getSn()); if (baseUpdatePushCommand.init() == SystemConstant.DEAL_FAIL) { log.error("基础数据推送指令配置失败"); } if (baseUpdatePushCommand.execute() == SystemConstant.DEAL_FAIL) { log.error("基础数据推送指令发送失败"); } } } catch (Exception ex) { log.error("增量同步数据异常",ex); } } /** * 增量推送终端信息至设备 * * @param updateCameras * @param delCameras * @param device * @return */ private SyncBaseUpdateDTO pushIncCameraDataGen(List updateCameras, List delCameras, Device device) { SyncBaseUpdateDTO syncBaseUpdateDTO = new SyncBaseUpdateDTO(); syncBaseUpdateDTO.setMode(SystemConstant.INC_MODE_REQ_TYPE); SyncDataDTO existSyncDataDTO = new SyncDataDTO(); SyncDataDTO delSyncDataDTO = new SyncDataDTO(); syncBaseUpdateDTO.setExist(existSyncDataDTO); syncBaseUpdateDTO.setDel(delSyncDataDTO); List updateCameraDTOs = new ArrayList<>(); List delCameraDTOs = new ArrayList<>(); existSyncDataDTO.setCameras(updateCameraDTOs); delSyncDataDTO.setCameras(delCameraDTOs); if (ObjectUtil.isNotEmpty(updateCameras)) { for (Camera camera : updateCameras) { CameraDTO cameraDTO = PojoDataConverter.cameraDTOConvert(camera); updateCameraDTOs.add(cameraDTO); } } if (ObjectUtil.isNotEmpty(delCameras)) { for (Camera camera : delCameras) { CameraDTO cameraDTO = PojoDataConverter.cameraDTOConvert(camera); delCameraDTOs.add(cameraDTO); } } return syncBaseUpdateDTO; } private class TmpSyncCameraInfo { private List updateCameras = new ArrayList<>(); private List delCameras = new ArrayList<>(); public List getUpdateCameras() { return updateCameras; } public List getDelCameras() { return delCameras; } } }