zxl
8 小时以前 5f14844e81dade9e55725642f42c76568c5ec908
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
package com.ycl.task;
 
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.conditions.query.LambdaQueryChainWrapper;
import com.baomidou.mybatisplus.extension.conditions.update.LambdaUpdateChainWrapper;
import com.ycl.common.enums.business.ProcessLogEventTypeEnum;
import com.ycl.common.enums.business.ProjectProcessTypeEnum;
import com.ycl.common.utils.DateUtils;
import com.ycl.domain.entity.ProcessCoding;
import com.ycl.domain.entity.ProcessLog;
import com.ycl.domain.entity.ProjectInfo;
import com.ycl.domain.entity.ProjectProcess;
import com.ycl.factory.FlowServiceFactory;
import com.ycl.mapper.ProcessCodingMapper;
import com.ycl.mapper.ProcessLogMapper;
import com.ycl.mapper.ProjectInfoMapper;
import com.ycl.mapper.ProjectProcessMapper;
import com.ycl.service.ProcessCodingService;
import com.ycl.service.ProcessLogService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.flowable.task.api.Task;
import org.flowable.task.api.TaskInfo;
import org.flowable.task.api.history.HistoricTaskInstance;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
 
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
 
import static com.ycl.common.constant.ProcessOverTimeConstants.*;
import static com.ycl.common.enums.business.ProcessLogEventTypeEnum.CANCEL_HANGUP;
import static com.ycl.common.enums.business.ProcessLogEventTypeEnum.HANGUP;
 
@Slf4j
@Component("flowableTask")
public class FlowableTask extends FlowServiceFactory {
    @Autowired
    private ProjectProcessMapper projectProcessMapper;
    @Autowired
    private ProjectInfoMapper projectInfoMapper;
    @Autowired
    private ProcessCodingMapper processCodingMapper;
    @Autowired
    private ProcessLogService processLogService;
    @Autowired
    private ProcessLogMapper processLogMapper;
 
