From cf23816c80771c9e22bb50d0d89feb34d2edca79 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期四, 13 一月 2022 16:42:58 +0800 Subject: [PATCH] 恢复合并的limit值 --- src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java | 179 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 files changed, 170 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java index 7928d5a..3fac37a 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java @@ -3,26 +3,28 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.TypeReference; +import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.conf.UserSetup; import com.genersoft.iot.vmp.gb28181.bean.GbStream; +import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; +import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; -import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; -import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; -import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; +import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig; +import com.genersoft.iot.vmp.media.zlm.dto.*; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IStreamPushService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.dao.GbStreamMapper; +import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper; import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper; import com.genersoft.iot.vmp.storager.dao.StreamPushMapper; import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.springframework.util.StringUtils; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; @Service public class StreamPushServiceImpl implements IStreamPushService { @@ -34,6 +36,9 @@ private StreamPushMapper streamPushMapper; @Autowired + private ParentPlatformMapper parentPlatformMapper; + + @Autowired private PlatformGbStreamMapper platformGbStreamMapper; @Autowired @@ -41,6 +46,9 @@ @Autowired private IRedisCatchStorage redisCatchStorage; + + @Autowired + private UserSetup userSetup; @Autowired private IMediaServerService mediaServerService; @@ -55,7 +63,9 @@ for (MediaItem item : mediaItems) { // 涓嶄繚瀛樺浗鏍囨帹鐞嗕互鍙婃媺娴佷唬鐞嗙殑娴� - if (item.getOriginType() == 1 || item.getOriginType() == 2 || item.getOriginType() == 8) { + if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal() + || item.getOriginType() == OriginType.RTMP_PUSH.ordinal() + || item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) { String key = item.getApp() + "_" + item.getStream(); StreamPushItem streamPushItem = result.get(key); if (streamPushItem == null) { @@ -63,7 +73,6 @@ result.put(key, streamPushItem); } } - } return new ArrayList<>(result.values()); @@ -97,10 +106,28 @@ } @Override + public List<StreamPushItem> getPushList(String mediaServerId) { + return streamPushMapper.selectAllByMediaServerId(mediaServerId); + } + + @Override public boolean saveToGB(GbStream stream) { stream.setStreamType("push"); stream.setStatus(true); int add = gbStreamMapper.add(stream); + // 鏌ユ壘寮�鍚簡鍏ㄩ儴鐩存挱娴佸叡浜殑涓婄骇骞冲彴 + List<ParentPlatform> parentPlatforms = parentPlatformMapper.selectAllAhareAllLiveStream(); + if (parentPlatforms.size() > 0) { + for (ParentPlatform parentPlatform : parentPlatforms) { + stream.setCatalogId(parentPlatform.getCatalogId()); + stream.setPlatformId(parentPlatform.getServerGBId()); + String streamId = stream.getStream(); + StreamProxyItem streamProxyItems = platformGbStreamMapper.selectOne(stream.getApp(), streamId, parentPlatform.getServerGBId()); + if (streamProxyItems == null) { + platformGbStreamMapper.add(stream); + } + } + } return add > 0; } @@ -135,4 +162,138 @@ return true; } + @Override + public void zlmServerOnline(String mediaServerId) { + // 鍚屾zlm鎺ㄦ祦淇℃伅 + MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); + if (mediaServerItem == null) { + return; + } + // 鏁版嵁搴撹褰� + List<StreamPushItem> pushList = getPushList(mediaServerId); + Map<String, StreamPushItem> pushItemMap = new HashMap<>(); + // redis璁板綍 + List<MediaItem> mediaItems = redisCatchStorage.getStreams(mediaServerId, "PUSH"); + Map<String, MediaItem> streamInfoPushItemMap = new HashMap<>(); + if (pushList.size() > 0) { + for (StreamPushItem streamPushItem : pushList) { + pushItemMap.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem); + } + } + if (mediaItems.size() > 0) { + for (MediaItem mediaItem : mediaItems) { + streamInfoPushItemMap.put(mediaItem.getApp() + mediaItem.getStream(), mediaItem); + } + } + zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{ + if (mediaList == null) return; + String dataStr = mediaList.getString("data"); + + Integer code = mediaList.getInteger("code"); + List<StreamPushItem> streamPushItems = null; + if (code == 0 ) { + if (dataStr != null) { + streamPushItems = handleJSON(dataStr, mediaServerItem); + } + } + + if (streamPushItems != null) { + for (StreamPushItem streamPushItem : streamPushItems) { + pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); + streamInfoPushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); + } + } + List<StreamPushItem> offlinePushItems = new ArrayList<>(pushItemMap.values()); + if (offlinePushItems.size() > 0) { + String type = "PUSH"; + int runLimit = 300; + if (offlinePushItems.size() > runLimit) { + for (int i = 0; i < offlinePushItems.size(); i += runLimit) { + int toIndex = i + runLimit; + if (i + runLimit > offlinePushItems.size()) { + toIndex = offlinePushItems.size(); + } + List<StreamPushItem> streamPushItemsSub = offlinePushItems.subList(i, toIndex); + streamPushMapper.delAll(streamPushItemsSub); + } + }else { + streamPushMapper.delAll(offlinePushItems); + } + + } + Collection<MediaItem> offlineMediaItemList = streamInfoPushItemMap.values(); + if (offlineMediaItemList.size() > 0) { + String type = "PUSH"; + for (MediaItem offlineMediaItem : offlineMediaItemList) { + JSONObject jsonObject = new JSONObject(); + jsonObject.put("serverId", userSetup.getServerId()); + jsonObject.put("app", offlineMediaItem.getApp()); + jsonObject.put("stream", offlineMediaItem.getStream()); + jsonObject.put("register", false); + jsonObject.put("mediaServerId", mediaServerId); + redisCatchStorage.sendStreamChangeMsg(type, jsonObject); + // 绉婚櫎redis鍐呮祦鐨勪俊鎭� + redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineMediaItem.getApp(), offlineMediaItem.getStream()); + } + } + })); + } + + @Override + public void zlmServerOffline(String mediaServerId) { + List<StreamPushItem> streamPushItems = streamPushMapper.selectAllByMediaServerId(mediaServerId); + // 绉婚櫎娌℃湁GBId鐨勬帹娴� + streamPushMapper.deleteWithoutGBId(mediaServerId); + gbStreamMapper.deleteWithoutGBId("push", mediaServerId); + // 鍏朵粬鐨勬祦璁剧疆鏈惎鐢� + gbStreamMapper.updateStatusByMediaServerId(mediaServerId, false); + // 鍙戦�佹祦鍋滄娑堟伅 + String type = "PUSH"; + // 鍙戦�乺edis娑堟伅 + List<MediaItem> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type); + if (streamInfoList.size() > 0) { + for (MediaItem mediaItem : streamInfoList) { + // 绉婚櫎redis鍐呮祦鐨勪俊鎭� + redisCatchStorage.removeStream(mediaServerId, type, mediaItem.getApp(), mediaItem.getStream()); + JSONObject jsonObject = new JSONObject(); + jsonObject.put("serverId", userSetup.getServerId()); + jsonObject.put("app", mediaItem.getApp()); + jsonObject.put("stream", mediaItem.getStream()); + jsonObject.put("register", false); + jsonObject.put("mediaServerId", mediaServerId); + redisCatchStorage.sendStreamChangeMsg(type, jsonObject); + } + } + } + + @Override + public void clean() { + + } + + @Override + public boolean saveToRandomGB() { + List<StreamPushItem> streamPushItems = streamPushMapper.selectAll(); + long gbId = 100001; + for (StreamPushItem streamPushItem : streamPushItems) { + streamPushItem.setStreamType("push"); + streamPushItem.setStatus(true); + streamPushItem.setGbId("34020000004111" + gbId); + gbId ++; + } + int limitCount = 30; + + if (streamPushItems.size() > limitCount) { + for (int i = 0; i < streamPushItems.size(); i += limitCount) { + int toIndex = i + limitCount; + if (i + limitCount > streamPushItems.size()) { + toIndex = streamPushItems.size(); + } + gbStreamMapper.batchAdd(streamPushItems.subList(i, toIndex)); + } + }else { + gbStreamMapper.batchAdd(streamPushItems); + } + return true; + } } -- Gitblit v1.8.0