package com.tievd.jyz.handler; import cn.hutool.core.util.ObjectUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.extension.activerecord.Model; import com.baomidou.mybatisplus.extension.service.IService; import com.tievd.jyz.cache.AlgorithmCache; import com.tievd.jyz.cache.EventCodeRelOilCache; import com.tievd.jyz.cache.EventCodeResourceCache; import com.tievd.jyz.constants.SystemConstant; import com.tievd.jyz.entity.OilEvent; import com.tievd.jyz.entity.OilRecord; import com.tievd.jyz.handler.alg.AlgHandleManager; import com.tievd.jyz.mapper.OilEventMapper; import com.tievd.jyz.mapper.OilRecordMapper; import com.tievd.jyz.mqtt.dto.EventInfoDTO; import com.tievd.jyz.mqtt.dto.EventResourceDTO; import com.tievd.jyz.mqtt.dto.MqttParamDTO; import com.tievd.jyz.util.MultiMinioUtil; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.lang.reflect.Method; import java.util.List; import java.util.Map; /** * 事件接收处理类 * @author timi */ @Slf4j @Component public class EventInfoHandler { @Autowired private AlgHandleManager algHandleManager; @Autowired private OilRecordMapper oilRecordMapper; @Autowired private OilEventMapper oilEventMapper; @Autowired private MultiMinioUtil minioUtil; @Autowired ApplicationContext context; /** * 事件处理入口 * @param mqttParamDTO */ public void event(MqttParamDTO mqttParamDTO){ List eventInfoDTOList = JSONArray.parseArray(mqttParamDTO.getData().toJSONString(), EventInfoDTO.class); for(EventInfoDTO eventInfoDTO :eventInfoDTOList){ algHandleManager.eventHandle(eventInfoDTO,mqttParamDTO.getSn(),mqttParamDTO.getTime()); } } /** * 资源处理入口 * @param mqttParamDTO */ public void resource(MqttParamDTO mqttParamDTO){ EventResourceDTO eventResourceDTO = JSONObject.toJavaObject(mqttParamDTO.getData(), EventResourceDTO.class); String algCode = eventResourceDTO.getAlgCode(); if (StringUtils.isBlank(algCode)) { log.info("========================> 资源未上报对应的algCode"); return; } //当前事件唯一标识 String eventCode = eventResourceDTO.getEventCode(); log.info("准备处理资源-类型:{}, code:{}", algCode, eventCode); if (AlgorithmCache.OIL_ALG_CODE.contains(algCode)) { oilResource(mqttParamDTO); } else { eventResource(AlgorithmCache.getEntity(algCode), AlgorithmCache.getService(algCode),mqttParamDTO); } } public void oilResource(MqttParamDTO mqttParamDTO){ EventResourceDTO eventResourceDTO = JSONObject.toJavaObject(mqttParamDTO.getData(), EventResourceDTO.class); //当前事件唯一标识 String eventCode = eventResourceDTO.getEventCode(); //终端编码 String cameraCode = eventResourceDTO.getCameraCode(); //图片路径 String imgPath = eventResourceDTO.getImgPath(); //视频路径 String videoPath = eventResourceDTO.getVideoPath(); //资源类型 String sourceType = eventResourceDTO.getSourceType(); List oilRecordIdList = EventCodeRelOilCache.remove(eventCode); //通用事件资源 if(oilRecordIdList.size() == 0){ LambdaQueryWrapper wrapper = new LambdaQueryWrapper<>(); wrapper.eq(OilEvent::getImgUid, eventCode); long count = oilEventMapper.selectCount(wrapper); if (count == 0) { //事件和记录均未查到数据,则先缓存图片信息,防止因消费顺序和相似度延迟入库导致的图片保存失败 EventCodeResourceCache.put(eventCode, mqttParamDTO); return; } log.info("-------------->准备更新事件资源:{}", eventCode); //更新异常事件资源 OilEvent oilEvent = new OilEvent(); oilEvent.setImgUid(eventCode); oilEvent.setImgPath(imgPath); oilEvent.setVideoPath(videoPath); oilEventMapper.update(oilEvent, wrapper); return; } //车辆识别资源 for(Long oilRecordId :oilRecordIdList){ //更新加油记录资源 OilRecord oilRecord = oilRecordMapper.selectById(oilRecordId); if(ObjectUtil.isNotNull(oilRecord)){ if(StringUtils.isEmpty(oilRecord.getImgPath())){ oilRecord.setImgPath(imgPath); }else{ oilRecord.setOutImgPath(imgPath); } oilRecordMapper.updateById(oilRecord); } else { String[] img = imgPath.split(SystemConstant.s3_split_char); if (img.length == 2) { minioUtil.removeObject(img[0], img[1]); } } } EventCodeResourceCache.remove(eventCode); } @SneakyThrows public void eventResource(E entity, Class serviceClass, MqttParamDTO mqttParamDTO) { IService service = context.getBean(serviceClass); EventResourceDTO eventResourceDTO = JSONObject.toJavaObject(mqttParamDTO.getData(), EventResourceDTO.class); //当前事件唯一标识 String eventCode = eventResourceDTO.getEventCode(); //图片路径 String imgPath = eventResourceDTO.getImgPath(); //视频路径 String videoPath = eventResourceDTO.getVideoPath(); //通用事件资源 QueryWrapper wrapper = new QueryWrapper<>(); wrapper.eq("img_uid", eventCode); long count = service.count(wrapper); if (count == 0) { //事件和记录均未查到数据,则先缓存图片信息,防止因消费顺序和相似度延迟入库导致的图片保存失败 EventCodeResourceCache.put(eventCode, mqttParamDTO); return; } log.info("-------------->准备更新事件资源:{}", eventCode); //更新异常事件资源 Method imgMethod = entity.getClass().getMethod("setImgPath", String.class); Method videoMethod = entity.getClass().getMethod("setVideoPath", String.class); imgMethod.invoke(entity, imgPath); videoMethod.invoke(entity, videoPath); service.update(entity, wrapper); EventCodeResourceCache.remove(eventCode); } @Scheduled(cron = "${jyz.timer.img-cache-handle:0 0/2 * * * ?}") public void carImgSet() { Map map = EventCodeResourceCache.asMap(); if (map.size() == 0) { return; } log.info("-------------->缓存图片数据重进流程<----------------"); for (Map.Entry entry : map.entrySet()) { resource(entry.getValue()); } } }