fuliqi
2024-12-09 2587568bec69f9b9956851da73d626c39bc720db
ycl-server/src/main/java/com/ycl/task/UYTask.java
@@ -3,6 +3,7 @@
import com.alibaba.fastjson2.JSONObject;
import com.mongodb.client.result.DeleteResult;
import com.ycl.feign.UYClient;
import com.ycl.platform.domain.entity.TMonitor;
import com.ycl.platform.domain.entity.WorkOrder;
import com.ycl.platform.domain.param.UY.ImageDetectionParam;
import com.ycl.platform.domain.param.UY.MonitorQualifyParam;
@@ -15,6 +16,7 @@
import com.ycl.platform.domain.result.UY.VideoOnlineResult;
import com.ycl.platform.domain.vo.UpdateOnlineVO;
import com.ycl.platform.mapper.TMonitorMapper;
import com.ycl.platform.mapper.WorkOrderMapper;
import com.ycl.platform.service.UYErrorTypeCheckService;
import com.ycl.platform.service.WorkOrderService;
import com.ycl.platform.service.YwPointService;
@@ -24,11 +26,14 @@
import com.ycl.utils.CheckPointUtil;
import com.ycl.utils.DateUtils;
import constant.ApiConstants;
import constant.CheckConstants;
import constant.RedisConstant;
import enumeration.ErrorType;
import enumeration.general.WorkOrderStatusEnum;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
@@ -40,6 +45,7 @@
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Function;
import java.util.stream.Collectors;
//优云对接数据任务
@@ -54,6 +60,7 @@
    private final YwPointService pointService;
    private final TMonitorMapper monitorMapper;
    private final WorkOrderService workOrderService;
    private final WorkOrderMapper workOrderMapper;
    private final SysConfigMapper sysConfigMapper;
    private final CheckPointUtil checkPointUtil;
    @Value("${youYun.tenantId}")
@@ -100,7 +107,7 @@
                                item.setNo(item.getDeviceId());
                            }
                        });
                        pointService.setDeviceTagByGB(records);
                        pointService.setDeviceTagByGB(records, CheckConstants.Rule_Category_Video);
                        //存放在mongo中
                        mongoTemplate.insertAll(records);
                        // 工单生成
@@ -149,7 +156,7 @@
                                item.setNo(item.getSerialNumber().getShowValue());
                            }
                        });
                        pointService.setDeviceTagByGB(records);
                        pointService.setDeviceTagByGB(records,CheckConstants.Rule_Category_Or);
                        pointService.setNew(records);
                        //存放在mongo中
                        mongoTemplate.insertAll(records);
@@ -172,9 +179,10 @@
    }
    /**
     * 点位在线检测
     * 点位在线PING检测
     * 任务会先执行一次优云同步,然后执行ping检测
     * online字段来自于优云,pingOnline为主动ping检测的。存入mongo给数据中心查阅
     */
    //TODO:视频离线次数、监测次数(修改逻辑只针对工单,检测在线的)
    public void pointOnline() throws ExecutionException, InterruptedException {
        log.info("开始检测点位在线");
        Integer times = 2;
@@ -186,51 +194,35 @@
        } else {
            log.error("请配置离线次数,此次设置为默认值2");
        }
        // 先查出设备IP集合
        // 先查出设备IP集合,剔除掉在线情况是未知的,并且只检测正在考核的设备避免多余工单
        List<TMonitorResult> monitorList = monitorMapper.getDistinctIP();
//        List<Future<TMonitorResult>> futureList = new ArrayList<>(48);
        //补充错误时间点
        Query onlineQuery = new Query(Criteria.where("mongoCreateTime").gte(DateUtils.getDayStart(new Date())).lt(DateUtils.getDayEnd(new Date())));
        Map<String, TMonitorResult> mongoMap = mongoTemplate.find(onlineQuery, TMonitorResult.class)
                .stream()
                .collect(Collectors.toMap(TMonitorResult::getNo, Function.identity(), (existing, replacement) -> replacement));
        for (TMonitorResult result : monitorList) {
            TMonitorResult mongoData = mongoMap.get(result.getNo());
            if(mongoData!=null){
                result.setOffLineTimeStr(mongoData.getOffLineTimeStr());
            }
        }
        List<TMonitorResult> dataList = new ArrayList<>(48);
