package com.tievd.jyz.handler;
|
|
import cn.hutool.core.util.ObjectUtil;
|
import com.alibaba.fastjson.JSON;
|
import com.alibaba.fastjson.JSONArray;
|
import com.alibaba.fastjson.JSONObject;
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
import com.baomidou.mybatisplus.extension.activerecord.Model;
|
import com.baomidou.mybatisplus.extension.service.IService;
|
import com.tievd.jyz.cache.AlgorithmCache;
|
import com.tievd.jyz.cache.EventCodeRelOilCache;
|
import com.tievd.jyz.cache.EventCodeResourceCache;
|
import com.tievd.jyz.constants.SystemConstant;
|
import com.tievd.jyz.entity.OilEvent;
|
import com.tievd.jyz.entity.OilRecord;
|
import com.tievd.jyz.handler.alg.AlgHandleManager;
|
import com.tievd.jyz.mapper.OilEventMapper;
|
import com.tievd.jyz.mapper.OilRecordMapper;
|
import com.tievd.jyz.mqtt.dto.EventInfoDTO;
|
import com.tievd.jyz.mqtt.dto.EventResourceDTO;
|
import com.tievd.jyz.mqtt.dto.MqttParamDTO;
|
import com.tievd.jyz.util.MultiMinioUtil;
|
import lombok.SneakyThrows;
|
import lombok.extern.slf4j.Slf4j;
|
import org.apache.commons.lang3.StringUtils;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.context.ApplicationContext;
|
import org.springframework.scheduling.annotation.Scheduled;
|
import org.springframework.stereotype.Component;
|
|
import java.lang.reflect.Method;
|
import java.util.List;
|
import java.util.Map;
|
|
/**
|
* 事件接收处理类
|
* @author timi
|
*/
|
@Slf4j
|
@Component
|
public class EventInfoHandler {
|
|
@Autowired
|
private AlgHandleManager algHandleManager;
|
@Autowired
|
private OilRecordMapper oilRecordMapper;
|
@Autowired
|
private OilEventMapper oilEventMapper;
|
@Autowired
|
private MultiMinioUtil minioUtil;
|
|
@Autowired
|
ApplicationContext context;
|
|
|
/**
|
* 事件处理入口
|
* @param mqttParamDTO
|
*/
|
public void event(MqttParamDTO<JSON> mqttParamDTO){
|
List<EventInfoDTO> eventInfoDTOList = JSONArray.parseArray(mqttParamDTO.getData().toJSONString(), EventInfoDTO.class);
|
for(EventInfoDTO eventInfoDTO :eventInfoDTOList){
|
algHandleManager.eventHandle(eventInfoDTO,mqttParamDTO.getSn(),mqttParamDTO.getTime());
|
}
|
}
|
|
/**
|
* 资源处理入口
|
* @param mqttParamDTO
|
*/
|
|
public void resource(MqttParamDTO<JSON> mqttParamDTO){
|
EventResourceDTO eventResourceDTO = JSONObject.toJavaObject(mqttParamDTO.getData(), EventResourceDTO.class);
|
String algCode = eventResourceDTO.getAlgCode();
|
if (StringUtils.isBlank(algCode)) {
|
log.info("========================> 资源未上报对应的algCode");
|
return;
|
}
|
//当前事件唯一标识
|
String eventCode = eventResourceDTO.getEventCode();
|
|
log.info("准备处理资源-类型:{}, code:{}", algCode, eventCode);
|
if (AlgorithmCache.OIL_ALG_CODE.contains(algCode)) {
|
oilResource(mqttParamDTO);
|
} else {
|
eventResource(AlgorithmCache.getEntity(algCode), AlgorithmCache.getService(algCode),mqttParamDTO);
|
}
|
}
|
|
public void oilResource(MqttParamDTO<JSON> mqttParamDTO){
|
EventResourceDTO eventResourceDTO = JSONObject.toJavaObject(mqttParamDTO.getData(), EventResourceDTO.class);
|
//当前事件唯一标识
|
String eventCode = eventResourceDTO.getEventCode();
|
//终端编码
|
String cameraCode = eventResourceDTO.getCameraCode();
|
//图片路径
|
String imgPath = eventResourceDTO.getImgPath();
|
//视频路径
|
String videoPath = eventResourceDTO.getVideoPath();
|
//资源类型
|
String sourceType = eventResourceDTO.getSourceType();
|
List<Long> oilRecordIdList = EventCodeRelOilCache.remove(eventCode);
|
//通用事件资源
|
if(oilRecordIdList.size() == 0){
|
LambdaQueryWrapper<OilEvent> wrapper = new LambdaQueryWrapper<>();
|
wrapper.eq(OilEvent::getImgUid, eventCode);
|
long count = oilEventMapper.selectCount(wrapper);
|
if (count == 0) {
|
//事件和记录均未查到数据,则先缓存图片信息,防止因消费顺序和相似度延迟入库导致的图片保存失败
|
EventCodeResourceCache.put(eventCode, mqttParamDTO);
|
return;
|
}
|
log.info("-------------->准备更新事件资源:{}", eventCode);
|
//更新异常事件资源
|
OilEvent oilEvent = new OilEvent();
|
oilEvent.setImgUid(eventCode);
|
oilEvent.setImgPath(imgPath);
|
oilEvent.setVideoPath(videoPath);
|
oilEventMapper.update(oilEvent, wrapper);
|
return;
|
}
|
//车辆识别资源
|
for(Long oilRecordId :oilRecordIdList){
|
//更新加油记录资源
|
OilRecord oilRecord = oilRecordMapper.selectById(oilRecordId);
|
if(ObjectUtil.isNotNull(oilRecord)){
|
if(StringUtils.isEmpty(oilRecord.getImgPath())){
|
oilRecord.setImgPath(imgPath);
|
}else{
|
oilRecord.setOutImgPath(imgPath);
|
}
|
oilRecordMapper.updateById(oilRecord);
|
} else {
|
String[] img = imgPath.split(SystemConstant.s3_split_char);
|
if (img.length == 2) {
|
minioUtil.removeObject(img[0], img[1]);
|
}
|
}
|
}
|
EventCodeResourceCache.remove(eventCode);
|
}
|
|
|
@SneakyThrows
|
public <E extends Model> void eventResource(E entity, Class<? extends IService> serviceClass, MqttParamDTO<JSON> mqttParamDTO) {
|
IService service = context.getBean(serviceClass);
|
|
EventResourceDTO eventResourceDTO = JSONObject.toJavaObject(mqttParamDTO.getData(), EventResourceDTO.class);
|
//当前事件唯一标识
|
String eventCode = eventResourceDTO.getEventCode();
|
//图片路径
|
String imgPath = eventResourceDTO.getImgPath();
|
//视频路径
|
String videoPath = eventResourceDTO.getVideoPath();
|
//通用事件资源
|
QueryWrapper wrapper = new QueryWrapper<>();
|
wrapper.eq("img_uid", eventCode);
|
long count = service.count(wrapper);
|
if (count == 0) {
|
//事件和记录均未查到数据,则先缓存图片信息,防止因消费顺序和相似度延迟入库导致的图片保存失败
|
EventCodeResourceCache.put(eventCode, mqttParamDTO);
|
return;
|
}
|
log.info("-------------->准备更新事件资源:{}", eventCode);
|
//更新异常事件资源
|
Method imgMethod = entity.getClass().getMethod("setImgPath", String.class);
|
Method videoMethod = entity.getClass().getMethod("setVideoPath", String.class);
|
imgMethod.invoke(entity, imgPath);
|
videoMethod.invoke(entity, videoPath);
|
service.update(entity, wrapper);
|
|
EventCodeResourceCache.remove(eventCode);
|
}
|
|
|
@Scheduled(cron = "${jyz.timer.img-cache-handle:0 0/2 * * * ?}")
|
public void carImgSet() {
|
Map<String, MqttParamDTO> map = EventCodeResourceCache.asMap();
|
if (map.size() == 0) {
|
return;
|
}
|
log.info("-------------->缓存图片数据重进流程<----------------");
|
for (Map.Entry<String, MqttParamDTO> entry : map.entrySet()) {
|
resource(entry.getValue());
|
}
|
}
|
}
|