old mode 100644
new mode 100755
|  |  |  | 
|---|
|  |  |  | import com.genersoft.iot.vmp.service.IGbStreamService; | 
|---|
|  |  |  | import com.genersoft.iot.vmp.service.IMediaServerService; | 
|---|
|  |  |  | import com.genersoft.iot.vmp.service.IStreamPushService; | 
|---|
|  |  |  | import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; | 
|---|
|  |  |  | import com.genersoft.iot.vmp.utils.DateUtil; | 
|---|
|  |  |  | import org.slf4j.Logger; | 
|---|
|  |  |  | import org.slf4j.LoggerFactory; | 
|---|
|  |  |  | 
|---|
|  |  |  | import org.springframework.stereotype.Component; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | import javax.annotation.Resource; | 
|---|
|  |  |  | import java.util.*; | 
|---|
|  |  |  | import java.util.ArrayList; | 
|---|
|  |  |  | import java.util.List; | 
|---|
|  |  |  | import java.util.concurrent.ConcurrentLinkedQueue; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | /** | 
|---|
|  |  |  | 
|---|
|  |  |  | @Resource | 
|---|
|  |  |  | private IGbStreamService gbStreamService; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | private boolean taskQueueHandlerRun = false; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | 
|---|
|  |  |  | @Override | 
|---|
|  |  |  | public void onMessage(Message message, byte[] bytes) { | 
|---|
|  |  |  | logger.info("[REDIS消息-推流设备列表更新]: {}", new String(message.getBody())); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | boolean isEmpty = taskQueue.isEmpty(); | 
|---|
|  |  |  | taskQueue.offer(message); | 
|---|
|  |  |  | if (!taskQueueHandlerRun) { | 
|---|
|  |  |  | taskQueueHandlerRun = true; | 
|---|
|  |  |  | if (isEmpty) { | 
|---|
|  |  |  | taskExecutor.execute(() -> { | 
|---|
|  |  |  | while (!taskQueue.isEmpty()) { | 
|---|
|  |  |  | Message msg = taskQueue.poll(); | 
|---|
|  |  |  | List<StreamPushItem> streamPushItems = JSON.parseArray(new String(msg.getBody()), StreamPushItem.class); | 
|---|
|  |  |  | //查询全部的app+stream 用于判断是添加还是修改 | 
|---|
|  |  |  | List<String> allAppAndStream = streamPushService.getAllAppAndStream(); | 
|---|
|  |  |  | try { | 
|---|
|  |  |  | List<StreamPushItem> streamPushItems = JSON.parseArray(new String(msg.getBody()), StreamPushItem.class); | 
|---|
|  |  |  | //查询全部的app+stream 用于判断是添加还是修改 | 
|---|
|  |  |  | List<String> allAppAndStream = streamPushService.getAllAppAndStream(); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | /** | 
|---|
|  |  |  | * 用于存储更具APP+Stream过滤后的数据,可以直接存入stream_push表与gb_stream表 | 
|---|
|  |  |  | */ | 
|---|
|  |  |  | List<StreamPushItem> streamPushItemForSave = new ArrayList<>(); | 
|---|
|  |  |  | List<StreamPushItem> streamPushItemForUpdate = new ArrayList<>(); | 
|---|
|  |  |  | for (StreamPushItem streamPushItem : streamPushItems) { | 
|---|
|  |  |  | String app = streamPushItem.getApp(); | 
|---|
|  |  |  | String stream = streamPushItem.getStream(); | 
|---|
|  |  |  | boolean contains = allAppAndStream.contains(app + stream); | 
|---|
|  |  |  | //不存在就添加 | 
|---|
|  |  |  | if (!contains) { | 
|---|
|  |  |  | streamPushItem.setStreamType("push"); | 
|---|
|  |  |  | streamPushItem.setCreateTime(DateUtil.getNow()); | 
|---|
|  |  |  | streamPushItem.setMediaServerId(mediaServerService.getDefaultMediaServer().getId()); | 
|---|
|  |  |  | streamPushItem.setOriginType(2); | 
|---|
|  |  |  | streamPushItem.setOriginTypeStr("rtsp_push"); | 
|---|
|  |  |  | streamPushItem.setTotalReaderCount("0"); | 
|---|
|  |  |  | streamPushItemForSave.add(streamPushItem); | 
|---|
|  |  |  | } else { | 
|---|
|  |  |  | //存在就只修改 name和gbId | 
|---|
|  |  |  | streamPushItemForUpdate.add(streamPushItem); | 
|---|
|  |  |  | /** | 
|---|
|  |  |  | * 用于存储更具APP+Stream过滤后的数据,可以直接存入stream_push表与gb_stream表 | 
|---|
|  |  |  | */ | 
|---|
|  |  |  | List<StreamPushItem> streamPushItemForSave = new ArrayList<>(); | 
|---|
|  |  |  | List<StreamPushItem> streamPushItemForUpdate = new ArrayList<>(); | 
|---|
|  |  |  | for (StreamPushItem streamPushItem : streamPushItems) { | 
|---|
|  |  |  | String app = streamPushItem.getApp(); | 
|---|
|  |  |  | String stream = streamPushItem.getStream(); | 
|---|
|  |  |  | boolean contains = allAppAndStream.contains(app + stream); | 
|---|
|  |  |  | //不存在就添加 | 
|---|
|  |  |  | if (!contains) { | 
|---|
|  |  |  | streamPushItem.setStreamType("push"); | 
|---|
|  |  |  | streamPushItem.setCreateTime(DateUtil.getNow()); | 
|---|
|  |  |  | streamPushItem.setMediaServerId(mediaServerService.getDefaultMediaServer().getId()); | 
|---|
|  |  |  | streamPushItem.setOriginType(2); | 
|---|
|  |  |  | streamPushItem.setOriginTypeStr("rtsp_push"); | 
|---|
|  |  |  | streamPushItem.setTotalReaderCount("0"); | 
|---|
|  |  |  | streamPushItemForSave.add(streamPushItem); | 
|---|
|  |  |  | } else { | 
|---|
|  |  |  | //存在就只修改 name和gbId | 
|---|
|  |  |  | streamPushItemForUpdate.add(streamPushItem); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | if (streamPushItemForSave.size() > 0) { | 
|---|
|  |  |  | if (streamPushItemForSave.size() > 0) { | 
|---|
|  |  |  |  | 
|---|
|  |  |  | logger.info("添加{}条",streamPushItemForSave.size()); | 
|---|
|  |  |  | logger.info(JSONObject.toJSONString(streamPushItemForSave)); | 
|---|
|  |  |  | streamPushService.batchAdd(streamPushItemForSave); | 
|---|
|  |  |  | logger.info("添加{}条",streamPushItemForSave.size()); | 
|---|
|  |  |  | logger.info(JSONObject.toJSONString(streamPushItemForSave)); | 
|---|
|  |  |  | streamPushService.batchAdd(streamPushItemForSave); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | } | 
|---|
|  |  |  | if(streamPushItemForUpdate.size()>0){ | 
|---|
|  |  |  | logger.info("修改{}条",streamPushItemForUpdate.size()); | 
|---|
|  |  |  | logger.info(JSONObject.toJSONString(streamPushItemForUpdate)); | 
|---|
|  |  |  | gbStreamService.updateGbIdOrName(streamPushItemForUpdate); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | if(streamPushItemForUpdate.size()>0){ | 
|---|
|  |  |  | logger.info("修改{}条",streamPushItemForUpdate.size()); | 
|---|
|  |  |  | logger.info(JSONObject.toJSONString(streamPushItemForUpdate)); | 
|---|
|  |  |  | gbStreamService.updateGbIdOrName(streamPushItemForUpdate); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | }catch (Exception e) { | 
|---|
|  |  |  | logger.warn("[REDIS消息-推流设备列表更新] 发现未处理的异常, \r\n{}", JSON.toJSONString(message)); | 
|---|
|  |  |  | logger.error("[REDIS消息-推流设备列表更新] 异常内容: ", e); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | taskQueueHandlerRun = false; | 
|---|
|  |  |  | }); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|