package com.ycl.task;
|
|
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
import com.baomidou.mybatisplus.extension.conditions.query.LambdaQueryChainWrapper;
|
import com.baomidou.mybatisplus.extension.conditions.update.LambdaUpdateChainWrapper;
|
import com.ycl.common.enums.business.ProcessLogEventTypeEnum;
|
import com.ycl.common.enums.business.ProjectProcessTypeEnum;
|
import com.ycl.common.utils.DateUtils;
|
import com.ycl.domain.entity.ProcessCoding;
|
import com.ycl.domain.entity.ProcessLog;
|
import com.ycl.domain.entity.ProjectInfo;
|
import com.ycl.domain.entity.ProjectProcess;
|
import com.ycl.factory.FlowServiceFactory;
|
import com.ycl.mapper.ProcessCodingMapper;
|
import com.ycl.mapper.ProcessLogMapper;
|
import com.ycl.mapper.ProjectInfoMapper;
|
import com.ycl.mapper.ProjectProcessMapper;
|
import com.ycl.service.ProcessCodingService;
|
import com.ycl.service.ProcessLogService;
|
import lombok.extern.slf4j.Slf4j;
|
import org.apache.commons.lang3.StringUtils;
|
import org.flowable.task.api.Task;
|
import org.flowable.task.api.TaskInfo;
|
import org.flowable.task.api.history.HistoricTaskInstance;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.stereotype.Component;
|
import org.springframework.util.CollectionUtils;
|
|
import java.text.SimpleDateFormat;
|
import java.util.*;
|
import java.util.function.Function;
|
import java.util.stream.Collectors;
|
|
import static com.ycl.common.constant.ProcessOverTimeConstants.*;
|
import static com.ycl.common.enums.business.ProcessLogEventTypeEnum.CANCEL_HANGUP;
|
import static com.ycl.common.enums.business.ProcessLogEventTypeEnum.HANGUP;
|
|
@Slf4j
|
@Component("flowableTask")
|
public class FlowableTask extends FlowServiceFactory {
|
@Autowired
|
private ProjectProcessMapper projectProcessMapper;
|
@Autowired
|
private ProjectInfoMapper projectInfoMapper;
|
@Autowired
|
private ProcessCodingMapper processCodingMapper;
|
@Autowired
|
private ProcessLogService processLogService;
|
@Autowired
|
private ProcessLogMapper processLogMapper;
|
|
/**
|
* 赋码任务
|
* 两个逻辑 改项目码、改节点颜色
|
*/
|
public void expireTask() {
|
log.info("开始定时任务:expireTask 赋码计算");
|
//当前正在运行的所有任务节点
|
List<Task> taskList = taskService.createTaskQuery().active().list();
|
log.info("查询到当前活跃任务数量: {}", taskList.size());
|
if (CollectionUtils.isEmpty(taskList)) {
|
log.info("活跃任务列表为空,结束任务");
|
return;
|
}
|
|
//排除掉节点挂起的任务
|
List<String> allHangupTask = processLogMapper.getAllHangup();
|
log.info("查询到挂起任务 ID 数量: {}", allHangupTask.size());
|
|
List<Task> filteredTaskList = taskList.stream()
|
.filter(task -> {
|
boolean isHangup = allHangupTask.contains(task.getId());
|
if (isHangup) {
|
log.info("任务 [{}] 因处于挂起状态被跳过", task.getId());
|
}
|
return !isHangup;
|
})
|
.collect(Collectors.toList());
|
|
log.info("排除挂起任务后,待处理任务数量: {}", filteredTaskList.size());
|
|
List<String> taskIds = filteredTaskList
|
.stream().map(TaskInfo::getId).collect(Collectors.toList());
|
//查询节点挂起日志
|
Map<String, List<ProcessLog>> hangupLogMap = new LambdaQueryChainWrapper<>(processLogMapper)
|
.in(ProcessLog::getEventType, HANGUP, CANCEL_HANGUP)
|
.in(ProcessLog::getTaskId, taskIds)
|
.list()
|
.stream()
|
.collect(Collectors.groupingBy(ProcessLog::getTaskId));
|
|
//需要监控的赋码任务
|
List<ProcessCoding> processCodingList = new LambdaQueryChainWrapper<>(processCodingMapper)
|
.in(ProcessCoding::getTaskId, taskIds)
|
.list();
|
|
log.info("开始处理赋码计算逻辑。当前待处理活跃任务数: {}, 已配置监控的任务数: {}", taskIds.size(), processCodingList.size());
|
|
Map<String, ProcessCoding> taskMap = new HashMap<>();
|
Map<String, Date> startTaskMap = new HashMap<>();
|
if (!CollectionUtils.isEmpty(processCodingList)) {
|
//key为taskId value为本体对象
|
taskMap = processCodingList.stream().collect(Collectors.toMap(ProcessCoding::getTaskId, Function.identity()));
|
//拿到开始计时的节点集合 key:taskId value:开始时间
|
startTaskMap = getStartTaskList(processCodingList);
|
log.info("获取到计时起点记录数量: {}", startTaskMap.size());
|
}
|
|
//提前准备接收数据的map key:流程实例id value:需要改变的颜色
|
Map<String, List<String>> map = new HashMap<>();
|
List<ProcessCoding> list = new ArrayList<>();
|
map.put(GREEN, new ArrayList<>());
|
map.put(RED, new ArrayList<>());
|
map.put(YELLOW, new ArrayList<>());
|
Date now = new Date();
|
|
//遍历所有代办的节点
|
for (Task task : filteredTaskList) {
|
String taskId = task.getId();
|
String procInsId = task.getProcessInstanceId();
|
ProcessCoding processCoding = taskMap.get(taskId);
|
|
if (processCoding == null) {
|
//不需要监控的任务节点项目码直接改为绿色
|
log.info("任务 ID: {}, 流程实例: {}, 名称: {} 未在监控配置中,项目码默认设为 GREEN", taskId, procInsId, task.getName());
|
map.get(GREEN).add(procInsId);
|
continue;
|
}
|
|
//判断赋码统一用秒作为单位,且只需用红码时间判断超时,通过超时去改变项目的赋码状态,节点本身无赋码状态
|
Date startTime = startTaskMap.get(processCoding.getStartTaskId());
|
if (startTime == null) {
|
log.warn("任务 ID: {}, 流程实例: {} 找不到计时起点时间 (StartTaskId: {}), 将默认归为 GREEN 以防止任务丢失", taskId, procInsId, processCoding.getStartTaskId());
|
map.get(GREEN).add(procInsId);
|
continue;
|
}
|
|
try {
|
Long redTime = getTime(processCoding.getRedTime());
|
//节点处理时间,需排除节假日
|
long durationTime = DateUtils.getWorkingSed(startTime, now);
|
|
//减去节点挂起时长
|
long finalDurationTime = subNodeHangupTime(hangupLogMap, task, durationTime);
|
|
String status = GREEN; // 默认状态为绿色
|
String overtimeStatus = NORMAL;
|
Long overtimeDurationSec = null; // 超时时长(秒,原始值)
|
Double overtimeDurationHour = 0.0; // 超时时长(小时,转换后)
|
|
if (redTime != null && redTime != 0 && finalDurationTime >= redTime) {
|
status = RED; // 如果超过红色时间阈值,则表明该任务超时
|
overtimeStatus = OVERTIME;
|
overtimeDurationSec = finalDurationTime - redTime;
|
overtimeDurationHour = Math.round((overtimeDurationSec / 3600.0) * 10) / 10.0;
|
|
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
log.info("任务超时预警:任务ID[{}], 流程实例[{}], 起点[{}], 当前[{}], 红码阈值[{}小时], 实际耗时[{}小时], 超时[{}小时]",
|
taskId, procInsId,
|
sdf.format(startTime), sdf.format(now),
|
Math.round((redTime / 3600.0) * 10) / 10.0,
|
Math.round((finalDurationTime / 3600.0) * 10) / 10.0,
|
overtimeDurationHour);
|
} else {
|
log.info("任务未超时:任务ID[{}], 流程实例[{}], 实际耗时[{}小时], 红码阈值[{}小时]",
|
taskId, procInsId,
|
Math.round((finalDurationTime / 3600.0) * 10) / 10.0,
|
redTime != null ? Math.round((redTime / 3600.0) * 10) / 10.0 : 0);
|
}
|
|
map.get(status).add(procInsId);
|
processCoding.setOverTimeTotal(String.valueOf(overtimeDurationHour));
|
processCoding.setStatus(status);
|
processCoding.setOvertimeStatus(overtimeStatus);
|
processCoding.setStartTaskTime(task.getCreateTime());
|
list.add(processCoding);
|
} catch (Exception e) {
|
log.error("任务 [{}] 赋码计算发生异常: {}", taskId, e.getMessage(), e);
|
}
|
}
|
//更新项目码
|
map.forEach((key, value) -> {
|
if (!value.isEmpty()) {
|
log.info("准备更新项目码为 {}: 流程实例列表 {}", key, value);
|
updateProjectCoding(value, key);
|
}
|
});
|
//更新节点状态 自定义的mybatis方法
|
if (!CollectionUtils.isEmpty(list)) {
|
log.info("批量更新节点状态数量: {}", list.size());
|
processCodingMapper.updateBatch(list);
|
}
|
|
log.info("定时任务:expireTask 赋码计算结束");
|
}
|
|
//减去节点挂起时长
|
private long subNodeHangupTime(Map<String, List<ProcessLog>> hangupLogMap, Task task, long durationTime) {
|
List<ProcessLog> processLogs = hangupLogMap.get(task.getId());
|
if (!CollectionUtils.isEmpty(processLogs)) {
|
long hangupTime = 0;
|
//分组分为挂起和取消挂起
|
Map<ProcessLogEventTypeEnum, List<ProcessLog>> logEventTypeMap = processLogs.stream()
|
.sorted(Comparator.comparing(ProcessLog::getGmtCreate))
|
.collect(Collectors.groupingBy(ProcessLog::getEventType));
|
List<ProcessLog> cancelHangup = logEventTypeMap.get(CANCEL_HANGUP);
|
for (int i = 0; i < cancelHangup.size(); i++) {
|
ProcessLog processLog = cancelHangup.get(i);
|
hangupTime += processLog.getGmtCreate().getTime() - logEventTypeMap.get(HANGUP).get(i).getGmtCreate().getTime();
|
}
|
durationTime = durationTime - hangupTime;
|
}
|
return durationTime;
|
}
|
|
private Long getTime(String timeStr) {
|
Long time = null;
|
if (StringUtils.isNotBlank(timeStr)) {
|
String[] timeArr = timeStr.split("-");
|
// 解析天数和小时数
|
int days = Integer.parseInt(timeArr[0]);
|
int hours = 0;
|
if (timeArr.length > 1) {
|
hours = Integer.parseInt(timeArr[1]);
|
}
|
time = (days * 24L + hours) * 3600L;
|
// //分-秒
|
// time= (days * 60L) + hours;
|
}
|
return time;
|
}
|
|
private Map<String, Date> getStartTaskList(List<ProcessCoding> processCodingList) {
|
//查出任务计时起始节点集合
|
List<String> startTaskIds = processCodingList.stream().map(ProcessCoding::getStartTaskId).collect(Collectors.toList());
|
//查出起始计时节点数据
|
Map<String, Date> startDateMap = new HashMap<>();
|
List<HistoricTaskInstance> hisStartTasks = historyService.createHistoricTaskInstanceQuery().taskIds(startTaskIds).list();
|
if (!CollectionUtils.isEmpty(hisStartTasks)) {
|
hisStartTasks.forEach(hisTask -> {
|
startDateMap.put(hisTask.getId(), hisTask.getStartTime());
|
});
|
}
|
return startDateMap;
|
}
|
|
/**
|
* 赋码
|
*
|
* @param processInstanceIds 流程实例ID列表
|
* @param coding 赋码值
|
*/
|
private void updateProjectCoding(List<String> processInstanceIds, String coding) {
|
if (CollectionUtils.isEmpty(processInstanceIds)) {
|
return;
|
}
|
|
List<ProjectProcess> projectProcesses = new LambdaQueryChainWrapper<>(projectProcessMapper)
|
.in(ProjectProcess::getProcessInsId, processInstanceIds)
|
.eq(ProjectProcess::getProjectType, ProjectProcessTypeEnum.PROJECT)
|
.list();
|
|
log.info("赋码类型 [{}]: 查询到 {} 个对应的项目流程关系", coding, projectProcesses.size());
|
|
List<String> projectIds = projectProcesses.stream()
|
.map(p -> {
|
log.info("流程实例 [{}] 对应项目 ID [{}]", p.getProcessInsId(), p.getProjectId());
|
return p.getProjectId();
|
})
|
.collect(Collectors.toList());
|
|
if (!CollectionUtils.isEmpty(projectIds)) {
|
log.info("执行数据库更新: 将项目 {} 的编码更新为 {}", projectIds, coding);
|
new LambdaUpdateChainWrapper<>(projectInfoMapper)
|
.in(ProjectInfo::getId, projectIds)
|
.set(ProjectInfo::getCoding, coding)
|
.update();
|
} else {
|
log.warn("未找到对应的项目 ID,无法执行编码更新。流程实例列表: {}", processInstanceIds);
|
}
|
}
|
}
|