From 150e7a31997f590eba879c3515f21821e9e68eb6 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期五, 22 三月 2024 16:45:18 +0800 Subject: [PATCH] 调整节点管理代码结构 --- src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java | 164 +++++++++++++++++++++++------------------------------- 1 files changed, 71 insertions(+), 93 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java index ea56055..c5b7f58 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java @@ -1,21 +1,22 @@ package com.genersoft.iot.vmp.service.impl; -import com.alibaba.fastjson2.JSON; -import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; -import com.alibaba.fastjson2.TypeReference; import com.baomidou.dynamic.datasource.annotation.DS; +import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.MediaConfig; import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.gb28181.bean.*; +import com.genersoft.iot.vmp.gb28181.bean.GbStream; +import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; +import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; -import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; -import com.genersoft.iot.vmp.media.zlm.dto.*; +import com.genersoft.iot.vmp.media.service.IMediaServerService; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; +import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; +import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType; import com.genersoft.iot.vmp.service.IGbStreamService; -import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.service.IStreamPushService; import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; @@ -67,9 +68,6 @@ private EventPublisher eventPublisher; @Autowired - private ZLMRESTfulUtils zlmresTfulUtils; - - @Autowired private IRedisCatchStorage redisCatchStorage; @Autowired @@ -88,32 +86,27 @@ private MediaConfig mediaConfig; - @Override - public List<StreamPushItem> handleJSON(String jsonData, MediaServer mediaServerItem) { - if (jsonData == null) { + private List<StreamPushItem> handleJSON(List<StreamInfo> streamInfoList) { + if (streamInfoList == null || streamInfoList.isEmpty()) { return null; } - Map<String, StreamPushItem> result = new HashMap<>(); - - List<OnStreamChangedHookParam> onStreamChangedHookParams = JSON.parseObject(jsonData, new TypeReference<List<OnStreamChangedHookParam>>() {}); - for (OnStreamChangedHookParam item : onStreamChangedHookParams) { - + for (StreamInfo streamInfo : streamInfoList) { // 涓嶄繚瀛樺浗鏍囨帹鐞嗕互鍙婃媺娴佷唬鐞嗙殑娴� - if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal() - || item.getOriginType() == OriginType.RTMP_PUSH.ordinal() - || item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) { - String key = item.getApp() + "_" + item.getStream(); + if (streamInfo.getOriginType() == OriginType.RTSP_PUSH.ordinal() + || streamInfo.getOriginType() == OriginType.RTMP_PUSH.ordinal() + || streamInfo.getOriginType() == OriginType.RTC_PUSH.ordinal() ) { + String key = streamInfo.getApp() + "_" + streamInfo.getStream(); StreamPushItem streamPushItem = result.get(key); if (streamPushItem == null) { - streamPushItem = transform(item); + streamPushItem = streamPushItem.instance(streamInfo); result.put(key, streamPushItem); } } } - return new ArrayList<>(result.values()); } + @Override public StreamPushItem transform(OnStreamChangedHookParam item) { StreamPushItem streamPushItem = new StreamPushItem(); @@ -165,14 +158,9 @@ platformGbStreamMapper.delByAppAndStream(stream.getApp(), stream.getStream()); int del = gbStreamMapper.del(stream.getApp(), stream.getStream()); MediaServer mediaInfo = mediaServerService.getOne(stream.getMediaServerId()); - JSONObject mediaList = zlmresTfulUtils.getMediaList(mediaInfo, stream.getApp(), stream.getStream()); - if (mediaList != null) { - if (mediaList.getInteger("code") == 0) { - JSONArray data = mediaList.getJSONArray("data"); - if (data == null) { - streamPushMapper.del(stream.getApp(), stream.getStream()); - } - } + List<StreamInfo> mediaList = mediaServerService.getMediaList(mediaInfo, stream.getApp(), stream.getStream(), null); + if (mediaList != null && mediaList.isEmpty()) { + streamPushMapper.del(stream.getApp(), stream.getStream()); } return del > 0; } @@ -196,7 +184,7 @@ int delStream = streamPushMapper.del(app, streamId); if (delStream > 0) { MediaServer mediaServerItem = mediaServerService.getOne(streamPushItem.getMediaServerId()); - zlmresTfulUtils.closeStreams(mediaServerItem,app, streamId); + mediaServerService.closeStreams(mediaServerItem,app, streamId); } return true; } @@ -232,71 +220,61 @@ for (StreamAuthorityInfo streamAuthorityInfo : allStreamAuthorityInfo) { streamAuthorityInfoInfoMap.put(streamAuthorityInfo.getApp() + streamAuthorityInfo.getStream(), streamAuthorityInfo); } - zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{ - if (mediaList == null) { - return; + List<StreamInfo> mediaList = mediaServerService.getMediaList(mediaServerItem, null, null, null); + if (mediaList == null) { + return; + } + List<StreamPushItem> streamPushItems = handleJSON(mediaList); + if (streamPushItems != null) { + for (StreamPushItem streamPushItem : streamPushItems) { + pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); + streamInfoPushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); + streamAuthorityInfoInfoMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); } - String dataStr = mediaList.getString("data"); - - Integer code = mediaList.getInteger("code"); - List<StreamPushItem> streamPushItems = null; - if (code == 0 ) { - if (dataStr != null) { - streamPushItems = handleJSON(dataStr, mediaServerItem); - } - } - - if (streamPushItems != null) { - for (StreamPushItem streamPushItem : streamPushItems) { - pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); - streamInfoPushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); - streamAuthorityInfoInfoMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); - } - } - List<StreamPushItem> offlinePushItems = new ArrayList<>(pushItemMap.values()); - if (offlinePushItems.size() > 0) { - String type = "PUSH"; - int runLimit = 300; - if (offlinePushItems.size() > runLimit) { - for (int i = 0; i < offlinePushItems.size(); i += runLimit) { - int toIndex = i + runLimit; - if (i + runLimit > offlinePushItems.size()) { - toIndex = offlinePushItems.size(); - } - List<StreamPushItem> streamPushItemsSub = offlinePushItems.subList(i, toIndex); - streamPushMapper.delAll(streamPushItemsSub); + } + List<StreamPushItem> offlinePushItems = new ArrayList<>(pushItemMap.values()); + if (offlinePushItems.size() > 0) { + String type = "PUSH"; + int runLimit = 300; + if (offlinePushItems.size() > runLimit) { + for (int i = 0; i < offlinePushItems.size(); i += runLimit) { + int toIndex = i + runLimit; + if (i + runLimit > offlinePushItems.size()) { + toIndex = offlinePushItems.size(); } - }else { - streamPushMapper.delAll(offlinePushItems); + List<StreamPushItem> streamPushItemsSub = offlinePushItems.subList(i, toIndex); + streamPushMapper.delAll(streamPushItemsSub); } - - } - Collection<OnStreamChangedHookParam> offlineOnStreamChangedHookParamList = streamInfoPushItemMap.values(); - if (offlineOnStreamChangedHookParamList.size() > 0) { - String type = "PUSH"; - for (OnStreamChangedHookParam offlineOnStreamChangedHookParam : offlineOnStreamChangedHookParamList) { - JSONObject jsonObject = new JSONObject(); - jsonObject.put("serverId", userSetting.getServerId()); - jsonObject.put("app", offlineOnStreamChangedHookParam.getApp()); - jsonObject.put("stream", offlineOnStreamChangedHookParam.getStream()); - jsonObject.put("register", false); - jsonObject.put("mediaServerId", mediaServerId); - redisCatchStorage.sendStreamChangeMsg(type, jsonObject); - // 绉婚櫎redis鍐呮祦鐨勪俊鎭� - redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineOnStreamChangedHookParam.getApp(), offlineOnStreamChangedHookParam.getStream()); - // 鍐椾綑鏁版嵁锛岃嚜宸辩郴缁熶腑鑷敤 - redisCatchStorage.removePushListItem(offlineOnStreamChangedHookParam.getApp(), offlineOnStreamChangedHookParam.getStream(), mediaServerItem.getId()); - } + }else { + streamPushMapper.delAll(offlinePushItems); } - Collection<StreamAuthorityInfo> streamAuthorityInfos = streamAuthorityInfoInfoMap.values(); - if (streamAuthorityInfos.size() > 0) { - for (StreamAuthorityInfo streamAuthorityInfo : streamAuthorityInfos) { - // 绉婚櫎redis鍐呮祦鐨勪俊鎭� - redisCatchStorage.removeStreamAuthorityInfo(streamAuthorityInfo.getApp(), streamAuthorityInfo.getStream()); - } + } + Collection<OnStreamChangedHookParam> offlineOnStreamChangedHookParamList = streamInfoPushItemMap.values(); + if (offlineOnStreamChangedHookParamList.size() > 0) { + String type = "PUSH"; + for (OnStreamChangedHookParam offlineOnStreamChangedHookParam : offlineOnStreamChangedHookParamList) { + JSONObject jsonObject = new JSONObject(); + jsonObject.put("serverId", userSetting.getServerId()); + jsonObject.put("app", offlineOnStreamChangedHookParam.getApp()); + jsonObject.put("stream", offlineOnStreamChangedHookParam.getStream()); + jsonObject.put("register", false); + jsonObject.put("mediaServerId", mediaServerId); + redisCatchStorage.sendStreamChangeMsg(type, jsonObject); + // 绉婚櫎redis鍐呮祦鐨勪俊鎭� + redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineOnStreamChangedHookParam.getApp(), offlineOnStreamChangedHookParam.getStream()); + // 鍐椾綑鏁版嵁锛岃嚜宸辩郴缁熶腑鑷敤 + redisCatchStorage.removePushListItem(offlineOnStreamChangedHookParam.getApp(), offlineOnStreamChangedHookParam.getStream(), mediaServerItem.getId()); } - })); + } + + Collection<StreamAuthorityInfo> streamAuthorityInfos = streamAuthorityInfoInfoMap.values(); + if (streamAuthorityInfos.size() > 0) { + for (StreamAuthorityInfo streamAuthorityInfo : streamAuthorityInfos) { + // 绉婚櫎redis鍐呮祦鐨勪俊鎭� + redisCatchStorage.removeStreamAuthorityInfo(streamAuthorityInfo.getApp(), streamAuthorityInfo.getStream()); + } + } } @Override @@ -471,7 +449,7 @@ if (delStream > 0) { for (GbStream gbStream : gbStreams) { MediaServer mediaServerItem = mediaServerService.getOne(gbStream.getMediaServerId()); - zlmresTfulUtils.closeStreams(mediaServerItem, gbStream.getApp(), gbStream.getStream()); + mediaServerService.closeStreams(mediaServerItem, gbStream.getApp(), gbStream.getStream()); } } -- Gitblit v1.8.0