From a42dda2bd3cc1cf8c20cc61e7ad9211eadecbaf3 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期四, 24 二月 2022 16:55:06 +0800 Subject: [PATCH] 规范数据库,添加必要约束,优化通道批量导入功能 --- src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java | 324 ++++++++++++++++++++++++++++++++++++++++++++--------- 1 files changed, 268 insertions(+), 56 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java index dcca0e5..8a015d9 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java @@ -5,26 +5,30 @@ import com.alibaba.fastjson.TypeReference; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.UserSetup; +import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.GbStream; +import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; +import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog; +import com.genersoft.iot.vmp.gb28181.event.EventPublisher; +import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig; -import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; -import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; -import com.genersoft.iot.vmp.media.zlm.dto.OriginType; -import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; +import com.genersoft.iot.vmp.media.zlm.dto.*; +import com.genersoft.iot.vmp.service.IGbStreamService; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IStreamPushService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import com.genersoft.iot.vmp.storager.dao.GbStreamMapper; -import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper; -import com.genersoft.iot.vmp.storager.dao.StreamPushMapper; +import com.genersoft.iot.vmp.storager.dao.*; +import com.genersoft.iot.vmp.vmanager.bean.StreamPushExcelDto; import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.springframework.util.StringUtils; import java.util.*; +import java.util.stream.Collectors; @Service public class StreamPushServiceImpl implements IStreamPushService { @@ -36,7 +40,19 @@ private StreamPushMapper streamPushMapper; @Autowired + private ParentPlatformMapper parentPlatformMapper; + + @Autowired + private PlatformCatalogMapper platformCatalogMapper; + + @Autowired private PlatformGbStreamMapper platformGbStreamMapper; + + @Autowired + private IGbStreamService gbStreamService; + + @Autowired + private EventPublisher eventPublisher; @Autowired private ZLMRESTfulUtils zlmresTfulUtils; @@ -70,7 +86,6 @@ result.put(key, streamPushItem); } } - } return new ArrayList<>(result.values()); @@ -82,13 +97,12 @@ streamPushItem.setMediaServerId(item.getMediaServerId()); streamPushItem.setStream(item.getStream()); streamPushItem.setAliveSecond(item.getAliveSecond()); - streamPushItem.setCreateStamp(item.getCreateStamp()); streamPushItem.setOriginSock(item.getOriginSock()); streamPushItem.setTotalReaderCount(item.getTotalReaderCount()); streamPushItem.setOriginType(item.getOriginType()); streamPushItem.setOriginTypeStr(item.getOriginTypeStr()); streamPushItem.setOriginUrl(item.getOriginUrl()); - streamPushItem.setCreateStamp(item.getCreateStamp()); + streamPushItem.setCreateStamp(item.getCreateStamp() * 1000); streamPushItem.setAliveSecond(item.getAliveSecond()); streamPushItem.setStatus(true); streamPushItem.setStreamType("push"); @@ -97,28 +111,55 @@ } @Override - public PageInfo<StreamPushItem> getPushList(Integer page, Integer count) { + public PageInfo<StreamPushItem> getPushList(Integer page, Integer count, String query, Boolean pushing, String mediaServerId) { PageHelper.startPage(page, count); - List<StreamPushItem> all = streamPushMapper.selectAll(); + List<StreamPushItem> all = streamPushMapper.selectAllForList(query, pushing, mediaServerId); return new PageInfo<>(all); } @Override public List<StreamPushItem> getPushList(String mediaServerId) { - return streamPushMapper.selectAllByMediaServerId(mediaServerId); + return streamPushMapper.selectAllByMediaServerIdWithOutGbID(mediaServerId); } @Override public boolean saveToGB(GbStream stream) { stream.setStreamType("push"); stream.setStatus(true); + stream.setCreateStamp(System.currentTimeMillis()); int add = gbStreamMapper.add(stream); + + // 鏌ユ壘寮�鍚簡鍏ㄩ儴鐩存挱娴佸叡浜殑涓婄骇骞冲彴 + List<ParentPlatform> parentPlatforms = parentPlatformMapper.selectAllAhareAllLiveStream(); + if (parentPlatforms.size() > 0) { + for (ParentPlatform parentPlatform : parentPlatforms) { + stream.setCatalogId(parentPlatform.getCatalogId()); + stream.setPlatformId(parentPlatform.getServerGBId()); + String streamId = stream.getStream(); + StreamProxyItem streamProxyItem = platformGbStreamMapper.selectOne(stream.getApp(), streamId, parentPlatform.getServerGBId()); + if (streamProxyItem == null) { + platformGbStreamMapper.add(stream); + eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), stream, CatalogEvent.ADD); + }else { + if (!streamProxyItem.getGbId().equals(stream.getGbId())) { + // 姝ゆ祦浣跨敤鍙︿竴涓浗鏍嘔d宸茬粡涓庤骞冲彴鍏宠仈锛岀Щ闄ゆ璁板綍 + platformGbStreamMapper.delByAppAndStreamAndPlatform(stream.getApp(), streamId, parentPlatform.getServerGBId()); + platformGbStreamMapper.add(stream); + eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), stream, CatalogEvent.ADD); + } + } + } + } + return add > 0; } @Override public boolean removeFromGB(GbStream stream) { + // 鍒ゆ柇鏄惁闇�瑕佸彂閫佷簨浠� + gbStreamService.sendCatalogMsg(stream, CatalogEvent.DEL); int del = gbStreamMapper.del(stream.getApp(), stream.getStream()); + platformGbStreamMapper.delByAppAndStream(stream.getApp(), stream.getStream()); MediaServerItem mediaInfo = mediaServerService.getOne(stream.getMediaServerId()); JSONObject mediaList = zlmresTfulUtils.getMediaList(mediaInfo, stream.getApp(), stream.getStream()); if (mediaList == null) { @@ -137,6 +178,8 @@ @Override public boolean stop(String app, String streamId) { StreamPushItem streamPushItem = streamPushMapper.selectOne(app, streamId); + gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL); + int delStream = streamPushMapper.del(app, streamId); gbStreamMapper.del(app, streamId); platformGbStreamMapper.delByAppAndStream(app, streamId); @@ -154,52 +197,81 @@ if (mediaServerItem == null) { return; } + // 鏁版嵁搴撹褰� List<StreamPushItem> pushList = getPushList(mediaServerId); + Map<String, StreamPushItem> pushItemMap = new HashMap<>(); + // redis璁板綍 + List<MediaItem> mediaItems = redisCatchStorage.getStreams(mediaServerId, "PUSH"); + Map<String, MediaItem> streamInfoPushItemMap = new HashMap<>(); if (pushList.size() > 0) { - Map<String, StreamPushItem> pushItemMap = new HashMap<>(); for (StreamPushItem streamPushItem : pushList) { - pushItemMap.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem); + if (StringUtils.isEmpty(streamPushItem.getGbId())) { + pushItemMap.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem); + } } - zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{ - if (mediaList == null) return; - String dataStr = mediaList.getString("data"); - - Integer code = mediaList.getInteger("code"); - List<StreamPushItem> streamPushItems = null; - if (code == 0 ) { - if (dataStr != null) { - streamPushItems = handleJSON(dataStr, mediaServerItem); - } - } - - if (streamPushItems != null) { - for (StreamPushItem streamPushItem : streamPushItems) { - pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); - } - } - Collection<StreamPushItem> offlinePushItems = pushItemMap.values(); - if (offlinePushItems.size() > 0) { - String type = "PUSH"; - streamPushMapper.delAll(new ArrayList<>(offlinePushItems)); - for (StreamPushItem offlinePushItem : offlinePushItems) { - JSONObject jsonObject = new JSONObject(); - jsonObject.put("serverId", userSetup.getServerId()); - jsonObject.put("app", offlinePushItem.getApp()); - jsonObject.put("stream", offlinePushItem.getStream()); - jsonObject.put("register", false); - jsonObject.put("mediaServerId", mediaServerId); - redisCatchStorage.sendStreamChangeMsg(type, jsonObject); - // 绉婚櫎redis鍐呮祦鐨勪俊鎭� - redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlinePushItem.getApp(), offlinePushItem.getStream()); - } - } - })); } + if (mediaItems.size() > 0) { + for (MediaItem mediaItem : mediaItems) { + streamInfoPushItemMap.put(mediaItem.getApp() + mediaItem.getStream(), mediaItem); + } + } + zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{ + if (mediaList == null) return; + String dataStr = mediaList.getString("data"); + + Integer code = mediaList.getInteger("code"); + List<StreamPushItem> streamPushItems = null; + if (code == 0 ) { + if (dataStr != null) { + streamPushItems = handleJSON(dataStr, mediaServerItem); + } + } + + if (streamPushItems != null) { + for (StreamPushItem streamPushItem : streamPushItems) { + pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); + streamInfoPushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); + } + } + List<StreamPushItem> offlinePushItems = new ArrayList<>(pushItemMap.values()); + if (offlinePushItems.size() > 0) { + String type = "PUSH"; + int runLimit = 300; + if (offlinePushItems.size() > runLimit) { + for (int i = 0; i < offlinePushItems.size(); i += runLimit) { + int toIndex = i + runLimit; + if (i + runLimit > offlinePushItems.size()) { + toIndex = offlinePushItems.size(); + } + List<StreamPushItem> streamPushItemsSub = offlinePushItems.subList(i, toIndex); + streamPushMapper.delAll(streamPushItemsSub); + } + }else { + streamPushMapper.delAll(offlinePushItems); + } + + } + Collection<MediaItem> offlineMediaItemList = streamInfoPushItemMap.values(); + if (offlineMediaItemList.size() > 0) { + String type = "PUSH"; + for (MediaItem offlineMediaItem : offlineMediaItemList) { + JSONObject jsonObject = new JSONObject(); + jsonObject.put("serverId", userSetup.getServerId()); + jsonObject.put("app", offlineMediaItem.getApp()); + jsonObject.put("stream", offlineMediaItem.getStream()); + jsonObject.put("register", false); + jsonObject.put("mediaServerId", mediaServerId); + redisCatchStorage.sendStreamChangeMsg(type, jsonObject); + // 绉婚櫎redis鍐呮祦鐨勪俊鎭� + redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineMediaItem.getApp(), offlineMediaItem.getStream()); + } + } + })); } @Override public void zlmServerOffline(String mediaServerId) { - List<StreamPushItem> streamPushItems = streamPushMapper.selectAllByMediaServerId(mediaServerId); + List<StreamPushItem> streamPushItems = streamPushMapper.selectAllByMediaServerIdWithOutGbID(mediaServerId); // 绉婚櫎娌℃湁GBId鐨勬帹娴� streamPushMapper.deleteWithoutGBId(mediaServerId); gbStreamMapper.deleteWithoutGBId("push", mediaServerId); @@ -208,18 +280,18 @@ // 鍙戦�佹祦鍋滄娑堟伅 String type = "PUSH"; // 鍙戦�乺edis娑堟伅 - List<StreamInfo> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type); + List<MediaItem> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type); if (streamInfoList.size() > 0) { - for (StreamInfo streamInfo : streamInfoList) { + for (MediaItem mediaItem : streamInfoList) { + // 绉婚櫎redis鍐呮祦鐨勪俊鎭� + redisCatchStorage.removeStream(mediaServerId, type, mediaItem.getApp(), mediaItem.getStream()); JSONObject jsonObject = new JSONObject(); jsonObject.put("serverId", userSetup.getServerId()); - jsonObject.put("app", streamInfo.getApp()); - jsonObject.put("stream", streamInfo.getStreamId()); + jsonObject.put("app", mediaItem.getApp()); + jsonObject.put("stream", mediaItem.getStream()); jsonObject.put("register", false); jsonObject.put("mediaServerId", mediaServerId); redisCatchStorage.sendStreamChangeMsg(type, jsonObject); - // 绉婚櫎redis鍐呮祦鐨勪俊鎭� - redisCatchStorage.removeStream(mediaServerId, type, streamInfo.getApp(), streamInfo.getStreamId()); } } } @@ -228,4 +300,144 @@ public void clean() { } + + @Override + public boolean saveToRandomGB() { + List<StreamPushItem> streamPushItems = streamPushMapper.selectAll(); + long gbId = 100001; + for (StreamPushItem streamPushItem : streamPushItems) { + streamPushItem.setStreamType("push"); + streamPushItem.setStatus(true); + streamPushItem.setGbId("34020000004111" + gbId); + streamPushItem.setCreateStamp(System.currentTimeMillis()); + gbId ++; + } + int limitCount = 30; + + if (streamPushItems.size() > limitCount) { + for (int i = 0; i < streamPushItems.size(); i += limitCount) { + int toIndex = i + limitCount; + if (i + limitCount > streamPushItems.size()) { + toIndex = streamPushItems.size(); + } + gbStreamMapper.batchAdd(streamPushItems.subList(i, toIndex)); + } + }else { + gbStreamMapper.batchAdd(streamPushItems); + } + return true; + } + + @Override + public void batchAdd(List<StreamPushItem> streamPushItems) { + streamPushMapper.addAll(streamPushItems); + gbStreamMapper.batchAdd(streamPushItems); + // 鏌ユ壘寮�鍚簡鍏ㄩ儴鐩存挱娴佸叡浜殑涓婄骇骞冲彴 + List<ParentPlatform> parentPlatforms = parentPlatformMapper.selectAllAhareAllLiveStream(); + if (parentPlatforms.size() > 0) { + for (StreamPushItem stream : streamPushItems) { + for (ParentPlatform parentPlatform : parentPlatforms) { + stream.setCatalogId(parentPlatform.getCatalogId()); + stream.setPlatformId(parentPlatform.getServerGBId()); + String streamId = stream.getStream(); + StreamProxyItem streamProxyItem = platformGbStreamMapper.selectOne(stream.getApp(), streamId, parentPlatform.getServerGBId()); + if (streamProxyItem == null) { + platformGbStreamMapper.add(stream); + eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), stream, CatalogEvent.ADD); + }else { + if (!streamProxyItem.getGbId().equals(stream.getGbId())) { + // 姝ゆ祦浣跨敤鍙︿竴涓浗鏍嘔d宸茬粡涓庤骞冲彴鍏宠仈锛岀Щ闄ゆ璁板綍 + platformGbStreamMapper.delByAppAndStreamAndPlatform(stream.getApp(), streamId, parentPlatform.getServerGBId()); + platformGbStreamMapper.add(stream); + eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), stream, CatalogEvent.ADD); + stream.setGbId(streamProxyItem.getGbId()); + eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), stream, CatalogEvent.DEL); + } + } + } + } + } + } + + @Override + public void batchAddForUpload(List<StreamPushItem> streamPushItems, Map<String, List<String[]>> streamPushItemsForAll ) { + // 瀛樺偍鏁版嵁鍒皊tream_push琛� + streamPushMapper.addAll(streamPushItems); + List<StreamPushItem> streamPushItemForGbStream = streamPushItems.stream() + .filter(streamPushItem-> streamPushItem.getId() != null) + .collect(Collectors.toList()); + // 瀛樺偍鏁版嵁鍒癵b_stream琛紝 id浼氳繑鍥炲埌streamPushItemForGbStream閲� + if (streamPushItemForGbStream.size() > 0) { + gbStreamMapper.batchAdd(streamPushItemForGbStream); + } + // 鍘婚櫎娌℃湁ID涔熷氨鏄病鏈夊瓨鍌ㄥ埌鏁版嵁搴撶殑鏁版嵁 + List<StreamPushItem> streamPushItemsForPlatform = streamPushItemForGbStream.stream() + .filter(streamPushItem-> streamPushItem.getGbStreamId() != null) + .collect(Collectors.toList()); + + if (streamPushItemsForPlatform.size() > 0) { + List<StreamPushItem> streamPushItemListFroPlatform = new ArrayList<>(); + Map<String, List<StreamPushItem>> platformForEvent = new HashMap<>(); + // 閬嶅巻瀛樺偍缁撴灉锛屾煡鎵綼pp+Stream->platformId+catalogId鐨勫搴斿叧绯伙紝鐒跺悗鎵ц鎵归噺鍐欏叆 + for (StreamPushItem streamPushItem : streamPushItemsForPlatform) { + List<String[]> platFormInfoList = streamPushItemsForAll.get(streamPushItem.getApp() + streamPushItem.getStream()); + if (platFormInfoList != null) { + if (platFormInfoList.size() > 0) { + for (String[] platFormInfoArray : platFormInfoList) { + StreamPushItem streamPushItemForPlatform = new StreamPushItem(); + streamPushItemForPlatform.setGbStreamId(streamPushItem.getGbStreamId()); + if (platFormInfoArray.length > 0) { + // 鏁扮粍 platFormInfoArray 0 涓哄钩鍙癐D銆� 1涓虹洰褰旾D + streamPushItemForPlatform.setPlatformId(platFormInfoArray[0]); + + List<StreamPushItem> streamPushItemsInPlatform = platformForEvent.get(streamPushItem.getPlatformId()); + if (streamPushItemsInPlatform == null) { + streamPushItemsInPlatform = new ArrayList<>(); + platformForEvent.put(platFormInfoArray[0], streamPushItemsInPlatform); + } + // 涓哄彂閫侀�氱煡鏁寸悊鏁版嵁 + streamPushItemForPlatform.setApp(streamPushItem.getApp()); + streamPushItemForPlatform.setStream(streamPushItem.getStream()); + streamPushItemForPlatform.setGbId(streamPushItem.getGbId()); + streamPushItemsInPlatform.add(streamPushItemForPlatform); + } + if (platFormInfoArray.length > 1) { + streamPushItemForPlatform.setCatalogId(platFormInfoArray[1]); + } + streamPushItemListFroPlatform.add(streamPushItemForPlatform); + + + } + } + + } + } + platformGbStreamMapper.batchAdd(streamPushItemListFroPlatform); + // 鍙戦�侀�氱煡 + for (String platformId : platformForEvent.keySet()) { + eventPublisher.catalogEventPublishForStream( + platformId, platformForEvent.get(platformId).toArray(new GbStream[0]), CatalogEvent.ADD); + } + } + } + + @Override + public boolean batchStop(List<GbStream> gbStreams) { + if (gbStreams == null || gbStreams.size() == 0) { + return false; + } + gbStreamService.sendCatalogMsgs(gbStreams, CatalogEvent.DEL); + + int delStream = streamPushMapper.delAllForGbStream(gbStreams); + gbStreamMapper.batchDelForGbStream(gbStreams); + platformGbStreamMapper.delByGbStreams(gbStreams); + if (delStream > 0) { + for (GbStream gbStream : gbStreams) { + MediaServerItem mediaServerItem = mediaServerService.getOne(gbStream.getMediaServerId()); + zlmresTfulUtils.closeStreams(mediaServerItem, gbStream.getApp(), gbStream.getStream()); + } + + } + return true; + } } -- Gitblit v1.8.0