From bf66f8f694607fdd41ff9d7f6149459eef96bb67 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期五, 11 二月 2022 16:45:06 +0800 Subject: [PATCH] 推流导入支持添加平台信息与目录信息 --- src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java | 33 +++++++++-- src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformGbStreamMapper.java | 15 +++++ src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java | 2 web_src/src/components/PushVideoList.vue | 2 src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEvent.java | 6 +- src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushUploadFileHandler.java | 81 ++++++++++++++++++++++---- src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java | 4 src/main/java/com/genersoft/iot/vmp/vmanager/bean/StreamPushExcelDto.java | 23 +++++++ src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java | 7 +- src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java | 2 10 files changed, 145 insertions(+), 30 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java index 426e2e5..8c8565b 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java @@ -131,7 +131,7 @@ } @Async - public void catalogEventPublishForStream(String platformId, List<GbStream> gbStreams, String type) { + public void catalogEventPublishForStream(String platformId, GbStream[] gbStreams, String type) { CatalogEvent outEvent = new CatalogEvent(this); outEvent.setGbStreams(gbStreams); outEvent.setType(type); @@ -141,8 +141,7 @@ @Async public void catalogEventPublishForStream(String platformId, GbStream gbStream, String type) { - List<GbStream> gbStreamList = new ArrayList<>(); - gbStreamList.add(gbStream); - catalogEventPublishForStream(platformId, gbStreamList, type); + GbStream[] gbStreams = {gbStream}; + catalogEventPublishForStream(platformId, gbStreams, type); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEvent.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEvent.java index c035b80..e343fec 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEvent.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEvent.java @@ -20,7 +20,7 @@ public static final String UPDATE = "UPDATE"; // 鏇存柊 private List<DeviceChannel> deviceChannels; - private List<GbStream> gbStreams; + private GbStream[] gbStreams; private String type; private String platformId; @@ -48,11 +48,11 @@ this.platformId = platformId; } - public List<GbStream> getGbStreams() { + public GbStream[] getGbStreams() { return gbStreams; } - public void setGbStreams(List<GbStream> gbStreams) { + public void setGbStreams(GbStream[] gbStreams) { this.gbStreams = gbStreams; } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java index 52442cc..997031b 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java @@ -94,7 +94,7 @@ if (event.getDeviceChannels() != null) { deviceChannelList.addAll(event.getDeviceChannels()); } - if (event.getGbStreams() != null && event.getGbStreams().size() > 0){ + if (event.getGbStreams() != null && event.getGbStreams().length > 0){ for (GbStream gbStream : event.getGbStreams()) { DeviceChannel deviceChannelByStream = gbStreamService.getDeviceChannelListByStream(gbStream, gbStream.getCatalogId(), parentPlatform.getDeviceGBId()); deviceChannelList.add(deviceChannelByStream); @@ -134,7 +134,7 @@ if (event.getDeviceChannels() != null) { deviceChannelList.addAll(event.getDeviceChannels()); } - if (event.getGbStreams() != null && event.getGbStreams().size() > 0){ + if (event.getGbStreams() != null && event.getGbStreams().length > 0){ for (GbStream gbStream : event.getGbStreams()) { DeviceChannel deviceChannelByStream = gbStreamService.getDeviceChannelListByStream(gbStream, gbStream.getCatalogId(), parentPlatform.getDeviceGBId()); deviceChannelList.add(deviceChannelByStream); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 6e6b7b5..34ee4a8 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -371,7 +371,7 @@ } } if (gbStreams.size() > 0) { - eventPublisher.catalogEventPublishForStream(null, gbStreams, CatalogEvent.ON); + eventPublisher.catalogEventPublishForStream(null, gbStreams.toArray(new GbStream[0]), CatalogEvent.ON); } }else { diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java index 53ca6c4..1d57d46 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java @@ -68,4 +68,6 @@ void batchAdd(List<StreamPushItem> streamPushExcelDtoList); boolean batchStop(List<GbStream> streamPushItems); + + void batchAddForUpload(String platformId, String catalogId, List<StreamPushItem> streamPushItems); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java index 2ce9234..ff31d8a 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java @@ -8,6 +8,7 @@ import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.GbStream; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; +import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; @@ -18,10 +19,7 @@ import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IStreamPushService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import com.genersoft.iot.vmp.storager.dao.GbStreamMapper; -import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper; -import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper; -import com.genersoft.iot.vmp.storager.dao.StreamPushMapper; +import com.genersoft.iot.vmp.storager.dao.*; import com.genersoft.iot.vmp.vmanager.bean.StreamPushExcelDto; import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; @@ -42,6 +40,9 @@ @Autowired private ParentPlatformMapper parentPlatformMapper; + + @Autowired + private PlatformCatalogMapper platformCatalogMapper; @Autowired private PlatformGbStreamMapper platformGbStreamMapper; @@ -95,13 +96,12 @@ streamPushItem.setMediaServerId(item.getMediaServerId()); streamPushItem.setStream(item.getStream()); streamPushItem.setAliveSecond(item.getAliveSecond()); - streamPushItem.setCreateStamp(item.getCreateStamp()); streamPushItem.setOriginSock(item.getOriginSock()); streamPushItem.setTotalReaderCount(item.getTotalReaderCount()); streamPushItem.setOriginType(item.getOriginType()); streamPushItem.setOriginTypeStr(item.getOriginTypeStr()); streamPushItem.setOriginUrl(item.getOriginUrl()); - streamPushItem.setCreateStamp(item.getCreateStamp()); + streamPushItem.setCreateStamp(item.getCreateStamp() * 1000); streamPushItem.setAliveSecond(item.getAliveSecond()); streamPushItem.setStatus(true); streamPushItem.setStreamType("push"); @@ -359,6 +359,27 @@ } @Override + public void batchAddForUpload(String platformId, String catalogId, List<StreamPushItem> streamPushItems) { + streamPushMapper.addAll(streamPushItems); + gbStreamMapper.batchAdd(streamPushItems); + if (platformId != null) { + ParentPlatform platform = parentPlatformMapper.getParentPlatByServerGBId(platformId); + if (platform != null) { + if (catalogId == null) { + catalogId = platform.getCatalogId(); + }else { + PlatformCatalog catalog = platformCatalogMapper.select(catalogId); + if (catalog == null) { + return; + } + } + platformGbStreamMapper.batchAdd(platformId, catalogId, streamPushItems); + eventPublisher.catalogEventPublishForStream(platformId, streamPushItems.toArray(new GbStream[0]), CatalogEvent.ADD); + } + } + } + + @Override public boolean batchStop(List<GbStream> gbStreams) { if (gbStreams == null || gbStreams.size() == 0) { return false; diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushUploadFileHandler.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushUploadFileHandler.java index 586b5b4..2f94709 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushUploadFileHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushUploadFileHandler.java @@ -7,10 +7,7 @@ import com.genersoft.iot.vmp.vmanager.bean.StreamPushExcelDto; import org.springframework.util.StringUtils; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import java.util.*; public class StreamPushUploadFileHandler extends AnalysisEventListener<StreamPushExcelDto> { @@ -18,10 +15,13 @@ private IStreamPushService pushService; private String defaultMediaServerId; private List<StreamPushItem> streamPushItems = new ArrayList<>(); + private Map<String, UploadData> streamPushItemsForPlatform = new HashMap<>(); private Set<String> streamPushStreamSet = new HashSet<>(); private Set<String> streamPushGBSet = new HashSet<>(); private List<String> errorStreamList = new ArrayList<>(); private List<String> errorGBList = new ArrayList<>(); + // 璇诲彇鏁伴噺璁℃暟鍣� + private int loadedSize = 0; public StreamPushUploadFileHandler(IStreamPushService pushService, String defaultMediaServerId, ErrorDataHandler errorDataHandler) { this.pushService = pushService; @@ -31,6 +31,16 @@ public interface ErrorDataHandler{ void handle(List<String> streams, List<String> gbId); + } + + private class UploadData{ + public String platformId; + public Map<String, List<StreamPushItem>> catalogData = new HashMap<>(); + public List<StreamPushItem> streamPushItems = new ArrayList<>(); + + public UploadData(String platformId) { + this.platformId = platformId; + } } @Override @@ -43,10 +53,10 @@ if (streamPushGBSet.contains(streamPushExcelDto.getGbId())) { errorGBList.add(streamPushExcelDto.getGbId()); } - if (streamPushStreamSet.contains(streamPushExcelDto.getApp() + streamPushExcelDto.getStream())) { + if (streamPushStreamSet.contains(streamPushExcelDto.getApp() + streamPushExcelDto.getStream() + streamPushExcelDto.getPlatformId())) { errorStreamList.add(streamPushExcelDto.getApp() + "/" + streamPushExcelDto.getStream()); } - if (streamPushGBSet.contains(streamPushExcelDto.getGbId()) || streamPushStreamSet.contains(streamPushExcelDto.getApp() + streamPushExcelDto.getStream())) { + if (streamPushGBSet.contains(streamPushExcelDto.getGbId()) || streamPushStreamSet.contains(streamPushExcelDto.getApp() + streamPushExcelDto.getStream() + streamPushExcelDto.getPlatformId())) { return; } @@ -62,24 +72,69 @@ streamPushItem.setOriginType(2); streamPushItem.setOriginTypeStr("rtsp_push"); streamPushItem.setTotalReaderCount("0"); - streamPushItems.add(streamPushItem); + streamPushItem.setPlatformId(streamPushExcelDto.getPlatformId()); + streamPushItem.setCatalogId(streamPushExcelDto.getCatalogId()); + if (StringUtils.isEmpty(streamPushExcelDto.getPlatformId())) { + streamPushItems.add(streamPushItem); + }else { + UploadData uploadData = streamPushItemsForPlatform.get(streamPushExcelDto.getPlatformId()); + if (uploadData == null) { + uploadData = new UploadData(streamPushExcelDto.getPlatformId()); + streamPushItemsForPlatform.put(streamPushExcelDto.getPlatformId(), uploadData); + } + if (!StringUtils.isEmpty(streamPushExcelDto.getCatalogId())) { + List<StreamPushItem> streamPushItems = uploadData.catalogData.get(streamPushExcelDto.getCatalogId()); + if (streamPushItems == null) { + streamPushItems = new ArrayList<>(); + uploadData.catalogData.put(streamPushExcelDto.getCatalogId(), streamPushItems); + } + streamPushItems.add(streamPushItem); + }else { + uploadData.streamPushItems.add(streamPushItem); + } + + } + streamPushGBSet.add(streamPushExcelDto.getGbId()); streamPushStreamSet.add(streamPushExcelDto.getApp()+streamPushExcelDto.getStream()); - if (streamPushItems.size() > 300) { - pushService.batchAdd(streamPushItems); - // 瀛樺偍瀹屾垚娓呯悊 list + loadedSize ++; + if (loadedSize > 1000) { + saveData(); streamPushItems.clear(); + streamPushItemsForPlatform.clear(); + loadedSize = 0; } + } @Override public void doAfterAllAnalysed(AnalysisContext analysisContext) { // 杩欓噷涔熻淇濆瓨鏁版嵁锛岀‘淇濇渶鍚庨仐鐣欑殑鏁版嵁涔熷瓨鍌ㄥ埌鏁版嵁搴� - if (streamPushItems.size() > 0) { - pushService.batchAdd(streamPushItems); - } + saveData(); streamPushGBSet.clear(); streamPushStreamSet.clear(); errorDataHandler.handle(errorStreamList, errorGBList); } + + private void saveData(){ + if (streamPushItems.size() > 0) { + pushService.batchAddForUpload(null, null, streamPushItems); + } + // 澶勭悊宸插垎閰嶅埌骞冲彴鐨勬祦 + if (streamPushItemsForPlatform.size() > 0){ + for (String platformId : streamPushItemsForPlatform.keySet()) { + UploadData uploadData = streamPushItemsForPlatform.get(platformId); + if (uploadData.streamPushItems.size() > 0) { + pushService.batchAddForUpload(platformId, null, uploadData.streamPushItems); + } + if (uploadData.catalogData.size() > 0) { + for (String catalogId : uploadData.catalogData.keySet()) { + if (uploadData.catalogData.get(catalogId).size() > 0) { + pushService.batchAddForUpload(platformId, catalogId, uploadData.catalogData.get(catalogId)); + } + } + } + } + } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformGbStreamMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformGbStreamMapper.java index e4639e7..1e15360 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformGbStreamMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformGbStreamMapper.java @@ -5,6 +5,7 @@ import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog; import com.genersoft.iot.vmp.gb28181.bean.PlatformGbStream; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; +import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import org.apache.ibatis.annotations.*; import org.springframework.stereotype.Repository; @@ -18,6 +19,17 @@ @Insert("REPLACE INTO platform_gb_stream (app, stream, platformId, catalogId) VALUES" + "('${app}', '${stream}', '${platformId}', '${catalogId}')") int add(PlatformGbStream platformGbStream); + + + @Insert("<script> " + + "REPLACE into platform_gb_stream " + + "(app, stream, platformId, catalogId) " + + "values " + + "<foreach collection='streamPushItems' index='index' item='item' separator=','> " + + "('${item.app}', '${item.stream}', '${platformId}', '${catalogId}')" + + "</foreach> " + + "</script>") + int batchAdd(String platformId, String catalogId, List<StreamPushItem> streamPushItems); @Delete("DELETE FROM platform_gb_stream WHERE app=#{app} AND stream=#{stream}") int delByAppAndStream(String app, String stream); @@ -82,4 +94,7 @@ "</foreach>" + "</script>") void delByGbStreams(List<GbStream> gbStreams); + + + } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/bean/StreamPushExcelDto.java b/src/main/java/com/genersoft/iot/vmp/vmanager/bean/StreamPushExcelDto.java index c0c7611..956f647 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/bean/StreamPushExcelDto.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/bean/StreamPushExcelDto.java @@ -16,6 +16,12 @@ @ExcelProperty("鍥芥爣ID") private String gbId; + @ExcelProperty("骞冲彴ID") + private String platformId; + + @ExcelProperty("鐩綍ID") + private String catalogId; + public String getName() { return name; } @@ -47,4 +53,21 @@ public void setGbId(String gbId) { this.gbId = gbId; } + + + public String getPlatformId() { + return platformId; + } + + public void setPlatformId(String platformId) { + this.platformId = platformId; + } + + public String getCatalogId() { + return catalogId; + } + + public void setCatalogId(String catalogId) { + this.catalogId = catalogId; + } } diff --git a/web_src/src/components/PushVideoList.vue b/web_src/src/components/PushVideoList.vue index 5fa162b..3831cdc 100644 --- a/web_src/src/components/PushVideoList.vue +++ b/web_src/src/components/PushVideoList.vue @@ -233,7 +233,7 @@ dateFormat: function(/** timestamp=0 **/) { let ts = arguments[0] || 0; let t,y,m,d,h,i,s; - t = ts ? new Date(ts*1000) : new Date(); + t = ts ? new Date(ts) : new Date(); y = t.getFullYear(); m = t.getMonth()+1; d = t.getDate(); -- Gitblit v1.8.0