package com.ycl.task; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.extension.conditions.query.LambdaQueryChainWrapper; import com.baomidou.mybatisplus.extension.conditions.update.LambdaUpdateChainWrapper; import com.ycl.common.enums.business.ProcessLogEventTypeEnum; import com.ycl.common.enums.business.ProjectProcessTypeEnum; import com.ycl.common.utils.DateUtils; import com.ycl.domain.entity.ProcessCoding; import com.ycl.domain.entity.ProcessLog; import com.ycl.domain.entity.ProjectInfo; import com.ycl.domain.entity.ProjectProcess; import com.ycl.factory.FlowServiceFactory; import com.ycl.mapper.ProcessCodingMapper; import com.ycl.mapper.ProcessLogMapper; import com.ycl.mapper.ProjectInfoMapper; import com.ycl.mapper.ProjectProcessMapper; import com.ycl.service.ProcessCodingService; import com.ycl.service.ProcessLogService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.flowable.task.api.Task; import org.flowable.task.api.TaskInfo; import org.flowable.task.api.history.HistoricTaskInstance; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import java.text.SimpleDateFormat; import java.util.*; import java.util.function.Function; import java.util.stream.Collectors; import static com.ycl.common.constant.ProcessOverTimeConstants.*; import static com.ycl.common.enums.business.ProcessLogEventTypeEnum.CANCEL_HANGUP; import static com.ycl.common.enums.business.ProcessLogEventTypeEnum.HANGUP; @Slf4j @Component("flowableTask") public class FlowableTask extends FlowServiceFactory { @Autowired private ProjectProcessMapper projectProcessMapper; @Autowired private ProjectInfoMapper projectInfoMapper; @Autowired private ProcessCodingMapper processCodingMapper; @Autowired private ProcessLogService processLogService; @Autowired private ProcessLogMapper processLogMapper; /** * 赋码任务 * 两个逻辑 改项目码、改节点颜色 */ public void expireTask() { log.info("开始定时任务:expireTask 赋码计算"); //当前正在运行的所有任务节点 List taskList = taskService.createTaskQuery().active().list(); log.info("查询到当前活跃任务数量: {}", taskList.size()); if (CollectionUtils.isEmpty(taskList)) { log.info("活跃任务列表为空,结束任务"); return; } //排除掉节点挂起的任务 List allHangupTask = processLogMapper.getAllHangup(); log.info("查询到挂起任务 ID 数量: {}", allHangupTask.size()); List filteredTaskList = taskList.stream() .filter(task -> { boolean isHangup = allHangupTask.contains(task.getId()); if (isHangup) { log.info("任务 [{}] 因处于挂起状态被跳过", task.getId()); } return !isHangup; }) .collect(Collectors.toList()); log.info("排除挂起任务后,待处理任务数量: {}", filteredTaskList.size()); List taskIds = filteredTaskList .stream().map(TaskInfo::getId).collect(Collectors.toList()); //查询节点挂起日志 Map> hangupLogMap = new LambdaQueryChainWrapper<>(processLogMapper) .in(ProcessLog::getEventType, HANGUP, CANCEL_HANGUP) .in(ProcessLog::getTaskId, taskIds) .list() .stream() .collect(Collectors.groupingBy(ProcessLog::getTaskId)); //需要监控的赋码任务 List processCodingList = new LambdaQueryChainWrapper<>(processCodingMapper) .in(ProcessCoding::getTaskId, taskIds) .list(); log.info("开始处理赋码计算逻辑。当前待处理活跃任务数: {}, 已配置监控的任务数: {}", taskIds.size(), processCodingList.size()); Map taskMap = new HashMap<>(); Map startTaskMap = new HashMap<>(); if (!CollectionUtils.isEmpty(processCodingList)) { //key为taskId value为本体对象 taskMap = processCodingList.stream().collect(Collectors.toMap(ProcessCoding::getTaskId, Function.identity())); //拿到开始计时的节点集合 key:taskId value:开始时间 startTaskMap = getStartTaskList(processCodingList); log.info("获取到计时起点记录数量: {}", startTaskMap.size()); } //提前准备接收数据的map key:流程实例id value:需要改变的颜色 Map> map = new HashMap<>(); List list = new ArrayList<>(); map.put(GREEN, new ArrayList<>()); map.put(RED, new ArrayList<>()); map.put(YELLOW, new ArrayList<>()); Date now = new Date(); //遍历所有代办的节点 for (Task task : filteredTaskList) { String taskId = task.getId(); String procInsId = task.getProcessInstanceId(); ProcessCoding processCoding = taskMap.get(taskId); if (processCoding == null) { //不需要监控的任务节点项目码直接改为绿色 log.info("任务 ID: {}, 流程实例: {}, 名称: {} 未在监控配置中,项目码默认设为 GREEN", taskId, procInsId, task.getName()); map.get(GREEN).add(procInsId); continue; } //判断赋码统一用秒作为单位,且只需用红码时间判断超时,通过超时去改变项目的赋码状态,节点本身无赋码状态 Date startTime = startTaskMap.get(processCoding.getStartTaskId()); if (startTime == null) { log.warn("任务 ID: {}, 流程实例: {} 找不到计时起点时间 (StartTaskId: {}), 将默认归为 GREEN 以防止任务丢失", taskId, procInsId, processCoding.getStartTaskId()); map.get(GREEN).add(procInsId); continue; } try { Long redTime = getTime(processCoding.getRedTime()); //节点处理时间,需排除节假日 long durationTime = DateUtils.getWorkingSed(startTime, now); //减去节点挂起时长 long finalDurationTime = subNodeHangupTime(hangupLogMap, task, durationTime); String status = GREEN; // 默认状态为绿色 String overtimeStatus = NORMAL; Long overtimeDurationSec = null; // 超时时长(秒,原始值) Double overtimeDurationHour = 0.0; // 超时时长(小时,转换后) if (redTime != null && redTime != 0 && finalDurationTime >= redTime) { status = RED; // 如果超过红色时间阈值,则表明该任务超时 overtimeStatus = OVERTIME; overtimeDurationSec = finalDurationTime - redTime; overtimeDurationHour = Math.round((overtimeDurationSec / 3600.0) * 10) / 10.0; SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); log.info("任务超时预警:任务ID[{}], 流程实例[{}], 起点[{}], 当前[{}], 红码阈值[{}小时], 实际耗时[{}小时], 超时[{}小时]", taskId, procInsId, sdf.format(startTime), sdf.format(now), Math.round((redTime / 3600.0) * 10) / 10.0, Math.round((finalDurationTime / 3600.0) * 10) / 10.0, overtimeDurationHour); } else { log.info("任务未超时:任务ID[{}], 流程实例[{}], 实际耗时[{}小时], 红码阈值[{}小时]", taskId, procInsId, Math.round((finalDurationTime / 3600.0) * 10) / 10.0, redTime != null ? Math.round((redTime / 3600.0) * 10) / 10.0 : 0); } map.get(status).add(procInsId); processCoding.setOverTimeTotal(String.valueOf(overtimeDurationHour)); processCoding.setStatus(status); processCoding.setOvertimeStatus(overtimeStatus); processCoding.setStartTaskTime(task.getCreateTime()); list.add(processCoding); } catch (Exception e) { log.error("任务 [{}] 赋码计算发生异常: {}", taskId, e.getMessage(), e); } } //更新项目码 map.forEach((key, value) -> { if (!value.isEmpty()) { log.info("准备更新项目码为 {}: 流程实例列表 {}", key, value); updateProjectCoding(value, key); } }); //更新节点状态 自定义的mybatis方法 if (!CollectionUtils.isEmpty(list)) { log.info("批量更新节点状态数量: {}", list.size()); processCodingMapper.updateBatch(list); } log.info("定时任务:expireTask 赋码计算结束"); } //减去节点挂起时长 private long subNodeHangupTime(Map> hangupLogMap, Task task, long durationTime) { List processLogs = hangupLogMap.get(task.getId()); if (!CollectionUtils.isEmpty(processLogs)) { long hangupTime = 0; //分组分为挂起和取消挂起 Map> logEventTypeMap = processLogs.stream() .sorted(Comparator.comparing(ProcessLog::getGmtCreate)) .collect(Collectors.groupingBy(ProcessLog::getEventType)); List cancelHangup = logEventTypeMap.get(CANCEL_HANGUP); for (int i = 0; i < cancelHangup.size(); i++) { ProcessLog processLog = cancelHangup.get(i); hangupTime += processLog.getGmtCreate().getTime() - logEventTypeMap.get(HANGUP).get(i).getGmtCreate().getTime(); } durationTime = durationTime - hangupTime; } return durationTime; } private Long getTime(String timeStr) { Long time = null; if (StringUtils.isNotBlank(timeStr)) { String[] timeArr = timeStr.split("-"); // 解析天数和小时数 int days = Integer.parseInt(timeArr[0]); int hours = 0; if (timeArr.length > 1) { hours = Integer.parseInt(timeArr[1]); } time = (days * 24L + hours) * 3600L; // //分-秒 // time= (days * 60L) + hours; } return time; } private Map getStartTaskList(List processCodingList) { //查出任务计时起始节点集合 List startTaskIds = processCodingList.stream().map(ProcessCoding::getStartTaskId).collect(Collectors.toList()); //查出起始计时节点数据 Map startDateMap = new HashMap<>(); List hisStartTasks = historyService.createHistoricTaskInstanceQuery().taskIds(startTaskIds).list(); if (!CollectionUtils.isEmpty(hisStartTasks)) { hisStartTasks.forEach(hisTask -> { startDateMap.put(hisTask.getId(), hisTask.getStartTime()); }); } return startDateMap; } /** * 赋码 * * @param processInstanceIds 流程实例ID列表 * @param coding 赋码值 */ private void updateProjectCoding(List processInstanceIds, String coding) { if (CollectionUtils.isEmpty(processInstanceIds)) { return; } List projectProcesses = new LambdaQueryChainWrapper<>(projectProcessMapper) .in(ProjectProcess::getProcessInsId, processInstanceIds) .eq(ProjectProcess::getProjectType, ProjectProcessTypeEnum.PROJECT) .list(); log.info("赋码类型 [{}]: 查询到 {} 个对应的项目流程关系", coding, projectProcesses.size()); List projectIds = projectProcesses.stream() .map(p -> { log.info("流程实例 [{}] 对应项目 ID [{}]", p.getProcessInsId(), p.getProjectId()); return p.getProjectId(); }) .collect(Collectors.toList()); if (!CollectionUtils.isEmpty(projectIds)) { log.info("执行数据库更新: 将项目 {} 的编码更新为 {}", projectIds, coding); new LambdaUpdateChainWrapper<>(projectInfoMapper) .in(ProjectInfo::getId, projectIds) .set(ProjectInfo::getCoding, coding) .update(); } else { log.warn("未找到对应的项目 ID,无法执行编码更新。流程实例列表: {}", processInstanceIds); } } }