From 2b0af3be14d3f8ac28a1cb031e21dc3a69146d2b Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期一, 25 三月 2024 17:59:09 +0800 Subject: [PATCH] 支持hook --- src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java | 174 +++++++++++++++++++++++++-------------------------------- 1 files changed, 76 insertions(+), 98 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java index e2d7e68..c5b7f58 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java @@ -1,21 +1,22 @@ package com.genersoft.iot.vmp.service.impl; -import com.alibaba.fastjson2.JSON; -import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; -import com.alibaba.fastjson2.TypeReference; import com.baomidou.dynamic.datasource.annotation.DS; +import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.MediaConfig; import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.gb28181.bean.*; +import com.genersoft.iot.vmp.gb28181.bean.GbStream; +import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; +import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; -import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; -import com.genersoft.iot.vmp.media.zlm.dto.*; +import com.genersoft.iot.vmp.media.service.IMediaServerService; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; +import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; +import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType; import com.genersoft.iot.vmp.service.IGbStreamService; -import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IStreamPushService; import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; @@ -67,9 +68,6 @@ private EventPublisher eventPublisher; @Autowired - private ZLMRESTfulUtils zlmresTfulUtils; - - @Autowired private IRedisCatchStorage redisCatchStorage; @Autowired @@ -88,32 +86,27 @@ private MediaConfig mediaConfig; - @Override - public List<StreamPushItem> handleJSON(String jsonData, MediaServerItem mediaServerItem) { - if (jsonData == null) { + private List<StreamPushItem> handleJSON(List<StreamInfo> streamInfoList) { + if (streamInfoList == null || streamInfoList.isEmpty()) { return null; } - Map<String, StreamPushItem> result = new HashMap<>(); - - List<OnStreamChangedHookParam> onStreamChangedHookParams = JSON.parseObject(jsonData, new TypeReference<List<OnStreamChangedHookParam>>() {}); - for (OnStreamChangedHookParam item : onStreamChangedHookParams) { - + for (StreamInfo streamInfo : streamInfoList) { // 涓嶄繚瀛樺浗鏍囨帹鐞嗕互鍙婃媺娴佷唬鐞嗙殑娴� - if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal() - || item.getOriginType() == OriginType.RTMP_PUSH.ordinal() - || item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) { - String key = item.getApp() + "_" + item.getStream(); + if (streamInfo.getOriginType() == OriginType.RTSP_PUSH.ordinal() + || streamInfo.getOriginType() == OriginType.RTMP_PUSH.ordinal() + || streamInfo.getOriginType() == OriginType.RTC_PUSH.ordinal() ) { + String key = streamInfo.getApp() + "_" + streamInfo.getStream(); StreamPushItem streamPushItem = result.get(key); if (streamPushItem == null) { - streamPushItem = transform(item); + streamPushItem = streamPushItem.instance(streamInfo); result.put(key, streamPushItem); } } } - return new ArrayList<>(result.values()); } + @Override public StreamPushItem transform(OnStreamChangedHookParam item) { StreamPushItem streamPushItem = new StreamPushItem(); @@ -122,7 +115,7 @@ streamPushItem.setStream(item.getStream()); streamPushItem.setAliveSecond(item.getAliveSecond()); streamPushItem.setOriginSock(item.getOriginSock()); - streamPushItem.setTotalReaderCount(item.getTotalReaderCount()); + streamPushItem.setTotalReaderCount(item.getTotalReaderCount() + ""); streamPushItem.setOriginType(item.getOriginType()); streamPushItem.setOriginTypeStr(item.getOriginTypeStr()); streamPushItem.setOriginUrl(item.getOriginUrl()); @@ -164,15 +157,10 @@ gbStreamService.sendCatalogMsg(stream, CatalogEvent.DEL); platformGbStreamMapper.delByAppAndStream(stream.getApp(), stream.getStream()); int del = gbStreamMapper.del(stream.getApp(), stream.getStream()); - MediaServerItem mediaInfo = mediaServerService.getOne(stream.getMediaServerId()); - JSONObject mediaList = zlmresTfulUtils.getMediaList(mediaInfo, stream.getApp(), stream.getStream()); - if (mediaList != null) { - if (mediaList.getInteger("code") == 0) { - JSONArray data = mediaList.getJSONArray("data"); - if (data == null) { - streamPushMapper.del(stream.getApp(), stream.getStream()); - } - } + MediaServer mediaInfo = mediaServerService.getOne(stream.getMediaServerId()); + List<StreamInfo> mediaList = mediaServerService.getMediaList(mediaInfo, stream.getApp(), stream.getStream(), null); + if (mediaList != null && mediaList.isEmpty()) { + streamPushMapper.del(stream.getApp(), stream.getStream()); } return del > 0; } @@ -195,8 +183,8 @@ gbStreamMapper.del(app, streamId); int delStream = streamPushMapper.del(app, streamId); if (delStream > 0) { - MediaServerItem mediaServerItem = mediaServerService.getOne(streamPushItem.getMediaServerId()); - zlmresTfulUtils.closeStreams(mediaServerItem,app, streamId); + MediaServer mediaServerItem = mediaServerService.getOne(streamPushItem.getMediaServerId()); + mediaServerService.closeStreams(mediaServerItem,app, streamId); } return true; } @@ -204,7 +192,7 @@ @Override public void zlmServerOnline(String mediaServerId) { // 鍚屾zlm鎺ㄦ祦淇℃伅 - MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); + MediaServer mediaServerItem = mediaServerService.getOne(mediaServerId); if (mediaServerItem == null) { return; } @@ -232,71 +220,61 @@ for (StreamAuthorityInfo streamAuthorityInfo : allStreamAuthorityInfo) { streamAuthorityInfoInfoMap.put(streamAuthorityInfo.getApp() + streamAuthorityInfo.getStream(), streamAuthorityInfo); } - zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{ - if (mediaList == null) { - return; + List<StreamInfo> mediaList = mediaServerService.getMediaList(mediaServerItem, null, null, null); + if (mediaList == null) { + return; + } + List<StreamPushItem> streamPushItems = handleJSON(mediaList); + if (streamPushItems != null) { + for (StreamPushItem streamPushItem : streamPushItems) { + pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); + streamInfoPushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); + streamAuthorityInfoInfoMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); } - String dataStr = mediaList.getString("data"); - - Integer code = mediaList.getInteger("code"); - List<StreamPushItem> streamPushItems = null; - if (code == 0 ) { - if (dataStr != null) { - streamPushItems = handleJSON(dataStr, mediaServerItem); - } - } - - if (streamPushItems != null) { - for (StreamPushItem streamPushItem : streamPushItems) { - pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); - streamInfoPushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); - streamAuthorityInfoInfoMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); - } - } - List<StreamPushItem> offlinePushItems = new ArrayList<>(pushItemMap.values()); - if (offlinePushItems.size() > 0) { - String type = "PUSH"; - int runLimit = 300; - if (offlinePushItems.size() > runLimit) { - for (int i = 0; i < offlinePushItems.size(); i += runLimit) { - int toIndex = i + runLimit; - if (i + runLimit > offlinePushItems.size()) { - toIndex = offlinePushItems.size(); - } - List<StreamPushItem> streamPushItemsSub = offlinePushItems.subList(i, toIndex); - streamPushMapper.delAll(streamPushItemsSub); + } + List<StreamPushItem> offlinePushItems = new ArrayList<>(pushItemMap.values()); + if (offlinePushItems.size() > 0) { + String type = "PUSH"; + int runLimit = 300; + if (offlinePushItems.size() > runLimit) { + for (int i = 0; i < offlinePushItems.size(); i += runLimit) { + int toIndex = i + runLimit; + if (i + runLimit > offlinePushItems.size()) { + toIndex = offlinePushItems.size(); } - }else { - streamPushMapper.delAll(offlinePushItems); + List<StreamPushItem> streamPushItemsSub = offlinePushItems.subList(i, toIndex); + streamPushMapper.delAll(streamPushItemsSub); } - - } - Collection<OnStreamChangedHookParam> offlineOnStreamChangedHookParamList = streamInfoPushItemMap.values(); - if (offlineOnStreamChangedHookParamList.size() > 0) { - String type = "PUSH"; - for (OnStreamChangedHookParam offlineOnStreamChangedHookParam : offlineOnStreamChangedHookParamList) { - JSONObject jsonObject = new JSONObject(); - jsonObject.put("serverId", userSetting.getServerId()); - jsonObject.put("app", offlineOnStreamChangedHookParam.getApp()); - jsonObject.put("stream", offlineOnStreamChangedHookParam.getStream()); - jsonObject.put("register", false); - jsonObject.put("mediaServerId", mediaServerId); - redisCatchStorage.sendStreamChangeMsg(type, jsonObject); - // 绉婚櫎redis鍐呮祦鐨勪俊鎭� - redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineOnStreamChangedHookParam.getApp(), offlineOnStreamChangedHookParam.getStream()); - // 鍐椾綑鏁版嵁锛岃嚜宸辩郴缁熶腑鑷敤 - redisCatchStorage.removePushListItem(offlineOnStreamChangedHookParam.getApp(), offlineOnStreamChangedHookParam.getStream(), mediaServerItem.getId()); - } + }else { + streamPushMapper.delAll(offlinePushItems); } - Collection<StreamAuthorityInfo> streamAuthorityInfos = streamAuthorityInfoInfoMap.values(); - if (streamAuthorityInfos.size() > 0) { - for (StreamAuthorityInfo streamAuthorityInfo : streamAuthorityInfos) { - // 绉婚櫎redis鍐呮祦鐨勪俊鎭� - redisCatchStorage.removeStreamAuthorityInfo(streamAuthorityInfo.getApp(), streamAuthorityInfo.getStream()); - } + } + Collection<OnStreamChangedHookParam> offlineOnStreamChangedHookParamList = streamInfoPushItemMap.values(); + if (offlineOnStreamChangedHookParamList.size() > 0) { + String type = "PUSH"; + for (OnStreamChangedHookParam offlineOnStreamChangedHookParam : offlineOnStreamChangedHookParamList) { + JSONObject jsonObject = new JSONObject(); + jsonObject.put("serverId", userSetting.getServerId()); + jsonObject.put("app", offlineOnStreamChangedHookParam.getApp()); + jsonObject.put("stream", offlineOnStreamChangedHookParam.getStream()); + jsonObject.put("register", false); + jsonObject.put("mediaServerId", mediaServerId); + redisCatchStorage.sendStreamChangeMsg(type, jsonObject); + // 绉婚櫎redis鍐呮祦鐨勪俊鎭� + redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineOnStreamChangedHookParam.getApp(), offlineOnStreamChangedHookParam.getStream()); + // 鍐椾綑鏁版嵁锛岃嚜宸辩郴缁熶腑鑷敤 + redisCatchStorage.removePushListItem(offlineOnStreamChangedHookParam.getApp(), offlineOnStreamChangedHookParam.getStream(), mediaServerItem.getId()); } - })); + } + + Collection<StreamAuthorityInfo> streamAuthorityInfos = streamAuthorityInfoInfoMap.values(); + if (streamAuthorityInfos.size() > 0) { + for (StreamAuthorityInfo streamAuthorityInfo : streamAuthorityInfos) { + // 绉婚櫎redis鍐呮祦鐨勪俊鎭� + redisCatchStorage.removeStreamAuthorityInfo(streamAuthorityInfo.getApp(), streamAuthorityInfo.getStream()); + } + } } @Override @@ -470,8 +448,8 @@ int delStream = streamPushMapper.delAllForGbStream(gbStreams); if (delStream > 0) { for (GbStream gbStream : gbStreams) { - MediaServerItem mediaServerItem = mediaServerService.getOne(gbStream.getMediaServerId()); - zlmresTfulUtils.closeStreams(mediaServerItem, gbStream.getApp(), gbStream.getStream()); + MediaServer mediaServerItem = mediaServerService.getOne(gbStream.getMediaServerId()); + mediaServerService.closeStreams(mediaServerItem, gbStream.getApp(), gbStream.getStream()); } } -- Gitblit v1.8.0