|  |  |  | 
|---|
|  |  |  |  | 
|---|
|  |  |  | import com.alibaba.fastjson2.JSON; | 
|---|
|  |  |  | import com.alibaba.fastjson2.JSONObject; | 
|---|
|  |  |  | import com.genersoft.iot.vmp.gb28181.bean.GbStream; | 
|---|
|  |  |  | import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; | 
|---|
|  |  |  | import com.genersoft.iot.vmp.service.IGbStreamService; | 
|---|
|  |  |  | import com.genersoft.iot.vmp.service.IMediaServerService; | 
|---|
|  |  |  | import com.genersoft.iot.vmp.media.service.IMediaServerService; | 
|---|
|  |  |  | import com.genersoft.iot.vmp.service.IStreamPushService; | 
|---|
|  |  |  | import com.genersoft.iot.vmp.utils.DateUtil; | 
|---|
|  |  |  | import org.slf4j.Logger; | 
|---|
|  |  |  | 
|---|
|  |  |  | import javax.annotation.Resource; | 
|---|
|  |  |  | import java.util.ArrayList; | 
|---|
|  |  |  | import java.util.List; | 
|---|
|  |  |  | import java.util.Map; | 
|---|
|  |  |  | import java.util.concurrent.ConcurrentLinkedQueue; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | /** | 
|---|
|  |  |  | 
|---|
|  |  |  | try { | 
|---|
|  |  |  | List<StreamPushItem> streamPushItems = JSON.parseArray(new String(msg.getBody()), StreamPushItem.class); | 
|---|
|  |  |  | //查询全部的app+stream 用于判断是添加还是修改 | 
|---|
|  |  |  | List<String> allAppAndStream = streamPushService.getAllAppAndStream(); | 
|---|
|  |  |  | Map<String, StreamPushItem> allAppAndStream = streamPushService.getAllAppAndStreamMap(); | 
|---|
|  |  |  | Map<String, GbStream> allGBId = gbStreamService.getAllGBId(); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | /** | 
|---|
|  |  |  | * 用于存储更具APP+Stream过滤后的数据,可以直接存入stream_push表与gb_stream表 | 
|---|
|  |  |  | 
|---|
|  |  |  | for (StreamPushItem streamPushItem : streamPushItems) { | 
|---|
|  |  |  | String app = streamPushItem.getApp(); | 
|---|
|  |  |  | String stream = streamPushItem.getStream(); | 
|---|
|  |  |  | boolean contains = allAppAndStream.contains(app + stream); | 
|---|
|  |  |  | boolean contains = allAppAndStream.containsKey(app + stream); | 
|---|
|  |  |  | //不存在就添加 | 
|---|
|  |  |  | if (!contains) { | 
|---|
|  |  |  | if (allGBId.containsKey(streamPushItem.getGbId())) { | 
|---|
|  |  |  | GbStream gbStream = allGBId.get(streamPushItem.getGbId()); | 
|---|
|  |  |  | logger.warn("[REDIS消息-推流设备列表更新-INSERT] 国标编号重复: {}, 已分配给{}/{}", | 
|---|
|  |  |  | streamPushItem.getGbId(), gbStream.getApp(), gbStream.getStream()); | 
|---|
|  |  |  | continue; | 
|---|
|  |  |  | } | 
|---|
|  |  |  | streamPushItem.setStreamType("push"); | 
|---|
|  |  |  | streamPushItem.setCreateTime(DateUtil.getNow()); | 
|---|
|  |  |  | streamPushItem.setMediaServerId(mediaServerService.getDefaultMediaServer().getId()); | 
|---|
|  |  |  | 
|---|
|  |  |  | streamPushItem.setOriginTypeStr("rtsp_push"); | 
|---|
|  |  |  | streamPushItem.setTotalReaderCount("0"); | 
|---|
|  |  |  | streamPushItemForSave.add(streamPushItem); | 
|---|
|  |  |  | allGBId.put(streamPushItem.getGbId(), streamPushItem); | 
|---|
|  |  |  | } else { | 
|---|
|  |  |  | if (allGBId.containsKey(streamPushItem.getGbId()) | 
|---|
|  |  |  | && (!allGBId.get(streamPushItem.getGbId()).getApp().equals(streamPushItem.getApp()) || !allGBId.get(streamPushItem.getGbId()).getStream().equals(streamPushItem.getStream()))) { | 
|---|
|  |  |  | GbStream gbStream = allGBId.get(streamPushItem.getGbId()); | 
|---|
|  |  |  | logger.warn("[REDIS消息-推流设备列表更新-UPDATE] 国标编号重复: {}, 已分配给{}/{}", | 
|---|
|  |  |  | streamPushItem.getGbId(), gbStream.getApp(), gbStream.getStream()); | 
|---|
|  |  |  | continue; | 
|---|
|  |  |  | } | 
|---|
|  |  |  | //存在就只修改 name和gbId | 
|---|
|  |  |  | streamPushItemForUpdate.add(streamPushItem); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | if (streamPushItemForSave.size() > 0) { | 
|---|
|  |  |  |  | 
|---|
|  |  |  | if (!streamPushItemForSave.isEmpty()) { | 
|---|
|  |  |  | logger.info("添加{}条",streamPushItemForSave.size()); | 
|---|
|  |  |  | logger.info(JSONObject.toJSONString(streamPushItemForSave)); | 
|---|
|  |  |  | streamPushService.batchAdd(streamPushItemForSave); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | } | 
|---|
|  |  |  | if(streamPushItemForUpdate.size()>0){ | 
|---|
|  |  |  | if(!streamPushItemForUpdate.isEmpty()){ | 
|---|
|  |  |  | logger.info("修改{}条",streamPushItemForUpdate.size()); | 
|---|
|  |  |  | logger.info(JSONObject.toJSONString(streamPushItemForUpdate)); | 
|---|
|  |  |  | gbStreamService.updateGbIdOrName(streamPushItemForUpdate); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | }catch (Exception e) { | 
|---|
|  |  |  | logger.warn("[REDIS消息-推流设备列表更新] 发现未处理的异常, \r\n{}", JSON.toJSONString(message)); | 
|---|
|  |  |  | logger.warn("[REDIS消息-推流设备列表更新] 发现未处理的异常, \r\n{}", new String(message.getBody())); | 
|---|
|  |  |  | logger.error("[REDIS消息-推流设备列表更新] 异常内容: ", e); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|