From 07a8ef9e256c70a3a5b15782add81dcad1e2ffc2 Mon Sep 17 00:00:00 2001 From: panlinlin <648540858@qq.com> Date: 星期五, 14 六月 2024 00:03:57 +0800 Subject: [PATCH] SIP只有一个监听时,直接返回 --- src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java | 206 +++++++++++++++++++++++++++++++++++++++++++-------- 1 files changed, 173 insertions(+), 33 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java old mode 100644 new mode 100755 index 8b8c839..e5847f6 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java @@ -1,36 +1,77 @@ package com.genersoft.iot.vmp.service.impl; import com.alibaba.fastjson2.JSON; +import com.baomidou.dynamic.datasource.annotation.DS; import com.genersoft.iot.vmp.common.InviteInfo; import com.genersoft.iot.vmp.common.InviteSessionStatus; import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.common.VideoManagerConstants; +import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; +import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent; import com.genersoft.iot.vmp.service.IInviteStreamService; -import com.genersoft.iot.vmp.service.bean.InviteErrorCallback; +import com.genersoft.iot.vmp.service.bean.ErrorCallback; +import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.redis.RedisUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.event.EventListener; import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; @Service +@DS("master") public class InviteStreamServiceImpl implements IInviteStreamService { private final Logger logger = LoggerFactory.getLogger(InviteStreamServiceImpl.class); - private final Map<String, List<InviteErrorCallback<Object>>> inviteErrorCallbackMap = new ConcurrentHashMap<>(); + private final Map<String, List<ErrorCallback<Object>>> inviteErrorCallbackMap = new ConcurrentHashMap<>(); @Autowired private RedisTemplate<Object, Object> redisTemplate; + @Autowired + private IVideoManagerStorage storage; + + /** + * 娴佸埌鏉ョ殑澶勭悊 + */ + @Async("taskExecutor") + @org.springframework.context.event.EventListener + public void onApplicationEvent(MediaArrivalEvent event) { +// if ("rtsp".equals(event.getSchema()) && "rtp".equals(event.getApp())) { +// +// } + } + + /** + * 娴佺寮�鐨勫鐞� + */ + @Async("taskExecutor") + @EventListener + public void onApplicationEvent(MediaDepartureEvent event) { + if ("rtsp".equals(event.getSchema()) && "rtp".equals(event.getApp())) { + InviteInfo inviteInfo = getInviteInfoByStream(null, event.getStream()); + if (inviteInfo != null && (inviteInfo.getType() == InviteSessionType.PLAY || inviteInfo.getType() == InviteSessionType.PLAYBACK)) { + removeInviteInfo(inviteInfo); + storage.stopPlay(inviteInfo.getDeviceId(), inviteInfo.getChannelId()); + } + } + } @Override public void updateInviteInfo(InviteInfo inviteInfo) { + updateInviteInfo(inviteInfo, null); + } + + @Override + public void updateInviteInfo(InviteInfo inviteInfo, Long time) { if (inviteInfo == null || (inviteInfo.getDeviceId() == null || inviteInfo.getChannelId() == null)) { logger.warn("[鏇存柊Invite淇℃伅]锛屽弬鏁颁笉鍏細 {}", JSON.toJSON(inviteInfo)); return; @@ -77,23 +118,54 @@ } String key = VideoManagerConstants.INVITE_PREFIX + - "_" + inviteInfoForUpdate.getType() + - "_" + inviteInfoForUpdate.getDeviceId() + - "_" + inviteInfoForUpdate.getChannelId() + - "_" + inviteInfoForUpdate.getStream(); - redisTemplate.opsForValue().set(key, inviteInfoForUpdate); + ":" + inviteInfoForUpdate.getType() + + ":" + inviteInfoForUpdate.getDeviceId() + + ":" + inviteInfoForUpdate.getChannelId() + + ":" + inviteInfoForUpdate.getStream()+ + ":" + inviteInfoForUpdate.getSsrcInfo().getSsrc(); + if (time != null && time > 0) { + redisTemplate.opsForValue().set(key, inviteInfoForUpdate, time, TimeUnit.SECONDS); + }else { + redisTemplate.opsForValue().set(key, inviteInfoForUpdate); + } + } + + @Override + public InviteInfo updateInviteInfoForStream(InviteInfo inviteInfo, String stream) { + + InviteInfo inviteInfoInDb = getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream()); + if (inviteInfoInDb == null) { + return null; + } + removeInviteInfo(inviteInfoInDb); + String key = VideoManagerConstants.INVITE_PREFIX + + ":" + inviteInfo.getType() + + ":" + inviteInfo.getDeviceId() + + ":" + inviteInfo.getChannelId() + + ":" + stream + + ":" + inviteInfo.getSsrcInfo().getSsrc(); + inviteInfoInDb.setStream(stream); + if (inviteInfoInDb.getSsrcInfo() != null) { + inviteInfoInDb.getSsrcInfo().setStream(stream); + } + redisTemplate.opsForValue().set(key, inviteInfoInDb); + return inviteInfoInDb; } @Override public InviteInfo getInviteInfo(InviteSessionType type, String deviceId, String channelId, String stream) { String key = VideoManagerConstants.INVITE_PREFIX + - "_" + (type != null ? type : "*") + - "_" + (deviceId != null ? deviceId : "*") + - "_" + (channelId != null ? channelId : "*") + - "_" + (stream != null ? stream : "*"); + ":" + (type != null ? type : "*") + + ":" + (deviceId != null ? deviceId : "*") + + ":" + (channelId != null ? channelId : "*") + + ":" + (stream != null ? stream : "*") + + ":*"; List<Object> scanResult = RedisUtil.scan(redisTemplate, key); - if (scanResult.size() != 1) { + if (scanResult.isEmpty()) { return null; + } + if (scanResult.size() != 1) { + logger.warn("[鑾峰彇InviteInfo] 鍙戠幇 key: {}瀛樺湪澶氭潯", key); } return (InviteInfo) redisTemplate.opsForValue().get(scanResult.get(0)); @@ -112,10 +184,11 @@ @Override public void removeInviteInfo(InviteSessionType type, String deviceId, String channelId, String stream) { String scanKey = VideoManagerConstants.INVITE_PREFIX + - "_" + (type != null ? type : "*") + - "_" + (deviceId != null ? deviceId : "*") + - "_" + (channelId != null ? channelId : "*") + - "_" + (stream != null ? stream : "*"); + ":" + (type != null ? type : "*") + + ":" + (deviceId != null ? deviceId : "*") + + ":" + (channelId != null ? channelId : "*") + + ":" + (stream != null ? stream : "*") + + ":*"; List<Object> scanResult = RedisUtil.scan(redisTemplate, scanKey); if (scanResult.size() > 0) { for (Object keyObj : scanResult) { @@ -141,9 +214,9 @@ } @Override - public void once(InviteSessionType type, String deviceId, String channelId, String stream, InviteErrorCallback<Object> callback) { + public void once(InviteSessionType type, String deviceId, String channelId, String stream, ErrorCallback<Object> callback) { String key = buildKey(type, deviceId, channelId, stream); - List<InviteErrorCallback<Object>> callbacks = inviteErrorCallbackMap.get(key); + List<ErrorCallback<Object>> callbacks = inviteErrorCallbackMap.get(key); if (callbacks == null) { callbacks = new CopyOnWriteArrayList<>(); inviteErrorCallbackMap.put(key, callbacks); @@ -152,27 +225,94 @@ } - @Override - public void call(InviteSessionType type, String deviceId, String channelId, String stream, int code, String msg, Object data) { - String key = buildKey(type, deviceId, channelId, stream); - List<InviteErrorCallback<Object>> callbacks = inviteErrorCallbackMap.get(key); - if (callbacks == null) { - return; - } - for (InviteErrorCallback<Object> callback : callbacks) { - callback.run(code, msg, data); - } - inviteErrorCallbackMap.remove(key); - } - private String buildKey(InviteSessionType type, String deviceId, String channelId, String stream) { - String key = type + "_" + deviceId + "_" + channelId; + String key = type + ":" + deviceId + ":" + channelId; // 濡傛灉ssrc鏈猲ull閭d箞鍙互瀹炵幇涓�涓�氶亾鍙兘涓�娆℃搷浣滐紝ssrc涓嶄负null鍒欏彲浠ユ敮鎸佷竴涓�氶亾澶氭invite if (stream != null) { - key += ("_" + stream); + key += (":" + stream); } return key; } + @Override + public void clearInviteInfo(String deviceId) { + removeInviteInfo(null, deviceId, null, null); + } + + @Override + public int getStreamInfoCount(String mediaServerId) { + int count = 0; + String key = VideoManagerConstants.INVITE_PREFIX + ":*:*:*:*:*"; + List<Object> scanResult = RedisUtil.scan(redisTemplate, key); + if (scanResult.size() == 0) { + return 0; + }else { + for (Object keyObj : scanResult) { + String keyStr = (String) keyObj; + InviteInfo inviteInfo = (InviteInfo) redisTemplate.opsForValue().get(keyStr); + if (inviteInfo != null && inviteInfo.getStreamInfo() != null && inviteInfo.getStreamInfo().getMediaServerId().equals(mediaServerId)) { + if (inviteInfo.getType().equals(InviteSessionType.DOWNLOAD) && inviteInfo.getStreamInfo().getProgress() == 1) { + continue; + } + count++; + } + } + } + return count; + } + + @Override + public void call(InviteSessionType type, String deviceId, String channelId, String stream, int code, String msg, Object data) { + String key = buildSubStreamKey(type, deviceId, channelId, stream); + List<ErrorCallback<Object>> callbacks = inviteErrorCallbackMap.get(key); + if (callbacks == null) { + return; + } + for (ErrorCallback<Object> callback : callbacks) { + callback.run(code, msg, data); + } + inviteErrorCallbackMap.remove(key); + } + + + private String buildSubStreamKey(InviteSessionType type, String deviceId, String channelId, String stream) { + String key = type + ":" + ":" + deviceId + ":" + channelId; + // 濡傛灉ssrc涓簄ull閭d箞鍙互瀹炵幇涓�涓�氶亾鍙兘涓�娆℃搷浣滐紝ssrc涓嶄负null鍒欏彲浠ユ敮鎸佷竴涓�氶亾澶氭invite + if (stream != null) { + key += (":" + stream); + } + return key; + } + + @Override + public InviteInfo getInviteInfoBySSRC(String ssrc) { + String key = VideoManagerConstants.INVITE_PREFIX + ":*:*:*:*:" + ssrc; + List<Object> scanResult = RedisUtil.scan(redisTemplate, key); + if (scanResult.size() != 1) { + return null; + } + + return (InviteInfo) redisTemplate.opsForValue().get(scanResult.get(0)); + } + + @Override + public InviteInfo updateInviteInfoForSSRC(InviteInfo inviteInfo, String ssrc) { + InviteInfo inviteInfoInDb = getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream()); + if (inviteInfoInDb == null) { + return null; + } + removeInviteInfo(inviteInfoInDb); + String key = VideoManagerConstants.INVITE_PREFIX + + ":" + inviteInfo.getType() + + ":" + inviteInfo.getDeviceId() + + ":" + inviteInfo.getChannelId() + + ":" + inviteInfo.getStream() + + ":" + ssrc; + if (inviteInfoInDb.getSsrcInfo() != null) { + inviteInfoInDb.getSsrcInfo().setSsrc(ssrc); + } + redisTemplate.opsForValue().set(key, inviteInfoInDb); + return inviteInfoInDb; + } } -- Gitblit v1.8.0