//        for (TMonitorResult monitor : monitorList) {
//            OnlineCheckThread thread = new OnlineCheckThread(monitor, checkPointUtil, times);
//            Future<TMonitorResult> future = executorService.submit(thread);
//            futureList.add(future);
//        }
//        for (Future<TMonitorResult> future : futureList) {
//            dataList.add(future.get()); // get方法会阻塞,知道拿到结果才继续执行for
//        }
        Integer time = times;
        List<CompletableFuture<TMonitorResult>> futureList = monitorList.stream()
                .map(monitor -> CompletableFuture.supplyAsync(() -> {
                            OnlineCheckThread thread = new OnlineCheckThread(monitor, checkPointUtil, time);
                            return thread.call(); // 假设 OnlineCheckThread 实现了 Callable 接口
                        }, executorService)
                        .orTimeout(40, TimeUnit.SECONDS)
                        .orTimeout(60, TimeUnit.SECONDS)
                        .exceptionally(ex -> {
                            if (ex instanceof TimeoutException) {
                                log.error("任务执行超时:"+monitor.getIp());
                            } else {
                                log.error("任务执行异常:"+monitor.getIp() + ex);
                                log.error("任务执行异常:"+monitor.getIp() +ex);
                                ex.printStackTrace();
                            }
                            int checkTimes = 1;
                            int offLineTimes = 1;
                            Map<String, Object> map = (Map<String, Object>) redisTemplate.opsForHash().get(RedisConstant.ONLINE_KEY, monitor.getIp());
                            if (!CollectionUtils.isEmpty(map)) {
                                checkTimes = (Integer) map.get("checkTimes") + 1;
                                offLineTimes = (Integer) map.get("offLineTimes");
                            }
                            monitor.setOnline(Boolean.FALSE);
                            monitor.setCheckCount(checkTimes);
                            monitor.setOffLineCount(offLineTimes);
                            if (monitor.getOffLineCount() >= time) {
                                WorkOrder workOrder = new WorkOrder();
                                workOrder.setSerialNumber(monitor.getNo());
                                List<String> errList = new ArrayList<>();
                                errList.add(ErrorType.DEVICE_OFFLINE.getValue());
                                workOrder.setErrorTypeList(errList);
                                workOrder.setStatus(WorkOrderStatusEnum.DISTRIBUTED);
                                monitor.setWorkOrder(workOrder);
                            }
                            return monitor; // 返回失败的结果
                            return null;
                        }))
                .collect(Collectors.toList());
@@ -239,42 +231,82 @@
                futureList.toArray(new CompletableFuture[0])
        );
        try {
            allOf.get(35, TimeUnit.SECONDS); // 给予额外的5秒来收集结果
            allOf.get(60, TimeUnit.SECONDS); // 给予额外的5秒来收集结果
        } catch (TimeoutException e) {
            log.warn("部分任务未在指定时间内完成");
            log.error("部分任务未在指定时间内完成");
        } catch (Exception e2){
            log.error("数据收集异常"+e2);
        }
        dataList = futureList.stream()
                .map(CompletableFuture::join)
                .filter(result -> result != null)
                .filter(Objects::nonNull)
                .collect(Collectors.toList());
        // 更新point表的在线标识
        Date now = new Date();
        List<UpdateOnlineVO> willUpdateList = dataList.stream().map(item -> {
            UpdateOnlineVO vo = new UpdateOnlineVO();
            vo.setOnline(item.getOnline());
            vo.setIp(item.getIp());
            vo.setUpdateTime(now);
            return vo;
        }).collect(Collectors.toList());
        monitorMapper.updateOnline(willUpdateList);
        // 工单
        List<WorkOrder> workOrderList = dataList.stream()
                .filter(item -> Objects.nonNull(item.getWorkOrder()))
                .map(TMonitorResult::getWorkOrder)
                .collect(Collectors.toList());
        if (!CollectionUtils.isEmpty(workOrderList)) {
            workOrderService.innerAddWorkOrder(workOrderList);
        List<String> offLineList = new ArrayList<>();
        List<String> onLineList = new ArrayList<>();
        //查出数据库纯车辆或纯人脸设备
//        List<String> serialNumbers = monitorMapper.selectCarOrFace().stream().map(TMonitor::getSerialNumber).collect(Collectors.toList());
        dataList.forEach(item->{
            if(item.getPingOnline()) {
                onLineList.add(item.getIp());
            } else if(!item.getPingOnline()) {
                //筛选出ping离线的设备,更改数据库为离线
                offLineList.add(item.getIp());
            }
        });
        if(!CollectionUtils.isEmpty(offLineList)) {
            monitorMapper.batchUpdateOnline(offLineList, now, ApiConstants.UY_OnlineSite_Offline);
        }
        if(!CollectionUtils.isEmpty(onLineList)) {
            monitorMapper.batchUpdateOnline(onLineList, now, ApiConstants.UY_OnlineSite_Online);
        }
        //存放到mongo
        if (!CollectionUtils.isEmpty(dataList)) {
            List<TMonitorResult> mongoList = new ArrayList<>();
            dataList.forEach(item->{
                String monitorType = item.getMonitorType();
                //同一个设备多个类型每个类型存一条数据,以此在保留数据的情况下区分省厅标签
                if(StringUtils.isNotEmpty(monitorType)){
                    String[] monitors = monitorType.split("/");
                    for (String type : monitors) {
                        TMonitorResult mongoData = new TMonitorResult();
                        BeanUtils.copyProperties(item,mongoData);
                        mongoData.setMonitorType(type);
                        if("1".equals(type)){
                            mongoData.setProvinceTag(mongoData.getProvinceTagVideo());
                        }else if("2".equals(type)){
                            mongoData.setProvinceTag(mongoData.getProvinceTagCar());
                        }else if("3".equals(type)){
                            mongoData.setProvinceTag(mongoData.getProvinceTagFace());
                        }
                        mongoList.add(mongoData);
                    }
                }
            });
            //如果存在之前的数据先删除
            Query query = new Query(Criteria.where("mongoCreateTime").gte(DateUtils.getDayStart(new Date())).lt(DateUtils.getDayEnd(new Date())));
            DeleteResult result = mongoTemplate.remove(query, TMonitorResult.class);
            //存放在mongo中
            mongoTemplate.insertAll(dataList);
            mongoTemplate.insertAll(mongoList);
        }
        //工单(同一IP只生成一个工单)
        //查询数据库已存在的离线工单获取ip集合,剔除
        List<String> ips = workOrderMapper.getOfflineWorkOrder();
        List<WorkOrder> workOrderList = dataList.stream()
                .filter(item -> Objects.nonNull(item.getWorkOrder()) && (CollectionUtils.isEmpty(ips) || !ips.contains(item.getIp())))
                .collect(Collectors.toMap(
                        TMonitorResult::getIp,
                        Function.identity(),
                        (existing, replacement) -> existing // 如果遇到相同的 IP,保留第一个 TMonitorResult 对象
                ))
                .values()
                .stream()
                .map(TMonitorResult::getWorkOrder)
                .collect(Collectors.toList());
        if (!CollectionUtils.isEmpty(workOrderList)) {
            workOrderService.innerAddWorkOrder(workOrderList);
        }
        log.info("点位在线监测完成");
    }
