package com.genersoft.iot.vmp.service.impl;
|
|
import com.alibaba.fastjson2.JSONObject;
|
import com.baomidou.dynamic.datasource.annotation.DS;
|
import com.genersoft.iot.vmp.common.StreamInfo;
|
import com.genersoft.iot.vmp.conf.MediaConfig;
|
import com.genersoft.iot.vmp.conf.UserSetting;
|
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
|
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
|
import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog;
|
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
|
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
|
import com.genersoft.iot.vmp.media.bean.MediaInfo;
|
import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
|
import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
|
import com.genersoft.iot.vmp.media.service.IMediaServerService;
|
import com.genersoft.iot.vmp.media.bean.MediaServer;
|
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
|
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
|
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
|
import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType;
|
import com.genersoft.iot.vmp.service.IGbStreamService;
|
import com.genersoft.iot.vmp.service.IStreamPushService;
|
import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis;
|
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
import com.genersoft.iot.vmp.storager.dao.*;
|
import com.genersoft.iot.vmp.utils.DateUtil;
|
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
|
import com.github.pagehelper.PageHelper;
|
import com.github.pagehelper.PageInfo;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.context.event.EventListener;
|
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
|
import org.springframework.scheduling.annotation.Async;
|
import org.springframework.stereotype.Service;
|
import org.springframework.transaction.TransactionDefinition;
|
import org.springframework.transaction.TransactionStatus;
|
import org.springframework.util.ObjectUtils;
|
|
import java.util.*;
|
import java.util.stream.Collectors;
|
|
@Service
|
@DS("master")
|
public class StreamPushServiceImpl implements IStreamPushService {
|
|
private final static Logger logger = LoggerFactory.getLogger(StreamPushServiceImpl.class);
|
|
@Autowired
|
private GbStreamMapper gbStreamMapper;
|
|
@Autowired
|
private StreamPushMapper streamPushMapper;
|
|
@Autowired
|
private StreamProxyMapper streamProxyMapper;
|
|
@Autowired
|
private ParentPlatformMapper parentPlatformMapper;
|
|
@Autowired
|
private PlatformCatalogMapper platformCatalogMapper;
|
|
@Autowired
|
private PlatformGbStreamMapper platformGbStreamMapper;
|
|
@Autowired
|
private IGbStreamService gbStreamService;
|
|
@Autowired
|
private EventPublisher eventPublisher;
|
|
@Autowired
|
private IRedisCatchStorage redisCatchStorage;
|
|
@Autowired
|
private UserSetting userSetting;
|
|
@Autowired
|
private IMediaServerService mediaServerService;
|
|
@Autowired
|
DataSourceTransactionManager dataSourceTransactionManager;
|
|
@Autowired
|
TransactionDefinition transactionDefinition;
|
|
@Autowired
|
private MediaConfig mediaConfig;
|
|
/**
|
* 流到来的处理
|
*/
|
@Async("taskExecutor")
|
@EventListener
|
public void onApplicationEvent(MediaArrivalEvent event) {
|
MediaInfo mediaInfo = event.getMediaInfo();
|
if (mediaInfo == null) {
|
return;
|
}
|
if (mediaInfo.getOriginType() != OriginType.RTMP_PUSH.ordinal()
|
&& mediaInfo.getOriginType() != OriginType.RTSP_PUSH.ordinal()
|
&& mediaInfo.getOriginType() != OriginType.RTC_PUSH.ordinal()) {
|
return;
|
}
|
|
StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(event.getApp(), event.getStream());
|
if (streamAuthorityInfo == null) {
|
streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(event);
|
} else {
|
streamAuthorityInfo.setOriginType(mediaInfo.getOriginType());
|
}
|
redisCatchStorage.updateStreamAuthorityInfo(event.getApp(), event.getStream(), streamAuthorityInfo);
|
StreamPushItem transform = StreamPushItem.getInstance(event, userSetting.getServerId());
|
transform.setPushIng(true);
|
transform.setUpdateTime(DateUtil.getNow());
|
transform.setPushTime(DateUtil.getNow());
|
transform.setSelf(true);
|
StreamPushItem pushInDb = getPush(event.getApp(), event.getStream());
|
if (pushInDb == null) {
|
transform.setCreateTime(DateUtil.getNow());
|
streamPushMapper.add(transform);
|
}else {
|
streamPushMapper.update(transform);
|
gbStreamMapper.updateMediaServer(event.getApp(), event.getStream(), event.getMediaServer().getId());
|
}
|
// TODO 相关的事件自行管理,不需要写入ZLMMediaListManager
|
// ChannelOnlineEvent channelOnlineEventLister = getChannelOnlineEventLister(transform.getApp(), transform.getStream());
|
// if ( channelOnlineEventLister != null) {
|
// try {
|
// channelOnlineEventLister.run(transform.getApp(), transform.getStream(), transform.getServerId());;
|
// } catch (ParseException e) {
|
// logger.error("addPush: ", e);
|
// }
|
// removedChannelOnlineEventLister(transform.getApp(), transform.getStream());
|
// }
|
// 冗余数据,自己系统中自用
|
redisCatchStorage.addPushListItem(event.getApp(), event.getStream(), event);
|
|
// 发送流变化redis消息
|
JSONObject jsonObject = new JSONObject();
|
jsonObject.put("serverId", userSetting.getServerId());
|
jsonObject.put("app", event.getApp());
|
jsonObject.put("stream", event.getStream());
|
jsonObject.put("register", true);
|
jsonObject.put("mediaServerId", event.getMediaServer().getId());
|
redisCatchStorage.sendStreamChangeMsg(OriginType.values()[event.getMediaInfo().getOriginType()].getType(), jsonObject);
|
}
|
|
/**
|
* 流离开的处理
|
*/
|
@Async("taskExecutor")
|
@EventListener
|
public void onApplicationEvent(MediaDepartureEvent event) {
|
// 兼容流注销时类型从redis记录获取
|
OnStreamChangedHookParam onStreamChangedHookParam = redisCatchStorage.getStreamInfo(
|
event.getApp(), event.getStream(), event.getMediaServer().getId());
|
if (onStreamChangedHookParam != null) {
|
String type = OriginType.values()[onStreamChangedHookParam.getOriginType()].getType();
|
redisCatchStorage.removeStream(event.getMediaServer().getId(), type, event.getApp(), event.getStream());
|
if ("PUSH".equalsIgnoreCase(type)) {
|
// 冗余数据,自己系统中自用
|
redisCatchStorage.removePushListItem(event.getApp(), event.getStream(), event.getMediaServer().getId());
|
}
|
if (type != null) {
|
// 发送流变化redis消息
|
JSONObject jsonObject = new JSONObject();
|
jsonObject.put("serverId", userSetting.getServerId());
|
jsonObject.put("app", event.getApp());
|
jsonObject.put("stream", event.getStream());
|
jsonObject.put("register", false);
|
jsonObject.put("mediaServerId", event.getMediaServer().getId());
|
redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
|
}
|
}
|
GbStream gbStream = gbStreamMapper.selectOne(event.getApp(), event.getStream());
|
if (gbStream != null) {
|
if (userSetting.isUsePushingAsStatus()) {
|
streamPushMapper.updatePushStatus(event.getApp(), event.getStream(), false);
|
eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF);
|
}
|
}else {
|
streamPushMapper.del(event.getApp(), event.getStream());
|
}
|
}
|
|
|
private List<StreamPushItem> handleJSON(List<StreamInfo> streamInfoList) {
|
if (streamInfoList == null || streamInfoList.isEmpty()) {
|
return null;
|
}
|
Map<String, StreamPushItem> result = new HashMap<>();
|
for (StreamInfo streamInfo : streamInfoList) {
|
// 不保存国标推理以及拉流代理的流
|
if (streamInfo.getOriginType() == OriginType.RTSP_PUSH.ordinal()
|
|| streamInfo.getOriginType() == OriginType.RTMP_PUSH.ordinal()
|
|| streamInfo.getOriginType() == OriginType.RTC_PUSH.ordinal() ) {
|
String key = streamInfo.getApp() + "_" + streamInfo.getStream();
|
StreamPushItem streamPushItem = result.get(key);
|
if (streamPushItem == null) {
|
streamPushItem = streamPushItem.getInstance(streamInfo);
|
result.put(key, streamPushItem);
|
}
|
}
|
}
|
return new ArrayList<>(result.values());
|
}
|
|
@Override
|
public StreamPushItem transform(OnStreamChangedHookParam item) {
|
StreamPushItem streamPushItem = new StreamPushItem();
|
streamPushItem.setApp(item.getApp());
|
streamPushItem.setMediaServerId(item.getMediaServerId());
|
streamPushItem.setStream(item.getStream());
|
streamPushItem.setAliveSecond(item.getAliveSecond());
|
streamPushItem.setOriginSock(item.getOriginSock());
|
streamPushItem.setTotalReaderCount(item.getTotalReaderCount() + "");
|
streamPushItem.setOriginType(item.getOriginType());
|
streamPushItem.setOriginTypeStr(item.getOriginTypeStr());
|
streamPushItem.setOriginUrl(item.getOriginUrl());
|
streamPushItem.setCreateTime(DateUtil.getNow());
|
streamPushItem.setAliveSecond(item.getAliveSecond());
|
streamPushItem.setStatus(true);
|
streamPushItem.setStreamType("push");
|
streamPushItem.setVhost(item.getVhost());
|
streamPushItem.setServerId(item.getSeverId());
|
return streamPushItem;
|
}
|
|
@Override
|
public PageInfo<StreamPushItem> getPushList(Integer page, Integer count, String query, Boolean pushing, String mediaServerId) {
|
PageHelper.startPage(page, count);
|
List<StreamPushItem> all = streamPushMapper.selectAllForList(query, pushing, mediaServerId);
|
return new PageInfo<>(all);
|
}
|
|
@Override
|
public List<StreamPushItem> getPushList(String mediaServerId) {
|
return streamPushMapper.selectAllByMediaServerIdWithOutGbID(mediaServerId);
|
}
|
|
@Override
|
public boolean saveToGB(GbStream stream) {
|
stream.setStreamType("push");
|
stream.setStatus(true);
|
stream.setCreateTime(DateUtil.getNow());
|
stream.setStreamType("push");
|
stream.setMediaServerId(mediaConfig.getId());
|
int add = gbStreamMapper.add(stream);
|
return add > 0;
|
}
|
|
@Override
|
public boolean removeFromGB(GbStream stream) {
|
// 判断是否需要发送事件
|
gbStreamService.sendCatalogMsg(stream, CatalogEvent.DEL);
|
platformGbStreamMapper.delByAppAndStream(stream.getApp(), stream.getStream());
|
int del = gbStreamMapper.del(stream.getApp(), stream.getStream());
|
MediaServer mediaInfo = mediaServerService.getOne(stream.getMediaServerId());
|
List<StreamInfo> mediaList = mediaServerService.getMediaList(mediaInfo, stream.getApp(), stream.getStream(), null);
|
if (mediaList != null && mediaList.isEmpty()) {
|
streamPushMapper.del(stream.getApp(), stream.getStream());
|
}
|
return del > 0;
|
}
|
|
|
@Override
|
public StreamPushItem getPush(String app, String streamId) {
|
return streamPushMapper.selectOne(app, streamId);
|
}
|
|
@Override
|
public boolean stop(String app, String streamId) {
|
logger.info("[推流 ] 停止流: {}/{}", app, streamId);
|
StreamPushItem streamPushItem = streamPushMapper.selectOne(app, streamId);
|
if (streamPushItem != null) {
|
gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL);
|
}
|
|
platformGbStreamMapper.delByAppAndStream(app, streamId);
|
gbStreamMapper.del(app, streamId);
|
int delStream = streamPushMapper.del(app, streamId);
|
if (delStream > 0) {
|
MediaServer mediaServerItem = mediaServerService.getOne(streamPushItem.getMediaServerId());
|
mediaServerService.closeStreams(mediaServerItem,app, streamId);
|
}
|
return true;
|
}
|
|
@Override
|
public void zlmServerOnline(String mediaServerId) {
|
// 同步zlm推流信息
|
MediaServer mediaServerItem = mediaServerService.getOne(mediaServerId);
|
if (mediaServerItem == null) {
|
return;
|
}
|
// 数据库记录
|
List<StreamPushItem> pushList = getPushList(mediaServerId);
|
Map<String, StreamPushItem> pushItemMap = new HashMap<>();
|
// redis记录
|
List<OnStreamChangedHookParam> onStreamChangedHookParams = redisCatchStorage.getStreams(mediaServerId, "PUSH");
|
Map<String, OnStreamChangedHookParam> streamInfoPushItemMap = new HashMap<>();
|
if (pushList.size() > 0) {
|
for (StreamPushItem streamPushItem : pushList) {
|
if (ObjectUtils.isEmpty(streamPushItem.getGbId())) {
|
pushItemMap.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem);
|
}
|
}
|
}
|
if (onStreamChangedHookParams.size() > 0) {
|
for (OnStreamChangedHookParam onStreamChangedHookParam : onStreamChangedHookParams) {
|
streamInfoPushItemMap.put(onStreamChangedHookParam.getApp() + onStreamChangedHookParam.getStream(), onStreamChangedHookParam);
|
}
|
}
|
// 获取所有推流鉴权信息,清理过期的
|
List<StreamAuthorityInfo> allStreamAuthorityInfo = redisCatchStorage.getAllStreamAuthorityInfo();
|
Map<String, StreamAuthorityInfo> streamAuthorityInfoInfoMap = new HashMap<>();
|
for (StreamAuthorityInfo streamAuthorityInfo : allStreamAuthorityInfo) {
|
streamAuthorityInfoInfoMap.put(streamAuthorityInfo.getApp() + streamAuthorityInfo.getStream(), streamAuthorityInfo);
|
}
|
List<StreamInfo> mediaList = mediaServerService.getMediaList(mediaServerItem, null, null, null);
|
if (mediaList == null) {
|
return;
|
}
|
List<StreamPushItem> streamPushItems = handleJSON(mediaList);
|
if (streamPushItems != null) {
|
for (StreamPushItem streamPushItem : streamPushItems) {
|
pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
|
streamInfoPushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
|
streamAuthorityInfoInfoMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
|
}
|
}
|
List<StreamPushItem> offlinePushItems = new ArrayList<>(pushItemMap.values());
|
if (offlinePushItems.size() > 0) {
|
String type = "PUSH";
|
int runLimit = 300;
|
if (offlinePushItems.size() > runLimit) {
|
for (int i = 0; i < offlinePushItems.size(); i += runLimit) {
|
int toIndex = i + runLimit;
|
if (i + runLimit > offlinePushItems.size()) {
|
toIndex = offlinePushItems.size();
|
}
|
List<StreamPushItem> streamPushItemsSub = offlinePushItems.subList(i, toIndex);
|
streamPushMapper.delAll(streamPushItemsSub);
|
}
|
}else {
|
streamPushMapper.delAll(offlinePushItems);
|
}
|
|
}
|
Collection<OnStreamChangedHookParam> offlineOnStreamChangedHookParamList = streamInfoPushItemMap.values();
|
if (offlineOnStreamChangedHookParamList.size() > 0) {
|
String type = "PUSH";
|
for (OnStreamChangedHookParam offlineOnStreamChangedHookParam : offlineOnStreamChangedHookParamList) {
|
JSONObject jsonObject = new JSONObject();
|
jsonObject.put("serverId", userSetting.getServerId());
|
jsonObject.put("app", offlineOnStreamChangedHookParam.getApp());
|
jsonObject.put("stream", offlineOnStreamChangedHookParam.getStream());
|
jsonObject.put("register", false);
|
jsonObject.put("mediaServerId", mediaServerId);
|
redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
|
// 移除redis内流的信息
|
redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineOnStreamChangedHookParam.getApp(), offlineOnStreamChangedHookParam.getStream());
|
// 冗余数据,自己系统中自用
|
redisCatchStorage.removePushListItem(offlineOnStreamChangedHookParam.getApp(), offlineOnStreamChangedHookParam.getStream(), mediaServerItem.getId());
|
}
|
}
|
|
Collection<StreamAuthorityInfo> streamAuthorityInfos = streamAuthorityInfoInfoMap.values();
|
if (streamAuthorityInfos.size() > 0) {
|
for (StreamAuthorityInfo streamAuthorityInfo : streamAuthorityInfos) {
|
// 移除redis内流的信息
|
redisCatchStorage.removeStreamAuthorityInfo(streamAuthorityInfo.getApp(), streamAuthorityInfo.getStream());
|
}
|
}
|
}
|
|
@Override
|
public void zlmServerOffline(String mediaServerId) {
|
List<StreamPushItem> streamPushItems = streamPushMapper.selectAllByMediaServerIdWithOutGbID(mediaServerId);
|
// 移除没有GBId的推流
|
streamPushMapper.deleteWithoutGBId(mediaServerId);
|
gbStreamMapper.deleteWithoutGBId("push", mediaServerId);
|
// 其他的流设置未启用
|
streamPushMapper.updateStatusByMediaServerId(mediaServerId, false);
|
streamProxyMapper.updateStatusByMediaServerId(mediaServerId, false);
|
// 发送流停止消息
|
String type = "PUSH";
|
// 发送redis消息
|
List<OnStreamChangedHookParam> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type);
|
if (streamInfoList.size() > 0) {
|
for (OnStreamChangedHookParam onStreamChangedHookParam : streamInfoList) {
|
// 移除redis内流的信息
|
redisCatchStorage.removeStream(mediaServerId, type, onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream());
|
JSONObject jsonObject = new JSONObject();
|
jsonObject.put("serverId", userSetting.getServerId());
|
jsonObject.put("app", onStreamChangedHookParam.getApp());
|
jsonObject.put("stream", onStreamChangedHookParam.getStream());
|
jsonObject.put("register", false);
|
jsonObject.put("mediaServerId", mediaServerId);
|
redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
|
|
// 冗余数据,自己系统中自用
|
redisCatchStorage.removePushListItem(onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream(), mediaServerId);
|
}
|
}
|
}
|
|
@Override
|
public void clean() {
|
|
}
|
|
@Override
|
public boolean saveToRandomGB() {
|
List<StreamPushItem> streamPushItems = streamPushMapper.selectAll();
|
long gbId = 100001;
|
for (StreamPushItem streamPushItem : streamPushItems) {
|
streamPushItem.setStreamType("push");
|
streamPushItem.setStatus(true);
|
streamPushItem.setGbId("34020000004111" + gbId);
|
streamPushItem.setCreateTime(DateUtil.getNow());
|
gbId ++;
|
}
|
int limitCount = 30;
|
|
if (streamPushItems.size() > limitCount) {
|
for (int i = 0; i < streamPushItems.size(); i += limitCount) {
|
int toIndex = i + limitCount;
|
if (i + limitCount > streamPushItems.size()) {
|
toIndex = streamPushItems.size();
|
}
|
gbStreamMapper.batchAdd(streamPushItems.subList(i, toIndex));
|
}
|
}else {
|
gbStreamMapper.batchAdd(streamPushItems);
|
}
|
return true;
|
}
|
|
@Override
|
public void batchAdd(List<StreamPushItem> streamPushItems) {
|
streamPushMapper.addAll(streamPushItems);
|
gbStreamMapper.batchAdd(streamPushItems);
|
}
|
|
|
@Override
|
public void batchAddForUpload(List<StreamPushItem> streamPushItems, Map<String, List<String[]>> streamPushItemsForAll ) {
|
// 存储数据到stream_push表
|
streamPushMapper.addAll(streamPushItems);
|
List<StreamPushItem> streamPushItemForGbStream = streamPushItems.stream()
|
.filter(streamPushItem-> streamPushItem.getGbId() != null)
|
.collect(Collectors.toList());
|
// 存储数据到gb_stream表, id会返回到streamPushItemForGbStream里
|
if (streamPushItemForGbStream.size() > 0) {
|
gbStreamMapper.batchAdd(streamPushItemForGbStream);
|
}
|
// 去除没有ID也就是没有存储到数据库的数据
|
List<StreamPushItem> streamPushItemsForPlatform = streamPushItemForGbStream.stream()
|
.filter(streamPushItem-> streamPushItem.getGbStreamId() != null)
|
.collect(Collectors.toList());
|
|
if (streamPushItemsForPlatform.size() > 0) {
|
// 获取所有平台,平台和目录信息一般不会特别大量。
|
List<ParentPlatform> parentPlatformList = parentPlatformMapper.getParentPlatformList();
|
Map<String, Map<String, PlatformCatalog>> platformInfoMap = new HashMap<>();
|
if (parentPlatformList.size() == 0) {
|
return;
|
}
|
for (ParentPlatform platform : parentPlatformList) {
|
Map<String, PlatformCatalog> catalogMap = new HashMap<>();
|
|
// 创建根节点
|
PlatformCatalog platformCatalog = new PlatformCatalog();
|
platformCatalog.setId(platform.getServerGBId());
|
catalogMap.put(platform.getServerGBId(), platformCatalog);
|
|
// 查询所有节点信息
|
List<PlatformCatalog> platformCatalogs = platformCatalogMapper.selectByPlatForm(platform.getServerGBId());
|
if (platformCatalogs.size() > 0) {
|
for (PlatformCatalog catalog : platformCatalogs) {
|
catalogMap.put(catalog.getId(), catalog);
|
}
|
}
|
platformInfoMap.put(platform.getServerGBId(), catalogMap);
|
}
|
List<StreamPushItem> streamPushItemListFroPlatform = new ArrayList<>();
|
Map<String, List<GbStream>> platformForEvent = new HashMap<>();
|
// 遍历存储结果,查找app+Stream->platformId+catalogId的对应关系,然后执行批量写入
|
for (StreamPushItem streamPushItem : streamPushItemsForPlatform) {
|
List<String[]> platFormInfoList = streamPushItemsForAll.get(streamPushItem.getApp() + streamPushItem.getStream());
|
if (platFormInfoList != null && platFormInfoList.size() > 0) {
|
for (String[] platFormInfoArray : platFormInfoList) {
|
StreamPushItem streamPushItemForPlatform = new StreamPushItem();
|
streamPushItemForPlatform.setGbStreamId(streamPushItem.getGbStreamId());
|
if (platFormInfoArray.length > 0) {
|
// 数组 platFormInfoArray 0 为平台ID。 1为目录ID
|
// 不存在这个平台,则忽略导入此关联关系
|
if (platformInfoMap.get(platFormInfoArray[0]) == null
|
|| platformInfoMap.get(platFormInfoArray[0]).get(platFormInfoArray[1]) == null) {
|
logger.info("导入数据时不存在平台或目录{}/{},已导入未分配", platFormInfoArray[0], platFormInfoArray[1] );
|
continue;
|
}
|
streamPushItemForPlatform.setPlatformId(platFormInfoArray[0]);
|
List<GbStream> gbStreamList = platformForEvent.get(platFormInfoArray[0]);
|
if (gbStreamList == null) {
|
gbStreamList = new ArrayList<>();
|
platformForEvent.put(platFormInfoArray[0], gbStreamList);
|
}
|
// 为发送通知整理数据
|
streamPushItemForPlatform.setName(streamPushItem.getName());
|
streamPushItemForPlatform.setApp(streamPushItem.getApp());
|
streamPushItemForPlatform.setStream(streamPushItem.getStream());
|
streamPushItemForPlatform.setGbId(streamPushItem.getGbId());
|
gbStreamList.add(streamPushItemForPlatform);
|
}
|
if (platFormInfoArray.length > 1) {
|
streamPushItemForPlatform.setCatalogId(platFormInfoArray[1]);
|
}
|
streamPushItemListFroPlatform.add(streamPushItemForPlatform);
|
}
|
|
}
|
}
|
if (!streamPushItemListFroPlatform.isEmpty()) {
|
platformGbStreamMapper.batchAdd(streamPushItemListFroPlatform);
|
// 发送通知
|
for (String platformId : platformForEvent.keySet()) {
|
eventPublisher.catalogEventPublishForStream(
|
platformId, platformForEvent.get(platformId), CatalogEvent.ADD);
|
}
|
}
|
}
|
}
|
|
@Override
|
public boolean batchStop(List<GbStream> gbStreams) {
|
if (gbStreams == null || gbStreams.size() == 0) {
|
return false;
|
}
|
gbStreamService.sendCatalogMsgs(gbStreams, CatalogEvent.DEL);
|
|
platformGbStreamMapper.delByGbStreams(gbStreams);
|
gbStreamMapper.batchDelForGbStream(gbStreams);
|
int delStream = streamPushMapper.delAllForGbStream(gbStreams);
|
if (delStream > 0) {
|
for (GbStream gbStream : gbStreams) {
|
MediaServer mediaServerItem = mediaServerService.getOne(gbStream.getMediaServerId());
|
mediaServerService.closeStreams(mediaServerItem, gbStream.getApp(), gbStream.getStream());
|
}
|
|
}
|
return true;
|
}
|
|
@Override
|
public void allStreamOffline() {
|
List<GbStream> onlinePushers = streamPushMapper.getOnlinePusherForGb();
|
if (onlinePushers.size() == 0) {
|
return;
|
}
|
streamPushMapper.setAllStreamOffline();
|
|
// 发送通知
|
eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.OFF);
|
}
|
|
@Override
|
public void offline(List<StreamPushItemFromRedis> offlineStreams) {
|
// 更新部分设备离线
|
List<GbStream> onlinePushers = streamPushMapper.getOnlinePusherForGbInList(offlineStreams);
|
streamPushMapper.offline(offlineStreams);
|
// 发送通知
|
eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.OFF);
|
}
|
|
@Override
|
public void online(List<StreamPushItemFromRedis> onlineStreams) {
|
// 更新部分设备上线streamPushService
|
List<GbStream> onlinePushers = streamPushMapper.getOfflinePusherForGbInList(onlineStreams);
|
streamPushMapper.online(onlineStreams);
|
// 发送通知
|
eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.ON);
|
}
|
|
@Override
|
public boolean add(StreamPushItem stream) {
|
stream.setUpdateTime(DateUtil.getNow());
|
stream.setCreateTime(DateUtil.getNow());
|
stream.setServerId(userSetting.getServerId());
|
stream.setMediaServerId(mediaConfig.getId());
|
stream.setSelf(true);
|
stream.setPushIng(true);
|
|
// 放在事务内执行
|
boolean result = false;
|
TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
|
try {
|
int addStreamResult = streamPushMapper.add(stream);
|
if (!ObjectUtils.isEmpty(stream.getGbId())) {
|
stream.setStreamType("push");
|
gbStreamMapper.add(stream);
|
}
|
dataSourceTransactionManager.commit(transactionStatus);
|
result = true;
|
}catch (Exception e) {
|
logger.error("批量移除流与平台的关系时错误", e);
|
dataSourceTransactionManager.rollback(transactionStatus);
|
}
|
return result;
|
}
|
|
@Override
|
public List<String> getAllAppAndStream() {
|
|
return streamPushMapper.getAllAppAndStream();
|
}
|
|
@Override
|
public ResourceBaseInfo getOverview() {
|
int total = streamPushMapper.getAllCount();
|
int online = streamPushMapper.getAllOnline(userSetting.isUsePushingAsStatus());
|
|
return new ResourceBaseInfo(total, online);
|
}
|
|
@Override
|
public Map<String, StreamPushItem> getAllAppAndStreamMap() {
|
return streamPushMapper.getAllAppAndStreamMap();
|
}
|
}
|