    /**
     * 赋码任务
     * 两个逻辑 改项目码、改节点颜色
     */
    public void expireTask() {
        log.info("开始定时任务:expireTask 赋码计算");
        //当前正在运行的所有任务节点
        List<Task> taskList = taskService.createTaskQuery().active().list();
        log.info("查询到当前活跃任务数量: {}", taskList.size());
        if (CollectionUtils.isEmpty(taskList)) {
            log.info("活跃任务列表为空,结束任务");
            return;
        }
 
        //排除掉节点挂起的任务
        List<String> allHangupTask = processLogMapper.getAllHangup();
        log.info("查询到挂起任务 ID 数量: {}", allHangupTask.size());
 
        List<Task> filteredTaskList = taskList.stream()
                .filter(task -> {
                    boolean isHangup = allHangupTask.contains(task.getId());
                    if (isHangup) {
                        log.info("任务 [{}] 因处于挂起状态被跳过", task.getId());
                    }
                    return !isHangup;
                })
                .collect(Collectors.toList());
 
        log.info("排除挂起任务后,待处理任务数量: {}", filteredTaskList.size());
 
        List<String> taskIds = filteredTaskList
        .stream().map(TaskInfo::getId).collect(Collectors.toList());
        //查询节点挂起日志
        Map<String, List<ProcessLog>> hangupLogMap = new LambdaQueryChainWrapper<>(processLogMapper)
                .in(ProcessLog::getEventType, HANGUP, CANCEL_HANGUP)
                .in(ProcessLog::getTaskId, taskIds)
                .list()
                .stream()
                .collect(Collectors.groupingBy(ProcessLog::getTaskId));
 
        //需要监控的赋码任务
        List<ProcessCoding> processCodingList = new LambdaQueryChainWrapper<>(processCodingMapper)
                .in(ProcessCoding::getTaskId, taskIds)
                .list();
 
        log.info("开始处理赋码计算逻辑。当前待处理活跃任务数: {}, 已配置监控的任务数: {}", taskIds.size(), processCodingList.size());
 
        Map<String, ProcessCoding> taskMap = new HashMap<>();
        Map<String, Date> startTaskMap = new HashMap<>();
        if (!CollectionUtils.isEmpty(processCodingList)) {
            //key为taskId value为本体对象
            taskMap = processCodingList.stream().collect(Collectors.toMap(ProcessCoding::getTaskId, Function.identity()));
            //拿到开始计时的节点集合 key:taskId value:开始时间
            startTaskMap = getStartTaskList(processCodingList);
            log.info("获取到计时起点记录数量: {}", startTaskMap.size());
        }
 
        //提前准备接收数据的map key:流程实例id value:需要改变的颜色
        Map<String, List<String>> map = new HashMap<>();
        List<ProcessCoding> list = new ArrayList<>();
        map.put(GREEN, new ArrayList<>());
        map.put(RED, new ArrayList<>());
        map.put(YELLOW, new ArrayList<>());
        Date now = new Date();
 
        //遍历所有代办的节点
        for (Task task : filteredTaskList) {
            String taskId = task.getId();
            String procInsId = task.getProcessInstanceId();
            ProcessCoding processCoding = taskMap.get(taskId);
 
            if (processCoding == null) {
                //不需要监控的任务节点项目码直接改为绿色
                log.info("任务 ID: {}, 流程实例: {}, 名称: {} 未在监控配置中,项目码默认设为 GREEN", taskId, procInsId, task.getName());
                map.get(GREEN).add(procInsId);
                continue;
            }
 
            //判断赋码统一用秒作为单位,且只需用红码时间判断超时,通过超时去改变项目的赋码状态,节点本身无赋码状态
            Date startTime = startTaskMap.get(processCoding.getStartTaskId());
            if (startTime == null) {
                log.warn("任务 ID: {}, 流程实例: {} 找不到计时起点时间 (StartTaskId: {}), 将默认归为 GREEN 以防止任务丢失", taskId, procInsId, processCoding.getStartTaskId());
                map.get(GREEN).add(procInsId);
                continue;
            }
 
            try {
                Long redTime = getTime(processCoding.getRedTime());
                //节点处理时间,需排除节假日
                long durationTime = DateUtils.getWorkingSed(startTime, now);
 
                //减去节点挂起时长
                long finalDurationTime = subNodeHangupTime(hangupLogMap, task, durationTime);
 
                String status = GREEN; // 默认状态为绿色
                String overtimeStatus = NORMAL;
                Long overtimeDurationSec = null; // 超时时长(秒,原始值)
                Double overtimeDurationHour = 0.0; // 超时时长(小时,转换后)
 
                if (redTime != null && redTime != 0 && finalDurationTime >= redTime) {
                    status = RED; // 如果超过红色时间阈值,则表明该任务超时
                    overtimeStatus = OVERTIME;
                    overtimeDurationSec = finalDurationTime - redTime;
                    overtimeDurationHour = Math.round((overtimeDurationSec / 3600.0) * 10) / 10.0;
 
                    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                    log.info("任务超时预警:任务ID[{}], 流程实例[{}], 起点[{}], 当前[{}], 红码阈值[{}小时], 实际耗时[{}小时], 超时[{}小时]",
                            taskId, procInsId,
                            sdf.format(startTime), sdf.format(now),
                            Math.round((redTime / 3600.0) * 10) / 10.0,
                            Math.round((finalDurationTime / 3600.0) * 10) / 10.0,
                            overtimeDurationHour);
                } else {
                    log.info("任务未超时:任务ID[{}], 流程实例[{}], 实际耗时[{}小时], 红码阈值[{}小时]",
                            taskId, procInsId,
                            Math.round((finalDurationTime / 3600.0) * 10) / 10.0,
                            redTime != null ? Math.round((redTime / 3600.0) * 10) / 10.0 : 0);
                }
 
                map.get(status).add(procInsId);
                processCoding.setOverTimeTotal(String.valueOf(overtimeDurationHour));
                processCoding.setStatus(status);
                processCoding.setOvertimeStatus(overtimeStatus);
                processCoding.setStartTaskTime(task.getCreateTime());
                list.add(processCoding);
            } catch (Exception e) {
                log.error("任务 [{}] 赋码计算发生异常: {}", taskId, e.getMessage(), e);
            }
        }
        //更新项目码
        map.forEach((key, value) -> {
            if (!value.isEmpty()) {
                log.info("准备更新项目码为 {}: 流程实例列表 {}", key, value);
                updateProjectCoding(value, key);
            }
        });
        //更新节点状态 自定义的mybatis方法
        if (!CollectionUtils.isEmpty(list)) {
            log.info("批量更新节点状态数量: {}", list.size());
            processCodingMapper.updateBatch(list);
        }
 
        log.info("定时任务:expireTask 赋码计算结束");
    }
 
