|  |  | 
 |  |  | import com.genersoft.iot.vmp.service.IGbStreamService; | 
 |  |  | import com.genersoft.iot.vmp.service.IMediaServerService; | 
 |  |  | import com.genersoft.iot.vmp.service.IStreamPushService; | 
 |  |  | import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; | 
 |  |  | import com.genersoft.iot.vmp.utils.DateUtil; | 
 |  |  | import org.slf4j.Logger; | 
 |  |  | import org.slf4j.LoggerFactory; | 
 |  |  | 
 |  |  | import org.springframework.stereotype.Component; | 
 |  |  |  | 
 |  |  | import javax.annotation.Resource; | 
 |  |  | import java.util.*; | 
 |  |  | import java.util.ArrayList; | 
 |  |  | import java.util.List; | 
 |  |  | import java.util.concurrent.ConcurrentLinkedQueue; | 
 |  |  |  | 
 |  |  | /** | 
 |  |  | 
 |  |  |     @Resource | 
 |  |  |     private IGbStreamService gbStreamService; | 
 |  |  |  | 
 |  |  |     private boolean taskQueueHandlerRun = false; | 
 |  |  |  | 
 |  |  |     private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); | 
 |  |  |  | 
 |  |  | 
 |  |  |     @Override | 
 |  |  |     public void onMessage(Message message, byte[] bytes) { | 
 |  |  |         logger.info("[REDIS消息-推流设备列表更新]: {}", new String(message.getBody())); | 
 |  |  |  | 
 |  |  |         boolean isEmpty = taskQueue.isEmpty(); | 
 |  |  |         taskQueue.offer(message); | 
 |  |  |         if (!taskQueueHandlerRun) { | 
 |  |  |             taskQueueHandlerRun = true; | 
 |  |  |         if (isEmpty) { | 
 |  |  |             taskExecutor.execute(() -> { | 
 |  |  |                 while (!taskQueue.isEmpty()) { | 
 |  |  |                     Message msg = taskQueue.poll(); | 
 |  |  |                     List<StreamPushItem> streamPushItems = JSON.parseArray(new String(msg.getBody()), StreamPushItem.class); | 
 |  |  |                     //查询全部的app+stream 用于判断是添加还是修改 | 
 |  |  |                     List<String> allAppAndStream = streamPushService.getAllAppAndStream(); | 
 |  |  |                     try { | 
 |  |  |                         List<StreamPushItem> streamPushItems = JSON.parseArray(new String(msg.getBody()), StreamPushItem.class); | 
 |  |  |                         //查询全部的app+stream 用于判断是添加还是修改 | 
 |  |  |                         List<String> allAppAndStream = streamPushService.getAllAppAndStream(); | 
 |  |  |  | 
 |  |  |                     /** | 
 |  |  |                      * 用于存储更具APP+Stream过滤后的数据,可以直接存入stream_push表与gb_stream表 | 
 |  |  |                      */ | 
 |  |  |                     List<StreamPushItem> streamPushItemForSave = new ArrayList<>(); | 
 |  |  |                     List<StreamPushItem> streamPushItemForUpdate = new ArrayList<>(); | 
 |  |  |                     for (StreamPushItem streamPushItem : streamPushItems) { | 
 |  |  |                         String app = streamPushItem.getApp(); | 
 |  |  |                         String stream = streamPushItem.getStream(); | 
 |  |  |                         boolean contains = allAppAndStream.contains(app + stream); | 
 |  |  |                         //不存在就添加 | 
 |  |  |                         if (!contains) { | 
 |  |  |                             streamPushItem.setStreamType("push"); | 
 |  |  |                             streamPushItem.setCreateTime(DateUtil.getNow()); | 
 |  |  |                             streamPushItem.setMediaServerId(mediaServerService.getDefaultMediaServer().getId()); | 
 |  |  |                             streamPushItem.setOriginType(2); | 
 |  |  |                             streamPushItem.setOriginTypeStr("rtsp_push"); | 
 |  |  |                             streamPushItem.setTotalReaderCount("0"); | 
 |  |  |                             streamPushItemForSave.add(streamPushItem); | 
 |  |  |                         } else { | 
 |  |  |                             //存在就只修改 name和gbId | 
 |  |  |                             streamPushItemForUpdate.add(streamPushItem); | 
 |  |  |                         /** | 
 |  |  |                          * 用于存储更具APP+Stream过滤后的数据,可以直接存入stream_push表与gb_stream表 | 
 |  |  |                          */ | 
 |  |  |                         List<StreamPushItem> streamPushItemForSave = new ArrayList<>(); | 
 |  |  |                         List<StreamPushItem> streamPushItemForUpdate = new ArrayList<>(); | 
 |  |  |                         for (StreamPushItem streamPushItem : streamPushItems) { | 
 |  |  |                             String app = streamPushItem.getApp(); | 
 |  |  |                             String stream = streamPushItem.getStream(); | 
 |  |  |                             boolean contains = allAppAndStream.contains(app + stream); | 
 |  |  |                             //不存在就添加 | 
 |  |  |                             if (!contains) { | 
 |  |  |                                 streamPushItem.setStreamType("push"); | 
 |  |  |                                 streamPushItem.setCreateTime(DateUtil.getNow()); | 
 |  |  |                                 streamPushItem.setMediaServerId(mediaServerService.getDefaultMediaServer().getId()); | 
 |  |  |                                 streamPushItem.setOriginType(2); | 
 |  |  |                                 streamPushItem.setOriginTypeStr("rtsp_push"); | 
 |  |  |                                 streamPushItem.setTotalReaderCount("0"); | 
 |  |  |                                 streamPushItemForSave.add(streamPushItem); | 
 |  |  |                             } else { | 
 |  |  |                                 //存在就只修改 name和gbId | 
 |  |  |                                 streamPushItemForUpdate.add(streamPushItem); | 
 |  |  |                             } | 
 |  |  |                         } | 
 |  |  |                     } | 
 |  |  |                     if (streamPushItemForSave.size() > 0) { | 
 |  |  |                         if (streamPushItemForSave.size() > 0) { | 
 |  |  |  | 
 |  |  |                         logger.info("添加{}条",streamPushItemForSave.size()); | 
 |  |  |                         logger.info(JSONObject.toJSONString(streamPushItemForSave)); | 
 |  |  |                         streamPushService.batchAdd(streamPushItemForSave); | 
 |  |  |                             logger.info("添加{}条",streamPushItemForSave.size()); | 
 |  |  |                             logger.info(JSONObject.toJSONString(streamPushItemForSave)); | 
 |  |  |                             streamPushService.batchAdd(streamPushItemForSave); | 
 |  |  |  | 
 |  |  |                     } | 
 |  |  |                     if(streamPushItemForUpdate.size()>0){ | 
 |  |  |                         logger.info("修改{}条",streamPushItemForUpdate.size()); | 
 |  |  |                         logger.info(JSONObject.toJSONString(streamPushItemForUpdate)); | 
 |  |  |                         gbStreamService.updateGbIdOrName(streamPushItemForUpdate); | 
 |  |  |                         } | 
 |  |  |                         if(streamPushItemForUpdate.size()>0){ | 
 |  |  |                             logger.info("修改{}条",streamPushItemForUpdate.size()); | 
 |  |  |                             logger.info(JSONObject.toJSONString(streamPushItemForUpdate)); | 
 |  |  |                             gbStreamService.updateGbIdOrName(streamPushItemForUpdate); | 
 |  |  |                         } | 
 |  |  |                     }catch (Exception e) { | 
 |  |  |                         logger.warn("[REDIS消息-推流设备列表更新] 发现未处理的异常, \r\n{}", JSON.toJSONString(message)); | 
 |  |  |                         logger.error("[REDIS消息-推流设备列表更新] 异常内容: ", e); | 
 |  |  |                     } | 
 |  |  |                 } | 
 |  |  |                 taskQueueHandlerRun = false; | 
 |  |  |             }); | 
 |  |  |         } | 
 |  |  |     } |