From 407a5edebfe43395d37e414604e5c5100fd605a8 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期三, 17 四月 2024 23:31:48 +0800 Subject: [PATCH] 优化多wvp国标级联推流时推流信息的清理 --- src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java | 243 +++++++++++++++--------------------------------- 1 files changed, 77 insertions(+), 166 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java old mode 100644 new mode 100755 index ed73dd1..9dc86f8 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.service.impl; import com.alibaba.fastjson2.JSON; +import com.baomidou.dynamic.datasource.annotation.DS; import com.genersoft.iot.vmp.common.InviteInfo; import com.genersoft.iot.vmp.common.InviteSessionStatus; import com.genersoft.iot.vmp.common.InviteSessionType; @@ -20,6 +21,7 @@ import java.util.concurrent.CopyOnWriteArrayList; @Service +@DS("master") public class InviteStreamServiceImpl implements IInviteStreamService { private final Logger logger = LoggerFactory.getLogger(InviteStreamServiceImpl.class); @@ -77,23 +79,50 @@ } String key = VideoManagerConstants.INVITE_PREFIX + - "_" + inviteInfoForUpdate.getType() + - "_" + inviteInfoForUpdate.getDeviceId() + - "_" + inviteInfoForUpdate.getChannelId() + - "_" + inviteInfoForUpdate.getStream(); + ":" + inviteInfoForUpdate.getType() + + ":" + inviteInfoForUpdate.getDeviceId() + + ":" + inviteInfoForUpdate.getChannelId() + + ":" + inviteInfoForUpdate.getStream()+ + ":" + inviteInfoForUpdate.getSsrcInfo().getSsrc(); redisTemplate.opsForValue().set(key, inviteInfoForUpdate); + } + + @Override + public InviteInfo updateInviteInfoForStream(InviteInfo inviteInfo, String stream) { + + InviteInfo inviteInfoInDb = getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream()); + if (inviteInfoInDb == null) { + return null; + } + removeInviteInfo(inviteInfoInDb); + String key = VideoManagerConstants.INVITE_PREFIX + + ":" + inviteInfo.getType() + + ":" + inviteInfo.getDeviceId() + + ":" + inviteInfo.getChannelId() + + ":" + stream + + ":" + inviteInfo.getSsrcInfo().getSsrc(); + inviteInfoInDb.setStream(stream); + if (inviteInfoInDb.getSsrcInfo() != null) { + inviteInfoInDb.getSsrcInfo().setStream(stream); + } + redisTemplate.opsForValue().set(key, inviteInfoInDb); + return inviteInfoInDb; } @Override public InviteInfo getInviteInfo(InviteSessionType type, String deviceId, String channelId, String stream) { String key = VideoManagerConstants.INVITE_PREFIX + - "_" + (type != null ? type : "*") + - "_" + (deviceId != null ? deviceId : "*") + - "_" + (channelId != null ? channelId : "*") + - "_" + (stream != null ? stream : "*"); + ":" + (type != null ? type : "*") + + ":" + (deviceId != null ? deviceId : "*") + + ":" + (channelId != null ? channelId : "*") + + ":" + (stream != null ? stream : "*") + + ":*"; List<Object> scanResult = RedisUtil.scan(redisTemplate, key); - if (scanResult.size() != 1) { + if (scanResult.isEmpty()) { return null; + } + if (scanResult.size() != 1) { + logger.warn("[鑾峰彇InviteInfo] 鍙戠幇 key: {}瀛樺湪澶氭潯", key); } return (InviteInfo) redisTemplate.opsForValue().get(scanResult.get(0)); @@ -112,10 +141,11 @@ @Override public void removeInviteInfo(InviteSessionType type, String deviceId, String channelId, String stream) { String scanKey = VideoManagerConstants.INVITE_PREFIX + - "_" + (type != null ? type : "*") + - "_" + (deviceId != null ? deviceId : "*") + - "_" + (channelId != null ? channelId : "*") + - "_" + (stream != null ? stream : "*"); + ":" + (type != null ? type : "*") + + ":" + (deviceId != null ? deviceId : "*") + + ":" + (channelId != null ? channelId : "*") + + ":" + (stream != null ? stream : "*") + + ":*"; List<Object> scanResult = RedisUtil.scan(redisTemplate, scanKey); if (scanResult.size() > 0) { for (Object keyObj : scanResult) { @@ -152,24 +182,11 @@ } - @Override - public void call(InviteSessionType type, String deviceId, String channelId, String stream, int code, String msg, Object data) { - String key = buildKey(type, deviceId, channelId, stream); - List<ErrorCallback<Object>> callbacks = inviteErrorCallbackMap.get(key); - if (callbacks == null) { - return; - } - for (ErrorCallback<Object> callback : callbacks) { - callback.run(code, msg, data); - } - inviteErrorCallbackMap.remove(key); - } - private String buildKey(InviteSessionType type, String deviceId, String channelId, String stream) { - String key = type + "_" + deviceId + "_" + channelId; + String key = type + ":" + deviceId + ":" + channelId; // 濡傛灉ssrc鏈猲ull閭d箞鍙互瀹炵幇涓�涓�氶亾鍙兘涓�娆℃搷浣滐紝ssrc涓嶄负null鍒欏彲浠ユ敮鎸佷竴涓�氶亾澶氭invite if (stream != null) { - key += ("_" + stream); + key += (":" + stream); } return key; } @@ -183,7 +200,7 @@ @Override public int getStreamInfoCount(String mediaServerId) { int count = 0; - String key = VideoManagerConstants.INVITE_PREFIX + "_*_*_*_*"; + String key = VideoManagerConstants.INVITE_PREFIX + ":*:*:*:*:*"; List<Object> scanResult = RedisUtil.scan(redisTemplate, key); if (scanResult.size() == 0) { return 0; @@ -199,69 +216,9 @@ return count; } - /*======================璁惧涓诲瓙鐮佹祦閫昏緫START=========================*/ - @Override - public InviteInfo getInviteInfoByDeviceAndChannel(InviteSessionType type, String deviceId, String channelId, boolean isSubStream) { - return getInviteInfo(type, deviceId, channelId,isSubStream, null); - } - - @Override - public void removeInviteInfoByDeviceAndChannel(InviteSessionType inviteSessionType, String deviceId, String channelId, boolean isSubStream) { - removeInviteInfo(inviteSessionType, deviceId, channelId,isSubStream, null); - } - - @Override - public InviteInfo getInviteInfo(InviteSessionType type, String deviceId, String channelId,boolean isSubStream, String stream) { - String key = VideoManagerConstants.INVITE_PREFIX + - "_" + (type != null ? type : "*") + - "_" + (isSubStream ? "sub" : "main") + - "_" + (deviceId != null ? deviceId : "*") + - "_" + (channelId != null ? channelId : "*") + - "_" + (stream != null ? stream : "*"); - List<Object> scanResult = RedisUtil.scan(redisTemplate, key); - if (scanResult.size() != 1) { - return null; - } - return (InviteInfo) redisTemplate.opsForValue().get(scanResult.get(0)); - } - - @Override - public void removeInviteInfo(InviteSessionType type, String deviceId, String channelId, boolean isSubStream, String stream) { - String scanKey = VideoManagerConstants.INVITE_PREFIX + - "_" + (type != null ? type : "*") + - "_" + (isSubStream ? "sub" : "main") + - "_" + (deviceId != null ? deviceId : "*") + - "_" + (channelId != null ? channelId : "*") + - "_" + (stream != null ? stream : "*"); - List<Object> scanResult = RedisUtil.scan(redisTemplate, scanKey); - if (scanResult.size() > 0) { - for (Object keyObj : scanResult) { - String key = (String) keyObj; - InviteInfo inviteInfo = (InviteInfo) redisTemplate.opsForValue().get(key); - if (inviteInfo == null) { - continue; - } - redisTemplate.delete(key); - inviteErrorCallbackMap.remove(buildKey(type, deviceId, channelId, inviteInfo.getStream())); - } - } - } - - @Override - public void once(InviteSessionType type, String deviceId, String channelId, boolean isSubStream, String stream, ErrorCallback<Object> callback) { - String key = buildSubStreamKey(type, deviceId, channelId,isSubStream, stream); - List<ErrorCallback<Object>> callbacks = inviteErrorCallbackMap.get(key); - if (callbacks == null) { - callbacks = new CopyOnWriteArrayList<>(); - inviteErrorCallbackMap.put(key, callbacks); - } - callbacks.add(callback); - } - - @Override - public void call(InviteSessionType type, String deviceId, String channelId, boolean isSubStream, String stream, int code, String msg, Object data) { - String key = buildSubStreamKey(type, deviceId, channelId,isSubStream, stream); + public void call(InviteSessionType type, String deviceId, String channelId, String stream, int code, String msg, Object data) { + String key = buildSubStreamKey(type, deviceId, channelId, stream); List<ErrorCallback<Object>> callbacks = inviteErrorCallbackMap.get(key); if (callbacks == null) { return; @@ -273,89 +230,43 @@ } - private String buildSubStreamKey(InviteSessionType type, String deviceId, String channelId, boolean isSubStream, String stream) { - String key = type + "_" + (isSubStream ? "sub":"main") + "_" + deviceId + "_" + channelId; + private String buildSubStreamKey(InviteSessionType type, String deviceId, String channelId, String stream) { + String key = type + ":" + ":" + deviceId + ":" + channelId; // 濡傛灉ssrc涓簄ull閭d箞鍙互瀹炵幇涓�涓�氶亾鍙兘涓�娆℃搷浣滐紝ssrc涓嶄负null鍒欏彲浠ユ敮鎸佷竴涓�氶亾澶氭invite if (stream != null) { - key += ("_" + stream); + key += (":" + stream); } return key; } - @Override - public void updateInviteInfoSub(InviteInfo inviteInfo) { - if (inviteInfo == null || (inviteInfo.getDeviceId() == null || inviteInfo.getChannelId() == null)) { - logger.warn("[鏇存柊Invite淇℃伅]锛屽弬鏁颁笉鍏細 {}", JSON.toJSON(inviteInfo)); - return; - } - InviteInfo inviteInfoForUpdate = null; - - if (InviteSessionStatus.ready == inviteInfo.getStatus()) { - if (inviteInfo.getDeviceId() == null - || inviteInfo.getChannelId() == null - || inviteInfo.getType() == null - || inviteInfo.getStream() == null - ) { - return; - } - inviteInfoForUpdate = inviteInfo; - } else { - InviteInfo inviteInfoInRedis = getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), - inviteInfo.getChannelId(),inviteInfo.isSubStream(), inviteInfo.getStream()); - if (inviteInfoInRedis == null) { - logger.warn("[鏇存柊Invite淇℃伅]锛屾湭浠庣紦瀛樹腑璇诲彇鍒癐nvite淇℃伅锛� deviceId: {}, channel: {}, stream: {}", - inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream()); - return; - } - if (inviteInfo.getStreamInfo() != null) { - inviteInfoInRedis.setStreamInfo(inviteInfo.getStreamInfo()); - } - if (inviteInfo.getSsrcInfo() != null) { - inviteInfoInRedis.setSsrcInfo(inviteInfo.getSsrcInfo()); - } - if (inviteInfo.getStreamMode() != null) { - inviteInfoInRedis.setStreamMode(inviteInfo.getStreamMode()); - } - if (inviteInfo.getReceiveIp() != null) { - inviteInfoInRedis.setReceiveIp(inviteInfo.getReceiveIp()); - } - if (inviteInfo.getReceivePort() != null) { - inviteInfoInRedis.setReceivePort(inviteInfo.getReceivePort()); - } - if (inviteInfo.getStatus() != null) { - inviteInfoInRedis.setStatus(inviteInfo.getStatus()); - } - - inviteInfoForUpdate = inviteInfoInRedis; - - } - String key = VideoManagerConstants.INVITE_PREFIX + - "_" + inviteInfoForUpdate.getType() + - "_" + (inviteInfoForUpdate.isSubStream() ? "sub":"main") + - "_" + inviteInfoForUpdate.getDeviceId() + - "_" + inviteInfoForUpdate.getChannelId() + - "_" + inviteInfoForUpdate.getStream(); - redisTemplate.opsForValue().set(key, inviteInfoForUpdate); - } @Override - public InviteInfo getInviteInfoByStream(InviteSessionType type, String stream, boolean isSubStream) { - return getInviteInfo(type, null, null,isSubStream, stream); - } - - @Override - public List<Object> getInviteInfos(InviteSessionType type, String deviceId, String channelId, String stream) { - String key = VideoManagerConstants.INVITE_PREFIX + - "_" + (type != null ? type : "*") + - "_" + (deviceId != null ? deviceId : "*") + - "_" + (channelId != null ? channelId : "*") + - "_" + (stream != null ? stream : "*"); + public InviteInfo getInviteInfoBySSRC(String ssrc) { + String key = VideoManagerConstants.INVITE_PREFIX + ":*:*:*:*:" + ssrc; List<Object> scanResult = RedisUtil.scan(redisTemplate, key); - return scanResult; + if (scanResult.size() != 1) { + return null; + } + + return (InviteInfo) redisTemplate.opsForValue().get(scanResult.get(0)); } - /*======================璁惧涓诲瓙鐮佹祦閫昏緫END=========================*/ - - - - + @Override + public InviteInfo updateInviteInfoForSSRC(InviteInfo inviteInfo, String ssrc) { + InviteInfo inviteInfoInDb = getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream()); + if (inviteInfoInDb == null) { + return null; + } + removeInviteInfo(inviteInfoInDb); + String key = VideoManagerConstants.INVITE_PREFIX + + ":" + inviteInfo.getType() + + ":" + inviteInfo.getDeviceId() + + ":" + inviteInfo.getChannelId() + + ":" + inviteInfo.getStream() + + ":" + ssrc; + if (inviteInfoInDb.getSsrcInfo() != null) { + inviteInfoInDb.getSsrcInfo().setSsrc(ssrc); + } + redisTemplate.opsForValue().set(key, inviteInfoInDb); + return inviteInfoInDb; + } } -- Gitblit v1.8.0