package com.ycl.task; import com.alibaba.druid.support.json.JSONUtils; import com.alibaba.fastjson2.JSONArray; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.ycl.platform.domain.entity.TMonitor; import com.ycl.platform.domain.entity.YwPoint; import com.ycl.platform.domain.result.UY.MonitorQualifyResult; import com.ycl.platform.domain.result.UY.OneMachineFileResult; import com.ycl.platform.domain.vo.TMonitorVO; import com.ycl.platform.mapper.TMonitorMapper; import com.ycl.platform.mapper.YwPointMapper; import com.ycl.platform.service.ITMonitorService; import com.ycl.platform.service.YwPointService; import com.ycl.system.entity.SysDictData; import com.ycl.system.service.ISysDictDataService; import com.ycl.utils.DateUtils; import com.ycl.utils.StringUtils; import constant.RedisConstant; import enumeration.general.AreaDeptEnum; import enumeration.general.PointStatus; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.Query; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.CollectionUtils; import java.util.*; import java.util.function.Function; import java.util.stream.Collectors; /** * 当天晚上 同步mongodb一机一档到数据库 */ @Slf4j @Component("monitorTask") public class MonitorTask { @Autowired private MongoTemplate mongoTemplate; @Autowired private ITMonitorService monitorService; @Autowired private TMonitorMapper monitorMapper; @Autowired private ISysDictDataService dictDataService; @Autowired private YwPointService ywPointService; @Autowired private YwPointMapper ywPointMapper; @Autowired private RedisTemplate redisTemplate; //同步mongodb一机一档到数据库 @Transactional(rollbackFor = Exception.class) public void synchronize() { log.info("开始同步mongodb一机一档到数据库"); Date date = DateUtils.getDay(2024, 7, 13); Query query = new Query(Criteria.where("mongoCreateTime").gte(DateUtils.getDayStart(date)).lt(DateUtils.getDayEnd(date))); // Query query = new Query(Criteria.where("mongoCreateTime").gte(DateUtils.getDayStart(new Date())).lt(DateUtils.getDayEnd(new Date()))); List oneMachineFileResults = mongoTemplate.find(query, MonitorQualifyResult.class); //数据库monitor表数据 Map monitorVOMap = monitorMapper.selectMonitorVOList().stream().collect(Collectors.toMap(TMonitorVO::getSerialNumber, Function.identity())); //点位数据 Map pointMap = ywPointService.list(new QueryWrapper()).stream().collect(Collectors.toMap(YwPoint::getSerialNumber, Function.identity())); //重点点位集合字典(解析SXJCJQY字段) SysDictData sysDictData = new SysDictData(); sysDictData.setDictType("platform_important_site"); List DictDataList = dictDataService.selectDictDataList(sysDictData); //采集区域为重点点位的集合 List importantSite = DictDataList.stream().map(SysDictData::getDictValue).collect(Collectors.toList()); //准备插入设备表的数据 List monitorList = new ArrayList<>(); //准备插入点位表的数据 List ywPointList = new ArrayList<>(); //新的数据,原数据库中不存在的数据 Set newMonitorList = new HashSet<>(); //全年留存 for (MonitorQualifyResult result : oneMachineFileResults) { TMonitor monitor = getMonitor(result, monitorVOMap); YwPoint point = getPoint(result, pointMap, importantSite); monitorList.add(monitor); ywPointList.add(point); //比对筛选出新的数据 if (!CollectionUtils.isEmpty(monitorVOMap) && !monitorVOMap.containsKey(result.getSerialNumber().getValue())) { newMonitorList.add(monitor); } } //添加老数据 List numbers = CollectionUtils.isEmpty(monitorList) ? new ArrayList<>() : monitorList.stream().map(TMonitor::getSerialNumber).collect(Collectors.toList()); monitorVOMap.forEach((key, value) -> { if (!numbers.contains(key)){ TMonitor monitor = new TMonitor(); BeanUtils.copyProperties(value,monitor); monitorList.add(monitor); } }); List points = CollectionUtils.isEmpty(ywPointList) ? new ArrayList<>() : ywPointList.stream().map(YwPoint::getSerialNumber).collect(Collectors.toList()); pointMap.forEach((key, value) -> { if (!points.contains(key)){ ywPointList.add(value); } }); log.info("result集合{},设备集合{},点位集合{}", oneMachineFileResults.size(), monitorList.size(), ywPointList.size()); //插入数据库 if (!CollectionUtils.isEmpty(monitorList)) { monitorMapper.deleteAll(); monitorService.saveBatch(monitorList); } if (!CollectionUtils.isEmpty(ywPointList)) { ywPointMapper.deleteAll(); ywPointService.saveBatch(ywPointList); } //新的数据放入Redis中等待考核指标任务使用 redisTemplate.opsForValue().set(RedisConstant.New_Monitor_Set, JSONArray.toJSONString(newMonitorList)); log.info("结束同步mongodb一机一档到数据库"); } private YwPoint getPoint(MonitorQualifyResult result, Map pointMap, List importantSite) { YwPoint ywPoint = new YwPoint(); if (pointMap.containsKey(result.getSerialNumber().getValue())) { ywPoint = pointMap.get(result.getSerialNumber().getValue()); } else { ywPoint.setPointName(result.getName().getValue()); ywPoint.setStatus(PointStatus.WAIT.getDesc()); ywPoint.setSerialNumber(result.getSerialNumber().getValue()); ywPoint.setImportantTag(Boolean.FALSE); ywPoint.setProvinceTag(Boolean.FALSE); ywPoint.setImportantCommandImageTag(Boolean.FALSE); ywPoint.setCreateTime(new Date()); ywPoint.setUpdateTime(new Date()); } //比对是否是重点点位 if (importantSite.contains(result.getSxjcjqy().getValue())) { ywPoint.setImportantTag(Boolean.TRUE); } //解析deptId //区域行政编码 String serialNumber = result.getSerialNumber().getValue(); Integer deptId = -1; if (!StringUtils.isEmpty(serialNumber)) { String areaCode = serialNumber.substring(0, 6); AreaDeptEnum areaDeptEnum = AreaDeptEnum.fromCode(areaCode); //如果解析不出区域deptId为-1 if (areaDeptEnum != null) { deptId = areaDeptEnum.getDeptId(); } } ywPoint.setDeptId(Long.valueOf(deptId + "")); return ywPoint; } private TMonitor getMonitor(MonitorQualifyResult result, Map monitorVOMap) { TMonitor monitor = new TMonitor(); if (monitorVOMap.containsKey(result.getSerialNumber().getValue())) { monitor.setId(monitorVOMap.get(result.getSerialNumber().getValue()).getId()); } monitor.setSerialNumber(result.getSerialNumber().getValue()); monitor.setName(result.getName().getValue()); String siteType = result.getJkdwlx().getValue(); if (!StringUtils.isEmpty(siteType)) { monitor.setSiteType(Long.valueOf(siteType)); } monitor.setMacAddr(result.getMacdz().getValue()); monitor.setIp(result.getIp().getValue()); monitor.setCameraFunType(result.getSxjgnlx().getValue()); monitor.setLongitude(result.getLongitude().getValue()); monitor.setLatitude(result.getLatitude().getValue() + ""); monitor.setCameraCaptureArea(result.getSxjcjqy().getValue()); String onState = result.getSbzt().getValue(); if (!StringUtils.isEmpty(onState)) { monitor.setOnState(Long.valueOf(onState)); } //国标码前八位为行政编码 String serialNumber = result.getSerialNumber().getValue(); if (!StringUtils.isEmpty(serialNumber)) { monitor.setCivilCode(serialNumber.substring(0, 8)); } return monitor; } }