package com.ycl.task; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.conditions.query.LambdaQueryChainWrapper; import com.google.common.util.concurrent.AtomicDouble; import com.ycl.platform.domain.entity.DemeritRecord; import com.ycl.platform.domain.entity.MonitorConstruction; import com.ycl.platform.domain.entity.Report; import com.ycl.platform.domain.result.UY.RecordMetaDSumResult; import com.ycl.platform.mapper.DemeritRecordMapper; import com.ycl.platform.mapper.IMonitorConstructionMapper; import com.ycl.platform.mapper.ReportMapper; import com.ycl.platform.service.IDemeritRecordService; import com.ycl.utils.DateUtils; import enumeration.ConstructionTypeEnum; import enumeration.general.AreaDeptEnum; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.Query; import org.springframework.stereotype.Component; import utils.StringUtils; import java.math.BigDecimal; import java.math.RoundingMode; import java.util.*; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; /** * zgyw * * @author : zxl * @date : 2025-09-15 17:32 **/ @Slf4j @RequiredArgsConstructor @Component("demeritRecordTask") public class DemeritRecordTask { private final MongoTemplate mongoTemplate; private final ReportMapper reportMapper; private final IMonitorConstructionMapper monitorConstructionMapper; private static final ExecutorService executorService = new ThreadPoolExecutor(16, 128, 5000, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy() ); private final IDemeritRecordService demeritRecordService; private DemeritRecord buildDemeritRecord(String constructionType,BigDecimal demerit,Integer deptId,Date recordTime) { DemeritRecord record = new DemeritRecord(); record.setConstructionType(constructionType); record.setDemerit(demerit); record.setDeptId(deptId); record.setRecordTime(recordTime); return record; } private BigDecimal calculateTotalDeduction(List records) { BigDecimal total = BigDecimal.ZERO; // 单线程循环,无需AtomicReference for (RecordMetaDSumResult record : records) { // 安全处理空值:若missDuration为null,默认按0分钟处理 Double missDurationHours = record.getMissDuration(); double missDurationMinutes = Optional.ofNullable(missDurationHours) .map(hours -> hours * 60.0) // 小时转分钟 .orElse(0.0); // 空值默认0分钟 // 计算单条记录的扣分 BigDecimal deduction; if (missDurationMinutes > 720) { deduction = new BigDecimal("2.0"); } else if (missDurationMinutes > 240) { deduction = new BigDecimal("1.5"); } else if (missDurationMinutes > 60) { deduction = new BigDecimal("1.0"); } else if (missDurationMinutes > 30) { deduction = new BigDecimal("0.5"); } else { deduction = BigDecimal.ZERO; } // 累加总扣分(最后统一处理精度,避免中间四舍五入误差) total = total.add(deduction); } // 最终统一保留1位小数,四舍五入 return total.setScale(1, RoundingMode.HALF_UP); } public void run(){ log.info("开始执行计算每日扣分记录情况"); //获得mongodb中所有全景的设备 //mongodb查询条件 Date today =new Date(); // Calendar calendar = Calendar.getInstance(); // calendar.setTime(today); // calendar.add(Calendar.DAY_OF_YEAR, -2); // 减去2天 测试用 // Date twoDaysAgo = calendar.getTime(); //计算录像可用率和重点录像可用率 Query query = new Query(); query.addCriteria(Criteria .where("mongoCreateTime").gte(DateUtils.getDayStart(today)).lt(DateUtils.getDayEnd(today))); List results = mongoTemplate.find(query, RecordMetaDSumResult.class); log.info("日期:{},查询出的设备录像记录数{}",today,results.size()); //过滤掉非全景的设备 且 1则为细节,如果是0则为全景 results = results.stream().filter(obj -> { String deviceId = obj.getDeviceId(); if (deviceId == null || deviceId.length() < 7) { return false; // 去除元素 } // 获取倒数第七位的字符 char seventhFromEnd = deviceId.charAt(deviceId.length() - 7); return seventhFromEnd != '1'; //为1 则会 false 去除掉 }).collect(Collectors.toList()); log.info("过滤后剩余全景设备数{}",results.size()); //只考核LT_、(三期) // DX_ 、(一二期) // DX_R、(四区人脸) // DX_RS、(四区人脸) // (需要排除DX_R2、DX_RD、J_、T1、T3以及没有前缀的设备) // List prefixes = Arrays.asList("LT_", "DX_", "DX_R", "DX_RS"); //查询设备标签表中的MonitorConstruction列表 List monitorConstructionList = new LambdaQueryChainWrapper<>(monitorConstructionMapper) .eq(MonitorConstruction::getDeleted, Boolean.FALSE) .list(); List serialNumberList = monitorConstructionList.stream() .map(MonitorConstruction::getSerialNumber).collect(Collectors.toList()); //过滤获得包含了这些标签的设备录像情况集合 results = results.stream() .filter(result -> { String sn = result.getNo(); // 任一字段非空且在集合中即可 return (sn != null && serialNumberList.contains(sn)); }) .collect(Collectors.toList()); log.info("剩余考核设备过滤后设备数{}",results.size()); //过滤掉报备的设备 //查询在当前时间有报备的所有设备, //因为录像数据的时间 Date yesterday =new Date(); Calendar calendar = Calendar.getInstance(); calendar.setTime(yesterday); calendar.add(Calendar.DAY_OF_YEAR, -1); // 减去1天 测试用 yesterday = calendar.getTime(); log.info("测试时间:{}",yesterday); List list = new LambdaQueryChainWrapper<>(reportMapper) .eq(Report::getStatus, 1) .ge(Report::getBeginCreateTime, DateUtils.getDayStart(yesterday)) .le(Report::getEndCreateTime, DateUtils.getDayEnd(yesterday)) .list(); log.info("报备记录:{}",list); List deviceIds = list.stream() .collect(Collectors.toMap( Report::getSerialNumber, // key: serialNumber Function.identity(), // value: Report对象本身 (existing, replacement) -> // 当key冲突时,保留createTime更大的(更新的)记录 existing.getCreateTime().after(replacement.getCreateTime()) ? existing : replacement )) .values().stream() .map(Report::getSerialNumber).collect(Collectors.toList()); Set deviceIdSet = new HashSet<>(deviceIds); log.info("报备设备数{}",deviceIdSet.size()); results = results.stream() .filter(result -> { // 获取当前对象的deviceId String resultDeviceId = result.getDeviceId(); // 过滤条件:deviceId不在集合中(注意处理null值,避免NPE) return resultDeviceId != null && !deviceIdSet.contains(resultDeviceId); }) .collect(Collectors.toList()); log.info("剩余过滤报备后设备数{}",results.size()); // 按区域划分 组装成map results.forEach(item -> { String areaCode = item.getArealayername().substring(0, 6); AreaDeptEnum areaDeptEnum = AreaDeptEnum.fromCode(areaCode); if (areaDeptEnum != null) { item.setArealayerno(areaDeptEnum.getCode()); } }); //需要添加数据库的数据集合 List demeritRecords = new ArrayList<>(); Map> groupByArealayerno = results.stream() .collect(Collectors.groupingBy(RecordMetaDSumResult::getArealayerno)); //按建设类型标签分组设备NO monitorConstructionList //按标签分组 Map> groupByTag = monitorConstructionList.stream() // 分组键:提取每个对象的 tag(注意处理 tag 为 null 的情况,避免键为 null) .collect(Collectors.groupingBy( mc -> mc.getTag() != null ? mc.getTag() : "DEFAULT_TAG", Collectors.mapping(MonitorConstruction::getSerialNumber, Collectors.toList()) )); List phaseOneTwoSerials = groupByTag.getOrDefault(ConstructionTypeEnum.PHASE_ONE_TWO.getDesc(), Collections.emptyList()); List phaseThreeSerials = groupByTag.getOrDefault(ConstructionTypeEnum.PHASE_THREE.getDesc(), Collections.emptyList()); List phaseFourthSerials = groupByTag.getOrDefault(ConstructionTypeEnum.PHASE_FOURTH.getDesc(), Collections.emptyList()); List checkEnterSichuan = groupByTag.getOrDefault(ConstructionTypeEnum.CHECK_ENTER_SICHUAN.getDesc(), Collections.emptyList()); List easternNewCity= groupByTag.getOrDefault(ConstructionTypeEnum.EASTERN_NEW_CITY.getDesc(), Collections.emptyList()); List yanTanPhaseTwoFace = groupByTag.getOrDefault(ConstructionTypeEnum.YAN_TAN_PHASE_TWO_FACE.getDesc(), Collections.emptyList()); //循环分组后的map for (Map.Entry> entry : groupByArealayerno.entrySet()) { String arealayerno = entry.getKey(); log.info("循环区域{}",arealayerno); //获得区 AreaDeptEnum areaDeptEnum = AreaDeptEnum.fromCode(arealayerno); if (areaDeptEnum == null) { log.info("区域数据异常,异常区域键值:{}",arealayerno); return; } List resultList = entry.getValue(); log.info("resultList:{}",resultList.size()); if (CollectionUtils.isNotEmpty(resultList)) { // 对每个List进行处理 分建类型处理集合 List phase_one_two = resultList.stream() .filter(result -> { String no = result.getNo(); return no != null && phaseOneTwoSerials.contains(no); }) .collect(Collectors.toList()); log.info("一二期考核记录数{}", phase_one_two.size()); List phase_three = resultList.stream() .filter(result -> { String no = result.getNo(); return no != null && phaseThreeSerials.contains(no); }) .collect(Collectors.toList()); log.info("三期考核记录数{}", phase_three.size()); List phase_fourth = resultList.stream() .filter(result -> { String no = result.getNo(); return no != null && phaseFourthSerials.contains(no); }) .collect(Collectors.toList()); log.info("四期考核记录数{}", phase_fourth.size()); List check_enter_sichuan = resultList.stream() .filter(result ->{ String no = result.getNo(); return no != null && checkEnterSichuan.contains(no); }) .collect(Collectors.toList()); log.info("入川即检{}", check_enter_sichuan.size()); List eastern_new_city = resultList.stream() .filter(result ->{ String no = result.getNo(); return no != null && easternNewCity.contains(no); }) .collect(Collectors.toList()); log.info("东部新城{}", eastern_new_city.size()); List yan_tan_phase_two_face = resultList.stream() .filter(result ->{ String no = result.getNo(); return no != null && yanTanPhaseTwoFace.contains(no); }) .collect(Collectors.toList()); log.info("沿滩二期人脸{}", yan_tan_phase_two_face.size()); //一二期 buildAndAddDemeritRecords(phase_one_two, ConstructionTypeEnum.PHASE_ONE_TWO.name(), areaDeptEnum.getDeptId(),yesterday,demeritRecords); //三期 buildAndAddDemeritRecords(phase_three, ConstructionTypeEnum.PHASE_THREE.name(), areaDeptEnum.getDeptId(),yesterday,demeritRecords); //四期 buildAndAddDemeritRecords(phase_fourth, ConstructionTypeEnum.PHASE_FOURTH.name(), areaDeptEnum.getDeptId(),yesterday,demeritRecords); //入川即检 buildAndAddDemeritRecords(check_enter_sichuan, ConstructionTypeEnum.CHECK_ENTER_SICHUAN.name(), areaDeptEnum.getDeptId(),yesterday,demeritRecords); //东部新城 buildAndAddDemeritRecords(eastern_new_city, ConstructionTypeEnum.EASTERN_NEW_CITY.name(), areaDeptEnum.getDeptId(),yesterday,demeritRecords); //沿滩二期人脸 buildAndAddDemeritRecords(yan_tan_phase_two_face, ConstructionTypeEnum.YAN_TAN_PHASE_TWO_FACE.name(), areaDeptEnum.getDeptId(),yesterday,demeritRecords); } } //处理完数据插入数据库中 //先删除需要插入时间是否存在数据 LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); queryWrapper.ge(DemeritRecord::getCreateTime,DateUtils.getDayStart(today)) .le(DemeritRecord::getCreateTime,DateUtils.getDayEnd(today)); demeritRecordService.remove(queryWrapper); demeritRecordService.saveBatch(demeritRecords); log.info("结束计算每日扣分记录情况:插入数据量{},数据信息:{}",demeritRecords.size(),demeritRecords); } public void buildAndAddDemeritRecords(List constructionByRecordMetaList, String constructionType,Integer areaDeptId,Date recordTime, List demeritRecords) { if (CollectionUtils.isNotEmpty(constructionByRecordMetaList)) { BigDecimal deduction = calculateTotalDeduction(constructionByRecordMetaList); DemeritRecord demeritRecord = buildDemeritRecord( constructionType, deduction, areaDeptId, DateUtils.getDayStart(recordTime)); demeritRecords.add(demeritRecord); }else{ DemeritRecord demeritRecord = buildDemeritRecord( constructionType, BigDecimal.ZERO, areaDeptId, DateUtils.getDayStart(recordTime)); demeritRecords.add(demeritRecord); } } }