package com.tievd.jyz.handler;
|
|
import cn.hutool.core.util.ObjectUtil;
|
import com.alibaba.fastjson.JSON;
|
import com.alibaba.fastjson.JSONObject;
|
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
import com.tievd.jyz.cache.DeviceCache;
|
import com.tievd.jyz.constants.SystemConstant;
|
import com.tievd.jyz.entity.Camera;
|
import com.tievd.jyz.entity.Device;
|
import com.tievd.jyz.entity.OilPosition;
|
import com.tievd.jyz.mapper.CameraMapper;
|
import com.tievd.jyz.mapper.DeviceMapper;
|
import com.tievd.jyz.mapper.OilPositionMapper;
|
import com.tievd.jyz.mqtt.PojoDataConverter;
|
import com.tievd.jyz.mqtt.command.BaseUpdatePullCommand;
|
import com.tievd.jyz.mqtt.command.BaseUpdatePushCommand;
|
import com.tievd.jyz.mqtt.command.MqttCommandReceiver;
|
import com.tievd.jyz.mqtt.dto.*;
|
import com.tievd.jyz.util.BeanUtil;
|
import lombok.extern.slf4j.Slf4j;
|
import org.apache.commons.lang3.StringUtils;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.scheduling.annotation.Async;
|
import org.springframework.stereotype.Component;
|
import org.springframework.transaction.annotation.Transactional;
|
|
import java.util.*;
|
|
/**
|
* 基础数据同步处理器
|
*
|
* @author timi
|
*/
|
@Slf4j
|
@Component
|
public class BaseDataSyncHandler {
|
|
@Autowired
|
private CameraMapper cameraMapper;
|
@Autowired
|
private DeviceMapper deviceMapper;
|
@Autowired
|
private OilPositionMapper oilPositionMapper;
|
@Autowired
|
private MqttCommandReceiver mqttCommandReceiver;
|
@Value("${init.sendCommandTopic:/ty/aibox/command/}")
|
private String sendCommandTopic;
|
|
/**
|
* 基础数据核验操作
|
*
|
* @param mqttParamDTO
|
*/
|
public void baseUpdateCheck(MqttParamDTO<JSON> mqttParamDTO) {
|
String deviceSN = mqttParamDTO.getSn();
|
Device device = deviceMapper.selectOne(new QueryWrapper<Device>().eq("sn", deviceSN));
|
if (ObjectUtil.isNull(device)) {
|
log.error("平台未找到网关设备,操作失败,SN:{}", deviceSN);
|
return;
|
}
|
String pullDataSource = StringUtils.EMPTY;
|
String pushDataSource = StringUtils.EMPTY;
|
Date localCameraTime = cameraMapper.findMaxChangeTime(device.getId());
|
Date localOilPositionTime = oilPositionMapper.findMaxChangeTime(device.getOrgCode());
|
Long aiBoxCameraUpTime = ((JSONObject) mqttParamDTO.getData()).getLong("cameraUpTime");
|
Long aiBoxOilPositionUpTime = ((JSONObject) mqttParamDTO.getData()).getLong("oilPositionUpTime");
|
if (ObjectUtil.isNull(localCameraTime)
|
|| localCameraTime.getTime() < aiBoxCameraUpTime) {
|
//终端平台更新时间小于设备更新时间,获取终端
|
log.info("终端平台更新时间小于设备更新时间,获取终端,deviceSN:{}",deviceSN);
|
pullDataSource = SystemConstant.CAMERA_REQ_SOURCE;
|
} else {
|
//终端平台更新时间大于设备更新时间,下发终端
|
log.info("终端平台更新时间大于设备更新时间,下发终端(已废弃)");
|
pushDataSource = SystemConstant.CAMERA_REQ_SOURCE;
|
}
|
if (ObjectUtil.isNull(localOilPositionTime)
|
|| localOilPositionTime.getTime() < aiBoxOilPositionUpTime) {
|
//加油位平台更新时间小于设备更新时间,获取加油位
|
log.info("加油位平台更新时间小于设备更新时间,获取加油位,deviceSN:{}",deviceSN);
|
pullDataSource = pullDataSource.equals(SystemConstant.CAMERA_REQ_SOURCE) ? SystemConstant.ALL_REQ_SOURCE : SystemConstant.OIL_REQ_SOURCE;
|
} else {
|
//加油位平台更新时间大于设备更新时间,下发加油位(暂时不下发加油位,加油位只单向同步)
|
//pushDataSource = pushDataSource.equals(SystemConstant.CAMERA_REQ_SOURCE) ? SystemConstant.ALL_REQ_SOURCE : SystemConstant.OIL_REQ_SOURCE;
|
}
|
if (StringUtils.isNotEmpty(pushDataSource)) {
|
//推送数据
|
//屏蔽推终端逻辑(改成单向同步,设备-》平台)
|
//pushFullDataToDevice(pushDataSource, device);
|
}
|
pullData(pullDataSource,deviceSN);
|
}
|
|
/**
|
* 拉取/同步基础数据
|
* @param pullDataSource 拉取数据类型:camera:终端; oil:加油区;all:所有(目前包括终端和加油区);
|
* SystemConstant.CAMERA_REQ_SOURCE
|
* SystemConstant.OIL_REQ_SOURCE
|
* SystemConstant.ALL_REQ_SOURCE
|
* @param deviceSN 网关序列号
|
*/
|
public void pullData(String pullDataSource,String deviceSN){
|
if (StringUtils.isNotEmpty(pullDataSource)) {
|
//拉取数据
|
BaseUpdatePullCommand baseUpdatePullCommand = new BaseUpdatePullCommand(mqttCommandReceiver, pullDataSource, sendCommandTopic, deviceSN);
|
if (baseUpdatePullCommand.init() == SystemConstant.DEAL_FAIL) {
|
log.error("基础数据拉取指令配置失败");
|
}
|
if (baseUpdatePullCommand.execute() == SystemConstant.DEAL_FAIL) {
|
log.error("基础数据拉取指令发送失败");
|
}
|
}
|
}
|
|
/**
|
* 上报的基础数据更新
|
*/
|
@Transactional(rollbackFor = RuntimeException.class)
|
public void baseInfoUpdate(MqttParamDTO<JSON> mqttParamDTO) {
|
String deviceSN = mqttParamDTO.getSn();
|
Device device = deviceMapper.selectOne(new QueryWrapper<Device>().eq("sn", deviceSN));
|
if (ObjectUtil.isNull(device)) {
|
log.error("平台未找到网关设备,操作失败,SN:{}", deviceSN);
|
return;
|
}
|
SyncBaseUpdateDTO syncBaseUpdateDTO = JSONObject.toJavaObject(mqttParamDTO.getData(), SyncBaseUpdateDTO.class);
|
if (SystemConstant.FULL_MODE_REQ_TYPE.equals(syncBaseUpdateDTO.getMode())) {
|
//全量更新
|
List<CameraDTO> cameras = syncBaseUpdateDTO.getExist().getCameras();
|
if (ObjectUtil.isNotEmpty(cameras)) {
|
syncFullCameraInfo(cameras, device);
|
}
|
List<OilPositionDTO> oilPositions = syncBaseUpdateDTO.getExist().getOilPositions();
|
if (ObjectUtil.isNotEmpty(oilPositions)) {
|
syncFullOilInfo(oilPositions, device);
|
}
|
} else {
|
//增量更新
|
List<CameraDTO> addOrUpdateCameras = syncBaseUpdateDTO.getExist().getCameras();
|
List<CameraDTO> delCameras = syncBaseUpdateDTO.getDel().getCameras();
|
syncIncCameraInfo(addOrUpdateCameras, delCameras, device);
|
|
List<OilPositionDTO> addOrUpdateOilDTOs = syncBaseUpdateDTO.getExist().getOilPositions();
|
List<OilPositionDTO> delOilDTOs = syncBaseUpdateDTO.getDel().getOilPositions();
|
syncIncOilInfo(addOrUpdateOilDTOs, delOilDTOs, device);
|
}
|
}
|
|
/**
|
* 增量同步终端信息
|
*
|
* @param addOrUpdateCameras
|
* @param delCameras
|
*/
|
private void syncIncCameraInfo(List<CameraDTO> addOrUpdateCameras, List<CameraDTO> delCameras, Device device) {
|
if(ObjectUtil.isNotEmpty(addOrUpdateCameras)){
|
for (CameraDTO cameraDTO : addOrUpdateCameras) {
|
if(StringUtils.isEmpty(cameraDTO.getCode())){
|
continue;
|
}
|
Camera camera = cameraMapper.selectOne(new QueryWrapper<Camera>().eq("code", cameraDTO.getCode()).eq("device_id", device.getId()));
|
Camera camera1 = PojoDataConverter.cameraConvert(cameraDTO);
|
if (ObjectUtil.isNull(camera)) {
|
//新增
|
camera1.setCreateTime(new Date());
|
camera1.setDeviceId(device.getId());
|
camera1.setOrgCode(device.getOrgCode());
|
camera1.setCreateTime(camera1.getUpdateTime());
|
cameraMapper.insert(camera1);
|
} else {
|
//更新
|
BeanUtil.copyPropertiesIgnoreNull(camera1, camera);
|
cameraMapper.updateById(camera);
|
}
|
}
|
}
|
if(ObjectUtil.isNotEmpty(delCameras)){
|
for (CameraDTO cameraDTO : delCameras) {
|
if(StringUtils.isEmpty(cameraDTO.getCode())){
|
continue;
|
}
|
cameraMapper.delete(new QueryWrapper<Camera>().eq("code", cameraDTO.getCode()).eq("device_id", device.getId()));
|
}
|
}
|
}
|
|
/**
|
* 增量同步加油位信息
|
*
|
* @param addOrUpdateOils
|
* @param delOils
|
*/
|
private void syncIncOilInfo(List<OilPositionDTO> addOrUpdateOils, List<OilPositionDTO> delOils, Device device) {
|
if(ObjectUtil.isNotEmpty(addOrUpdateOils)){
|
for (OilPositionDTO oilPositionDTO : addOrUpdateOils) {
|
OilPosition oilPosition = oilPositionMapper.selectOne(new QueryWrapper<OilPosition>().eq("position_code", oilPositionDTO.getCode()).eq("org_code", device.getOrgCode()).eq("device_id", device.getId()));
|
OilPosition oilPosition1 = PojoDataConverter.oilPositionConvert(oilPositionDTO);
|
if (ObjectUtil.isNull(oilPosition)) {
|
//新增
|
oilPosition1.setOrgCode(device.getOrgCode());
|
oilPosition1.setDeviceId(device.getId());
|
oilPositionMapper.insert(oilPosition1);
|
} else {
|
//更新
|
BeanUtil.copyPropertiesIgnoreNull(oilPosition1, oilPosition);
|
oilPositionMapper.updateById(oilPosition);
|
}
|
}
|
}
|
if(ObjectUtil.isNotEmpty(delOils)){
|
for (OilPositionDTO oilPositionDTO : delOils) {
|
oilPositionMapper.delete(new QueryWrapper<OilPosition>().eq("position_code", oilPositionDTO.getCode()).eq("org_code", device.getOrgCode()));
|
}
|
}
|
}
|
|
/**
|
* 全量同步终端数据
|
*
|
* @param newCameras
|
* @param device
|
*/
|
private void syncFullCameraInfo(List<CameraDTO> newCameras, Device device) {
|
if (ObjectUtil.isNotEmpty(newCameras)) {
|
List<Camera> currentCameraList = cameraMapper.selectList(new QueryWrapper<Camera>().eq("device_id", device.getId()));
|
for (CameraDTO cameraDTO : newCameras) {
|
boolean isUpdate = false;
|
Camera newCamera = PojoDataConverter.cameraConvert(cameraDTO);
|
Iterator<Camera> cameraIterator = currentCameraList.iterator();
|
while (cameraIterator.hasNext()) {
|
Camera oldCamera = cameraIterator.next();
|
if (cameraDTO.getCode().equals(oldCamera.getCode())) {
|
//更新
|
isUpdate = true;
|
cameraIterator.remove();
|
newCamera.setId(oldCamera.getId());
|
cameraMapper.updateById(newCamera);
|
}
|
}
|
if (!isUpdate) {
|
//新增
|
newCamera.setDeviceId(device.getId());
|
newCamera.setOrgCode(device.getOrgCode());
|
newCamera.setCreateTime(newCamera.getUpdateTime());
|
cameraMapper.insert(newCamera);
|
}
|
}
|
//删除
|
Iterator<Camera> cameraIterator = currentCameraList.iterator();
|
while (cameraIterator.hasNext()) {
|
Camera camera = cameraIterator.next();
|
cameraMapper.deleteById(camera.getId());
|
}
|
}
|
}
|
|
/**
|
* 全量同步终端数据
|
*
|
* @param newOilPositionDTOS
|
* @param device
|
*/
|
private void syncFullOilInfo(List<OilPositionDTO> newOilPositionDTOS, Device device) {
|
if (ObjectUtil.isNotEmpty(newOilPositionDTOS)) {
|
List<OilPosition> currentOilList = oilPositionMapper.selectList(new QueryWrapper<OilPosition>().eq("org_code", device.getOrgCode()).eq("device_id", device.getId()));
|
for (OilPositionDTO oilPositionDTO : newOilPositionDTOS) {
|
boolean isUpdate = false;
|
OilPosition newOilPosition = PojoDataConverter.oilPositionConvert(oilPositionDTO);
|
Iterator<OilPosition> oilIterator = currentOilList.iterator();
|
while (oilIterator.hasNext()) {
|
OilPosition oldOil = oilIterator.next();
|
if (oilPositionDTO.getCode().equals(oldOil.getPositionCode())) {
|
//更新
|
isUpdate = true;
|
oilIterator.remove();
|
newOilPosition.setId(oldOil.getId());
|
oilPositionMapper.updateById(newOilPosition);
|
}
|
}
|
if (!isUpdate) {
|
//新增
|
newOilPosition.setDeviceId(device.getId());
|
newOilPosition.setOrgCode(device.getOrgCode());
|
oilPositionMapper.insert(newOilPosition);
|
}
|
}
|
//删除
|
Iterator<OilPosition> oilIterator = currentOilList.iterator();
|
while (oilIterator.hasNext()) {
|
OilPosition oilPosition = oilIterator.next();
|
oilPositionMapper.deleteById(oilPosition.getId());
|
}
|
}
|
}
|
|
/**
|
* 需全量推送基础数据组装
|
* @param pushDataSource
|
* @param device
|
* @return
|
*/
|
private SyncBaseUpdateDTO pushFullDataGen(String pushDataSource, Device device) {
|
SyncBaseUpdateDTO syncBaseUpdateDTO = new SyncBaseUpdateDTO();
|
syncBaseUpdateDTO.setMode(SystemConstant.FULL_MODE_REQ_TYPE);
|
SyncDataDTO syncDataDTO = new SyncDataDTO();
|
syncBaseUpdateDTO.setExist(syncDataDTO);
|
List<CameraDTO> cameras = new ArrayList<>();
|
List<OilPositionDTO> oilPositions = new ArrayList<>();
|
syncDataDTO.setCameras(cameras);
|
syncDataDTO.setOilPositions(oilPositions);
|
if (pushDataSource.equals(SystemConstant.CAMERA_REQ_SOURCE) || pushDataSource.equals(SystemConstant.ALL_REQ_SOURCE)) {
|
List<Camera> cameraList = cameraMapper.selectList(new QueryWrapper<Camera>().eq("device_id", device.getId()));
|
for (Camera camera : cameraList) {
|
CameraDTO cameraDTO = PojoDataConverter.cameraDTOConvert(camera);
|
cameras.add(cameraDTO);
|
}
|
}
|
if (pushDataSource.equals(SystemConstant.OIL_REQ_SOURCE) || pushDataSource.equals(SystemConstant.ALL_REQ_SOURCE)) {
|
List<OilPosition> oilPositionList = oilPositionMapper.selectList(new QueryWrapper<OilPosition>().eq("org_code", device.getOrgCode()).eq("device_id", device.getId()));
|
for (OilPosition oilPosition : oilPositionList) {
|
OilPositionDTO oilPositionDTO = PojoDataConverter.oilPositionDTOConvert(oilPosition);
|
oilPositions.add(oilPositionDTO);
|
}
|
}
|
return syncBaseUpdateDTO;
|
}
|
|
/**
|
* 推送全量数据至网关设备(废弃,改成单向同步,设备-》平台)
|
*
|
* @param pushDataSource 请求数据类型,camera:终端;oil:加油位;all:终端加油位都需要
|
* @param device 网关设备
|
*/
|
@Deprecated
|
private void pushFullDataToDevice(String pushDataSource, Device device) {
|
SyncBaseUpdateDTO syncBaseUpdateDTO = pushFullDataGen(pushDataSource, device);
|
log.info("发送全量基础数据至设备,msg:{}",JSON.toJSONString(syncBaseUpdateDTO));
|
BaseUpdatePushCommand baseUpdatePushCommand = new BaseUpdatePushCommand(mqttCommandReceiver, syncBaseUpdateDTO, sendCommandTopic, device.getSn());
|
if (baseUpdatePushCommand.init() == SystemConstant.DEAL_FAIL) {
|
log.error("基础数据推送指令配置失败");
|
}
|
if (baseUpdatePushCommand.execute() == SystemConstant.DEAL_FAIL) {
|
log.error("基础数据推送指令发送失败");
|
}
|
}
|
|
/**
|
* 推送增量终端数据至网关设备(已废弃,改成单向同步,设备-》平台)
|
*
|
* @param updateCameras 新增/更新终端数据
|
* @param delCameras 删除终端数据
|
*/
|
@Async
|
@Deprecated
|
public void pushIncCamerasToDevice(List<Camera> updateCameras, List<Camera> delCameras) {
|
try {
|
Map<Long, TmpSyncCameraInfo> deviceCameraMap = new HashMap<>();
|
if (ObjectUtil.isNotEmpty(updateCameras)) {
|
for (Camera camera : updateCameras) {
|
TmpSyncCameraInfo tmpSyncCameraInfo = deviceCameraMap.get(camera.getDeviceId());
|
if (ObjectUtil.isNull(tmpSyncCameraInfo)) {
|
tmpSyncCameraInfo = new TmpSyncCameraInfo();
|
deviceCameraMap.put(camera.getDeviceId(), tmpSyncCameraInfo);
|
}
|
tmpSyncCameraInfo.getUpdateCameras().add(camera);
|
}
|
}
|
if (ObjectUtil.isNotEmpty(delCameras)) {
|
for (Camera camera : delCameras) {
|
TmpSyncCameraInfo tmpSyncCameraInfo = deviceCameraMap.get(camera.getDeviceId());
|
if (ObjectUtil.isNull(tmpSyncCameraInfo)) {
|
tmpSyncCameraInfo = new TmpSyncCameraInfo();
|
deviceCameraMap.put(camera.getDeviceId(), tmpSyncCameraInfo);
|
}
|
tmpSyncCameraInfo.getDelCameras().add(camera);
|
}
|
}
|
for (Map.Entry<Long, TmpSyncCameraInfo> entry : deviceCameraMap.entrySet()) {
|
Device device = DeviceCache.getDeviceById(entry.getKey());
|
if (ObjectUtil.isNull(device)) {
|
log.error("网关设备不存在,id:{}", entry.getKey());
|
continue;
|
}
|
SyncBaseUpdateDTO syncBaseUpdateDTO = pushIncCameraDataGen(entry.getValue().getUpdateCameras(), entry.getValue().getDelCameras(), device);
|
log.info("发送增量基础数据至设备,msg:{}",JSON.toJSONString(syncBaseUpdateDTO));
|
BaseUpdatePushCommand baseUpdatePushCommand = new BaseUpdatePushCommand(mqttCommandReceiver, syncBaseUpdateDTO, sendCommandTopic, device.getSn());
|
if (baseUpdatePushCommand.init() == SystemConstant.DEAL_FAIL) {
|
log.error("基础数据推送指令配置失败");
|
}
|
if (baseUpdatePushCommand.execute() == SystemConstant.DEAL_FAIL) {
|
log.error("基础数据推送指令发送失败");
|
}
|
}
|
} catch (Exception ex) {
|
log.error("增量同步数据异常",ex);
|
}
|
}
|
|
/**
|
* 增量推送终端信息至设备
|
*
|
* @param updateCameras
|
* @param delCameras
|
* @param device
|
* @return
|
*/
|
private SyncBaseUpdateDTO pushIncCameraDataGen(List<Camera> updateCameras, List<Camera> delCameras, Device device) {
|
SyncBaseUpdateDTO syncBaseUpdateDTO = new SyncBaseUpdateDTO();
|
syncBaseUpdateDTO.setMode(SystemConstant.INC_MODE_REQ_TYPE);
|
SyncDataDTO existSyncDataDTO = new SyncDataDTO();
|
SyncDataDTO delSyncDataDTO = new SyncDataDTO();
|
syncBaseUpdateDTO.setExist(existSyncDataDTO);
|
syncBaseUpdateDTO.setDel(delSyncDataDTO);
|
List<CameraDTO> updateCameraDTOs = new ArrayList<>();
|
List<CameraDTO> delCameraDTOs = new ArrayList<>();
|
existSyncDataDTO.setCameras(updateCameraDTOs);
|
delSyncDataDTO.setCameras(delCameraDTOs);
|
if (ObjectUtil.isNotEmpty(updateCameras)) {
|
for (Camera camera : updateCameras) {
|
CameraDTO cameraDTO = PojoDataConverter.cameraDTOConvert(camera);
|
updateCameraDTOs.add(cameraDTO);
|
}
|
}
|
if (ObjectUtil.isNotEmpty(delCameras)) {
|
for (Camera camera : delCameras) {
|
CameraDTO cameraDTO = PojoDataConverter.cameraDTOConvert(camera);
|
delCameraDTOs.add(cameraDTO);
|
}
|
}
|
return syncBaseUpdateDTO;
|
}
|
|
private class TmpSyncCameraInfo {
|
|
private List<Camera> updateCameras = new ArrayList<>();
|
private List<Camera> delCameras = new ArrayList<>();
|
|
public List<Camera> getUpdateCameras() {
|
return updateCameras;
|
}
|
|
public List<Camera> getDelCameras() {
|
return delCameras;
|
}
|
|
}
|
|
|
}
|