package com.ycl.task; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.conditions.query.LambdaQueryChainWrapper; import com.google.common.util.concurrent.AtomicDouble; import com.ycl.platform.domain.entity.DailyMonitorDemeritRecord; import com.ycl.platform.domain.entity.DemeritRecord; import com.ycl.platform.domain.entity.MonitorConstruction; import com.ycl.platform.domain.entity.Report; import com.ycl.platform.domain.result.UY.RecordMetaDSumResult; import com.ycl.platform.mapper.DemeritRecordMapper; import com.ycl.platform.mapper.IMonitorConstructionMapper; import com.ycl.platform.mapper.ReportMapper; import com.ycl.platform.service.IDailyMonitorDemeritRecordService; import com.ycl.platform.service.IDemeritRecordService; import com.ycl.utils.DateUtils; import enumeration.ConstructionTypeEnum; import enumeration.general.AreaDeptEnum; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.Query; import org.springframework.stereotype.Component; import utils.StringUtils; import java.math.BigDecimal; import java.math.RoundingMode; import java.util.*; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; /** * zgyw * * @author : zxl * @date : 2025-09-15 17:32 **/ @Slf4j @RequiredArgsConstructor @Component("demeritRecordTask") public class DemeritRecordTask { private final MongoTemplate mongoTemplate; private final ReportMapper reportMapper; private final IMonitorConstructionMapper monitorConstructionMapper; private final IDailyMonitorDemeritRecordService dailyMonitorDemeritRecordService; private static final ExecutorService executorService = new ThreadPoolExecutor(16, 128, 5000, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy() ); private final IDemeritRecordService demeritRecordService; private final IDailyMonitorDemeritRecordService iDailyMonitorDemeritRecordService; private DemeritRecord buildDemeritRecord(String constructionType,BigDecimal demerit,Integer deptId,Date recordTime) { DemeritRecord record = new DemeritRecord(); record.setConstructionType(constructionType); record.setDemerit(demerit); record.setDeptId(deptId); record.setRecordTime(recordTime); return record; } private DailyMonitorDemeritRecord buildDailyMonitorDemeritRecord(Date recordTime,String deviceId,String constructionType,BigDecimal demerit,Integer deptId,Boolean isReport,String deviceName) { DailyMonitorDemeritRecord demeritRecord = new DailyMonitorDemeritRecord(); demeritRecord.setRecordTime(recordTime); demeritRecord.setDemerit(demerit); demeritRecord.setConstructionType(constructionType); demeritRecord.setSerialNumber(deviceId); demeritRecord.setIsReport(isReport); demeritRecord.setDeptId(deptId); demeritRecord.setDeviceName(deviceName); return demeritRecord; } private BigDecimal calculateTotalDeduction(List records,Date recordTime,Integer deptId,List dailyMonitorDemeritRecords) { BigDecimal total = BigDecimal.ZERO; // 单线程循环,无需AtomicReference for (RecordMetaDSumResult record : records) { // 安全处理空值:若missDuration为null,默认按0分钟处理 Double missDurationHours = record.getMissDuration(); double missDurationMinutes = Optional.ofNullable(missDurationHours) .map(hours -> hours * 60.0) // 小时转分钟 .orElse(0.0); // 空值默认0分钟 // 计算单条记录的扣分 BigDecimal deduction; if (missDurationMinutes > 720) { deduction = new BigDecimal("2.0"); } else if (missDurationMinutes > 240) { deduction = new BigDecimal("1.5"); } else if (missDurationMinutes > 60) { deduction = new BigDecimal("1.0"); } else if (missDurationMinutes > 30) { deduction = new BigDecimal("0.5"); } else { deduction = BigDecimal.ZERO; } dailyMonitorDemeritRecords.add(buildDailyMonitorDemeritRecord(recordTime,record.getDeviceId() ,record.getConstructionType(),deduction,deptId,false,record.getDeviceName())); // 累加总扣分(最后统一处理精度,避免中间四舍五入误差) total = total.add(deduction); } // 最终统一保留1位小数,四舍五入 return total.setScale(1, RoundingMode.HALF_UP); } public void run(){ log.info("开始执行计算每日扣分记录情况"); //获得mongodb中所有全景的设备 //mongodb查询条件 Date today =new Date(); // Calendar calendar = Calendar.getInstance(); // calendar.setTime(today); // calendar.add(Calendar.DAY_OF_YEAR, -2); // 减去2天 测试用 // Date twoDaysAgo = calendar.getTime(); //计算录像可用率和重点录像可用率 Query query = new Query(); query.addCriteria(Criteria .where("mongoCreateTime").gte(DateUtils.getDayStart(today)).lt(DateUtils.getDayEnd(today))); List results = mongoTemplate.find(query, RecordMetaDSumResult.class); log.info("日期:{},查询出的设备录像记录数{}",today,results.size()); //过滤掉非全景的设备 且 1则为细节,如果是0则为全景 results = results.stream().filter(obj -> { String deviceId = obj.getDeviceId(); if (deviceId == null || deviceId.length() < 7) { return false; // 去除元素 } // 获取倒数第七位的字符 char seventhFromEnd = deviceId.charAt(deviceId.length() - 7); return seventhFromEnd != '1'; //为1 则会 false 去除掉 }).collect(Collectors.toList()); log.info("过滤后剩余全景设备数{}",results.size()); //只考核LT_、(三期) // DX_ 、(一二期) // DX_R、(四区人脸) // DX_RS、(四区人脸) // (需要排除DX_R2、DX_RD、J_、T1、T3以及没有前缀的设备) // List prefixes = Arrays.asList("LT_", "DX_", "DX_R", "DX_RS"); //查询设备标签表中的MonitorConstruction列表 List monitorConstructionList = new LambdaQueryChainWrapper<>(monitorConstructionMapper) .eq(MonitorConstruction::getDeleted, Boolean.FALSE) .list(); //按编号分组,值为设备对应标签 Map serialTagMap = monitorConstructionList.stream() .filter(mc -> mc.getSerialNumber() != null) // 过滤序列号为空的无效数据 .collect(Collectors.toMap( MonitorConstruction::getSerialNumber, MonitorConstruction::getTag, (oldVal, newVal) -> newVal )); //过滤掉没有标签的集合 并将标签赋值给录像情况集合 results = results.stream() .filter(result -> { String sn = result.getNo(); // 过滤条件:序列号非空 + 在标签Map中存在(即有对应标签) boolean isMatch = sn != null && serialTagMap.containsKey(sn); if (isMatch) { // 匹配成功,将标签赋值给result(需确保RecordMetaDSumResult有setTag()方法) String tag = serialTagMap.get(sn); result.setConstructionType(tag); // 关键:赋值标签 } return isMatch; // 只保留有标签的result }) .collect(Collectors.toList()); log.info("有标签的设备记录集合大小:{}",results.size());//这是需要计算扣分的录像数据 //过滤掉报备的设备 //查询在当前时间有报备的所有设备, //因为录像数据的时间 Date yesterday =new Date(); Calendar calendar = Calendar.getInstance(); calendar.setTime(yesterday); calendar.add(Calendar.DAY_OF_YEAR, -1); // 减去1天 测试用 yesterday = calendar.getTime(); log.info("测试时间:{}",yesterday); List list = new LambdaQueryChainWrapper<>(reportMapper) .eq(Report::getStatus, 1) // getEndCreateTime getEndCreateTime 01 00:00:00 - 30 11:59:59 .le(Report::getBeginCreateTime, DateUtils.getDayStart(yesterday)) //>= .ge(Report::getEndCreateTime, DateUtils.getDayEnd(yesterday)) .list(); log.info("报备记录:{}",list); List deviceIds = list.stream() .collect(Collectors.toMap( Report::getSerialNumber, // key: serialNumber Function.identity(), // value: Report对象本身 (existing, replacement) -> // 当key冲突时,保留createTime更大的(更新的)记录 existing.getCreateTime().after(replacement.getCreateTime()) ? existing : replacement )) .values().stream() .map(Report::getSerialNumber).collect(Collectors.toList()); Set deviceIdSet = new HashSet<>(deviceIds); log.info("报备设备数{}",deviceIdSet.size()); List dailyMonitorDemeritRecords = new ArrayList<>(); Date yesterdayBegin = DateUtils.getDayStart(yesterday); // 遍历区分对象的报备状态 // 因为下面会过滤覆盖掉考核设备,需要已报备的设备录像情况信息, // 所以此处应该将已报备的设备录像情况信息添加到每日扣分详情记录中并初始化好 后续不在处理直接添加数据库中 // 将区域信息 放入集合中 results.forEach(item -> { String areaCode = item.getArealayername().substring(0, 6); AreaDeptEnum areaDeptEnum = AreaDeptEnum.fromCode(areaCode); if (areaDeptEnum != null) { item.setArealayerno(areaDeptEnum.getCode()); } }); results.forEach(result ->{ String deviceId = result.getDeviceId(); if (StringUtils.isNotBlank(deviceId)) { if (deviceIdSet.contains(deviceId)) { //已报备设备记录 AreaDeptEnum areaDeptEnum = AreaDeptEnum.fromCode(result.getArealayerno()); if (areaDeptEnum == null) { log.info("区域数据异常,异常区域键值:{}",result.getArealayerno()); return; } DailyMonitorDemeritRecord demeritRecord = buildDailyMonitorDemeritRecord( yesterdayBegin,deviceId, result.getConstructionType(), BigDecimal.ZERO, areaDeptEnum.getDeptId(), true, result.getDeviceName() ); dailyMonitorDemeritRecords.add(demeritRecord); } } }); // 过滤获得未报备集合 results = results.stream() .filter(result -> { String resultDeviceId = result.getDeviceId(); return resultDeviceId != null && !deviceIdSet.contains(resultDeviceId); }) .collect(Collectors.toList()); log.info("剩余过滤报备后设备数{}",results.size()); //需要添加数据库的数据集合 List demeritRecords = new ArrayList<>(); // 按区域划分 组装成map Map> groupByArealayerno = results.stream() .collect(Collectors.groupingBy(RecordMetaDSumResult::getArealayerno)); //按建设类型标签分组设备NO monitorConstructionList //按标签分组 Map> groupByTag = monitorConstructionList.stream() // 分组键:提取每个对象的 tag(注意处理 tag 为 null 的情况,避免键为 null) .collect(Collectors.groupingBy( mc -> mc.getTag() != null ? mc.getTag() : "DEFAULT_TAG", Collectors.mapping(MonitorConstruction::getSerialNumber, Collectors.toList()) )); List phaseOneTwoSerials = groupByTag.getOrDefault(ConstructionTypeEnum.PHASE_ONE_TWO.getDesc(), Collections.emptyList()); List phaseThreeSerials = groupByTag.getOrDefault(ConstructionTypeEnum.PHASE_THREE.getDesc(), Collections.emptyList()); List phaseFourthSerials = groupByTag.getOrDefault(ConstructionTypeEnum.PHASE_FOURTH.getDesc(), Collections.emptyList()); List checkEnterSichuan = groupByTag.getOrDefault(ConstructionTypeEnum.CHECK_ENTER_SICHUAN.getDesc(), Collections.emptyList()); List easternNewCity= groupByTag.getOrDefault(ConstructionTypeEnum.EASTERN_NEW_CITY.getDesc(), Collections.emptyList()); List yanTanPhaseTwoFace = groupByTag.getOrDefault(ConstructionTypeEnum.YAN_TAN_PHASE_TWO_FACE.getDesc(), Collections.emptyList()); //循环分组后的map for (Map.Entry> entry : groupByArealayerno.entrySet()) { String arealayerno = entry.getKey(); log.info("循环区域{}",arealayerno); //获得区 AreaDeptEnum areaDeptEnum = AreaDeptEnum.fromCode(arealayerno); if (areaDeptEnum == null) { log.info("区域数据异常,异常区域键值:{}",arealayerno); return; } List resultList = entry.getValue(); log.info("resultList:{}",resultList.size()); if (CollectionUtils.isNotEmpty(resultList)) { // 对每个List进行处理 分建类型处理集合 List phase_one_two = resultList.stream() .filter(result -> { String no = result.getNo(); return no != null && phaseOneTwoSerials.contains(no); }) .collect(Collectors.toList()); log.info("一二期考核记录数{}", phase_one_two.size()); List phase_three = resultList.stream() .filter(result -> { String no = result.getNo(); return no != null && phaseThreeSerials.contains(no); }) .collect(Collectors.toList()); log.info("三期考核记录数{}", phase_three.size()); List phase_fourth = resultList.stream() .filter(result -> { String no = result.getNo(); return no != null && phaseFourthSerials.contains(no); }) .collect(Collectors.toList()); log.info("四期考核记录数{}", phase_fourth.size()); List check_enter_sichuan = resultList.stream() .filter(result ->{ String no = result.getNo(); return no != null && checkEnterSichuan.contains(no); }) .collect(Collectors.toList()); log.info("入川即检{}", check_enter_sichuan.size()); List eastern_new_city = resultList.stream() .filter(result ->{ String no = result.getNo(); return no != null && easternNewCity.contains(no); }) .collect(Collectors.toList()); log.info("东部新城{}", eastern_new_city.size()); List yan_tan_phase_two_face = resultList.stream() .filter(result ->{ String no = result.getNo(); return no != null && yanTanPhaseTwoFace.contains(no); }) .collect(Collectors.toList()); log.info("沿滩二期人脸{}", yan_tan_phase_two_face.size()); //一二期 buildAndAddDemeritRecords(phase_one_two, ConstructionTypeEnum.PHASE_ONE_TWO.name(), areaDeptEnum.getDeptId(),yesterdayBegin,demeritRecords,dailyMonitorDemeritRecords); //三期 buildAndAddDemeritRecords(phase_three, ConstructionTypeEnum.PHASE_THREE.name(), areaDeptEnum.getDeptId(),yesterdayBegin,demeritRecords,dailyMonitorDemeritRecords); //四期 buildAndAddDemeritRecords(phase_fourth, ConstructionTypeEnum.PHASE_FOURTH.name(), areaDeptEnum.getDeptId(),yesterdayBegin,demeritRecords,dailyMonitorDemeritRecords); //入川即检 buildAndAddDemeritRecords(check_enter_sichuan, ConstructionTypeEnum.CHECK_ENTER_SICHUAN.name(), areaDeptEnum.getDeptId(),yesterdayBegin,demeritRecords,dailyMonitorDemeritRecords); //东部新城 buildAndAddDemeritRecords(eastern_new_city, ConstructionTypeEnum.EASTERN_NEW_CITY.name(), areaDeptEnum.getDeptId(),yesterdayBegin,demeritRecords,dailyMonitorDemeritRecords); //沿滩二期人脸 buildAndAddDemeritRecords(yan_tan_phase_two_face, ConstructionTypeEnum.YAN_TAN_PHASE_TWO_FACE.name(), areaDeptEnum.getDeptId(),yesterdayBegin,demeritRecords,dailyMonitorDemeritRecords); } } //处理完数据插入数据库中 //先删除需要插入时间是否存在数据 LambdaQueryWrapper demeritRecordLambdaQueryWrapper = new LambdaQueryWrapper<>(); demeritRecordLambdaQueryWrapper.ge(DemeritRecord::getCreateTime,DateUtils.getDayStart(today)) .le(DemeritRecord::getCreateTime,DateUtils.getDayEnd(today)); demeritRecordService.remove(demeritRecordLambdaQueryWrapper); demeritRecordService.saveBatch(demeritRecords); log.info("结束计算每日扣分记录情况:插入数据量{},数据信息:{}",demeritRecords.size(),demeritRecords); //填充设备录像情况扣分详情结果 LambdaQueryWrapper dailyMonitorDemeritRecordLambdaQueryWrapper = new LambdaQueryWrapper<>(); dailyMonitorDemeritRecordLambdaQueryWrapper.ge(DailyMonitorDemeritRecord::getCreateTime,DateUtils.getDayStart(today)) .le(DailyMonitorDemeritRecord::getCreateTime,DateUtils.getDayEnd(today)); iDailyMonitorDemeritRecordService.remove(dailyMonitorDemeritRecordLambdaQueryWrapper); iDailyMonitorDemeritRecordService.saveBatch(dailyMonitorDemeritRecords); log.info("结束计算每日扣分记录详情情况:插入数据量{},数据信息:{}",dailyMonitorDemeritRecords.size(),dailyMonitorDemeritRecords); } public void buildAndAddDemeritRecords(List constructionByRecordMetaList, String constructionType,Integer areaDeptId,Date recordTime, List demeritRecords, List dailyMonitorDemeritRecords) { if (CollectionUtils.isNotEmpty(constructionByRecordMetaList)) { BigDecimal deduction = calculateTotalDeduction(constructionByRecordMetaList,recordTime,areaDeptId,dailyMonitorDemeritRecords); DemeritRecord demeritRecord = buildDemeritRecord( constructionType, deduction, areaDeptId, recordTime); demeritRecords.add(demeritRecord); }else{ DemeritRecord demeritRecord = buildDemeritRecord( constructionType, BigDecimal.ZERO, areaDeptId, recordTime); demeritRecords.add(demeritRecord); } } }