From 82adc0cb23f3ee47322e78889cdaba57e9309000 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期二, 21 三月 2023 15:55:24 +0800 Subject: [PATCH] 完善语音对讲级联 --- src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java | 458 +++++++++++++++++++++++++++++++++++++++++++++++--------- 1 files changed, 382 insertions(+), 76 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java index dcca0e5..6540e3e 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java @@ -1,33 +1,44 @@ package com.genersoft.iot.vmp.service.impl; -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONObject; -import com.alibaba.fastjson.TypeReference; -import com.genersoft.iot.vmp.common.StreamInfo; -import com.genersoft.iot.vmp.conf.UserSetup; -import com.genersoft.iot.vmp.gb28181.bean.GbStream; -import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONArray; +import com.alibaba.fastjson2.JSONObject; +import com.alibaba.fastjson2.TypeReference; +import com.genersoft.iot.vmp.conf.MediaConfig; +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.gb28181.bean.*; +import com.genersoft.iot.vmp.gb28181.event.EventPublisher; +import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; -import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig; -import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; -import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; -import com.genersoft.iot.vmp.media.zlm.dto.OriginType; -import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; +import com.genersoft.iot.vmp.media.zlm.dto.*; +import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; +import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType; +import com.genersoft.iot.vmp.service.IGbStreamService; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IStreamPushService; +import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import com.genersoft.iot.vmp.storager.dao.GbStreamMapper; -import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper; -import com.genersoft.iot.vmp.storager.dao.StreamPushMapper; +import com.genersoft.iot.vmp.storager.dao.*; +import com.genersoft.iot.vmp.utils.DateUtil; +import com.genersoft.iot.vmp.vmanager.bean.ResourceBaceInfo; import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.stereotype.Service; +import org.springframework.transaction.TransactionDefinition; +import org.springframework.transaction.TransactionStatus; +import org.springframework.util.ObjectUtils; import java.util.*; +import java.util.stream.Collectors; @Service public class StreamPushServiceImpl implements IStreamPushService { + + private final static Logger logger = LoggerFactory.getLogger(StreamPushServiceImpl.class); @Autowired private GbStreamMapper gbStreamMapper; @@ -36,7 +47,22 @@ private StreamPushMapper streamPushMapper; @Autowired + private StreamProxyMapper streamProxyMapper; + + @Autowired + private ParentPlatformMapper parentPlatformMapper; + + @Autowired + private PlatformCatalogMapper platformCatalogMapper; + + @Autowired private PlatformGbStreamMapper platformGbStreamMapper; + + @Autowired + private IGbStreamService gbStreamService; + + @Autowired + private EventPublisher eventPublisher; @Autowired private ZLMRESTfulUtils zlmresTfulUtils; @@ -45,19 +71,31 @@ private IRedisCatchStorage redisCatchStorage; @Autowired - private UserSetup userSetup; + private UserSetting userSetting; @Autowired private IMediaServerService mediaServerService; + @Autowired + DataSourceTransactionManager dataSourceTransactionManager; + + @Autowired + TransactionDefinition transactionDefinition; + + @Autowired + private MediaConfig mediaConfig; + + @Override public List<StreamPushItem> handleJSON(String jsonData, MediaServerItem mediaServerItem) { - if (jsonData == null) return null; + if (jsonData == null) { + return null; + } Map<String, StreamPushItem> result = new HashMap<>(); - List<MediaItem> mediaItems = JSON.parseObject(jsonData, new TypeReference<List<MediaItem>>() {}); - for (MediaItem item : mediaItems) { + List<OnStreamChangedHookParam> onStreamChangedHookParams = JSON.parseObject(jsonData, new TypeReference<List<OnStreamChangedHookParam>>() {}); + for (OnStreamChangedHookParam item : onStreamChangedHookParams) { // 涓嶄繚瀛樺浗鏍囨帹鐞嗕互鍙婃媺娴佷唬鐞嗙殑娴� if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal() @@ -70,59 +108,69 @@ result.put(key, streamPushItem); } } - } return new ArrayList<>(result.values()); } @Override - public StreamPushItem transform(MediaItem item) { + public StreamPushItem transform(OnStreamChangedHookParam item) { StreamPushItem streamPushItem = new StreamPushItem(); streamPushItem.setApp(item.getApp()); streamPushItem.setMediaServerId(item.getMediaServerId()); streamPushItem.setStream(item.getStream()); streamPushItem.setAliveSecond(item.getAliveSecond()); - streamPushItem.setCreateStamp(item.getCreateStamp()); streamPushItem.setOriginSock(item.getOriginSock()); streamPushItem.setTotalReaderCount(item.getTotalReaderCount()); streamPushItem.setOriginType(item.getOriginType()); streamPushItem.setOriginTypeStr(item.getOriginTypeStr()); streamPushItem.setOriginUrl(item.getOriginUrl()); - streamPushItem.setCreateStamp(item.getCreateStamp()); + streamPushItem.setCreateTime(DateUtil.getNow()); streamPushItem.setAliveSecond(item.getAliveSecond()); streamPushItem.setStatus(true); streamPushItem.setStreamType("push"); streamPushItem.setVhost(item.getVhost()); + streamPushItem.setServerId(item.getSeverId()); return streamPushItem; } @Override - public PageInfo<StreamPushItem> getPushList(Integer page, Integer count) { + public PageInfo<StreamPushItem> getPushList(Integer page, Integer count, String query, Boolean pushing, String mediaServerId) { PageHelper.startPage(page, count); - List<StreamPushItem> all = streamPushMapper.selectAll(); + List<StreamPushItem> all = streamPushMapper.selectAllForList(query, pushing, mediaServerId); return new PageInfo<>(all); } @Override public List<StreamPushItem> getPushList(String mediaServerId) { - return streamPushMapper.selectAllByMediaServerId(mediaServerId); + return streamPushMapper.selectAllByMediaServerIdWithOutGbID(mediaServerId); } @Override public boolean saveToGB(GbStream stream) { stream.setStreamType("push"); stream.setStatus(true); + stream.setCreateTime(DateUtil.getNow()); + stream.setStreamType("push"); + stream.setMediaServerId(mediaConfig.getId()); int add = gbStreamMapper.add(stream); return add > 0; } @Override public boolean removeFromGB(GbStream stream) { + // 鍒ゆ柇鏄惁闇�瑕佸彂閫佷簨浠� + gbStreamService.sendCatalogMsg(stream, CatalogEvent.DEL); + platformGbStreamMapper.delByAppAndStream(stream.getApp(), stream.getStream()); int del = gbStreamMapper.del(stream.getApp(), stream.getStream()); MediaServerItem mediaInfo = mediaServerService.getOne(stream.getMediaServerId()); JSONObject mediaList = zlmresTfulUtils.getMediaList(mediaInfo, stream.getApp(), stream.getStream()); - if (mediaList == null) { - streamPushMapper.del(stream.getApp(), stream.getStream()); + if (mediaList != null) { + if (mediaList.getInteger("code") == 0) { + JSONArray data = mediaList.getJSONArray("data"); + if (data == null) { + streamPushMapper.del(stream.getApp(), stream.getStream()); + } + } } return del > 0; } @@ -130,16 +178,19 @@ @Override public StreamPushItem getPush(String app, String streamId) { - return streamPushMapper.selectOne(app, streamId); } @Override public boolean stop(String app, String streamId) { StreamPushItem streamPushItem = streamPushMapper.selectOne(app, streamId); - int delStream = streamPushMapper.del(app, streamId); - gbStreamMapper.del(app, streamId); + if (streamPushItem != null) { + gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL); + } + platformGbStreamMapper.delByAppAndStream(app, streamId); + gbStreamMapper.del(app, streamId); + int delStream = streamPushMapper.del(app, streamId); if (delStream > 0) { MediaServerItem mediaServerItem = mediaServerService.getOne(streamPushItem.getMediaServerId()); zlmresTfulUtils.closeStreams(mediaServerItem,app, streamId); @@ -154,72 +205,119 @@ if (mediaServerItem == null) { return; } + // 鏁版嵁搴撹褰� List<StreamPushItem> pushList = getPushList(mediaServerId); + Map<String, StreamPushItem> pushItemMap = new HashMap<>(); + // redis璁板綍 + List<OnStreamChangedHookParam> onStreamChangedHookParams = redisCatchStorage.getStreams(mediaServerId, "PUSH"); + Map<String, OnStreamChangedHookParam> streamInfoPushItemMap = new HashMap<>(); if (pushList.size() > 0) { - Map<String, StreamPushItem> pushItemMap = new HashMap<>(); for (StreamPushItem streamPushItem : pushList) { - pushItemMap.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem); + if (ObjectUtils.isEmpty(streamPushItem.getGbId())) { + pushItemMap.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem); + } } - zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{ - if (mediaList == null) return; - String dataStr = mediaList.getString("data"); - - Integer code = mediaList.getInteger("code"); - List<StreamPushItem> streamPushItems = null; - if (code == 0 ) { - if (dataStr != null) { - streamPushItems = handleJSON(dataStr, mediaServerItem); - } - } - - if (streamPushItems != null) { - for (StreamPushItem streamPushItem : streamPushItems) { - pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); - } - } - Collection<StreamPushItem> offlinePushItems = pushItemMap.values(); - if (offlinePushItems.size() > 0) { - String type = "PUSH"; - streamPushMapper.delAll(new ArrayList<>(offlinePushItems)); - for (StreamPushItem offlinePushItem : offlinePushItems) { - JSONObject jsonObject = new JSONObject(); - jsonObject.put("serverId", userSetup.getServerId()); - jsonObject.put("app", offlinePushItem.getApp()); - jsonObject.put("stream", offlinePushItem.getStream()); - jsonObject.put("register", false); - jsonObject.put("mediaServerId", mediaServerId); - redisCatchStorage.sendStreamChangeMsg(type, jsonObject); - // 绉婚櫎redis鍐呮祦鐨勪俊鎭� - redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlinePushItem.getApp(), offlinePushItem.getStream()); - } - } - })); } + if (onStreamChangedHookParams.size() > 0) { + for (OnStreamChangedHookParam onStreamChangedHookParam : onStreamChangedHookParams) { + streamInfoPushItemMap.put(onStreamChangedHookParam.getApp() + onStreamChangedHookParam.getStream(), onStreamChangedHookParam); + } + } + // 鑾峰彇鎵�鏈夋帹娴侀壌鏉冧俊鎭紝娓呯悊杩囨湡鐨� + List<StreamAuthorityInfo> allStreamAuthorityInfo = redisCatchStorage.getAllStreamAuthorityInfo(); + Map<String, StreamAuthorityInfo> streamAuthorityInfoInfoMap = new HashMap<>(); + for (StreamAuthorityInfo streamAuthorityInfo : allStreamAuthorityInfo) { + streamAuthorityInfoInfoMap.put(streamAuthorityInfo.getApp() + streamAuthorityInfo.getStream(), streamAuthorityInfo); + } + zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{ + if (mediaList == null) { + return; + } + String dataStr = mediaList.getString("data"); + + Integer code = mediaList.getInteger("code"); + List<StreamPushItem> streamPushItems = null; + if (code == 0 ) { + if (dataStr != null) { + streamPushItems = handleJSON(dataStr, mediaServerItem); + } + } + + if (streamPushItems != null) { + for (StreamPushItem streamPushItem : streamPushItems) { + pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); + streamInfoPushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); + streamAuthorityInfoInfoMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); + } + } + List<StreamPushItem> offlinePushItems = new ArrayList<>(pushItemMap.values()); + if (offlinePushItems.size() > 0) { + String type = "PUSH"; + int runLimit = 300; + if (offlinePushItems.size() > runLimit) { + for (int i = 0; i < offlinePushItems.size(); i += runLimit) { + int toIndex = i + runLimit; + if (i + runLimit > offlinePushItems.size()) { + toIndex = offlinePushItems.size(); + } + List<StreamPushItem> streamPushItemsSub = offlinePushItems.subList(i, toIndex); + streamPushMapper.delAll(streamPushItemsSub); + } + }else { + streamPushMapper.delAll(offlinePushItems); + } + + } + Collection<OnStreamChangedHookParam> offlineOnStreamChangedHookParamList = streamInfoPushItemMap.values(); + if (offlineOnStreamChangedHookParamList.size() > 0) { + String type = "PUSH"; + for (OnStreamChangedHookParam offlineOnStreamChangedHookParam : offlineOnStreamChangedHookParamList) { + JSONObject jsonObject = new JSONObject(); + jsonObject.put("serverId", userSetting.getServerId()); + jsonObject.put("app", offlineOnStreamChangedHookParam.getApp()); + jsonObject.put("stream", offlineOnStreamChangedHookParam.getStream()); + jsonObject.put("register", false); + jsonObject.put("mediaServerId", mediaServerId); + redisCatchStorage.sendStreamChangeMsg(type, jsonObject); + // 绉婚櫎redis鍐呮祦鐨勪俊鎭� + redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineOnStreamChangedHookParam.getApp(), offlineOnStreamChangedHookParam.getStream()); + } + } + + Collection<StreamAuthorityInfo> streamAuthorityInfos = streamAuthorityInfoInfoMap.values(); + if (streamAuthorityInfos.size() > 0) { + for (StreamAuthorityInfo streamAuthorityInfo : streamAuthorityInfos) { + // 绉婚櫎redis鍐呮祦鐨勪俊鎭� + redisCatchStorage.removeStreamAuthorityInfo(streamAuthorityInfo.getApp(), streamAuthorityInfo.getStream()); + } + } + })); } @Override public void zlmServerOffline(String mediaServerId) { - List<StreamPushItem> streamPushItems = streamPushMapper.selectAllByMediaServerId(mediaServerId); + List<StreamPushItem> streamPushItems = streamPushMapper.selectAllByMediaServerIdWithOutGbID(mediaServerId); // 绉婚櫎娌℃湁GBId鐨勬帹娴� streamPushMapper.deleteWithoutGBId(mediaServerId); gbStreamMapper.deleteWithoutGBId("push", mediaServerId); // 鍏朵粬鐨勬祦璁剧疆鏈惎鐢� - gbStreamMapper.updateStatusByMediaServerId(mediaServerId, false); + streamPushMapper.updateStatusByMediaServerId(mediaServerId, false); + streamProxyMapper.updateStatusByMediaServerId(mediaServerId, false); // 鍙戦�佹祦鍋滄娑堟伅 String type = "PUSH"; // 鍙戦�乺edis娑堟伅 - List<StreamInfo> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type); + List<OnStreamChangedHookParam> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type); if (streamInfoList.size() > 0) { - for (StreamInfo streamInfo : streamInfoList) { + for (OnStreamChangedHookParam onStreamChangedHookParam : streamInfoList) { + // 绉婚櫎redis鍐呮祦鐨勪俊鎭� + redisCatchStorage.removeStream(mediaServerId, type, onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream()); JSONObject jsonObject = new JSONObject(); - jsonObject.put("serverId", userSetup.getServerId()); - jsonObject.put("app", streamInfo.getApp()); - jsonObject.put("stream", streamInfo.getStreamId()); + jsonObject.put("serverId", userSetting.getServerId()); + jsonObject.put("app", onStreamChangedHookParam.getApp()); + jsonObject.put("stream", onStreamChangedHookParam.getStream()); jsonObject.put("register", false); jsonObject.put("mediaServerId", mediaServerId); redisCatchStorage.sendStreamChangeMsg(type, jsonObject); - // 绉婚櫎redis鍐呮祦鐨勪俊鎭� - redisCatchStorage.removeStream(mediaServerId, type, streamInfo.getApp(), streamInfo.getStreamId()); } } } @@ -228,4 +326,212 @@ public void clean() { } + + @Override + public boolean saveToRandomGB() { + List<StreamPushItem> streamPushItems = streamPushMapper.selectAll(); + long gbId = 100001; + for (StreamPushItem streamPushItem : streamPushItems) { + streamPushItem.setStreamType("push"); + streamPushItem.setStatus(true); + streamPushItem.setGbId("34020000004111" + gbId); + streamPushItem.setCreateTime(DateUtil.getNow()); + gbId ++; + } + int limitCount = 30; + + if (streamPushItems.size() > limitCount) { + for (int i = 0; i < streamPushItems.size(); i += limitCount) { + int toIndex = i + limitCount; + if (i + limitCount > streamPushItems.size()) { + toIndex = streamPushItems.size(); + } + gbStreamMapper.batchAdd(streamPushItems.subList(i, toIndex)); + } + }else { + gbStreamMapper.batchAdd(streamPushItems); + } + return true; + } + + @Override + public void batchAdd(List<StreamPushItem> streamPushItems) { + streamPushMapper.addAll(streamPushItems); + gbStreamMapper.batchAdd(streamPushItems); + } + + + @Override + public void batchAddForUpload(List<StreamPushItem> streamPushItems, Map<String, List<String[]>> streamPushItemsForAll ) { + // 瀛樺偍鏁版嵁鍒皊tream_push琛� + streamPushMapper.addAll(streamPushItems); + List<StreamPushItem> streamPushItemForGbStream = streamPushItems.stream() + .filter(streamPushItem-> streamPushItem.getGbId() != null) + .collect(Collectors.toList()); + // 瀛樺偍鏁版嵁鍒癵b_stream琛紝 id浼氳繑鍥炲埌streamPushItemForGbStream閲� + if (streamPushItemForGbStream.size() > 0) { + gbStreamMapper.batchAdd(streamPushItemForGbStream); + } + // 鍘婚櫎娌℃湁ID涔熷氨鏄病鏈夊瓨鍌ㄥ埌鏁版嵁搴撶殑鏁版嵁 + List<StreamPushItem> streamPushItemsForPlatform = streamPushItemForGbStream.stream() + .filter(streamPushItem-> streamPushItem.getGbStreamId() != null) + .collect(Collectors.toList()); + + if (streamPushItemsForPlatform.size() > 0) { + // 鑾峰彇鎵�鏈夊钩鍙帮紝骞冲彴鍜岀洰褰曚俊鎭竴鑸笉浼氱壒鍒ぇ閲忋�� + List<ParentPlatform> parentPlatformList = parentPlatformMapper.getParentPlatformList(); + Map<String, Map<String, PlatformCatalog>> platformInfoMap = new HashMap<>(); + if (parentPlatformList.size() == 0) { + return; + } + for (ParentPlatform platform : parentPlatformList) { + Map<String, PlatformCatalog> catalogMap = new HashMap<>(); + + // 鍒涘缓鏍硅妭鐐� + PlatformCatalog platformCatalog = new PlatformCatalog(); + platformCatalog.setId(platform.getServerGBId()); + catalogMap.put(platform.getServerGBId(), platformCatalog); + + // 鏌ヨ鎵�鏈夎妭鐐逛俊鎭� + List<PlatformCatalog> platformCatalogs = platformCatalogMapper.selectByPlatForm(platform.getServerGBId()); + if (platformCatalogs.size() > 0) { + for (PlatformCatalog catalog : platformCatalogs) { + catalogMap.put(catalog.getId(), catalog); + } + } + platformInfoMap.put(platform.getServerGBId(), catalogMap); + } + List<StreamPushItem> streamPushItemListFroPlatform = new ArrayList<>(); + Map<String, List<GbStream>> platformForEvent = new HashMap<>(); + // 閬嶅巻瀛樺偍缁撴灉锛屾煡鎵綼pp+Stream->platformId+catalogId鐨勫搴斿叧绯伙紝鐒跺悗鎵ц鎵归噺鍐欏叆 + for (StreamPushItem streamPushItem : streamPushItemsForPlatform) { + List<String[]> platFormInfoList = streamPushItemsForAll.get(streamPushItem.getApp() + streamPushItem.getStream()); + if (platFormInfoList != null && platFormInfoList.size() > 0) { + for (String[] platFormInfoArray : platFormInfoList) { + StreamPushItem streamPushItemForPlatform = new StreamPushItem(); + streamPushItemForPlatform.setGbStreamId(streamPushItem.getGbStreamId()); + if (platFormInfoArray.length > 0) { + // 鏁扮粍 platFormInfoArray 0 涓哄钩鍙癐D銆� 1涓虹洰褰旾D + // 涓嶅瓨鍦ㄨ繖涓钩鍙帮紝鍒欏拷鐣ュ鍏ユ鍏宠仈鍏崇郴 + if (platformInfoMap.get(platFormInfoArray[0]) == null + || platformInfoMap.get(platFormInfoArray[0]).get(platFormInfoArray[1]) == null) { + logger.info("瀵煎叆鏁版嵁鏃朵笉瀛樺湪骞冲彴鎴栫洰褰晎}/{},宸插鍏ユ湭鍒嗛厤", platFormInfoArray[0], platFormInfoArray[1] ); + continue; + } + streamPushItemForPlatform.setPlatformId(platFormInfoArray[0]); + List<GbStream> gbStreamList = platformForEvent.get(platFormInfoArray[0]); + if (gbStreamList == null) { + gbStreamList = new ArrayList<>(); + platformForEvent.put(platFormInfoArray[0], gbStreamList); + } + // 涓哄彂閫侀�氱煡鏁寸悊鏁版嵁 + streamPushItemForPlatform.setName(streamPushItem.getName()); + streamPushItemForPlatform.setApp(streamPushItem.getApp()); + streamPushItemForPlatform.setStream(streamPushItem.getStream()); + streamPushItemForPlatform.setGbId(streamPushItem.getGbId()); + gbStreamList.add(streamPushItemForPlatform); + } + if (platFormInfoArray.length > 1) { + streamPushItemForPlatform.setCatalogId(platFormInfoArray[1]); + } + streamPushItemListFroPlatform.add(streamPushItemForPlatform); + } + + } + } + if (streamPushItemListFroPlatform.size() > 0) { + platformGbStreamMapper.batchAdd(streamPushItemListFroPlatform); + // 鍙戦�侀�氱煡 + for (String platformId : platformForEvent.keySet()) { + eventPublisher.catalogEventPublishForStream( + platformId, platformForEvent.get(platformId), CatalogEvent.ADD); + } + } + } + } + + @Override + public boolean batchStop(List<GbStream> gbStreams) { + if (gbStreams == null || gbStreams.size() == 0) { + return false; + } + gbStreamService.sendCatalogMsgs(gbStreams, CatalogEvent.DEL); + + platformGbStreamMapper.delByGbStreams(gbStreams); + gbStreamMapper.batchDelForGbStream(gbStreams); + int delStream = streamPushMapper.delAllForGbStream(gbStreams); + if (delStream > 0) { + for (GbStream gbStream : gbStreams) { + MediaServerItem mediaServerItem = mediaServerService.getOne(gbStream.getMediaServerId()); + zlmresTfulUtils.closeStreams(mediaServerItem, gbStream.getApp(), gbStream.getStream()); + } + + } + return true; + } + + @Override + public void allStreamOffline() { + List<GbStream> onlinePushers = streamPushMapper.getOnlinePusherForGb(); + if (onlinePushers.size() == 0) { + return; + } + streamPushMapper.setAllStreamOffline(); + + // 鍙戦�侀�氱煡 + eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.OFF); + } + + @Override + public void offline(List<StreamPushItemFromRedis> offlineStreams) { + // 鏇存柊閮ㄥ垎璁惧绂荤嚎 + List<GbStream> onlinePushers = streamPushMapper.getOnlinePusherForGbInList(offlineStreams); + streamPushMapper.offline(offlineStreams); + // 鍙戦�侀�氱煡 + eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.OFF); + } + + @Override + public void online(List<StreamPushItemFromRedis> onlineStreams) { + // 鏇存柊閮ㄥ垎璁惧涓婄嚎streamPushService + List<GbStream> onlinePushers = streamPushMapper.getOfflinePusherForGbInList(onlineStreams); + streamPushMapper.online(onlineStreams); + // 鍙戦�侀�氱煡 + eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.ON); + } + + @Override + public boolean add(StreamPushItem stream) { + stream.setUpdateTime(DateUtil.getNow()); + stream.setCreateTime(DateUtil.getNow()); + stream.setServerId(userSetting.getServerId()); + + // 鏀惧湪浜嬪姟鍐呮墽琛� + boolean result = false; + TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); + try { + int addStreamResult = streamPushMapper.add(stream); + if (!ObjectUtils.isEmpty(stream.getGbId())) { + stream.setStreamType("push"); + gbStreamMapper.add(stream); + } + dataSourceTransactionManager.commit(transactionStatus); + result = true; + }catch (Exception e) { + logger.error("鎵归噺绉婚櫎娴佷笌骞冲彴鐨勫叧绯绘椂閿欒", e); + dataSourceTransactionManager.rollback(transactionStatus); + } + return result; + } + + @Override + public List<String> getAllAppAndStream() { + + return streamPushMapper.getAllAppAndStream(); + } + + @Override + public ResourceBaceInfo getOverview() { + return streamPushMapper.getOverview(userSetting.isUsePushingAsStatus()); + } } -- Gitblit v1.8.0