old mode 100644
new mode 100755
| | |
| | | package com.genersoft.iot.vmp.service.impl; |
| | | |
| | | import com.alibaba.fastjson.JSON; |
| | | import com.alibaba.fastjson.JSONArray; |
| | | import com.alibaba.fastjson.JSONObject; |
| | | import com.alibaba.fastjson.TypeReference; |
| | | import com.alibaba.fastjson2.JSONObject; |
| | | import com.baomidou.dynamic.datasource.annotation.DS; |
| | | import com.genersoft.iot.vmp.common.StreamInfo; |
| | | import com.genersoft.iot.vmp.conf.MediaConfig; |
| | | import com.genersoft.iot.vmp.conf.UserSetting; |
| | | import com.genersoft.iot.vmp.gb28181.bean.*; |
| | | import com.genersoft.iot.vmp.gb28181.bean.GbStream; |
| | | import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; |
| | | import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog; |
| | | import com.genersoft.iot.vmp.gb28181.event.EventPublisher; |
| | | import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; |
| | | import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.*; |
| | | import com.genersoft.iot.vmp.media.bean.MediaInfo; |
| | | import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; |
| | | import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent; |
| | | import com.genersoft.iot.vmp.media.service.IMediaServerService; |
| | | import com.genersoft.iot.vmp.media.bean.MediaServer; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType; |
| | | import com.genersoft.iot.vmp.service.IGbStreamService; |
| | | import com.genersoft.iot.vmp.service.IMediaServerService; |
| | | import com.genersoft.iot.vmp.service.IStreamPushService; |
| | | import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis; |
| | | import com.genersoft.iot.vmp.storager.IRedisCatchStorage; |
| | | import com.genersoft.iot.vmp.storager.dao.*; |
| | | import com.genersoft.iot.vmp.utils.DateUtil; |
| | | import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo; |
| | | import com.github.pagehelper.PageHelper; |
| | | import com.github.pagehelper.PageInfo; |
| | | import org.slf4j.Logger; |
| | | import org.slf4j.LoggerFactory; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.context.event.EventListener; |
| | | import org.springframework.jdbc.datasource.DataSourceTransactionManager; |
| | | import org.springframework.scheduling.annotation.Async; |
| | | import org.springframework.stereotype.Service; |
| | | import org.springframework.transaction.TransactionDefinition; |
| | | import org.springframework.transaction.TransactionStatus; |
| | | import org.springframework.util.StringUtils; |
| | | import org.springframework.util.ObjectUtils; |
| | | |
| | | import java.util.*; |
| | | import java.util.stream.Collectors; |
| | | |
| | | @Service |
| | | @DS("master") |
| | | public class StreamPushServiceImpl implements IStreamPushService { |
| | | |
| | | private final static Logger logger = LoggerFactory.getLogger(StreamPushServiceImpl.class); |
| | |
| | | private EventPublisher eventPublisher; |
| | | |
| | | @Autowired |
| | | private ZLMRESTfulUtils zlmresTfulUtils; |
| | | |
| | | @Autowired |
| | | private IRedisCatchStorage redisCatchStorage; |
| | | |
| | | @Autowired |
| | |
| | | @Autowired |
| | | TransactionDefinition transactionDefinition; |
| | | |
| | | @Override |
| | | public List<StreamPushItem> handleJSON(String jsonData, MediaServerItem mediaServerItem) { |
| | | if (jsonData == null) { |
| | | return null; |
| | | @Autowired |
| | | private MediaConfig mediaConfig; |
| | | |
| | | /** |
| | | * 流到来的处理 |
| | | */ |
| | | @Async("taskExecutor") |
| | | @EventListener |
| | | public void onApplicationEvent(MediaArrivalEvent event) { |
| | | MediaInfo mediaInfo = event.getMediaInfo(); |
| | | if (mediaInfo == null) { |
| | | return; |
| | | } |
| | | if (mediaInfo.getOriginType() != OriginType.RTMP_PUSH.ordinal() |
| | | && mediaInfo.getOriginType() != OriginType.RTSP_PUSH.ordinal() |
| | | && mediaInfo.getOriginType() != OriginType.RTC_PUSH.ordinal()) { |
| | | return; |
| | | } |
| | | |
| | | StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(event.getApp(), event.getStream()); |
| | | if (streamAuthorityInfo == null) { |
| | | streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(event); |
| | | } else { |
| | | streamAuthorityInfo.setOriginType(mediaInfo.getOriginType()); |
| | | } |
| | | redisCatchStorage.updateStreamAuthorityInfo(event.getApp(), event.getStream(), streamAuthorityInfo); |
| | | StreamPushItem transform = StreamPushItem.getInstance(event, userSetting.getServerId()); |
| | | transform.setPushIng(true); |
| | | transform.setUpdateTime(DateUtil.getNow()); |
| | | transform.setPushTime(DateUtil.getNow()); |
| | | transform.setSelf(true); |
| | | StreamPushItem pushInDb = getPush(event.getApp(), event.getStream()); |
| | | if (pushInDb == null) { |
| | | transform.setCreateTime(DateUtil.getNow()); |
| | | streamPushMapper.add(transform); |
| | | }else { |
| | | streamPushMapper.update(transform); |
| | | gbStreamMapper.updateMediaServer(event.getApp(), event.getStream(), event.getMediaServer().getId()); |
| | | } |
| | | // TODO 相关的事件自行管理,不需要写入ZLMMediaListManager |
| | | // ChannelOnlineEvent channelOnlineEventLister = getChannelOnlineEventLister(transform.getApp(), transform.getStream()); |
| | | // if ( channelOnlineEventLister != null) { |
| | | // try { |
| | | // channelOnlineEventLister.run(transform.getApp(), transform.getStream(), transform.getServerId());; |
| | | // } catch (ParseException e) { |
| | | // logger.error("addPush: ", e); |
| | | // } |
| | | // removedChannelOnlineEventLister(transform.getApp(), transform.getStream()); |
| | | // } |
| | | // 冗余数据,自己系统中自用 |
| | | redisCatchStorage.addPushListItem(event.getApp(), event.getStream(), event); |
| | | |
| | | // 发送流变化redis消息 |
| | | JSONObject jsonObject = new JSONObject(); |
| | | jsonObject.put("serverId", userSetting.getServerId()); |
| | | jsonObject.put("app", event.getApp()); |
| | | jsonObject.put("stream", event.getStream()); |
| | | jsonObject.put("register", true); |
| | | jsonObject.put("mediaServerId", event.getMediaServer().getId()); |
| | | redisCatchStorage.sendStreamChangeMsg(OriginType.values()[event.getMediaInfo().getOriginType()].getType(), jsonObject); |
| | | } |
| | | |
| | | /** |
| | | * 流离开的处理 |
| | | */ |
| | | @Async("taskExecutor") |
| | | @EventListener |
| | | public void onApplicationEvent(MediaDepartureEvent event) { |
| | | // 兼容流注销时类型从redis记录获取 |
| | | MediaInfo mediaInfo = redisCatchStorage.getStreamInfo( |
| | | event.getApp(), event.getStream(), event.getMediaServer().getId()); |
| | | if (mediaInfo != null) { |
| | | String type = OriginType.values()[mediaInfo.getOriginType()].getType(); |
| | | redisCatchStorage.removeStream(event.getMediaServer().getId(), type, event.getApp(), event.getStream()); |
| | | if ("PUSH".equalsIgnoreCase(type)) { |
| | | // 冗余数据,自己系统中自用 |
| | | redisCatchStorage.removePushListItem(event.getApp(), event.getStream(), event.getMediaServer().getId()); |
| | | } |
| | | if (type != null) { |
| | | // 发送流变化redis消息 |
| | | JSONObject jsonObject = new JSONObject(); |
| | | jsonObject.put("serverId", userSetting.getServerId()); |
| | | jsonObject.put("app", event.getApp()); |
| | | jsonObject.put("stream", event.getStream()); |
| | | jsonObject.put("register", false); |
| | | jsonObject.put("mediaServerId", event.getMediaServer().getId()); |
| | | redisCatchStorage.sendStreamChangeMsg(type, jsonObject); |
| | | } |
| | | } |
| | | GbStream gbStream = gbStreamMapper.selectOne(event.getApp(), event.getStream()); |
| | | if (gbStream != null) { |
| | | if (userSetting.isUsePushingAsStatus()) { |
| | | streamPushMapper.updatePushStatus(event.getApp(), event.getStream(), false); |
| | | eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF); |
| | | } |
| | | }else { |
| | | streamPushMapper.del(event.getApp(), event.getStream()); |
| | | } |
| | | } |
| | | |
| | | |
| | | private List<StreamPushItem> handleJSON(List<StreamInfo> streamInfoList) { |
| | | if (streamInfoList == null || streamInfoList.isEmpty()) { |
| | | return null; |
| | | } |
| | | Map<String, StreamPushItem> result = new HashMap<>(); |
| | | |
| | | List<MediaItem> mediaItems = JSON.parseObject(jsonData, new TypeReference<List<MediaItem>>() {}); |
| | | for (MediaItem item : mediaItems) { |
| | | |
| | | for (StreamInfo streamInfo : streamInfoList) { |
| | | // 不保存国标推理以及拉流代理的流 |
| | | if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal() |
| | | || item.getOriginType() == OriginType.RTMP_PUSH.ordinal() |
| | | || item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) { |
| | | String key = item.getApp() + "_" + item.getStream(); |
| | | if (streamInfo.getOriginType() == OriginType.RTSP_PUSH.ordinal() |
| | | || streamInfo.getOriginType() == OriginType.RTMP_PUSH.ordinal() |
| | | || streamInfo.getOriginType() == OriginType.RTC_PUSH.ordinal() ) { |
| | | String key = streamInfo.getApp() + "_" + streamInfo.getStream(); |
| | | StreamPushItem streamPushItem = result.get(key); |
| | | if (streamPushItem == null) { |
| | | streamPushItem = transform(item); |
| | | streamPushItem = streamPushItem.getInstance(streamInfo); |
| | | result.put(key, streamPushItem); |
| | | } |
| | | } |
| | | } |
| | | |
| | | return new ArrayList<>(result.values()); |
| | | } |
| | | |
| | | @Override |
| | | public StreamPushItem transform(MediaItem item) { |
| | | public StreamPushItem transform(OnStreamChangedHookParam item) { |
| | | StreamPushItem streamPushItem = new StreamPushItem(); |
| | | streamPushItem.setApp(item.getApp()); |
| | | streamPushItem.setMediaServerId(item.getMediaServerId()); |
| | |
| | | stream.setStreamType("push"); |
| | | stream.setStatus(true); |
| | | stream.setCreateTime(DateUtil.getNow()); |
| | | stream.setStreamType("push"); |
| | | stream.setMediaServerId(mediaConfig.getId()); |
| | | int add = gbStreamMapper.add(stream); |
| | | return add > 0; |
| | | } |
| | |
| | | gbStreamService.sendCatalogMsg(stream, CatalogEvent.DEL); |
| | | platformGbStreamMapper.delByAppAndStream(stream.getApp(), stream.getStream()); |
| | | int del = gbStreamMapper.del(stream.getApp(), stream.getStream()); |
| | | MediaServerItem mediaInfo = mediaServerService.getOne(stream.getMediaServerId()); |
| | | JSONObject mediaList = zlmresTfulUtils.getMediaList(mediaInfo, stream.getApp(), stream.getStream()); |
| | | if (mediaList != null) { |
| | | if (mediaList.getInteger("code") == 0) { |
| | | JSONArray data = mediaList.getJSONArray("data"); |
| | | if (data == null) { |
| | | streamPushMapper.del(stream.getApp(), stream.getStream()); |
| | | } |
| | | } |
| | | MediaServer mediaInfo = mediaServerService.getOne(stream.getMediaServerId()); |
| | | List<StreamInfo> mediaList = mediaServerService.getMediaList(mediaInfo, stream.getApp(), stream.getStream(), null); |
| | | if (mediaList != null && mediaList.isEmpty()) { |
| | | streamPushMapper.del(stream.getApp(), stream.getStream()); |
| | | } |
| | | return del > 0; |
| | | } |
| | |
| | | |
| | | @Override |
| | | public boolean stop(String app, String streamId) { |
| | | logger.info("[推流 ] 停止流: {}/{}", app, streamId); |
| | | StreamPushItem streamPushItem = streamPushMapper.selectOne(app, streamId); |
| | | gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL); |
| | | if (streamPushItem != null) { |
| | | gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL); |
| | | } |
| | | |
| | | platformGbStreamMapper.delByAppAndStream(app, streamId); |
| | | gbStreamMapper.del(app, streamId); |
| | | int delStream = streamPushMapper.del(app, streamId); |
| | | if (delStream > 0) { |
| | | MediaServerItem mediaServerItem = mediaServerService.getOne(streamPushItem.getMediaServerId()); |
| | | zlmresTfulUtils.closeStreams(mediaServerItem,app, streamId); |
| | | MediaServer mediaServerItem = mediaServerService.getOne(streamPushItem.getMediaServerId()); |
| | | mediaServerService.closeStreams(mediaServerItem,app, streamId); |
| | | } |
| | | return true; |
| | | } |
| | |
| | | @Override |
| | | public void zlmServerOnline(String mediaServerId) { |
| | | // 同步zlm推流信息 |
| | | MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); |
| | | MediaServer mediaServerItem = mediaServerService.getOne(mediaServerId); |
| | | if (mediaServerItem == null) { |
| | | return; |
| | | } |
| | |
| | | List<StreamPushItem> pushList = getPushList(mediaServerId); |
| | | Map<String, StreamPushItem> pushItemMap = new HashMap<>(); |
| | | // redis记录 |
| | | List<MediaItem> mediaItems = redisCatchStorage.getStreams(mediaServerId, "PUSH"); |
| | | Map<String, MediaItem> streamInfoPushItemMap = new HashMap<>(); |
| | | List<MediaInfo> mediaInfoList = redisCatchStorage.getStreams(mediaServerId, "PUSH"); |
| | | Map<String, MediaInfo> streamInfoPushItemMap = new HashMap<>(); |
| | | if (pushList.size() > 0) { |
| | | for (StreamPushItem streamPushItem : pushList) { |
| | | if (StringUtils.isEmpty(streamPushItem.getGbId())) { |
| | | if (ObjectUtils.isEmpty(streamPushItem.getGbId())) { |
| | | pushItemMap.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem); |
| | | } |
| | | } |
| | | } |
| | | if (mediaItems.size() > 0) { |
| | | for (MediaItem mediaItem : mediaItems) { |
| | | streamInfoPushItemMap.put(mediaItem.getApp() + mediaItem.getStream(), mediaItem); |
| | | if (mediaInfoList.size() > 0) { |
| | | for (MediaInfo mediaInfo : mediaInfoList) { |
| | | streamInfoPushItemMap.put(mediaInfo.getApp() + mediaInfo.getStream(), mediaInfo); |
| | | } |
| | | } |
| | | zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{ |
| | | if (mediaList == null) { |
| | | return; |
| | | // 获取所有推流鉴权信息,清理过期的 |
| | | List<StreamAuthorityInfo> allStreamAuthorityInfo = redisCatchStorage.getAllStreamAuthorityInfo(); |
| | | Map<String, StreamAuthorityInfo> streamAuthorityInfoInfoMap = new HashMap<>(); |
| | | for (StreamAuthorityInfo streamAuthorityInfo : allStreamAuthorityInfo) { |
| | | streamAuthorityInfoInfoMap.put(streamAuthorityInfo.getApp() + streamAuthorityInfo.getStream(), streamAuthorityInfo); |
| | | } |
| | | List<StreamInfo> mediaList = mediaServerService.getMediaList(mediaServerItem, null, null, null); |
| | | if (mediaList == null) { |
| | | return; |
| | | } |
| | | List<StreamPushItem> streamPushItems = handleJSON(mediaList); |
| | | if (streamPushItems != null) { |
| | | for (StreamPushItem streamPushItem : streamPushItems) { |
| | | pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); |
| | | streamInfoPushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); |
| | | streamAuthorityInfoInfoMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); |
| | | } |
| | | String dataStr = mediaList.getString("data"); |
| | | |
| | | Integer code = mediaList.getInteger("code"); |
| | | List<StreamPushItem> streamPushItems = null; |
| | | if (code == 0 ) { |
| | | if (dataStr != null) { |
| | | streamPushItems = handleJSON(dataStr, mediaServerItem); |
| | | } |
| | | } |
| | | |
| | | if (streamPushItems != null) { |
| | | for (StreamPushItem streamPushItem : streamPushItems) { |
| | | pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); |
| | | streamInfoPushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); |
| | | } |
| | | } |
| | | List<StreamPushItem> offlinePushItems = new ArrayList<>(pushItemMap.values()); |
| | | if (offlinePushItems.size() > 0) { |
| | | String type = "PUSH"; |
| | | int runLimit = 300; |
| | | if (offlinePushItems.size() > runLimit) { |
| | | for (int i = 0; i < offlinePushItems.size(); i += runLimit) { |
| | | int toIndex = i + runLimit; |
| | | if (i + runLimit > offlinePushItems.size()) { |
| | | toIndex = offlinePushItems.size(); |
| | | } |
| | | List<StreamPushItem> streamPushItemsSub = offlinePushItems.subList(i, toIndex); |
| | | streamPushMapper.delAll(streamPushItemsSub); |
| | | } |
| | | List<StreamPushItem> offlinePushItems = new ArrayList<>(pushItemMap.values()); |
| | | if (offlinePushItems.size() > 0) { |
| | | String type = "PUSH"; |
| | | int runLimit = 300; |
| | | if (offlinePushItems.size() > runLimit) { |
| | | for (int i = 0; i < offlinePushItems.size(); i += runLimit) { |
| | | int toIndex = i + runLimit; |
| | | if (i + runLimit > offlinePushItems.size()) { |
| | | toIndex = offlinePushItems.size(); |
| | | } |
| | | }else { |
| | | streamPushMapper.delAll(offlinePushItems); |
| | | List<StreamPushItem> streamPushItemsSub = offlinePushItems.subList(i, toIndex); |
| | | streamPushMapper.delAll(streamPushItemsSub); |
| | | } |
| | | }else { |
| | | streamPushMapper.delAll(offlinePushItems); |
| | | } |
| | | |
| | | } |
| | | Collection<MediaInfo> mediaInfos = streamInfoPushItemMap.values(); |
| | | if (mediaInfos.size() > 0) { |
| | | String type = "PUSH"; |
| | | for (MediaInfo mediaInfo : mediaInfos) { |
| | | JSONObject jsonObject = new JSONObject(); |
| | | jsonObject.put("serverId", userSetting.getServerId()); |
| | | jsonObject.put("app", mediaInfo.getApp()); |
| | | jsonObject.put("stream", mediaInfo.getStream()); |
| | | jsonObject.put("register", false); |
| | | jsonObject.put("mediaServerId", mediaServerId); |
| | | redisCatchStorage.sendStreamChangeMsg(type, jsonObject); |
| | | // 移除redis内流的信息 |
| | | redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", mediaInfo.getApp(), mediaInfo.getStream()); |
| | | // 冗余数据,自己系统中自用 |
| | | redisCatchStorage.removePushListItem(mediaInfo.getApp(), mediaInfo.getStream(), mediaServerItem.getId()); |
| | | } |
| | | Collection<MediaItem> offlineMediaItemList = streamInfoPushItemMap.values(); |
| | | if (offlineMediaItemList.size() > 0) { |
| | | String type = "PUSH"; |
| | | for (MediaItem offlineMediaItem : offlineMediaItemList) { |
| | | JSONObject jsonObject = new JSONObject(); |
| | | jsonObject.put("serverId", userSetting.getServerId()); |
| | | jsonObject.put("app", offlineMediaItem.getApp()); |
| | | jsonObject.put("stream", offlineMediaItem.getStream()); |
| | | jsonObject.put("register", false); |
| | | jsonObject.put("mediaServerId", mediaServerId); |
| | | redisCatchStorage.sendStreamChangeMsg(type, jsonObject); |
| | | // 移除redis内流的信息 |
| | | redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineMediaItem.getApp(), offlineMediaItem.getStream()); |
| | | } |
| | | } |
| | | |
| | | Collection<StreamAuthorityInfo> streamAuthorityInfos = streamAuthorityInfoInfoMap.values(); |
| | | if (streamAuthorityInfos.size() > 0) { |
| | | for (StreamAuthorityInfo streamAuthorityInfo : streamAuthorityInfos) { |
| | | // 移除redis内流的信息 |
| | | redisCatchStorage.removeStreamAuthorityInfo(streamAuthorityInfo.getApp(), streamAuthorityInfo.getStream()); |
| | | } |
| | | })); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | |
| | | // 发送流停止消息 |
| | | String type = "PUSH"; |
| | | // 发送redis消息 |
| | | List<MediaItem> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type); |
| | | if (streamInfoList.size() > 0) { |
| | | for (MediaItem mediaItem : streamInfoList) { |
| | | List<MediaInfo> mediaInfoList = redisCatchStorage.getStreams(mediaServerId, type); |
| | | if (mediaInfoList.size() > 0) { |
| | | for (MediaInfo mediaInfo : mediaInfoList) { |
| | | // 移除redis内流的信息 |
| | | redisCatchStorage.removeStream(mediaServerId, type, mediaItem.getApp(), mediaItem.getStream()); |
| | | redisCatchStorage.removeStream(mediaServerId, type, mediaInfo.getApp(), mediaInfo.getStream()); |
| | | JSONObject jsonObject = new JSONObject(); |
| | | jsonObject.put("serverId", userSetting.getServerId()); |
| | | jsonObject.put("app", mediaItem.getApp()); |
| | | jsonObject.put("stream", mediaItem.getStream()); |
| | | jsonObject.put("app", mediaInfo.getApp()); |
| | | jsonObject.put("stream", mediaInfo.getStream()); |
| | | jsonObject.put("register", false); |
| | | jsonObject.put("mediaServerId", mediaServerId); |
| | | redisCatchStorage.sendStreamChangeMsg(type, jsonObject); |
| | | |
| | | // 冗余数据,自己系统中自用 |
| | | redisCatchStorage.removePushListItem(mediaInfo.getApp(), mediaInfo.getStream(), mediaServerId); |
| | | } |
| | | } |
| | | } |
| | |
| | | gbStreamMapper.batchAdd(streamPushItems); |
| | | } |
| | | |
| | | |
| | | @Override |
| | | public void batchAddForUpload(List<StreamPushItem> streamPushItems, Map<String, List<String[]>> streamPushItemsForAll ) { |
| | | // 存储数据到stream_push表 |
| | | streamPushMapper.addAll(streamPushItems); |
| | | List<StreamPushItem> streamPushItemForGbStream = streamPushItems.stream() |
| | | .filter(streamPushItem-> streamPushItem.getId() != null) |
| | | .filter(streamPushItem-> streamPushItem.getGbId() != null) |
| | | .collect(Collectors.toList()); |
| | | // 存储数据到gb_stream表, id会返回到streamPushItemForGbStream里 |
| | | if (streamPushItemForGbStream.size() > 0) { |
| | |
| | | |
| | | } |
| | | } |
| | | if (streamPushItemListFroPlatform.size() > 0) { |
| | | if (!streamPushItemListFroPlatform.isEmpty()) { |
| | | platformGbStreamMapper.batchAdd(streamPushItemListFroPlatform); |
| | | // 发送通知 |
| | | for (String platformId : platformForEvent.keySet()) { |
| | |
| | | platformId, platformForEvent.get(platformId), CatalogEvent.ADD); |
| | | } |
| | | } |
| | | |
| | | } |
| | | } |
| | | |
| | |
| | | int delStream = streamPushMapper.delAllForGbStream(gbStreams); |
| | | if (delStream > 0) { |
| | | for (GbStream gbStream : gbStreams) { |
| | | MediaServerItem mediaServerItem = mediaServerService.getOne(gbStream.getMediaServerId()); |
| | | zlmresTfulUtils.closeStreams(mediaServerItem, gbStream.getApp(), gbStream.getStream()); |
| | | MediaServer mediaServerItem = mediaServerService.getOne(gbStream.getMediaServerId()); |
| | | mediaServerService.closeStreams(mediaServerItem, gbStream.getApp(), gbStream.getStream()); |
| | | } |
| | | |
| | | } |
| | |
| | | stream.setUpdateTime(DateUtil.getNow()); |
| | | stream.setCreateTime(DateUtil.getNow()); |
| | | stream.setServerId(userSetting.getServerId()); |
| | | stream.setMediaServerId(mediaConfig.getId()); |
| | | stream.setSelf(true); |
| | | stream.setPushIng(true); |
| | | |
| | | // 放在事务内执行 |
| | | boolean result = false; |
| | | TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); |
| | | try { |
| | | int addStreamResult = streamPushMapper.add(stream); |
| | | if (!StringUtils.isEmpty(stream.getGbId())) { |
| | | if (!ObjectUtils.isEmpty(stream.getGbId())) { |
| | | stream.setStreamType("push"); |
| | | gbStreamMapper.add(stream); |
| | | } |
| | |
| | | } |
| | | return result; |
| | | } |
| | | |
| | | @Override |
| | | public List<String> getAllAppAndStream() { |
| | | |
| | | return streamPushMapper.getAllAppAndStream(); |
| | | } |
| | | |
| | | @Override |
| | | public ResourceBaseInfo getOverview() { |
| | | int total = streamPushMapper.getAllCount(); |
| | | int online = streamPushMapper.getAllOnline(userSetting.isUsePushingAsStatus()); |
| | | |
| | | return new ResourceBaseInfo(total, online); |
| | | } |
| | | |
| | | @Override |
| | | public Map<String, StreamPushItem> getAllAppAndStreamMap() { |
| | | return streamPushMapper.getAllAppAndStreamMap(); |
| | | } |
| | | |
| | | @Override |
| | | public void updatePush(OnStreamChangedHookParam param) { |
| | | StreamPushItem transform = transform(param); |
| | | StreamPushItem pushInDb = getPush(param.getApp(), param.getStream()); |
| | | transform.setPushIng(param.isRegist()); |
| | | transform.setUpdateTime(DateUtil.getNow()); |
| | | transform.setPushTime(DateUtil.getNow()); |
| | | transform.setSelf(userSetting.getServerId().equals(param.getSeverId())); |
| | | if (pushInDb == null) { |
| | | transform.setCreateTime(DateUtil.getNow()); |
| | | streamPushMapper.add(transform); |
| | | }else { |
| | | streamPushMapper.update(transform); |
| | | gbStreamMapper.updateMediaServer(param.getApp(), param.getStream(), param.getMediaServerId()); |
| | | } |
| | | } |
| | | } |