| | |
| | | import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; |
| | | import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.*; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType; |
| | | import com.genersoft.iot.vmp.service.IGbStreamService; |
| | | import com.genersoft.iot.vmp.service.IMediaServerService; |
| | | import com.genersoft.iot.vmp.service.IStreamPushService; |
| | |
| | | import org.springframework.transaction.TransactionDefinition; |
| | | import org.springframework.transaction.TransactionStatus; |
| | | import org.springframework.util.ObjectUtils; |
| | | import org.springframework.util.StringUtils; |
| | | |
| | | import java.util.*; |
| | | import java.util.stream.Collectors; |
| | |
| | | |
| | | Map<String, StreamPushItem> result = new HashMap<>(); |
| | | |
| | | List<MediaItem> mediaItems = JSON.parseObject(jsonData, new TypeReference<List<MediaItem>>() {}); |
| | | for (MediaItem item : mediaItems) { |
| | | List<OnStreamChangedHookParam> onStreamChangedHookParams = JSON.parseObject(jsonData, new TypeReference<List<OnStreamChangedHookParam>>() {}); |
| | | for (OnStreamChangedHookParam item : onStreamChangedHookParams) { |
| | | |
| | | // 不保存国标推理以及拉流代理的流 |
| | | if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal() |
| | |
| | | return new ArrayList<>(result.values()); |
| | | } |
| | | @Override |
| | | public StreamPushItem transform(MediaItem item) { |
| | | public StreamPushItem transform(OnStreamChangedHookParam item) { |
| | | StreamPushItem streamPushItem = new StreamPushItem(); |
| | | streamPushItem.setApp(item.getApp()); |
| | | streamPushItem.setMediaServerId(item.getMediaServerId()); |
| | |
| | | @Override |
| | | public boolean stop(String app, String streamId) { |
| | | StreamPushItem streamPushItem = streamPushMapper.selectOne(app, streamId); |
| | | gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL); |
| | | if (streamPushItem != null) { |
| | | gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL); |
| | | } |
| | | |
| | | platformGbStreamMapper.delByAppAndStream(app, streamId); |
| | | gbStreamMapper.del(app, streamId); |
| | |
| | | List<StreamPushItem> pushList = getPushList(mediaServerId); |
| | | Map<String, StreamPushItem> pushItemMap = new HashMap<>(); |
| | | // redis记录 |
| | | List<MediaItem> mediaItems = redisCatchStorage.getStreams(mediaServerId, "PUSH"); |
| | | Map<String, MediaItem> streamInfoPushItemMap = new HashMap<>(); |
| | | List<OnStreamChangedHookParam> onStreamChangedHookParams = redisCatchStorage.getStreams(mediaServerId, "PUSH"); |
| | | Map<String, OnStreamChangedHookParam> streamInfoPushItemMap = new HashMap<>(); |
| | | if (pushList.size() > 0) { |
| | | for (StreamPushItem streamPushItem : pushList) { |
| | | if (ObjectUtils.isEmpty(streamPushItem.getGbId())) { |
| | |
| | | } |
| | | } |
| | | } |
| | | if (mediaItems.size() > 0) { |
| | | for (MediaItem mediaItem : mediaItems) { |
| | | streamInfoPushItemMap.put(mediaItem.getApp() + mediaItem.getStream(), mediaItem); |
| | | if (onStreamChangedHookParams.size() > 0) { |
| | | for (OnStreamChangedHookParam onStreamChangedHookParam : onStreamChangedHookParams) { |
| | | streamInfoPushItemMap.put(onStreamChangedHookParam.getApp() + onStreamChangedHookParam.getStream(), onStreamChangedHookParam); |
| | | } |
| | | } |
| | | // 获取所有推流鉴权信息,清理过期的 |
| | | List<StreamAuthorityInfo> allStreamAuthorityInfo = redisCatchStorage.getAllStreamAuthorityInfo(); |
| | | Map<String, StreamAuthorityInfo> streamAuthorityInfoInfoMap = new HashMap<>(); |
| | | for (StreamAuthorityInfo streamAuthorityInfo : allStreamAuthorityInfo) { |
| | | streamAuthorityInfoInfoMap.put(streamAuthorityInfo.getApp() + streamAuthorityInfo.getStream(), streamAuthorityInfo); |
| | | } |
| | | zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{ |
| | | if (mediaList == null) { |
| | |
| | | for (StreamPushItem streamPushItem : streamPushItems) { |
| | | pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); |
| | | streamInfoPushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); |
| | | streamAuthorityInfoInfoMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); |
| | | } |
| | | } |
| | | List<StreamPushItem> offlinePushItems = new ArrayList<>(pushItemMap.values()); |
| | |
| | | } |
| | | |
| | | } |
| | | Collection<MediaItem> offlineMediaItemList = streamInfoPushItemMap.values(); |
| | | if (offlineMediaItemList.size() > 0) { |
| | | Collection<OnStreamChangedHookParam> offlineOnStreamChangedHookParamList = streamInfoPushItemMap.values(); |
| | | if (offlineOnStreamChangedHookParamList.size() > 0) { |
| | | String type = "PUSH"; |
| | | for (MediaItem offlineMediaItem : offlineMediaItemList) { |
| | | for (OnStreamChangedHookParam offlineOnStreamChangedHookParam : offlineOnStreamChangedHookParamList) { |
| | | JSONObject jsonObject = new JSONObject(); |
| | | jsonObject.put("serverId", userSetting.getServerId()); |
| | | jsonObject.put("app", offlineMediaItem.getApp()); |
| | | jsonObject.put("stream", offlineMediaItem.getStream()); |
| | | jsonObject.put("app", offlineOnStreamChangedHookParam.getApp()); |
| | | jsonObject.put("stream", offlineOnStreamChangedHookParam.getStream()); |
| | | jsonObject.put("register", false); |
| | | jsonObject.put("mediaServerId", mediaServerId); |
| | | redisCatchStorage.sendStreamChangeMsg(type, jsonObject); |
| | | // 移除redis内流的信息 |
| | | redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineMediaItem.getApp(), offlineMediaItem.getStream()); |
| | | redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineOnStreamChangedHookParam.getApp(), offlineOnStreamChangedHookParam.getStream()); |
| | | } |
| | | } |
| | | |
| | | Collection<StreamAuthorityInfo> streamAuthorityInfos = streamAuthorityInfoInfoMap.values(); |
| | | if (streamAuthorityInfos.size() > 0) { |
| | | for (StreamAuthorityInfo streamAuthorityInfo : streamAuthorityInfos) { |
| | | // 移除redis内流的信息 |
| | | redisCatchStorage.removeStreamAuthorityInfo(streamAuthorityInfo.getApp(), streamAuthorityInfo.getStream()); |
| | | } |
| | | } |
| | | })); |
| | |
| | | // 发送流停止消息 |
| | | String type = "PUSH"; |
| | | // 发送redis消息 |
| | | List<MediaItem> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type); |
| | | List<OnStreamChangedHookParam> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type); |
| | | if (streamInfoList.size() > 0) { |
| | | for (MediaItem mediaItem : streamInfoList) { |
| | | for (OnStreamChangedHookParam onStreamChangedHookParam : streamInfoList) { |
| | | // 移除redis内流的信息 |
| | | redisCatchStorage.removeStream(mediaServerId, type, mediaItem.getApp(), mediaItem.getStream()); |
| | | redisCatchStorage.removeStream(mediaServerId, type, onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream()); |
| | | JSONObject jsonObject = new JSONObject(); |
| | | jsonObject.put("serverId", userSetting.getServerId()); |
| | | jsonObject.put("app", mediaItem.getApp()); |
| | | jsonObject.put("stream", mediaItem.getStream()); |
| | | jsonObject.put("app", onStreamChangedHookParam.getApp()); |
| | | jsonObject.put("stream", onStreamChangedHookParam.getStream()); |
| | | jsonObject.put("register", false); |
| | | jsonObject.put("mediaServerId", mediaServerId); |
| | | redisCatchStorage.sendStreamChangeMsg(type, jsonObject); |
| | |
| | | // 存储数据到stream_push表 |
| | | streamPushMapper.addAll(streamPushItems); |
| | | List<StreamPushItem> streamPushItemForGbStream = streamPushItems.stream() |
| | | .filter(streamPushItem-> streamPushItem.getId() != null) |
| | | .filter(streamPushItem-> streamPushItem.getGbId() != null) |
| | | .collect(Collectors.toList()); |
| | | // 存储数据到gb_stream表, id会返回到streamPushItemForGbStream里 |
| | | if (streamPushItemForGbStream.size() > 0) { |