src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java
@@ -131,7 +131,7 @@ } @Async public void catalogEventPublishForStream(String platformId, List<GbStream> gbStreams, String type) { public void catalogEventPublishForStream(String platformId, GbStream[] gbStreams, String type) { CatalogEvent outEvent = new CatalogEvent(this); outEvent.setGbStreams(gbStreams); outEvent.setType(type); @@ -141,8 +141,7 @@ @Async public void catalogEventPublishForStream(String platformId, GbStream gbStream, String type) { List<GbStream> gbStreamList = new ArrayList<>(); gbStreamList.add(gbStream); catalogEventPublishForStream(platformId, gbStreamList, type); GbStream[] gbStreams = {gbStream}; catalogEventPublishForStream(platformId, gbStreams, type); } } src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEvent.java
@@ -20,7 +20,7 @@ public static final String UPDATE = "UPDATE"; // 更新 private List<DeviceChannel> deviceChannels; private List<GbStream> gbStreams; private GbStream[] gbStreams; private String type; private String platformId; @@ -48,11 +48,11 @@ this.platformId = platformId; } public List<GbStream> getGbStreams() { public GbStream[] getGbStreams() { return gbStreams; } public void setGbStreams(List<GbStream> gbStreams) { public void setGbStreams(GbStream[] gbStreams) { this.gbStreams = gbStreams; } } src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java
@@ -94,7 +94,7 @@ if (event.getDeviceChannels() != null) { deviceChannelList.addAll(event.getDeviceChannels()); } if (event.getGbStreams() != null && event.getGbStreams().size() > 0){ if (event.getGbStreams() != null && event.getGbStreams().length > 0){ for (GbStream gbStream : event.getGbStreams()) { DeviceChannel deviceChannelByStream = gbStreamService.getDeviceChannelListByStream(gbStream, gbStream.getCatalogId(), parentPlatform.getDeviceGBId()); deviceChannelList.add(deviceChannelByStream); @@ -134,7 +134,7 @@ if (event.getDeviceChannels() != null) { deviceChannelList.addAll(event.getDeviceChannels()); } if (event.getGbStreams() != null && event.getGbStreams().size() > 0){ if (event.getGbStreams() != null && event.getGbStreams().length > 0){ for (GbStream gbStream : event.getGbStreams()) { DeviceChannel deviceChannelByStream = gbStreamService.getDeviceChannelListByStream(gbStream, gbStream.getCatalogId(), parentPlatform.getDeviceGBId()); deviceChannelList.add(deviceChannelByStream); src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -371,7 +371,7 @@ } } if (gbStreams.size() > 0) { eventPublisher.catalogEventPublishForStream(null, gbStreams, CatalogEvent.ON); eventPublisher.catalogEventPublishForStream(null, gbStreams.toArray(new GbStream[0]), CatalogEvent.ON); } }else { src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java
@@ -68,4 +68,6 @@ void batchAdd(List<StreamPushItem> streamPushExcelDtoList); boolean batchStop(List<GbStream> streamPushItems); void batchAddForUpload(String platformId, String catalogId, List<StreamPushItem> streamPushItems); } src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
@@ -8,6 +8,7 @@ import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.GbStream; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; @@ -18,10 +19,7 @@ import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IStreamPushService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.dao.GbStreamMapper; import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper; import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper; import com.genersoft.iot.vmp.storager.dao.StreamPushMapper; import com.genersoft.iot.vmp.storager.dao.*; import com.genersoft.iot.vmp.vmanager.bean.StreamPushExcelDto; import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; @@ -42,6 +40,9 @@ @Autowired private ParentPlatformMapper parentPlatformMapper; @Autowired private PlatformCatalogMapper platformCatalogMapper; @Autowired private PlatformGbStreamMapper platformGbStreamMapper; @@ -95,13 +96,12 @@ streamPushItem.setMediaServerId(item.getMediaServerId()); streamPushItem.setStream(item.getStream()); streamPushItem.setAliveSecond(item.getAliveSecond()); streamPushItem.setCreateStamp(item.getCreateStamp()); streamPushItem.setOriginSock(item.getOriginSock()); streamPushItem.setTotalReaderCount(item.getTotalReaderCount()); streamPushItem.setOriginType(item.getOriginType()); streamPushItem.setOriginTypeStr(item.getOriginTypeStr()); streamPushItem.setOriginUrl(item.getOriginUrl()); streamPushItem.setCreateStamp(item.getCreateStamp()); streamPushItem.setCreateStamp(item.getCreateStamp() * 1000); streamPushItem.setAliveSecond(item.getAliveSecond()); streamPushItem.setStatus(true); streamPushItem.setStreamType("push"); @@ -359,6 +359,27 @@ } @Override public void batchAddForUpload(String platformId, String catalogId, List<StreamPushItem> streamPushItems) { streamPushMapper.addAll(streamPushItems); gbStreamMapper.batchAdd(streamPushItems); if (platformId != null) { ParentPlatform platform = parentPlatformMapper.getParentPlatByServerGBId(platformId); if (platform != null) { if (catalogId == null) { catalogId = platform.getCatalogId(); }else { PlatformCatalog catalog = platformCatalogMapper.select(catalogId); if (catalog == null) { return; } } platformGbStreamMapper.batchAdd(platformId, catalogId, streamPushItems); eventPublisher.catalogEventPublishForStream(platformId, streamPushItems.toArray(new GbStream[0]), CatalogEvent.ADD); } } } @Override public boolean batchStop(List<GbStream> gbStreams) { if (gbStreams == null || gbStreams.size() == 0) { return false; src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushUploadFileHandler.java
@@ -7,10 +7,7 @@ import com.genersoft.iot.vmp.vmanager.bean.StreamPushExcelDto; import org.springframework.util.StringUtils; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.*; public class StreamPushUploadFileHandler extends AnalysisEventListener<StreamPushExcelDto> { @@ -18,10 +15,13 @@ private IStreamPushService pushService; private String defaultMediaServerId; private List<StreamPushItem> streamPushItems = new ArrayList<>(); private Map<String, UploadData> streamPushItemsForPlatform = new HashMap<>(); private Set<String> streamPushStreamSet = new HashSet<>(); private Set<String> streamPushGBSet = new HashSet<>(); private List<String> errorStreamList = new ArrayList<>(); private List<String> errorGBList = new ArrayList<>(); // 读取数量计数器 private int loadedSize = 0; public StreamPushUploadFileHandler(IStreamPushService pushService, String defaultMediaServerId, ErrorDataHandler errorDataHandler) { this.pushService = pushService; @@ -31,6 +31,16 @@ public interface ErrorDataHandler{ void handle(List<String> streams, List<String> gbId); } private class UploadData{ public String platformId; public Map<String, List<StreamPushItem>> catalogData = new HashMap<>(); public List<StreamPushItem> streamPushItems = new ArrayList<>(); public UploadData(String platformId) { this.platformId = platformId; } } @Override @@ -43,10 +53,10 @@ if (streamPushGBSet.contains(streamPushExcelDto.getGbId())) { errorGBList.add(streamPushExcelDto.getGbId()); } if (streamPushStreamSet.contains(streamPushExcelDto.getApp() + streamPushExcelDto.getStream())) { if (streamPushStreamSet.contains(streamPushExcelDto.getApp() + streamPushExcelDto.getStream() + streamPushExcelDto.getPlatformId())) { errorStreamList.add(streamPushExcelDto.getApp() + "/" + streamPushExcelDto.getStream()); } if (streamPushGBSet.contains(streamPushExcelDto.getGbId()) || streamPushStreamSet.contains(streamPushExcelDto.getApp() + streamPushExcelDto.getStream())) { if (streamPushGBSet.contains(streamPushExcelDto.getGbId()) || streamPushStreamSet.contains(streamPushExcelDto.getApp() + streamPushExcelDto.getStream() + streamPushExcelDto.getPlatformId())) { return; } @@ -62,24 +72,69 @@ streamPushItem.setOriginType(2); streamPushItem.setOriginTypeStr("rtsp_push"); streamPushItem.setTotalReaderCount("0"); streamPushItems.add(streamPushItem); streamPushItem.setPlatformId(streamPushExcelDto.getPlatformId()); streamPushItem.setCatalogId(streamPushExcelDto.getCatalogId()); if (StringUtils.isEmpty(streamPushExcelDto.getPlatformId())) { streamPushItems.add(streamPushItem); }else { UploadData uploadData = streamPushItemsForPlatform.get(streamPushExcelDto.getPlatformId()); if (uploadData == null) { uploadData = new UploadData(streamPushExcelDto.getPlatformId()); streamPushItemsForPlatform.put(streamPushExcelDto.getPlatformId(), uploadData); } if (!StringUtils.isEmpty(streamPushExcelDto.getCatalogId())) { List<StreamPushItem> streamPushItems = uploadData.catalogData.get(streamPushExcelDto.getCatalogId()); if (streamPushItems == null) { streamPushItems = new ArrayList<>(); uploadData.catalogData.put(streamPushExcelDto.getCatalogId(), streamPushItems); } streamPushItems.add(streamPushItem); }else { uploadData.streamPushItems.add(streamPushItem); } } streamPushGBSet.add(streamPushExcelDto.getGbId()); streamPushStreamSet.add(streamPushExcelDto.getApp()+streamPushExcelDto.getStream()); if (streamPushItems.size() > 300) { pushService.batchAdd(streamPushItems); // 存储完成清理 list loadedSize ++; if (loadedSize > 1000) { saveData(); streamPushItems.clear(); streamPushItemsForPlatform.clear(); loadedSize = 0; } } @Override public void doAfterAllAnalysed(AnalysisContext analysisContext) { // 这里也要保存数据,确保最后遗留的数据也存储到数据库 if (streamPushItems.size() > 0) { pushService.batchAdd(streamPushItems); } saveData(); streamPushGBSet.clear(); streamPushStreamSet.clear(); errorDataHandler.handle(errorStreamList, errorGBList); } private void saveData(){ if (streamPushItems.size() > 0) { pushService.batchAddForUpload(null, null, streamPushItems); } // 处理已分配到平台的流 if (streamPushItemsForPlatform.size() > 0){ for (String platformId : streamPushItemsForPlatform.keySet()) { UploadData uploadData = streamPushItemsForPlatform.get(platformId); if (uploadData.streamPushItems.size() > 0) { pushService.batchAddForUpload(platformId, null, uploadData.streamPushItems); } if (uploadData.catalogData.size() > 0) { for (String catalogId : uploadData.catalogData.keySet()) { if (uploadData.catalogData.get(catalogId).size() > 0) { pushService.batchAddForUpload(platformId, catalogId, uploadData.catalogData.get(catalogId)); } } } } } } } src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformGbStreamMapper.java
@@ -5,6 +5,7 @@ import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog; import com.genersoft.iot.vmp.gb28181.bean.PlatformGbStream; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import org.apache.ibatis.annotations.*; import org.springframework.stereotype.Repository; @@ -18,6 +19,17 @@ @Insert("REPLACE INTO platform_gb_stream (app, stream, platformId, catalogId) VALUES" + "('${app}', '${stream}', '${platformId}', '${catalogId}')") int add(PlatformGbStream platformGbStream); @Insert("<script> " + "REPLACE into platform_gb_stream " + "(app, stream, platformId, catalogId) " + "values " + "<foreach collection='streamPushItems' index='index' item='item' separator=','> " + "('${item.app}', '${item.stream}', '${platformId}', '${catalogId}')" + "</foreach> " + "</script>") int batchAdd(String platformId, String catalogId, List<StreamPushItem> streamPushItems); @Delete("DELETE FROM platform_gb_stream WHERE app=#{app} AND stream=#{stream}") int delByAppAndStream(String app, String stream); @@ -82,4 +94,7 @@ "</foreach>" + "</script>") void delByGbStreams(List<GbStream> gbStreams); } src/main/java/com/genersoft/iot/vmp/vmanager/bean/StreamPushExcelDto.java
@@ -16,6 +16,12 @@ @ExcelProperty("国标ID") private String gbId; @ExcelProperty("平台ID") private String platformId; @ExcelProperty("目录ID") private String catalogId; public String getName() { return name; } @@ -47,4 +53,21 @@ public void setGbId(String gbId) { this.gbId = gbId; } public String getPlatformId() { return platformId; } public void setPlatformId(String platformId) { this.platformId = platformId; } public String getCatalogId() { return catalogId; } public void setCatalogId(String catalogId) { this.catalogId = catalogId; } } web_src/src/components/PushVideoList.vue
@@ -233,7 +233,7 @@ dateFormat: function(/** timestamp=0 **/) { let ts = arguments[0] || 0; let t,y,m,d,h,i,s; t = ts ? new Date(ts*1000) : new Date(); t = ts ? new Date(ts) : new Date(); y = t.getFullYear(); m = t.getMonth()+1; d = t.getDate();