    //减去节点挂起时长
    private long subNodeHangupTime(Map<String, List<ProcessLog>> hangupLogMap, Task task, long durationTime) {
        List<ProcessLog> processLogs = hangupLogMap.get(task.getId());
        if (!CollectionUtils.isEmpty(processLogs)) {
            long hangupTime = 0;
            //分组分为挂起和取消挂起
            Map<ProcessLogEventTypeEnum, List<ProcessLog>> logEventTypeMap = processLogs.stream()
                    .sorted(Comparator.comparing(ProcessLog::getGmtCreate))
                    .collect(Collectors.groupingBy(ProcessLog::getEventType));
            List<ProcessLog> cancelHangup = logEventTypeMap.get(CANCEL_HANGUP);
            for (int i = 0; i < cancelHangup.size(); i++) {
                ProcessLog processLog = cancelHangup.get(i);
                hangupTime += processLog.getGmtCreate().getTime() - logEventTypeMap.get(HANGUP).get(i).getGmtCreate().getTime();
            }
            durationTime = durationTime - hangupTime;
        }
        return durationTime;
    }
 
    private Long getTime(String timeStr) {
        Long time = null;
        if (StringUtils.isNotBlank(timeStr)) {
            String[] timeArr = timeStr.split("-");
            // 解析天数和小时数
            int days = Integer.parseInt(timeArr[0]);
            int hours = 0;
            if (timeArr.length > 1) {
                hours = Integer.parseInt(timeArr[1]);
            }
            time = (days * 24L + hours) * 3600L;
//            //分-秒
//            time= (days * 60L) + hours;
        }
        return time;
    }
 
    private Map<String, Date> getStartTaskList(List<ProcessCoding> processCodingList) {
        //查出任务计时起始节点集合
        List<String> startTaskIds = processCodingList.stream().map(ProcessCoding::getStartTaskId).collect(Collectors.toList());
        //查出起始计时节点数据
        Map<String, Date> startDateMap = new HashMap<>();
        List<HistoricTaskInstance> hisStartTasks = historyService.createHistoricTaskInstanceQuery().taskIds(startTaskIds).list();
        if (!CollectionUtils.isEmpty(hisStartTasks)) {
            hisStartTasks.forEach(hisTask -> {
                startDateMap.put(hisTask.getId(), hisTask.getStartTime());
            });
        }
        return startDateMap;
    }
 
    /**
     * 赋码
     *
     * @param processInstanceIds 流程实例ID列表
     * @param coding             赋码值
     */
    private void updateProjectCoding(List<String> processInstanceIds, String coding) {
        if (CollectionUtils.isEmpty(processInstanceIds)) {
            return;
        }
 
        List<ProjectProcess> projectProcesses = new LambdaQueryChainWrapper<>(projectProcessMapper)
                .in(ProjectProcess::getProcessInsId, processInstanceIds)
                .eq(ProjectProcess::getProjectType, ProjectProcessTypeEnum.PROJECT)
                .list();
 
        log.info("赋码类型 [{}]: 查询到 {} 个对应的项目流程关系", coding, projectProcesses.size());
 
        List<String> projectIds = projectProcesses.stream()
                .map(p -> {
                    log.info("流程实例 [{}] 对应项目 ID [{}]", p.getProcessInsId(), p.getProjectId());
                    return p.getProjectId();
                })
                .collect(Collectors.toList());
 
        if (!CollectionUtils.isEmpty(projectIds)) {
            log.info("执行数据库更新: 将项目 {} 的编码更新为 {}", projectIds, coding);
            new LambdaUpdateChainWrapper<>(projectInfoMapper)
                    .in(ProjectInfo::getId, projectIds)
                    .set(ProjectInfo::getCoding, coding)
                    .update();
        } else {
            log.warn("未找到对应的项目 ID,无法执行编码更新。流程实例列表: {}", processInstanceIds);
        }
    }
}