old mode 100644
new mode 100755
 |  |  | 
 |  |  | import com.alibaba.fastjson2.JSONArray; | 
 |  |  | import com.alibaba.fastjson2.JSONObject; | 
 |  |  | import com.alibaba.fastjson2.TypeReference; | 
 |  |  | import com.baomidou.dynamic.datasource.annotation.DS; | 
 |  |  | import com.genersoft.iot.vmp.conf.MediaConfig; | 
 |  |  | import com.genersoft.iot.vmp.conf.UserSetting; | 
 |  |  | import com.genersoft.iot.vmp.gb28181.bean.*; | 
 |  |  | 
 |  |  | import com.genersoft.iot.vmp.storager.IRedisCatchStorage; | 
 |  |  | import com.genersoft.iot.vmp.storager.dao.*; | 
 |  |  | import com.genersoft.iot.vmp.utils.DateUtil; | 
 |  |  | import com.genersoft.iot.vmp.vmanager.bean.ResourceBaceInfo; | 
 |  |  | import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo; | 
 |  |  | import com.github.pagehelper.PageHelper; | 
 |  |  | import com.github.pagehelper.PageInfo; | 
 |  |  | import org.slf4j.Logger; | 
 |  |  | 
 |  |  | import java.util.stream.Collectors; | 
 |  |  |  | 
 |  |  | @Service | 
 |  |  | @DS("master") | 
 |  |  | public class StreamPushServiceImpl implements IStreamPushService { | 
 |  |  |  | 
 |  |  |     private final static Logger logger = LoggerFactory.getLogger(StreamPushServiceImpl.class); | 
 |  |  | 
 |  |  |  | 
 |  |  |     @Override | 
 |  |  |     public boolean stop(String app, String streamId) { | 
 |  |  |         logger.info("[推流 ] 停止流: {}/{}", app, streamId); | 
 |  |  |         StreamPushItem streamPushItem = streamPushMapper.selectOne(app, streamId); | 
 |  |  |         gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL); | 
 |  |  |         if (streamPushItem != null) { | 
 |  |  |             gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL); | 
 |  |  |         } | 
 |  |  |  | 
 |  |  |         platformGbStreamMapper.delByAppAndStream(app, streamId); | 
 |  |  |         gbStreamMapper.del(app, streamId); | 
 |  |  | 
 |  |  |                 streamInfoPushItemMap.put(onStreamChangedHookParam.getApp() + onStreamChangedHookParam.getStream(), onStreamChangedHookParam); | 
 |  |  |             } | 
 |  |  |         } | 
 |  |  |         // 获取所有推流鉴权信息,清理过期的 | 
 |  |  |         List<StreamAuthorityInfo> allStreamAuthorityInfo = redisCatchStorage.getAllStreamAuthorityInfo(); | 
 |  |  |         Map<String, StreamAuthorityInfo> streamAuthorityInfoInfoMap = new HashMap<>(); | 
 |  |  |         for (StreamAuthorityInfo streamAuthorityInfo : allStreamAuthorityInfo) { | 
 |  |  |             streamAuthorityInfoInfoMap.put(streamAuthorityInfo.getApp() + streamAuthorityInfo.getStream(), streamAuthorityInfo); | 
 |  |  |         } | 
 |  |  |         zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{ | 
 |  |  |             if (mediaList == null) { | 
 |  |  |                 return; | 
 |  |  | 
 |  |  |                 for (StreamPushItem streamPushItem : streamPushItems) { | 
 |  |  |                     pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); | 
 |  |  |                     streamInfoPushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); | 
 |  |  |                     streamAuthorityInfoInfoMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); | 
 |  |  |                 } | 
 |  |  |             } | 
 |  |  |             List<StreamPushItem> offlinePushItems = new ArrayList<>(pushItemMap.values()); | 
 |  |  | 
 |  |  |                     redisCatchStorage.sendStreamChangeMsg(type, jsonObject); | 
 |  |  |                     // 移除redis内流的信息 | 
 |  |  |                     redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineOnStreamChangedHookParam.getApp(), offlineOnStreamChangedHookParam.getStream()); | 
 |  |  |                     // 冗余数据,自己系统中自用 | 
 |  |  |                     redisCatchStorage.removePushListItem(offlineOnStreamChangedHookParam.getApp(), offlineOnStreamChangedHookParam.getStream(), mediaServerItem.getId()); | 
 |  |  |                 } | 
 |  |  |             } | 
 |  |  |  | 
 |  |  |             Collection<StreamAuthorityInfo> streamAuthorityInfos = streamAuthorityInfoInfoMap.values(); | 
 |  |  |             if (streamAuthorityInfos.size() > 0) { | 
 |  |  |                 for (StreamAuthorityInfo streamAuthorityInfo : streamAuthorityInfos) { | 
 |  |  |                     // 移除redis内流的信息 | 
 |  |  |                     redisCatchStorage.removeStreamAuthorityInfo(streamAuthorityInfo.getApp(), streamAuthorityInfo.getStream()); | 
 |  |  |                 } | 
 |  |  |             } | 
 |  |  |         })); | 
 |  |  | 
 |  |  |                 jsonObject.put("register", false); | 
 |  |  |                 jsonObject.put("mediaServerId", mediaServerId); | 
 |  |  |                 redisCatchStorage.sendStreamChangeMsg(type, jsonObject); | 
 |  |  |  | 
 |  |  |                 // 冗余数据,自己系统中自用 | 
 |  |  |                 redisCatchStorage.removePushListItem(onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream(), mediaServerId); | 
 |  |  |             } | 
 |  |  |         } | 
 |  |  |     } | 
 |  |  | 
 |  |  |         // 存储数据到stream_push表 | 
 |  |  |         streamPushMapper.addAll(streamPushItems); | 
 |  |  |         List<StreamPushItem> streamPushItemForGbStream = streamPushItems.stream() | 
 |  |  |                 .filter(streamPushItem-> streamPushItem.getId() != null) | 
 |  |  |                 .filter(streamPushItem-> streamPushItem.getGbId() != null) | 
 |  |  |                 .collect(Collectors.toList()); | 
 |  |  |         // 存储数据到gb_stream表, id会返回到streamPushItemForGbStream里 | 
 |  |  |         if (streamPushItemForGbStream.size() > 0) { | 
 |  |  | 
 |  |  |  | 
 |  |  |                 } | 
 |  |  |             } | 
 |  |  |             if (streamPushItemListFroPlatform.size() > 0) { | 
 |  |  |             if (!streamPushItemListFroPlatform.isEmpty()) { | 
 |  |  |                 platformGbStreamMapper.batchAdd(streamPushItemListFroPlatform); | 
 |  |  |                 // 发送通知 | 
 |  |  |                 for (String platformId : platformForEvent.keySet()) { | 
 |  |  | 
 |  |  |         stream.setUpdateTime(DateUtil.getNow()); | 
 |  |  |         stream.setCreateTime(DateUtil.getNow()); | 
 |  |  |         stream.setServerId(userSetting.getServerId()); | 
 |  |  |         stream.setMediaServerId(mediaConfig.getId()); | 
 |  |  |         stream.setSelf(true); | 
 |  |  |         stream.setPushIng(true); | 
 |  |  |  | 
 |  |  |         // 放在事务内执行 | 
 |  |  |         boolean result = false; | 
 |  |  | 
 |  |  |     } | 
 |  |  |  | 
 |  |  |     @Override | 
 |  |  |     public ResourceBaceInfo getOverview() { | 
 |  |  |         return streamPushMapper.getOverview(userSetting.isUsePushingAsStatus()); | 
 |  |  |     public ResourceBaseInfo getOverview() { | 
 |  |  |         int total = streamPushMapper.getAllCount(); | 
 |  |  |         int online = streamPushMapper.getAllOnline(userSetting.isUsePushingAsStatus()); | 
 |  |  |  | 
 |  |  |         return new ResourceBaseInfo(total, online); | 
 |  |  |     } | 
 |  |  |  | 
 |  |  |     @Override | 
 |  |  |     public Map<String, StreamPushItem> getAllAppAndStreamMap() { | 
 |  |  |         return streamPushMapper.getAllAppAndStreamMap(); | 
 |  |  |     } | 
 |  |  | } |