From bc0319b3f338412aa18f73bd749057e9ea3a7125 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期一, 13 十二月 2021 17:20:23 +0800 Subject: [PATCH] 将device信息写入redis以提高sip处理速度 --- src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java | 127 ++++++++++++++++++++++++++++++------------ 1 files changed, 90 insertions(+), 37 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java index dcca0e5..ecbddc4 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java @@ -154,47 +154,74 @@ if (mediaServerItem == null) { return; } + // 鏁版嵁搴撹褰� List<StreamPushItem> pushList = getPushList(mediaServerId); + Map<String, StreamPushItem> pushItemMap = new HashMap<>(); + // redis璁板綍 + List<StreamInfo> streamInfoPushList = redisCatchStorage.getStreams(mediaServerId, "PUSH"); + Map<String, StreamInfo> streamInfoPushItemMap = new HashMap<>(); if (pushList.size() > 0) { - Map<String, StreamPushItem> pushItemMap = new HashMap<>(); for (StreamPushItem streamPushItem : pushList) { pushItemMap.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem); } - zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{ - if (mediaList == null) return; - String dataStr = mediaList.getString("data"); - - Integer code = mediaList.getInteger("code"); - List<StreamPushItem> streamPushItems = null; - if (code == 0 ) { - if (dataStr != null) { - streamPushItems = handleJSON(dataStr, mediaServerItem); - } - } - - if (streamPushItems != null) { - for (StreamPushItem streamPushItem : streamPushItems) { - pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); - } - } - Collection<StreamPushItem> offlinePushItems = pushItemMap.values(); - if (offlinePushItems.size() > 0) { - String type = "PUSH"; - streamPushMapper.delAll(new ArrayList<>(offlinePushItems)); - for (StreamPushItem offlinePushItem : offlinePushItems) { - JSONObject jsonObject = new JSONObject(); - jsonObject.put("serverId", userSetup.getServerId()); - jsonObject.put("app", offlinePushItem.getApp()); - jsonObject.put("stream", offlinePushItem.getStream()); - jsonObject.put("register", false); - jsonObject.put("mediaServerId", mediaServerId); - redisCatchStorage.sendStreamChangeMsg(type, jsonObject); - // 绉婚櫎redis鍐呮祦鐨勪俊鎭� - redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlinePushItem.getApp(), offlinePushItem.getStream()); - } - } - })); } + if (streamInfoPushList.size() > 0) { + for (StreamInfo streamInfo : streamInfoPushList) { + streamInfoPushItemMap.put(streamInfo.getApp() + streamInfo.getStreamId(), streamInfo); + } + } + zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{ + if (mediaList == null) return; + String dataStr = mediaList.getString("data"); + + Integer code = mediaList.getInteger("code"); + List<StreamPushItem> streamPushItems = null; + if (code == 0 ) { + if (dataStr != null) { + streamPushItems = handleJSON(dataStr, mediaServerItem); + } + } + + if (streamPushItems != null) { + for (StreamPushItem streamPushItem : streamPushItems) { + pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); + streamInfoPushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); + } + } + List<StreamPushItem> offlinePushItems = new ArrayList<>(pushItemMap.values()); + if (offlinePushItems.size() > 0) { + String type = "PUSH"; + int runLimit = 300; + if (offlinePushItems.size() > runLimit) { + for (int i = 0; i < offlinePushItems.size(); i += runLimit) { + int toIndex = i + runLimit; + if (i + runLimit > offlinePushItems.size()) { + toIndex = offlinePushItems.size(); + } + List<StreamPushItem> streamPushItemsSub = offlinePushItems.subList(i, toIndex); + streamPushMapper.delAll(streamPushItemsSub); + } + }else { + streamPushMapper.delAll(offlinePushItems); + } + + } + Collection<StreamInfo> offlineStreamInfoItems = streamInfoPushItemMap.values(); + if (offlineStreamInfoItems.size() > 0) { + String type = "PUSH"; + for (StreamInfo offlineStreamInfoItem : offlineStreamInfoItems) { + JSONObject jsonObject = new JSONObject(); + jsonObject.put("serverId", userSetup.getServerId()); + jsonObject.put("app", offlineStreamInfoItem.getApp()); + jsonObject.put("stream", offlineStreamInfoItem.getStreamId()); + jsonObject.put("register", false); + jsonObject.put("mediaServerId", mediaServerId); + redisCatchStorage.sendStreamChangeMsg(type, jsonObject); + // 绉婚櫎redis鍐呮祦鐨勪俊鎭� + redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineStreamInfoItem.getApp(), offlineStreamInfoItem.getStreamId()); + } + } + })); } @Override @@ -211,6 +238,8 @@ List<StreamInfo> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type); if (streamInfoList.size() > 0) { for (StreamInfo streamInfo : streamInfoList) { + // 绉婚櫎redis鍐呮祦鐨勪俊鎭� + redisCatchStorage.removeStream(mediaServerId, type, streamInfo.getApp(), streamInfo.getStreamId()); JSONObject jsonObject = new JSONObject(); jsonObject.put("serverId", userSetup.getServerId()); jsonObject.put("app", streamInfo.getApp()); @@ -218,8 +247,6 @@ jsonObject.put("register", false); jsonObject.put("mediaServerId", mediaServerId); redisCatchStorage.sendStreamChangeMsg(type, jsonObject); - // 绉婚櫎redis鍐呮祦鐨勪俊鎭� - redisCatchStorage.removeStream(mediaServerId, type, streamInfo.getApp(), streamInfo.getStreamId()); } } } @@ -228,4 +255,30 @@ public void clean() { } + + @Override + public boolean saveToRandomGB() { + List<StreamPushItem> streamPushItems = streamPushMapper.selectAll(); + long gbId = 100001; + for (StreamPushItem streamPushItem : streamPushItems) { + streamPushItem.setStreamType("push"); + streamPushItem.setStatus(true); + streamPushItem.setGbId("34020000004111" + gbId); + gbId ++; + } + int limitCount = 30; + + if (streamPushItems.size() > limitCount) { + for (int i = 0; i < streamPushItems.size(); i += limitCount) { + int toIndex = i + limitCount; + if (i + limitCount > streamPushItems.size()) { + toIndex = streamPushItems.size(); + } + gbStreamMapper.batchAdd(streamPushItems.subList(i, toIndex)); + } + }else { + gbStreamMapper.batchAdd(streamPushItems); + } + return true; + } } -- Gitblit v1.8.0