From 2b0af3be14d3f8ac28a1cb031e21dc3a69146d2b Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期一, 25 三月 2024 17:59:09 +0800 Subject: [PATCH] 支持hook --- src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java | 234 ++++++++++++++++++++++++++++++++-------------------------- 1 files changed, 128 insertions(+), 106 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java old mode 100644 new mode 100755 index ed59230..c5b7f58 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java @@ -1,23 +1,28 @@ package com.genersoft.iot.vmp.service.impl; -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONArray; -import com.alibaba.fastjson.JSONObject; -import com.alibaba.fastjson.TypeReference; +import com.alibaba.fastjson2.JSONObject; +import com.baomidou.dynamic.datasource.annotation.DS; +import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.MediaConfig; import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.gb28181.bean.*; +import com.genersoft.iot.vmp.gb28181.bean.GbStream; +import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; +import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; -import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; -import com.genersoft.iot.vmp.media.zlm.dto.*; +import com.genersoft.iot.vmp.media.service.IMediaServerService; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; +import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; +import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; +import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; +import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType; import com.genersoft.iot.vmp.service.IGbStreamService; -import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IStreamPushService; import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.dao.*; import com.genersoft.iot.vmp.utils.DateUtil; +import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo; import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; import org.slf4j.Logger; @@ -27,12 +32,13 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionStatus; -import org.springframework.util.StringUtils; +import org.springframework.util.ObjectUtils; import java.util.*; import java.util.stream.Collectors; @Service +@DS("master") public class StreamPushServiceImpl implements IStreamPushService { private final static Logger logger = LoggerFactory.getLogger(StreamPushServiceImpl.class); @@ -62,9 +68,6 @@ private EventPublisher eventPublisher; @Autowired - private ZLMRESTfulUtils zlmresTfulUtils; - - @Autowired private IRedisCatchStorage redisCatchStorage; @Autowired @@ -83,41 +86,36 @@ private MediaConfig mediaConfig; - @Override - public List<StreamPushItem> handleJSON(String jsonData, MediaServerItem mediaServerItem) { - if (jsonData == null) { + private List<StreamPushItem> handleJSON(List<StreamInfo> streamInfoList) { + if (streamInfoList == null || streamInfoList.isEmpty()) { return null; } - Map<String, StreamPushItem> result = new HashMap<>(); - - List<MediaItem> mediaItems = JSON.parseObject(jsonData, new TypeReference<List<MediaItem>>() {}); - for (MediaItem item : mediaItems) { - + for (StreamInfo streamInfo : streamInfoList) { // 涓嶄繚瀛樺浗鏍囨帹鐞嗕互鍙婃媺娴佷唬鐞嗙殑娴� - if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal() - || item.getOriginType() == OriginType.RTMP_PUSH.ordinal() - || item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) { - String key = item.getApp() + "_" + item.getStream(); + if (streamInfo.getOriginType() == OriginType.RTSP_PUSH.ordinal() + || streamInfo.getOriginType() == OriginType.RTMP_PUSH.ordinal() + || streamInfo.getOriginType() == OriginType.RTC_PUSH.ordinal() ) { + String key = streamInfo.getApp() + "_" + streamInfo.getStream(); StreamPushItem streamPushItem = result.get(key); if (streamPushItem == null) { - streamPushItem = transform(item); + streamPushItem = streamPushItem.instance(streamInfo); result.put(key, streamPushItem); } } } - return new ArrayList<>(result.values()); } + @Override - public StreamPushItem transform(MediaItem item) { + public StreamPushItem transform(OnStreamChangedHookParam item) { StreamPushItem streamPushItem = new StreamPushItem(); streamPushItem.setApp(item.getApp()); streamPushItem.setMediaServerId(item.getMediaServerId()); streamPushItem.setStream(item.getStream()); streamPushItem.setAliveSecond(item.getAliveSecond()); streamPushItem.setOriginSock(item.getOriginSock()); - streamPushItem.setTotalReaderCount(item.getTotalReaderCount()); + streamPushItem.setTotalReaderCount(item.getTotalReaderCount() + ""); streamPushItem.setOriginType(item.getOriginType()); streamPushItem.setOriginTypeStr(item.getOriginTypeStr()); streamPushItem.setOriginUrl(item.getOriginUrl()); @@ -159,15 +157,10 @@ gbStreamService.sendCatalogMsg(stream, CatalogEvent.DEL); platformGbStreamMapper.delByAppAndStream(stream.getApp(), stream.getStream()); int del = gbStreamMapper.del(stream.getApp(), stream.getStream()); - MediaServerItem mediaInfo = mediaServerService.getOne(stream.getMediaServerId()); - JSONObject mediaList = zlmresTfulUtils.getMediaList(mediaInfo, stream.getApp(), stream.getStream()); - if (mediaList != null) { - if (mediaList.getInteger("code") == 0) { - JSONArray data = mediaList.getJSONArray("data"); - if (data == null) { - streamPushMapper.del(stream.getApp(), stream.getStream()); - } - } + MediaServer mediaInfo = mediaServerService.getOne(stream.getMediaServerId()); + List<StreamInfo> mediaList = mediaServerService.getMediaList(mediaInfo, stream.getApp(), stream.getStream(), null); + if (mediaList != null && mediaList.isEmpty()) { + streamPushMapper.del(stream.getApp(), stream.getStream()); } return del > 0; } @@ -180,15 +173,18 @@ @Override public boolean stop(String app, String streamId) { + logger.info("[鎺ㄦ祦 ] 鍋滄娴侊細 {}/{}", app, streamId); StreamPushItem streamPushItem = streamPushMapper.selectOne(app, streamId); - gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL); + if (streamPushItem != null) { + gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL); + } platformGbStreamMapper.delByAppAndStream(app, streamId); gbStreamMapper.del(app, streamId); int delStream = streamPushMapper.del(app, streamId); if (delStream > 0) { - MediaServerItem mediaServerItem = mediaServerService.getOne(streamPushItem.getMediaServerId()); - zlmresTfulUtils.closeStreams(mediaServerItem,app, streamId); + MediaServer mediaServerItem = mediaServerService.getOne(streamPushItem.getMediaServerId()); + mediaServerService.closeStreams(mediaServerItem,app, streamId); } return true; } @@ -196,7 +192,7 @@ @Override public void zlmServerOnline(String mediaServerId) { // 鍚屾zlm鎺ㄦ祦淇℃伅 - MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); + MediaServer mediaServerItem = mediaServerService.getOne(mediaServerId); if (mediaServerItem == null) { return; } @@ -204,74 +200,81 @@ List<StreamPushItem> pushList = getPushList(mediaServerId); Map<String, StreamPushItem> pushItemMap = new HashMap<>(); // redis璁板綍 - List<MediaItem> mediaItems = redisCatchStorage.getStreams(mediaServerId, "PUSH"); - Map<String, MediaItem> streamInfoPushItemMap = new HashMap<>(); + List<OnStreamChangedHookParam> onStreamChangedHookParams = redisCatchStorage.getStreams(mediaServerId, "PUSH"); + Map<String, OnStreamChangedHookParam> streamInfoPushItemMap = new HashMap<>(); if (pushList.size() > 0) { for (StreamPushItem streamPushItem : pushList) { - if (StringUtils.isEmpty(streamPushItem.getGbId())) { + if (ObjectUtils.isEmpty(streamPushItem.getGbId())) { pushItemMap.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem); } } } - if (mediaItems.size() > 0) { - for (MediaItem mediaItem : mediaItems) { - streamInfoPushItemMap.put(mediaItem.getApp() + mediaItem.getStream(), mediaItem); + if (onStreamChangedHookParams.size() > 0) { + for (OnStreamChangedHookParam onStreamChangedHookParam : onStreamChangedHookParams) { + streamInfoPushItemMap.put(onStreamChangedHookParam.getApp() + onStreamChangedHookParam.getStream(), onStreamChangedHookParam); } } - zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{ - if (mediaList == null) { - return; + // 鑾峰彇鎵�鏈夋帹娴侀壌鏉冧俊鎭紝娓呯悊杩囨湡鐨� + List<StreamAuthorityInfo> allStreamAuthorityInfo = redisCatchStorage.getAllStreamAuthorityInfo(); + Map<String, StreamAuthorityInfo> streamAuthorityInfoInfoMap = new HashMap<>(); + for (StreamAuthorityInfo streamAuthorityInfo : allStreamAuthorityInfo) { + streamAuthorityInfoInfoMap.put(streamAuthorityInfo.getApp() + streamAuthorityInfo.getStream(), streamAuthorityInfo); + } + List<StreamInfo> mediaList = mediaServerService.getMediaList(mediaServerItem, null, null, null); + if (mediaList == null) { + return; + } + List<StreamPushItem> streamPushItems = handleJSON(mediaList); + if (streamPushItems != null) { + for (StreamPushItem streamPushItem : streamPushItems) { + pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); + streamInfoPushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); + streamAuthorityInfoInfoMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); } - String dataStr = mediaList.getString("data"); - - Integer code = mediaList.getInteger("code"); - List<StreamPushItem> streamPushItems = null; - if (code == 0 ) { - if (dataStr != null) { - streamPushItems = handleJSON(dataStr, mediaServerItem); - } - } - - if (streamPushItems != null) { - for (StreamPushItem streamPushItem : streamPushItems) { - pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); - streamInfoPushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); - } - } - List<StreamPushItem> offlinePushItems = new ArrayList<>(pushItemMap.values()); - if (offlinePushItems.size() > 0) { - String type = "PUSH"; - int runLimit = 300; - if (offlinePushItems.size() > runLimit) { - for (int i = 0; i < offlinePushItems.size(); i += runLimit) { - int toIndex = i + runLimit; - if (i + runLimit > offlinePushItems.size()) { - toIndex = offlinePushItems.size(); - } - List<StreamPushItem> streamPushItemsSub = offlinePushItems.subList(i, toIndex); - streamPushMapper.delAll(streamPushItemsSub); + } + List<StreamPushItem> offlinePushItems = new ArrayList<>(pushItemMap.values()); + if (offlinePushItems.size() > 0) { + String type = "PUSH"; + int runLimit = 300; + if (offlinePushItems.size() > runLimit) { + for (int i = 0; i < offlinePushItems.size(); i += runLimit) { + int toIndex = i + runLimit; + if (i + runLimit > offlinePushItems.size()) { + toIndex = offlinePushItems.size(); } - }else { - streamPushMapper.delAll(offlinePushItems); + List<StreamPushItem> streamPushItemsSub = offlinePushItems.subList(i, toIndex); + streamPushMapper.delAll(streamPushItemsSub); } + }else { + streamPushMapper.delAll(offlinePushItems); + } + } + Collection<OnStreamChangedHookParam> offlineOnStreamChangedHookParamList = streamInfoPushItemMap.values(); + if (offlineOnStreamChangedHookParamList.size() > 0) { + String type = "PUSH"; + for (OnStreamChangedHookParam offlineOnStreamChangedHookParam : offlineOnStreamChangedHookParamList) { + JSONObject jsonObject = new JSONObject(); + jsonObject.put("serverId", userSetting.getServerId()); + jsonObject.put("app", offlineOnStreamChangedHookParam.getApp()); + jsonObject.put("stream", offlineOnStreamChangedHookParam.getStream()); + jsonObject.put("register", false); + jsonObject.put("mediaServerId", mediaServerId); + redisCatchStorage.sendStreamChangeMsg(type, jsonObject); + // 绉婚櫎redis鍐呮祦鐨勪俊鎭� + redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineOnStreamChangedHookParam.getApp(), offlineOnStreamChangedHookParam.getStream()); + // 鍐椾綑鏁版嵁锛岃嚜宸辩郴缁熶腑鑷敤 + redisCatchStorage.removePushListItem(offlineOnStreamChangedHookParam.getApp(), offlineOnStreamChangedHookParam.getStream(), mediaServerItem.getId()); } - Collection<MediaItem> offlineMediaItemList = streamInfoPushItemMap.values(); - if (offlineMediaItemList.size() > 0) { - String type = "PUSH"; - for (MediaItem offlineMediaItem : offlineMediaItemList) { - JSONObject jsonObject = new JSONObject(); - jsonObject.put("serverId", userSetting.getServerId()); - jsonObject.put("app", offlineMediaItem.getApp()); - jsonObject.put("stream", offlineMediaItem.getStream()); - jsonObject.put("register", false); - jsonObject.put("mediaServerId", mediaServerId); - redisCatchStorage.sendStreamChangeMsg(type, jsonObject); - // 绉婚櫎redis鍐呮祦鐨勪俊鎭� - redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineMediaItem.getApp(), offlineMediaItem.getStream()); - } + } + + Collection<StreamAuthorityInfo> streamAuthorityInfos = streamAuthorityInfoInfoMap.values(); + if (streamAuthorityInfos.size() > 0) { + for (StreamAuthorityInfo streamAuthorityInfo : streamAuthorityInfos) { + // 绉婚櫎redis鍐呮祦鐨勪俊鎭� + redisCatchStorage.removeStreamAuthorityInfo(streamAuthorityInfo.getApp(), streamAuthorityInfo.getStream()); } - })); + } } @Override @@ -286,18 +289,21 @@ // 鍙戦�佹祦鍋滄娑堟伅 String type = "PUSH"; // 鍙戦�乺edis娑堟伅 - List<MediaItem> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type); + List<OnStreamChangedHookParam> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type); if (streamInfoList.size() > 0) { - for (MediaItem mediaItem : streamInfoList) { + for (OnStreamChangedHookParam onStreamChangedHookParam : streamInfoList) { // 绉婚櫎redis鍐呮祦鐨勪俊鎭� - redisCatchStorage.removeStream(mediaServerId, type, mediaItem.getApp(), mediaItem.getStream()); + redisCatchStorage.removeStream(mediaServerId, type, onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream()); JSONObject jsonObject = new JSONObject(); jsonObject.put("serverId", userSetting.getServerId()); - jsonObject.put("app", mediaItem.getApp()); - jsonObject.put("stream", mediaItem.getStream()); + jsonObject.put("app", onStreamChangedHookParam.getApp()); + jsonObject.put("stream", onStreamChangedHookParam.getStream()); jsonObject.put("register", false); jsonObject.put("mediaServerId", mediaServerId); redisCatchStorage.sendStreamChangeMsg(type, jsonObject); + + // 鍐椾綑鏁版嵁锛岃嚜宸辩郴缁熶腑鑷敤 + redisCatchStorage.removePushListItem(onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream(), mediaServerId); } } } @@ -346,7 +352,7 @@ // 瀛樺偍鏁版嵁鍒皊tream_push琛� streamPushMapper.addAll(streamPushItems); List<StreamPushItem> streamPushItemForGbStream = streamPushItems.stream() - .filter(streamPushItem-> streamPushItem.getId() != null) + .filter(streamPushItem-> streamPushItem.getGbId() != null) .collect(Collectors.toList()); // 瀛樺偍鏁版嵁鍒癵b_stream琛紝 id浼氳繑鍥炲埌streamPushItemForGbStream閲� if (streamPushItemForGbStream.size() > 0) { @@ -419,7 +425,7 @@ } } - if (streamPushItemListFroPlatform.size() > 0) { + if (!streamPushItemListFroPlatform.isEmpty()) { platformGbStreamMapper.batchAdd(streamPushItemListFroPlatform); // 鍙戦�侀�氱煡 for (String platformId : platformForEvent.keySet()) { @@ -427,7 +433,6 @@ platformId, platformForEvent.get(platformId), CatalogEvent.ADD); } } - } } @@ -443,8 +448,8 @@ int delStream = streamPushMapper.delAllForGbStream(gbStreams); if (delStream > 0) { for (GbStream gbStream : gbStreams) { - MediaServerItem mediaServerItem = mediaServerService.getOne(gbStream.getMediaServerId()); - zlmresTfulUtils.closeStreams(mediaServerItem, gbStream.getApp(), gbStream.getStream()); + MediaServer mediaServerItem = mediaServerService.getOne(gbStream.getMediaServerId()); + mediaServerService.closeStreams(mediaServerItem, gbStream.getApp(), gbStream.getStream()); } } @@ -486,13 +491,16 @@ stream.setUpdateTime(DateUtil.getNow()); stream.setCreateTime(DateUtil.getNow()); stream.setServerId(userSetting.getServerId()); + stream.setMediaServerId(mediaConfig.getId()); + stream.setSelf(true); + stream.setPushIng(true); // 鏀惧湪浜嬪姟鍐呮墽琛� boolean result = false; TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); try { int addStreamResult = streamPushMapper.add(stream); - if (!StringUtils.isEmpty(stream.getGbId())) { + if (!ObjectUtils.isEmpty(stream.getGbId())) { stream.setStreamType("push"); gbStreamMapper.add(stream); } @@ -507,6 +515,20 @@ @Override public List<String> getAllAppAndStream() { + return streamPushMapper.getAllAppAndStream(); } + + @Override + public ResourceBaseInfo getOverview() { + int total = streamPushMapper.getAllCount(); + int online = streamPushMapper.getAllOnline(userSetting.isUsePushingAsStatus()); + + return new ResourceBaseInfo(total, online); + } + + @Override + public Map<String, StreamPushItem> getAllAppAndStreamMap() { + return streamPushMapper.getAllAppAndStreamMap(); + } } -- Gitblit v1.8.0