| | |
| | | import com.alibaba.fastjson.TypeReference; |
| | | import com.genersoft.iot.vmp.common.StreamInfo; |
| | | import com.genersoft.iot.vmp.conf.UserSetup; |
| | | import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; |
| | | import com.genersoft.iot.vmp.gb28181.bean.GbStream; |
| | | import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; |
| | | import com.genersoft.iot.vmp.gb28181.event.EventPublisher; |
| | | import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; |
| | | import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; |
| | | import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; |
| | | import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.OriginType; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.*; |
| | | import com.genersoft.iot.vmp.service.IGbStreamService; |
| | | import com.genersoft.iot.vmp.service.IMediaServerService; |
| | | import com.genersoft.iot.vmp.service.IStreamPushService; |
| | | import com.genersoft.iot.vmp.storager.IRedisCatchStorage; |
| | | import com.genersoft.iot.vmp.storager.dao.GbStreamMapper; |
| | | import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper; |
| | | import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper; |
| | | import com.genersoft.iot.vmp.storager.dao.StreamPushMapper; |
| | | import com.github.pagehelper.PageHelper; |
| | | import com.github.pagehelper.PageInfo; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.stereotype.Service; |
| | | import org.springframework.util.StringUtils; |
| | | |
| | | import java.util.*; |
| | | |
| | |
| | | private StreamPushMapper streamPushMapper; |
| | | |
| | | @Autowired |
| | | private ParentPlatformMapper parentPlatformMapper; |
| | | |
| | | @Autowired |
| | | private PlatformGbStreamMapper platformGbStreamMapper; |
| | | |
| | | @Autowired |
| | | private IGbStreamService gbStreamService; |
| | | |
| | | @Autowired |
| | | private EventPublisher eventPublisher; |
| | | |
| | | @Autowired |
| | | private ZLMRESTfulUtils zlmresTfulUtils; |
| | |
| | | result.put(key, streamPushItem); |
| | | } |
| | | } |
| | | |
| | | } |
| | | |
| | | return new ArrayList<>(result.values()); |
| | |
| | | stream.setStreamType("push"); |
| | | stream.setStatus(true); |
| | | int add = gbStreamMapper.add(stream); |
| | | |
| | | // 查找开启了全部直播流共享的上级平台 |
| | | List<ParentPlatform> parentPlatforms = parentPlatformMapper.selectAllAhareAllLiveStream(); |
| | | if (parentPlatforms.size() > 0) { |
| | | for (ParentPlatform parentPlatform : parentPlatforms) { |
| | | stream.setCatalogId(parentPlatform.getCatalogId()); |
| | | stream.setPlatformId(parentPlatform.getServerGBId()); |
| | | String streamId = stream.getStream(); |
| | | StreamProxyItem streamProxyItem = platformGbStreamMapper.selectOne(stream.getApp(), streamId, parentPlatform.getServerGBId()); |
| | | if (streamProxyItem == null) { |
| | | platformGbStreamMapper.add(stream); |
| | | eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), stream, CatalogEvent.ADD); |
| | | }else { |
| | | if (!streamProxyItem.getGbId().equals(stream.getGbId())) { |
| | | // 此流使用另一个国标Id已经与该平台关联,移除此记录 |
| | | platformGbStreamMapper.delByAppAndStreamAndPlatform(stream.getApp(), streamId, parentPlatform.getServerGBId()); |
| | | platformGbStreamMapper.add(stream); |
| | | eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), stream, CatalogEvent.ADD); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | return add > 0; |
| | | } |
| | | |
| | | @Override |
| | | public boolean removeFromGB(GbStream stream) { |
| | | // 判断是否需要发送事件 |
| | | gbStreamService.sendCatalogMsg(stream, CatalogEvent.DEL); |
| | | int del = gbStreamMapper.del(stream.getApp(), stream.getStream()); |
| | | platformGbStreamMapper.delByAppAndStream(stream.getApp(), stream.getStream()); |
| | | MediaServerItem mediaInfo = mediaServerService.getOne(stream.getMediaServerId()); |
| | | JSONObject mediaList = zlmresTfulUtils.getMediaList(mediaInfo, stream.getApp(), stream.getStream()); |
| | | if (mediaList == null) { |
| | |
| | | @Override |
| | | public boolean stop(String app, String streamId) { |
| | | StreamPushItem streamPushItem = streamPushMapper.selectOne(app, streamId); |
| | | gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL); |
| | | |
| | | int delStream = streamPushMapper.del(app, streamId); |
| | | gbStreamMapper.del(app, streamId); |
| | | platformGbStreamMapper.delByAppAndStream(app, streamId); |
| | |
| | | if (mediaServerItem == null) { |
| | | return; |
| | | } |
| | | // 数据库记录 |
| | | List<StreamPushItem> pushList = getPushList(mediaServerId); |
| | | Map<String, StreamPushItem> pushItemMap = new HashMap<>(); |
| | | // redis记录 |
| | | List<MediaItem> mediaItems = redisCatchStorage.getStreams(mediaServerId, "PUSH"); |
| | | Map<String, MediaItem> streamInfoPushItemMap = new HashMap<>(); |
| | | if (pushList.size() > 0) { |
| | | Map<String, StreamPushItem> pushItemMap = new HashMap<>(); |
| | | for (StreamPushItem streamPushItem : pushList) { |
| | | pushItemMap.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem); |
| | | } |
| | | zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{ |
| | | if (mediaList == null) return; |
| | | String dataStr = mediaList.getString("data"); |
| | | |
| | | Integer code = mediaList.getInteger("code"); |
| | | List<StreamPushItem> streamPushItems = null; |
| | | if (code == 0 ) { |
| | | if (dataStr != null) { |
| | | streamPushItems = handleJSON(dataStr, mediaServerItem); |
| | | } |
| | | } |
| | | |
| | | if (streamPushItems != null) { |
| | | for (StreamPushItem streamPushItem : streamPushItems) { |
| | | pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); |
| | | } |
| | | } |
| | | Collection<StreamPushItem> offlinePushItems = pushItemMap.values(); |
| | | if (offlinePushItems.size() > 0) { |
| | | String type = "PUSH"; |
| | | streamPushMapper.delAll(new ArrayList<>(offlinePushItems)); |
| | | for (StreamPushItem offlinePushItem : offlinePushItems) { |
| | | JSONObject jsonObject = new JSONObject(); |
| | | jsonObject.put("serverId", userSetup.getServerId()); |
| | | jsonObject.put("app", offlinePushItem.getApp()); |
| | | jsonObject.put("stream", offlinePushItem.getStream()); |
| | | jsonObject.put("register", false); |
| | | jsonObject.put("mediaServerId", mediaServerId); |
| | | redisCatchStorage.sendStreamChangeMsg(type, jsonObject); |
| | | // 移除redis内流的信息 |
| | | redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlinePushItem.getApp(), offlinePushItem.getStream()); |
| | | } |
| | | } |
| | | })); |
| | | } |
| | | if (mediaItems.size() > 0) { |
| | | for (MediaItem mediaItem : mediaItems) { |
| | | streamInfoPushItemMap.put(mediaItem.getApp() + mediaItem.getStream(), mediaItem); |
| | | } |
| | | } |
| | | zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{ |
| | | if (mediaList == null) return; |
| | | String dataStr = mediaList.getString("data"); |
| | | |
| | | Integer code = mediaList.getInteger("code"); |
| | | List<StreamPushItem> streamPushItems = null; |
| | | if (code == 0 ) { |
| | | if (dataStr != null) { |
| | | streamPushItems = handleJSON(dataStr, mediaServerItem); |
| | | } |
| | | } |
| | | |
| | | if (streamPushItems != null) { |
| | | for (StreamPushItem streamPushItem : streamPushItems) { |
| | | pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); |
| | | streamInfoPushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); |
| | | } |
| | | } |
| | | List<StreamPushItem> offlinePushItems = new ArrayList<>(pushItemMap.values()); |
| | | if (offlinePushItems.size() > 0) { |
| | | String type = "PUSH"; |
| | | int runLimit = 300; |
| | | if (offlinePushItems.size() > runLimit) { |
| | | for (int i = 0; i < offlinePushItems.size(); i += runLimit) { |
| | | int toIndex = i + runLimit; |
| | | if (i + runLimit > offlinePushItems.size()) { |
| | | toIndex = offlinePushItems.size(); |
| | | } |
| | | List<StreamPushItem> streamPushItemsSub = offlinePushItems.subList(i, toIndex); |
| | | streamPushMapper.delAll(streamPushItemsSub); |
| | | } |
| | | }else { |
| | | streamPushMapper.delAll(offlinePushItems); |
| | | } |
| | | |
| | | } |
| | | Collection<MediaItem> offlineMediaItemList = streamInfoPushItemMap.values(); |
| | | if (offlineMediaItemList.size() > 0) { |
| | | String type = "PUSH"; |
| | | for (MediaItem offlineMediaItem : offlineMediaItemList) { |
| | | JSONObject jsonObject = new JSONObject(); |
| | | jsonObject.put("serverId", userSetup.getServerId()); |
| | | jsonObject.put("app", offlineMediaItem.getApp()); |
| | | jsonObject.put("stream", offlineMediaItem.getStream()); |
| | | jsonObject.put("register", false); |
| | | jsonObject.put("mediaServerId", mediaServerId); |
| | | redisCatchStorage.sendStreamChangeMsg(type, jsonObject); |
| | | // 移除redis内流的信息 |
| | | redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineMediaItem.getApp(), offlineMediaItem.getStream()); |
| | | } |
| | | } |
| | | })); |
| | | } |
| | | |
| | | @Override |
| | |
| | | // 发送流停止消息 |
| | | String type = "PUSH"; |
| | | // 发送redis消息 |
| | | List<StreamInfo> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type); |
| | | List<MediaItem> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type); |
| | | if (streamInfoList.size() > 0) { |
| | | for (StreamInfo streamInfo : streamInfoList) { |
| | | for (MediaItem mediaItem : streamInfoList) { |
| | | // 移除redis内流的信息 |
| | | redisCatchStorage.removeStream(mediaServerId, type, mediaItem.getApp(), mediaItem.getStream()); |
| | | JSONObject jsonObject = new JSONObject(); |
| | | jsonObject.put("serverId", userSetup.getServerId()); |
| | | jsonObject.put("app", streamInfo.getApp()); |
| | | jsonObject.put("stream", streamInfo.getStreamId()); |
| | | jsonObject.put("app", mediaItem.getApp()); |
| | | jsonObject.put("stream", mediaItem.getStream()); |
| | | jsonObject.put("register", false); |
| | | jsonObject.put("mediaServerId", mediaServerId); |
| | | redisCatchStorage.sendStreamChangeMsg(type, jsonObject); |
| | | // 移除redis内流的信息 |
| | | redisCatchStorage.removeStream(mediaServerId, type, streamInfo.getApp(), streamInfo.getStreamId()); |
| | | } |
| | | } |
| | | } |
| | |
| | | public void clean() { |
| | | |
| | | } |
| | | |
| | | @Override |
| | | public boolean saveToRandomGB() { |
| | | List<StreamPushItem> streamPushItems = streamPushMapper.selectAll(); |
| | | long gbId = 100001; |
| | | for (StreamPushItem streamPushItem : streamPushItems) { |
| | | streamPushItem.setStreamType("push"); |
| | | streamPushItem.setStatus(true); |
| | | streamPushItem.setGbId("34020000004111" + gbId); |
| | | gbId ++; |
| | | } |
| | | int limitCount = 30; |
| | | |
| | | if (streamPushItems.size() > limitCount) { |
| | | for (int i = 0; i < streamPushItems.size(); i += limitCount) { |
| | | int toIndex = i + limitCount; |
| | | if (i + limitCount > streamPushItems.size()) { |
| | | toIndex = streamPushItems.size(); |
| | | } |
| | | gbStreamMapper.batchAdd(streamPushItems.subList(i, toIndex)); |
| | | } |
| | | }else { |
| | | gbStreamMapper.batchAdd(streamPushItems); |
| | | } |
| | | return true; |
| | | } |
| | | } |