|  |  |  | 
|---|
|  |  |  | package com.genersoft.iot.vmp.service.impl; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | import com.alibaba.fastjson.JSON; | 
|---|
|  |  |  | import com.alibaba.fastjson.JSONArray; | 
|---|
|  |  |  | import com.alibaba.fastjson.JSONObject; | 
|---|
|  |  |  | import com.alibaba.fastjson.TypeReference; | 
|---|
|  |  |  | import com.alibaba.fastjson2.JSON; | 
|---|
|  |  |  | import com.alibaba.fastjson2.JSONArray; | 
|---|
|  |  |  | import com.alibaba.fastjson2.JSONObject; | 
|---|
|  |  |  | import com.alibaba.fastjson2.TypeReference; | 
|---|
|  |  |  | import com.genersoft.iot.vmp.conf.MediaConfig; | 
|---|
|  |  |  | import com.genersoft.iot.vmp.conf.UserSetting; | 
|---|
|  |  |  | import com.genersoft.iot.vmp.gb28181.bean.*; | 
|---|
|  |  |  | 
|---|
|  |  |  | import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; | 
|---|
|  |  |  | import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; | 
|---|
|  |  |  | import com.genersoft.iot.vmp.media.zlm.dto.*; | 
|---|
|  |  |  | import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; | 
|---|
|  |  |  | import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType; | 
|---|
|  |  |  | import com.genersoft.iot.vmp.service.IGbStreamService; | 
|---|
|  |  |  | import com.genersoft.iot.vmp.service.IMediaServerService; | 
|---|
|  |  |  | import com.genersoft.iot.vmp.service.IStreamPushService; | 
|---|
|  |  |  | 
|---|
|  |  |  | import com.genersoft.iot.vmp.storager.IRedisCatchStorage; | 
|---|
|  |  |  | import com.genersoft.iot.vmp.storager.dao.*; | 
|---|
|  |  |  | import com.genersoft.iot.vmp.utils.DateUtil; | 
|---|
|  |  |  | import com.genersoft.iot.vmp.vmanager.bean.ResourceBaceInfo; | 
|---|
|  |  |  | import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo; | 
|---|
|  |  |  | import com.github.pagehelper.PageHelper; | 
|---|
|  |  |  | import com.github.pagehelper.PageInfo; | 
|---|
|  |  |  | import org.slf4j.Logger; | 
|---|
|  |  |  | 
|---|
|  |  |  | import org.springframework.transaction.TransactionDefinition; | 
|---|
|  |  |  | import org.springframework.transaction.TransactionStatus; | 
|---|
|  |  |  | import org.springframework.util.ObjectUtils; | 
|---|
|  |  |  | import org.springframework.util.StringUtils; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | import java.util.*; | 
|---|
|  |  |  | import java.util.stream.Collectors; | 
|---|
|  |  |  | 
|---|
|  |  |  |  | 
|---|
|  |  |  | Map<String, StreamPushItem> result = new HashMap<>(); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | List<MediaItem> mediaItems = JSON.parseObject(jsonData, new TypeReference<List<MediaItem>>() {}); | 
|---|
|  |  |  | for (MediaItem item : mediaItems) { | 
|---|
|  |  |  | List<OnStreamChangedHookParam> onStreamChangedHookParams = JSON.parseObject(jsonData, new TypeReference<List<OnStreamChangedHookParam>>() {}); | 
|---|
|  |  |  | for (OnStreamChangedHookParam item : onStreamChangedHookParams) { | 
|---|
|  |  |  |  | 
|---|
|  |  |  | // 不保存国标推理以及拉流代理的流 | 
|---|
|  |  |  | if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal() | 
|---|
|  |  |  | 
|---|
|  |  |  | return new ArrayList<>(result.values()); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | @Override | 
|---|
|  |  |  | public StreamPushItem transform(MediaItem item) { | 
|---|
|  |  |  | public StreamPushItem transform(OnStreamChangedHookParam item) { | 
|---|
|  |  |  | StreamPushItem streamPushItem = new StreamPushItem(); | 
|---|
|  |  |  | streamPushItem.setApp(item.getApp()); | 
|---|
|  |  |  | streamPushItem.setMediaServerId(item.getMediaServerId()); | 
|---|
|  |  |  | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Override | 
|---|
|  |  |  | public boolean stop(String app, String streamId) { | 
|---|
|  |  |  | logger.info("[推流 ] 停止流: {}/{}", app, streamId); | 
|---|
|  |  |  | StreamPushItem streamPushItem = streamPushMapper.selectOne(app, streamId); | 
|---|
|  |  |  | gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL); | 
|---|
|  |  |  | if (streamPushItem != null) { | 
|---|
|  |  |  | gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL); | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | platformGbStreamMapper.delByAppAndStream(app, streamId); | 
|---|
|  |  |  | gbStreamMapper.del(app, streamId); | 
|---|
|  |  |  | 
|---|
|  |  |  | List<StreamPushItem> pushList = getPushList(mediaServerId); | 
|---|
|  |  |  | Map<String, StreamPushItem> pushItemMap = new HashMap<>(); | 
|---|
|  |  |  | // redis记录 | 
|---|
|  |  |  | List<MediaItem> mediaItems = redisCatchStorage.getStreams(mediaServerId, "PUSH"); | 
|---|
|  |  |  | Map<String, MediaItem> streamInfoPushItemMap = new HashMap<>(); | 
|---|
|  |  |  | List<OnStreamChangedHookParam> onStreamChangedHookParams = redisCatchStorage.getStreams(mediaServerId, "PUSH"); | 
|---|
|  |  |  | Map<String, OnStreamChangedHookParam> streamInfoPushItemMap = new HashMap<>(); | 
|---|
|  |  |  | if (pushList.size() > 0) { | 
|---|
|  |  |  | for (StreamPushItem streamPushItem : pushList) { | 
|---|
|  |  |  | if (ObjectUtils.isEmpty(streamPushItem.getGbId())) { | 
|---|
|  |  |  | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | if (mediaItems.size() > 0) { | 
|---|
|  |  |  | for (MediaItem mediaItem : mediaItems) { | 
|---|
|  |  |  | streamInfoPushItemMap.put(mediaItem.getApp() + mediaItem.getStream(), mediaItem); | 
|---|
|  |  |  | if (onStreamChangedHookParams.size() > 0) { | 
|---|
|  |  |  | for (OnStreamChangedHookParam onStreamChangedHookParam : onStreamChangedHookParams) { | 
|---|
|  |  |  | streamInfoPushItemMap.put(onStreamChangedHookParam.getApp() + onStreamChangedHookParam.getStream(), onStreamChangedHookParam); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | // 获取所有推流鉴权信息,清理过期的 | 
|---|
|  |  |  | List<StreamAuthorityInfo> allStreamAuthorityInfo = redisCatchStorage.getAllStreamAuthorityInfo(); | 
|---|
|  |  |  | Map<String, StreamAuthorityInfo> streamAuthorityInfoInfoMap = new HashMap<>(); | 
|---|
|  |  |  | for (StreamAuthorityInfo streamAuthorityInfo : allStreamAuthorityInfo) { | 
|---|
|  |  |  | streamAuthorityInfoInfoMap.put(streamAuthorityInfo.getApp() + streamAuthorityInfo.getStream(), streamAuthorityInfo); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{ | 
|---|
|  |  |  | if (mediaList == null) { | 
|---|
|  |  |  | 
|---|
|  |  |  | for (StreamPushItem streamPushItem : streamPushItems) { | 
|---|
|  |  |  | pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); | 
|---|
|  |  |  | streamInfoPushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); | 
|---|
|  |  |  | streamAuthorityInfoInfoMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | List<StreamPushItem> offlinePushItems = new ArrayList<>(pushItemMap.values()); | 
|---|
|  |  |  | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | } | 
|---|
|  |  |  | Collection<MediaItem> offlineMediaItemList = streamInfoPushItemMap.values(); | 
|---|
|  |  |  | if (offlineMediaItemList.size() > 0) { | 
|---|
|  |  |  | Collection<OnStreamChangedHookParam> offlineOnStreamChangedHookParamList = streamInfoPushItemMap.values(); | 
|---|
|  |  |  | if (offlineOnStreamChangedHookParamList.size() > 0) { | 
|---|
|  |  |  | String type = "PUSH"; | 
|---|
|  |  |  | for (MediaItem offlineMediaItem : offlineMediaItemList) { | 
|---|
|  |  |  | for (OnStreamChangedHookParam offlineOnStreamChangedHookParam : offlineOnStreamChangedHookParamList) { | 
|---|
|  |  |  | JSONObject jsonObject = new JSONObject(); | 
|---|
|  |  |  | jsonObject.put("serverId", userSetting.getServerId()); | 
|---|
|  |  |  | jsonObject.put("app", offlineMediaItem.getApp()); | 
|---|
|  |  |  | jsonObject.put("stream", offlineMediaItem.getStream()); | 
|---|
|  |  |  | jsonObject.put("app", offlineOnStreamChangedHookParam.getApp()); | 
|---|
|  |  |  | jsonObject.put("stream", offlineOnStreamChangedHookParam.getStream()); | 
|---|
|  |  |  | jsonObject.put("register", false); | 
|---|
|  |  |  | jsonObject.put("mediaServerId", mediaServerId); | 
|---|
|  |  |  | redisCatchStorage.sendStreamChangeMsg(type, jsonObject); | 
|---|
|  |  |  | // 移除redis内流的信息 | 
|---|
|  |  |  | redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineMediaItem.getApp(), offlineMediaItem.getStream()); | 
|---|
|  |  |  | redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineOnStreamChangedHookParam.getApp(), offlineOnStreamChangedHookParam.getStream()); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | Collection<StreamAuthorityInfo> streamAuthorityInfos = streamAuthorityInfoInfoMap.values(); | 
|---|
|  |  |  | if (streamAuthorityInfos.size() > 0) { | 
|---|
|  |  |  | for (StreamAuthorityInfo streamAuthorityInfo : streamAuthorityInfos) { | 
|---|
|  |  |  | // 移除redis内流的信息 | 
|---|
|  |  |  | redisCatchStorage.removeStreamAuthorityInfo(streamAuthorityInfo.getApp(), streamAuthorityInfo.getStream()); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | })); | 
|---|
|  |  |  | 
|---|
|  |  |  | // 发送流停止消息 | 
|---|
|  |  |  | String type = "PUSH"; | 
|---|
|  |  |  | // 发送redis消息 | 
|---|
|  |  |  | List<MediaItem> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type); | 
|---|
|  |  |  | List<OnStreamChangedHookParam> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type); | 
|---|
|  |  |  | if (streamInfoList.size() > 0) { | 
|---|
|  |  |  | for (MediaItem mediaItem : streamInfoList) { | 
|---|
|  |  |  | for (OnStreamChangedHookParam onStreamChangedHookParam : streamInfoList) { | 
|---|
|  |  |  | // 移除redis内流的信息 | 
|---|
|  |  |  | redisCatchStorage.removeStream(mediaServerId, type, mediaItem.getApp(), mediaItem.getStream()); | 
|---|
|  |  |  | redisCatchStorage.removeStream(mediaServerId, type, onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream()); | 
|---|
|  |  |  | JSONObject jsonObject = new JSONObject(); | 
|---|
|  |  |  | jsonObject.put("serverId", userSetting.getServerId()); | 
|---|
|  |  |  | jsonObject.put("app", mediaItem.getApp()); | 
|---|
|  |  |  | jsonObject.put("stream", mediaItem.getStream()); | 
|---|
|  |  |  | jsonObject.put("app", onStreamChangedHookParam.getApp()); | 
|---|
|  |  |  | jsonObject.put("stream", onStreamChangedHookParam.getStream()); | 
|---|
|  |  |  | jsonObject.put("register", false); | 
|---|
|  |  |  | jsonObject.put("mediaServerId", mediaServerId); | 
|---|
|  |  |  | redisCatchStorage.sendStreamChangeMsg(type, jsonObject); | 
|---|
|  |  |  | 
|---|
|  |  |  | // 存储数据到stream_push表 | 
|---|
|  |  |  | streamPushMapper.addAll(streamPushItems); | 
|---|
|  |  |  | List<StreamPushItem> streamPushItemForGbStream = streamPushItems.stream() | 
|---|
|  |  |  | .filter(streamPushItem-> streamPushItem.getId() != null) | 
|---|
|  |  |  | .filter(streamPushItem-> streamPushItem.getGbId() != null) | 
|---|
|  |  |  | .collect(Collectors.toList()); | 
|---|
|  |  |  | // 存储数据到gb_stream表, id会返回到streamPushItemForGbStream里 | 
|---|
|  |  |  | if (streamPushItemForGbStream.size() > 0) { | 
|---|
|  |  |  | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Override | 
|---|
|  |  |  | public ResourceBaceInfo getOverview() { | 
|---|
|  |  |  | return streamPushMapper.getOverview(userSetting.isUsePushingAsStatus()); | 
|---|
|  |  |  | public ResourceBaseInfo getOverview() { | 
|---|
|  |  |  | int total = streamPushMapper.getAllCount(); | 
|---|
|  |  |  | int online = streamPushMapper.getAllOnline(userSetting.isUsePushingAsStatus()); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | return new ResourceBaseInfo(total, online); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|