package com.genersoft.iot.vmp.service.impl;
|
|
import com.alibaba.fastjson2.JSON;
|
import com.baomidou.dynamic.datasource.annotation.DS;
|
import com.genersoft.iot.vmp.common.InviteInfo;
|
import com.genersoft.iot.vmp.common.InviteSessionStatus;
|
import com.genersoft.iot.vmp.common.InviteSessionType;
|
import com.genersoft.iot.vmp.common.VideoManagerConstants;
|
import com.genersoft.iot.vmp.media.event.MediaArrivalEvent;
|
import com.genersoft.iot.vmp.media.event.MediaDepartureEvent;
|
import com.genersoft.iot.vmp.service.IInviteStreamService;
|
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
|
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
|
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.context.event.EventListener;
|
import org.springframework.data.redis.core.RedisTemplate;
|
import org.springframework.scheduling.annotation.Async;
|
import org.springframework.stereotype.Service;
|
|
import java.util.List;
|
import java.util.Map;
|
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.CopyOnWriteArrayList;
|
|
@Service
|
@DS("master")
|
public class InviteStreamServiceImpl implements IInviteStreamService {
|
|
private final Logger logger = LoggerFactory.getLogger(InviteStreamServiceImpl.class);
|
|
private final Map<String, List<ErrorCallback<Object>>> inviteErrorCallbackMap = new ConcurrentHashMap<>();
|
|
@Autowired
|
private RedisTemplate<Object, Object> redisTemplate;
|
|
@Autowired
|
private IVideoManagerStorage storage;
|
|
/**
|
* 流到来的处理
|
*/
|
@Async("taskExecutor")
|
@org.springframework.context.event.EventListener
|
public void onApplicationEvent(MediaArrivalEvent event) {
|
if ("rtsp".equals(event.getSchema()) && "rtp".equals(event.getApp())) {
|
|
}
|
}
|
|
/**
|
* 流离开的处理
|
*/
|
@Async("taskExecutor")
|
@EventListener
|
public void onApplicationEvent(MediaDepartureEvent event) {
|
if ("rtsp".equals(event.getSchema()) && "rtp".equals(event.getApp())) {
|
InviteInfo inviteInfo = getInviteInfoByStream(null, event.getStream());
|
if (inviteInfo != null && (inviteInfo.getType() == InviteSessionType.PLAY || inviteInfo.getType() == InviteSessionType.PLAYBACK)) {
|
removeInviteInfo(inviteInfo);
|
stopPlay(inviteInfo.getDeviceId(), inviteInfo.getChannelId());
|
}
|
}
|
}
|
|
@Override
|
public void updateInviteInfo(InviteInfo inviteInfo) {
|
if (inviteInfo == null || (inviteInfo.getDeviceId() == null || inviteInfo.getChannelId() == null)) {
|
logger.warn("[更新Invite信息],参数不全: {}", JSON.toJSON(inviteInfo));
|
return;
|
}
|
InviteInfo inviteInfoForUpdate = null;
|
|
if (InviteSessionStatus.ready == inviteInfo.getStatus()) {
|
if (inviteInfo.getDeviceId() == null
|
|| inviteInfo.getChannelId() == null
|
|| inviteInfo.getType() == null
|
|| inviteInfo.getStream() == null
|
) {
|
return;
|
}
|
inviteInfoForUpdate = inviteInfo;
|
} else {
|
InviteInfo inviteInfoInRedis = getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(),
|
inviteInfo.getChannelId(), inviteInfo.getStream());
|
if (inviteInfoInRedis == null) {
|
logger.warn("[更新Invite信息],未从缓存中读取到Invite信息: deviceId: {}, channel: {}, stream: {}",
|
inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
|
return;
|
}
|
if (inviteInfo.getStreamInfo() != null) {
|
inviteInfoInRedis.setStreamInfo(inviteInfo.getStreamInfo());
|
}
|
if (inviteInfo.getSsrcInfo() != null) {
|
inviteInfoInRedis.setSsrcInfo(inviteInfo.getSsrcInfo());
|
}
|
if (inviteInfo.getStreamMode() != null) {
|
inviteInfoInRedis.setStreamMode(inviteInfo.getStreamMode());
|
}
|
if (inviteInfo.getReceiveIp() != null) {
|
inviteInfoInRedis.setReceiveIp(inviteInfo.getReceiveIp());
|
}
|
if (inviteInfo.getReceivePort() != null) {
|
inviteInfoInRedis.setReceivePort(inviteInfo.getReceivePort());
|
}
|
if (inviteInfo.getStatus() != null) {
|
inviteInfoInRedis.setStatus(inviteInfo.getStatus());
|
}
|
|
inviteInfoForUpdate = inviteInfoInRedis;
|
|
}
|
String key = VideoManagerConstants.INVITE_PREFIX +
|
":" + inviteInfoForUpdate.getType() +
|
":" + inviteInfoForUpdate.getDeviceId() +
|
":" + inviteInfoForUpdate.getChannelId() +
|
":" + inviteInfoForUpdate.getStream()+
|
":" + inviteInfoForUpdate.getSsrcInfo().getSsrc();
|
redisTemplate.opsForValue().set(key, inviteInfoForUpdate);
|
}
|
|
@Override
|
public InviteInfo updateInviteInfoForStream(InviteInfo inviteInfo, String stream) {
|
|
InviteInfo inviteInfoInDb = getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
|
if (inviteInfoInDb == null) {
|
return null;
|
}
|
removeInviteInfo(inviteInfoInDb);
|
String key = VideoManagerConstants.INVITE_PREFIX +
|
":" + inviteInfo.getType() +
|
":" + inviteInfo.getDeviceId() +
|
":" + inviteInfo.getChannelId() +
|
":" + stream +
|
":" + inviteInfo.getSsrcInfo().getSsrc();
|
inviteInfoInDb.setStream(stream);
|
if (inviteInfoInDb.getSsrcInfo() != null) {
|
inviteInfoInDb.getSsrcInfo().setStream(stream);
|
}
|
redisTemplate.opsForValue().set(key, inviteInfoInDb);
|
return inviteInfoInDb;
|
}
|
|
@Override
|
public InviteInfo getInviteInfo(InviteSessionType type, String deviceId, String channelId, String stream) {
|
String key = VideoManagerConstants.INVITE_PREFIX +
|
":" + (type != null ? type : "*") +
|
":" + (deviceId != null ? deviceId : "*") +
|
":" + (channelId != null ? channelId : "*") +
|
":" + (stream != null ? stream : "*")
|
+ ":*";
|
List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
|
if (scanResult.isEmpty()) {
|
return null;
|
}
|
if (scanResult.size() != 1) {
|
logger.warn("[获取InviteInfo] 发现 key: {}存在多条", key);
|
}
|
|
return (InviteInfo) redisTemplate.opsForValue().get(scanResult.get(0));
|
}
|
|
@Override
|
public InviteInfo getInviteInfoByDeviceAndChannel(InviteSessionType type, String deviceId, String channelId) {
|
return getInviteInfo(type, deviceId, channelId, null);
|
}
|
|
@Override
|
public InviteInfo getInviteInfoByStream(InviteSessionType type, String stream) {
|
return getInviteInfo(type, null, null, stream);
|
}
|
|
@Override
|
public void removeInviteInfo(InviteSessionType type, String deviceId, String channelId, String stream) {
|
String scanKey = VideoManagerConstants.INVITE_PREFIX +
|
":" + (type != null ? type : "*") +
|
":" + (deviceId != null ? deviceId : "*") +
|
":" + (channelId != null ? channelId : "*") +
|
":" + (stream != null ? stream : "*") +
|
":*";
|
List<Object> scanResult = RedisUtil.scan(redisTemplate, scanKey);
|
if (scanResult.size() > 0) {
|
for (Object keyObj : scanResult) {
|
String key = (String) keyObj;
|
InviteInfo inviteInfo = (InviteInfo) redisTemplate.opsForValue().get(key);
|
if (inviteInfo == null) {
|
continue;
|
}
|
redisTemplate.delete(key);
|
inviteErrorCallbackMap.remove(buildKey(type, deviceId, channelId, inviteInfo.getStream()));
|
}
|
}
|
}
|
|
@Override
|
public void removeInviteInfoByDeviceAndChannel(InviteSessionType inviteSessionType, String deviceId, String channelId) {
|
removeInviteInfo(inviteSessionType, deviceId, channelId, null);
|
}
|
|
@Override
|
public void removeInviteInfo(InviteInfo inviteInfo) {
|
removeInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
|
}
|
|
@Override
|
public void once(InviteSessionType type, String deviceId, String channelId, String stream, ErrorCallback<Object> callback) {
|
String key = buildKey(type, deviceId, channelId, stream);
|
List<ErrorCallback<Object>> callbacks = inviteErrorCallbackMap.get(key);
|
if (callbacks == null) {
|
callbacks = new CopyOnWriteArrayList<>();
|
inviteErrorCallbackMap.put(key, callbacks);
|
}
|
callbacks.add(callback);
|
|
}
|
|
private String buildKey(InviteSessionType type, String deviceId, String channelId, String stream) {
|
String key = type + ":" + deviceId + ":" + channelId;
|
// 如果ssrc未null那么可以实现一个通道只能一次操作,ssrc不为null则可以支持一个通道多次invite
|
if (stream != null) {
|
key += (":" + stream);
|
}
|
return key;
|
}
|
|
|
@Override
|
public void clearInviteInfo(String deviceId) {
|
removeInviteInfo(null, deviceId, null, null);
|
}
|
|
@Override
|
public int getStreamInfoCount(String mediaServerId) {
|
int count = 0;
|
String key = VideoManagerConstants.INVITE_PREFIX + ":*:*:*:*:*";
|
List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
|
if (scanResult.size() == 0) {
|
return 0;
|
}else {
|
for (Object keyObj : scanResult) {
|
String keyStr = (String) keyObj;
|
InviteInfo inviteInfo = (InviteInfo) redisTemplate.opsForValue().get(keyStr);
|
if (inviteInfo != null && inviteInfo.getStreamInfo() != null && inviteInfo.getStreamInfo().getMediaServerId().equals(mediaServerId)) {
|
count++;
|
}
|
}
|
}
|
return count;
|
}
|
|
@Override
|
public void call(InviteSessionType type, String deviceId, String channelId, String stream, int code, String msg, Object data) {
|
String key = buildSubStreamKey(type, deviceId, channelId, stream);
|
List<ErrorCallback<Object>> callbacks = inviteErrorCallbackMap.get(key);
|
if (callbacks == null) {
|
return;
|
}
|
for (ErrorCallback<Object> callback : callbacks) {
|
callback.run(code, msg, data);
|
}
|
inviteErrorCallbackMap.remove(key);
|
}
|
|
|
private String buildSubStreamKey(InviteSessionType type, String deviceId, String channelId, String stream) {
|
String key = type + ":" + ":" + deviceId + ":" + channelId;
|
// 如果ssrc为null那么可以实现一个通道只能一次操作,ssrc不为null则可以支持一个通道多次invite
|
if (stream != null) {
|
key += (":" + stream);
|
}
|
return key;
|
}
|
|
@Override
|
public InviteInfo getInviteInfoBySSRC(String ssrc) {
|
String key = VideoManagerConstants.INVITE_PREFIX + ":*:*:*:*:" + ssrc;
|
List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
|
if (scanResult.size() != 1) {
|
return null;
|
}
|
|
return (InviteInfo) redisTemplate.opsForValue().get(scanResult.get(0));
|
}
|
|
@Override
|
public InviteInfo updateInviteInfoForSSRC(InviteInfo inviteInfo, String ssrc) {
|
InviteInfo inviteInfoInDb = getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
|
if (inviteInfoInDb == null) {
|
return null;
|
}
|
removeInviteInfo(inviteInfoInDb);
|
String key = VideoManagerConstants.INVITE_PREFIX +
|
":" + inviteInfo.getType() +
|
":" + inviteInfo.getDeviceId() +
|
":" + inviteInfo.getChannelId() +
|
":" + inviteInfo.getStream() +
|
":" + ssrc;
|
if (inviteInfoInDb.getSsrcInfo() != null) {
|
inviteInfoInDb.getSsrcInfo().setSsrc(ssrc);
|
}
|
redisTemplate.opsForValue().set(key, inviteInfoInDb);
|
return inviteInfoInDb;
|
}
|
}
|