@@ -302,11 +334,34 @@
                                .where("mongoCreateTime").gte(DateUtils.getDayStart(new Date())).lt(DateUtils.getDayEnd(new Date())));
                        DeleteResult result = mongoTemplate.remove(query, VideoOnlineResult.class);
                        //打标签
                        pointService.setDeviceTagByGB(records);
                        records.forEach(item -> {
                            if (Objects.nonNull(item.getDeviceId())) {
                                item.setNo(item.getDeviceId());
                            }
                        });
                        //貌似这里可以忽略省厅标签因为项目展示的后面ping的时候存的数据
                        pointService.setDeviceTagByGB(records,CheckConstants.Rule_Category_Video);
                        //存放在mongo中
                        mongoTemplate.insertAll(records);
                        // 工单生成
                        uyErrorTypeCheckService.videoOnlineCheck(records);
                        //更新point表在线状态
                        Date now = new Date();
                        List<UpdateOnlineVO> willUpdateList = records.stream().map(item -> {
                            UpdateOnlineVO vo = new UpdateOnlineVO();
                            vo.setOnline(item.getStatus());
                            vo.setIp(item.getIpAddr());
                            vo.setUpdateTime(now);
                            return vo;
                        }).collect(Collectors.toList());
                        monitorMapper.updateOnlineFromUyOrHk(willUpdateList);
                        //离线生成工单,一个ip只生成一个工单
                        List<VideoOnlineResult> workOrders = new ArrayList<>(records.stream()
                                .filter(item -> ApiConstants.UY_OnlineSite_Offline.equals(item.getStatus()))
                                .collect(Collectors.toMap(
                                        VideoOnlineResult::getIpAddr,
                                        Function.identity(),
                                        (existing, replacement) -> existing // 如果遇到相同的 IP,保留第一个(existing)
                                )).values());
                        uyErrorTypeCheckService.videoOnlineCheck(workOrders);
                    } else {
                        log.error("点位在线结果数据为空{}", data);
                    }
@@ -352,11 +407,11 @@
                        }
                    });
                    //打标签
                    pointService.setDeviceTagByGB(records);
                    pointService.setDeviceTagByGB(records,CheckConstants.Rule_Category_Video);
                    //存放在mongo中
                    mongoTemplate.insertAll(records);
                    // 工单生成
                    uyErrorTypeCheckService.recordMetaDSumCheck(records);
//                    // 工单生成
//                    uyErrorTypeCheckService.recordMetaDSumCheck(records);
                }
            } else {
                log.error("录像可用数据为空{}", jsonObject);