package com.genersoft.iot.vmp.service.impl; import com.alibaba.fastjson2.JSONObject; import com.baomidou.dynamic.datasource.annotation.DS; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.MediaConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.GbStream; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType; import com.genersoft.iot.vmp.service.IGbStreamService; import com.genersoft.iot.vmp.service.IStreamPushService; import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.dao.*; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo; import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.event.EventListener; import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionStatus; import org.springframework.util.ObjectUtils; import java.util.*; import java.util.stream.Collectors; @Service @DS("master") public class StreamPushServiceImpl implements IStreamPushService { private final static Logger logger = LoggerFactory.getLogger(StreamPushServiceImpl.class); @Autowired private GbStreamMapper gbStreamMapper; @Autowired private StreamPushMapper streamPushMapper; @Autowired private StreamProxyMapper streamProxyMapper; @Autowired private ParentPlatformMapper parentPlatformMapper; @Autowired private PlatformCatalogMapper platformCatalogMapper; @Autowired private PlatformGbStreamMapper platformGbStreamMapper; @Autowired private IGbStreamService gbStreamService; @Autowired private EventPublisher eventPublisher; @Autowired private IRedisCatchStorage redisCatchStorage; @Autowired private UserSetting userSetting; @Autowired private IMediaServerService mediaServerService; @Autowired DataSourceTransactionManager dataSourceTransactionManager; @Autowired TransactionDefinition transactionDefinition; @Autowired private MediaConfig mediaConfig; /** * 流到来的处理 */ @Async("taskExecutor") @EventListener public void onApplicationEvent(MediaArrivalEvent event) { MediaInfo mediaInfo = event.getMediaInfo(); if (mediaInfo == null) { return; } if (mediaInfo.getOriginType() != OriginType.RTMP_PUSH.ordinal() && mediaInfo.getOriginType() != OriginType.RTSP_PUSH.ordinal() && mediaInfo.getOriginType() != OriginType.RTC_PUSH.ordinal()) { return; } StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(event.getApp(), event.getStream()); if (streamAuthorityInfo == null) { streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(event); } else { streamAuthorityInfo.setOriginType(mediaInfo.getOriginType()); } redisCatchStorage.updateStreamAuthorityInfo(event.getApp(), event.getStream(), streamAuthorityInfo); StreamPushItem transform = StreamPushItem.getInstance(event, userSetting.getServerId()); transform.setPushIng(true); transform.setUpdateTime(DateUtil.getNow()); transform.setPushTime(DateUtil.getNow()); transform.setSelf(true); StreamPushItem pushInDb = getPush(event.getApp(), event.getStream()); if (pushInDb == null) { transform.setCreateTime(DateUtil.getNow()); streamPushMapper.add(transform); }else { streamPushMapper.update(transform); gbStreamMapper.updateMediaServer(event.getApp(), event.getStream(), event.getMediaServer().getId()); } // TODO 相关的事件自行管理,不需要写入ZLMMediaListManager // ChannelOnlineEvent channelOnlineEventLister = getChannelOnlineEventLister(transform.getApp(), transform.getStream()); // if ( channelOnlineEventLister != null) { // try { // channelOnlineEventLister.run(transform.getApp(), transform.getStream(), transform.getServerId());; // } catch (ParseException e) { // logger.error("addPush: ", e); // } // removedChannelOnlineEventLister(transform.getApp(), transform.getStream()); // } // 冗余数据,自己系统中自用 redisCatchStorage.addPushListItem(event.getApp(), event.getStream(), event); // 发送流变化redis消息 JSONObject jsonObject = new JSONObject(); jsonObject.put("serverId", userSetting.getServerId()); jsonObject.put("app", event.getApp()); jsonObject.put("stream", event.getStream()); jsonObject.put("register", true); jsonObject.put("mediaServerId", event.getMediaServer().getId()); redisCatchStorage.sendStreamChangeMsg(OriginType.values()[event.getMediaInfo().getOriginType()].getType(), jsonObject); } /** * 流离开的处理 */ @Async("taskExecutor") @EventListener public void onApplicationEvent(MediaDepartureEvent event) { // 兼容流注销时类型从redis记录获取 MediaInfo mediaInfo = redisCatchStorage.getStreamInfo( event.getApp(), event.getStream(), event.getMediaServer().getId()); if (mediaInfo != null) { String type = OriginType.values()[mediaInfo.getOriginType()].getType(); redisCatchStorage.removeStream(event.getMediaServer().getId(), type, event.getApp(), event.getStream()); if ("PUSH".equalsIgnoreCase(type)) { // 冗余数据,自己系统中自用 redisCatchStorage.removePushListItem(event.getApp(), event.getStream(), event.getMediaServer().getId()); } if (type != null) { // 发送流变化redis消息 JSONObject jsonObject = new JSONObject(); jsonObject.put("serverId", userSetting.getServerId()); jsonObject.put("app", event.getApp()); jsonObject.put("stream", event.getStream()); jsonObject.put("register", false); jsonObject.put("mediaServerId", event.getMediaServer().getId()); redisCatchStorage.sendStreamChangeMsg(type, jsonObject); } } GbStream gbStream = gbStreamMapper.selectOne(event.getApp(), event.getStream()); if (gbStream != null) { if (userSetting.isUsePushingAsStatus()) { streamPushMapper.updatePushStatus(event.getApp(), event.getStream(), false); eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF); } }else { streamPushMapper.del(event.getApp(), event.getStream()); } } private List handleJSON(List streamInfoList) { if (streamInfoList == null || streamInfoList.isEmpty()) { return null; } Map result = new HashMap<>(); for (StreamInfo streamInfo : streamInfoList) { // 不保存国标推理以及拉流代理的流 if (streamInfo.getOriginType() == OriginType.RTSP_PUSH.ordinal() || streamInfo.getOriginType() == OriginType.RTMP_PUSH.ordinal() || streamInfo.getOriginType() == OriginType.RTC_PUSH.ordinal() ) { String key = streamInfo.getApp() + "_" + streamInfo.getStream(); StreamPushItem streamPushItem = result.get(key); if (streamPushItem == null) { streamPushItem = streamPushItem.getInstance(streamInfo); result.put(key, streamPushItem); } } } return new ArrayList<>(result.values()); } @Override public StreamPushItem transform(OnStreamChangedHookParam item) { StreamPushItem streamPushItem = new StreamPushItem(); streamPushItem.setApp(item.getApp()); streamPushItem.setMediaServerId(item.getMediaServerId()); streamPushItem.setStream(item.getStream()); streamPushItem.setAliveSecond(item.getAliveSecond()); streamPushItem.setOriginSock(item.getOriginSock()); streamPushItem.setTotalReaderCount(item.getTotalReaderCount()); streamPushItem.setOriginType(item.getOriginType()); streamPushItem.setOriginTypeStr(item.getOriginTypeStr()); streamPushItem.setOriginUrl(item.getOriginUrl()); streamPushItem.setCreateTime(DateUtil.getNow()); streamPushItem.setAliveSecond(item.getAliveSecond()); streamPushItem.setStatus(true); streamPushItem.setStreamType("push"); streamPushItem.setVhost(item.getVhost()); streamPushItem.setServerId(item.getSeverId()); return streamPushItem; } @Override public PageInfo getPushList(Integer page, Integer count, String query, Boolean pushing, String mediaServerId) { PageHelper.startPage(page, count); List all = streamPushMapper.selectAllForList(query, pushing, mediaServerId); return new PageInfo<>(all); } @Override public List getPushList(String mediaServerId) { return streamPushMapper.selectAllByMediaServerIdWithOutGbID(mediaServerId); } @Override public boolean saveToGB(GbStream stream) { stream.setStreamType("push"); stream.setStatus(true); stream.setCreateTime(DateUtil.getNow()); stream.setStreamType("push"); stream.setMediaServerId(mediaConfig.getId()); int add = gbStreamMapper.add(stream); return add > 0; } @Override public boolean removeFromGB(GbStream stream) { // 判断是否需要发送事件 gbStreamService.sendCatalogMsg(stream, CatalogEvent.DEL); platformGbStreamMapper.delByAppAndStream(stream.getApp(), stream.getStream()); int del = gbStreamMapper.del(stream.getApp(), stream.getStream()); MediaServer mediaInfo = mediaServerService.getOne(stream.getMediaServerId()); List mediaList = mediaServerService.getMediaList(mediaInfo, stream.getApp(), stream.getStream(), null); if (mediaList != null && mediaList.isEmpty()) { streamPushMapper.del(stream.getApp(), stream.getStream()); } return del > 0; } @Override public StreamPushItem getPush(String app, String streamId) { return streamPushMapper.selectOne(app, streamId); } @Override public boolean stop(String app, String streamId) { logger.info("[推流 ] 停止流: {}/{}", app, streamId); StreamPushItem streamPushItem = streamPushMapper.selectOne(app, streamId); if (streamPushItem != null) { gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL); } platformGbStreamMapper.delByAppAndStream(app, streamId); gbStreamMapper.del(app, streamId); int delStream = streamPushMapper.del(app, streamId); if (delStream > 0) { MediaServer mediaServerItem = mediaServerService.getOne(streamPushItem.getMediaServerId()); mediaServerService.closeStreams(mediaServerItem,app, streamId); } return true; } @Override public void zlmServerOnline(String mediaServerId) { // 同步zlm推流信息 MediaServer mediaServerItem = mediaServerService.getOne(mediaServerId); if (mediaServerItem == null) { return; } // 数据库记录 List pushList = getPushList(mediaServerId); Map pushItemMap = new HashMap<>(); // redis记录 List mediaInfoList = redisCatchStorage.getStreams(mediaServerId, "PUSH"); Map streamInfoPushItemMap = new HashMap<>(); if (pushList.size() > 0) { for (StreamPushItem streamPushItem : pushList) { if (ObjectUtils.isEmpty(streamPushItem.getGbId())) { pushItemMap.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem); } } } if (mediaInfoList.size() > 0) { for (MediaInfo mediaInfo : mediaInfoList) { streamInfoPushItemMap.put(mediaInfo.getApp() + mediaInfo.getStream(), mediaInfo); } } // 获取所有推流鉴权信息,清理过期的 List allStreamAuthorityInfo = redisCatchStorage.getAllStreamAuthorityInfo(); Map streamAuthorityInfoInfoMap = new HashMap<>(); for (StreamAuthorityInfo streamAuthorityInfo : allStreamAuthorityInfo) { streamAuthorityInfoInfoMap.put(streamAuthorityInfo.getApp() + streamAuthorityInfo.getStream(), streamAuthorityInfo); } List mediaList = mediaServerService.getMediaList(mediaServerItem, null, null, null); if (mediaList == null) { return; } List streamPushItems = handleJSON(mediaList); if (streamPushItems != null) { for (StreamPushItem streamPushItem : streamPushItems) { pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); streamInfoPushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); streamAuthorityInfoInfoMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); } } List offlinePushItems = new ArrayList<>(pushItemMap.values()); if (offlinePushItems.size() > 0) { String type = "PUSH"; int runLimit = 300; if (offlinePushItems.size() > runLimit) { for (int i = 0; i < offlinePushItems.size(); i += runLimit) { int toIndex = i + runLimit; if (i + runLimit > offlinePushItems.size()) { toIndex = offlinePushItems.size(); } List streamPushItemsSub = offlinePushItems.subList(i, toIndex); streamPushMapper.delAll(streamPushItemsSub); } }else { streamPushMapper.delAll(offlinePushItems); } } Collection mediaInfos = streamInfoPushItemMap.values(); if (mediaInfos.size() > 0) { String type = "PUSH"; for (MediaInfo mediaInfo : mediaInfos) { JSONObject jsonObject = new JSONObject(); jsonObject.put("serverId", userSetting.getServerId()); jsonObject.put("app", mediaInfo.getApp()); jsonObject.put("stream", mediaInfo.getStream()); jsonObject.put("register", false); jsonObject.put("mediaServerId", mediaServerId); redisCatchStorage.sendStreamChangeMsg(type, jsonObject); // 移除redis内流的信息 redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", mediaInfo.getApp(), mediaInfo.getStream()); // 冗余数据,自己系统中自用 redisCatchStorage.removePushListItem(mediaInfo.getApp(), mediaInfo.getStream(), mediaServerItem.getId()); } } Collection streamAuthorityInfos = streamAuthorityInfoInfoMap.values(); if (streamAuthorityInfos.size() > 0) { for (StreamAuthorityInfo streamAuthorityInfo : streamAuthorityInfos) { // 移除redis内流的信息 redisCatchStorage.removeStreamAuthorityInfo(streamAuthorityInfo.getApp(), streamAuthorityInfo.getStream()); } } } @Override public void zlmServerOffline(String mediaServerId) { List streamPushItems = streamPushMapper.selectAllByMediaServerIdWithOutGbID(mediaServerId); // 移除没有GBId的推流 streamPushMapper.deleteWithoutGBId(mediaServerId); gbStreamMapper.deleteWithoutGBId("push", mediaServerId); // 其他的流设置未启用 streamPushMapper.updateStatusByMediaServerId(mediaServerId, false); streamProxyMapper.updateStatusByMediaServerId(mediaServerId, false); // 发送流停止消息 String type = "PUSH"; // 发送redis消息 List mediaInfoList = redisCatchStorage.getStreams(mediaServerId, type); if (mediaInfoList.size() > 0) { for (MediaInfo mediaInfo : mediaInfoList) { // 移除redis内流的信息 redisCatchStorage.removeStream(mediaServerId, type, mediaInfo.getApp(), mediaInfo.getStream()); JSONObject jsonObject = new JSONObject(); jsonObject.put("serverId", userSetting.getServerId()); jsonObject.put("app", mediaInfo.getApp()); jsonObject.put("stream", mediaInfo.getStream()); jsonObject.put("register", false); jsonObject.put("mediaServerId", mediaServerId); redisCatchStorage.sendStreamChangeMsg(type, jsonObject); // 冗余数据,自己系统中自用 redisCatchStorage.removePushListItem(mediaInfo.getApp(), mediaInfo.getStream(), mediaServerId); } } } @Override public void clean() { } @Override public boolean saveToRandomGB() { List streamPushItems = streamPushMapper.selectAll(); long gbId = 100001; for (StreamPushItem streamPushItem : streamPushItems) { streamPushItem.setStreamType("push"); streamPushItem.setStatus(true); streamPushItem.setGbId("34020000004111" + gbId); streamPushItem.setCreateTime(DateUtil.getNow()); gbId ++; } int limitCount = 30; if (streamPushItems.size() > limitCount) { for (int i = 0; i < streamPushItems.size(); i += limitCount) { int toIndex = i + limitCount; if (i + limitCount > streamPushItems.size()) { toIndex = streamPushItems.size(); } gbStreamMapper.batchAdd(streamPushItems.subList(i, toIndex)); } }else { gbStreamMapper.batchAdd(streamPushItems); } return true; } @Override public void batchAdd(List streamPushItems) { streamPushMapper.addAll(streamPushItems); gbStreamMapper.batchAdd(streamPushItems); } @Override public void batchAddForUpload(List streamPushItems, Map> streamPushItemsForAll ) { // 存储数据到stream_push表 streamPushMapper.addAll(streamPushItems); List streamPushItemForGbStream = streamPushItems.stream() .filter(streamPushItem-> streamPushItem.getGbId() != null) .collect(Collectors.toList()); // 存储数据到gb_stream表, id会返回到streamPushItemForGbStream里 if (streamPushItemForGbStream.size() > 0) { gbStreamMapper.batchAdd(streamPushItemForGbStream); } // 去除没有ID也就是没有存储到数据库的数据 List streamPushItemsForPlatform = streamPushItemForGbStream.stream() .filter(streamPushItem-> streamPushItem.getGbStreamId() != null) .collect(Collectors.toList()); if (streamPushItemsForPlatform.size() > 0) { // 获取所有平台,平台和目录信息一般不会特别大量。 List parentPlatformList = parentPlatformMapper.getParentPlatformList(); Map> platformInfoMap = new HashMap<>(); if (parentPlatformList.size() == 0) { return; } for (ParentPlatform platform : parentPlatformList) { Map catalogMap = new HashMap<>(); // 创建根节点 PlatformCatalog platformCatalog = new PlatformCatalog(); platformCatalog.setId(platform.getServerGBId()); catalogMap.put(platform.getServerGBId(), platformCatalog); // 查询所有节点信息 List platformCatalogs = platformCatalogMapper.selectByPlatForm(platform.getServerGBId()); if (platformCatalogs.size() > 0) { for (PlatformCatalog catalog : platformCatalogs) { catalogMap.put(catalog.getId(), catalog); } } platformInfoMap.put(platform.getServerGBId(), catalogMap); } List streamPushItemListFroPlatform = new ArrayList<>(); Map> platformForEvent = new HashMap<>(); // 遍历存储结果,查找app+Stream->platformId+catalogId的对应关系,然后执行批量写入 for (StreamPushItem streamPushItem : streamPushItemsForPlatform) { List platFormInfoList = streamPushItemsForAll.get(streamPushItem.getApp() + streamPushItem.getStream()); if (platFormInfoList != null && platFormInfoList.size() > 0) { for (String[] platFormInfoArray : platFormInfoList) { StreamPushItem streamPushItemForPlatform = new StreamPushItem(); streamPushItemForPlatform.setGbStreamId(streamPushItem.getGbStreamId()); if (platFormInfoArray.length > 0) { // 数组 platFormInfoArray 0 为平台ID。 1为目录ID // 不存在这个平台,则忽略导入此关联关系 if (platformInfoMap.get(platFormInfoArray[0]) == null || platformInfoMap.get(platFormInfoArray[0]).get(platFormInfoArray[1]) == null) { logger.info("导入数据时不存在平台或目录{}/{},已导入未分配", platFormInfoArray[0], platFormInfoArray[1] ); continue; } streamPushItemForPlatform.setPlatformId(platFormInfoArray[0]); List gbStreamList = platformForEvent.get(platFormInfoArray[0]); if (gbStreamList == null) { gbStreamList = new ArrayList<>(); platformForEvent.put(platFormInfoArray[0], gbStreamList); } // 为发送通知整理数据 streamPushItemForPlatform.setName(streamPushItem.getName()); streamPushItemForPlatform.setApp(streamPushItem.getApp()); streamPushItemForPlatform.setStream(streamPushItem.getStream()); streamPushItemForPlatform.setGbId(streamPushItem.getGbId()); gbStreamList.add(streamPushItemForPlatform); } if (platFormInfoArray.length > 1) { streamPushItemForPlatform.setCatalogId(platFormInfoArray[1]); } streamPushItemListFroPlatform.add(streamPushItemForPlatform); } } } if (!streamPushItemListFroPlatform.isEmpty()) { platformGbStreamMapper.batchAdd(streamPushItemListFroPlatform); // 发送通知 for (String platformId : platformForEvent.keySet()) { eventPublisher.catalogEventPublishForStream( platformId, platformForEvent.get(platformId), CatalogEvent.ADD); } } } } @Override public boolean batchStop(List gbStreams) { if (gbStreams == null || gbStreams.size() == 0) { return false; } gbStreamService.sendCatalogMsgs(gbStreams, CatalogEvent.DEL); platformGbStreamMapper.delByGbStreams(gbStreams); gbStreamMapper.batchDelForGbStream(gbStreams); int delStream = streamPushMapper.delAllForGbStream(gbStreams); if (delStream > 0) { for (GbStream gbStream : gbStreams) { MediaServer mediaServerItem = mediaServerService.getOne(gbStream.getMediaServerId()); mediaServerService.closeStreams(mediaServerItem, gbStream.getApp(), gbStream.getStream()); } } return true; } @Override public void allStreamOffline() { List onlinePushers = streamPushMapper.getOnlinePusherForGb(); if (onlinePushers.size() == 0) { return; } streamPushMapper.setAllStreamOffline(); // 发送通知 eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.OFF); } @Override public void offline(List offlineStreams) { // 更新部分设备离线 List onlinePushers = streamPushMapper.getOnlinePusherForGbInList(offlineStreams); streamPushMapper.offline(offlineStreams); // 发送通知 eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.OFF); } @Override public void online(List onlineStreams) { // 更新部分设备上线streamPushService List onlinePushers = streamPushMapper.getOfflinePusherForGbInList(onlineStreams); streamPushMapper.online(onlineStreams); // 发送通知 eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.ON); } @Override public boolean add(StreamPushItem stream) { stream.setUpdateTime(DateUtil.getNow()); stream.setCreateTime(DateUtil.getNow()); stream.setServerId(userSetting.getServerId()); stream.setMediaServerId(mediaConfig.getId()); stream.setSelf(true); stream.setPushIng(true); // 放在事务内执行 boolean result = false; TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); try { int addStreamResult = streamPushMapper.add(stream); if (!ObjectUtils.isEmpty(stream.getGbId())) { stream.setStreamType("push"); gbStreamMapper.add(stream); } dataSourceTransactionManager.commit(transactionStatus); result = true; }catch (Exception e) { logger.error("批量移除流与平台的关系时错误", e); dataSourceTransactionManager.rollback(transactionStatus); } return result; } @Override public List getAllAppAndStream() { return streamPushMapper.getAllAppAndStream(); } @Override public ResourceBaseInfo getOverview() { int total = streamPushMapper.getAllCount(); int online = streamPushMapper.getAllOnline(userSetting.isUsePushingAsStatus()); return new ResourceBaseInfo(total, online); } @Override public Map getAllAppAndStreamMap() { return streamPushMapper.getAllAppAndStreamMap(); } @Override public void updatePush(OnStreamChangedHookParam param) { StreamPushItem transform = transform(param); StreamPushItem pushInDb = getPush(param.getApp(), param.getStream()); transform.setPushIng(param.isRegist()); transform.setUpdateTime(DateUtil.getNow()); transform.setPushTime(DateUtil.getNow()); transform.setSelf(userSetting.getServerId().equals(param.getSeverId())); if (pushInDb == null) { transform.setCreateTime(DateUtil.getNow()); streamPushMapper.add(transform); }else { streamPushMapper.update(transform); gbStreamMapper.updateMediaServer(param.getApp(), param.getStream(), param.getMediaServerId()); } } }