package com.genersoft.iot.vmp.service.impl; import com.alibaba.fastjson2.JSON; import com.genersoft.iot.vmp.common.InviteInfo; import com.genersoft.iot.vmp.common.InviteSessionStatus; import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.service.IInviteStreamService; import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.utils.redis.RedisUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; @Service public class InviteStreamServiceImpl implements IInviteStreamService { private final Logger logger = LoggerFactory.getLogger(InviteStreamServiceImpl.class); private final Map>> inviteErrorCallbackMap = new ConcurrentHashMap<>(); @Autowired private RedisTemplate redisTemplate; @Override public void updateInviteInfo(InviteInfo inviteInfo) { if (inviteInfo == null || (inviteInfo.getDeviceId() == null || inviteInfo.getChannelId() == null)) { logger.warn("[更新Invite信息],参数不全: {}", JSON.toJSON(inviteInfo)); return; } InviteInfo inviteInfoForUpdate = null; if (InviteSessionStatus.ready == inviteInfo.getStatus()) { if (inviteInfo.getDeviceId() == null || inviteInfo.getChannelId() == null || inviteInfo.getType() == null || inviteInfo.getStream() == null ) { return; } inviteInfoForUpdate = inviteInfo; } else { InviteInfo inviteInfoInRedis = getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream()); if (inviteInfoInRedis == null) { logger.warn("[更新Invite信息],未从缓存中读取到Invite信息: deviceId: {}, channel: {}, stream: {}", inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream()); return; } if (inviteInfo.getStreamInfo() != null) { inviteInfoInRedis.setStreamInfo(inviteInfo.getStreamInfo()); } if (inviteInfo.getSsrcInfo() != null) { inviteInfoInRedis.setSsrcInfo(inviteInfo.getSsrcInfo()); } if (inviteInfo.getStreamMode() != null) { inviteInfoInRedis.setStreamMode(inviteInfo.getStreamMode()); } if (inviteInfo.getReceiveIp() != null) { inviteInfoInRedis.setReceiveIp(inviteInfo.getReceiveIp()); } if (inviteInfo.getReceivePort() != null) { inviteInfoInRedis.setReceivePort(inviteInfo.getReceivePort()); } if (inviteInfo.getStatus() != null) { inviteInfoInRedis.setStatus(inviteInfo.getStatus()); } inviteInfoForUpdate = inviteInfoInRedis; } String key = VideoManagerConstants.INVITE_PREFIX + ":" + inviteInfoForUpdate.getType() + ":" + inviteInfoForUpdate.getDeviceId() + ":" + inviteInfoForUpdate.getChannelId() + ":" + inviteInfoForUpdate.getStream()+ ":" + inviteInfoForUpdate.getSsrcInfo().getSsrc(); redisTemplate.opsForValue().set(key, inviteInfoForUpdate); } @Override public InviteInfo updateInviteInfoForStream(InviteInfo inviteInfo, String stream) { InviteInfo inviteInfoInDb = getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream()); if (inviteInfoInDb == null) { return null; } removeInviteInfo(inviteInfoInDb); String key = VideoManagerConstants.INVITE_PREFIX + ":" + inviteInfo.getType() + ":" + inviteInfo.getDeviceId() + ":" + inviteInfo.getChannelId() + ":" + stream + ":" + inviteInfo.getSsrcInfo().getSsrc(); inviteInfoInDb.setStream(stream); if (inviteInfoInDb.getSsrcInfo() != null) { inviteInfoInDb.getSsrcInfo().setStream(stream); } redisTemplate.opsForValue().set(key, inviteInfoInDb); return inviteInfoInDb; } @Override public InviteInfo getInviteInfo(InviteSessionType type, String deviceId, String channelId, String stream) { String key = VideoManagerConstants.INVITE_PREFIX + ":" + (type != null ? type : "*") + ":" + (deviceId != null ? deviceId : "*") + ":" + (channelId != null ? channelId : "*") + ":" + (stream != null ? stream : "*") + ":*"; List scanResult = RedisUtil.scan(redisTemplate, key); if (scanResult.size() != 1) { return null; } return (InviteInfo) redisTemplate.opsForValue().get(scanResult.get(0)); } @Override public InviteInfo getInviteInfoByDeviceAndChannel(InviteSessionType type, String deviceId, String channelId) { return getInviteInfo(type, deviceId, channelId, null); } @Override public InviteInfo getInviteInfoByStream(InviteSessionType type, String stream) { return getInviteInfo(type, null, null, stream); } @Override public void removeInviteInfo(InviteSessionType type, String deviceId, String channelId, String stream) { String scanKey = VideoManagerConstants.INVITE_PREFIX + ":" + (type != null ? type : "*") + ":" + (deviceId != null ? deviceId : "*") + ":" + (channelId != null ? channelId : "*") + ":" + (stream != null ? stream : "*") + ":*"; List scanResult = RedisUtil.scan(redisTemplate, scanKey); if (scanResult.size() > 0) { for (Object keyObj : scanResult) { String key = (String) keyObj; InviteInfo inviteInfo = (InviteInfo) redisTemplate.opsForValue().get(key); if (inviteInfo == null) { continue; } redisTemplate.delete(key); inviteErrorCallbackMap.remove(buildKey(type, deviceId, channelId, inviteInfo.getStream())); } } } @Override public void removeInviteInfoByDeviceAndChannel(InviteSessionType inviteSessionType, String deviceId, String channelId) { removeInviteInfo(inviteSessionType, deviceId, channelId, null); } @Override public void removeInviteInfo(InviteInfo inviteInfo) { removeInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream()); } @Override public void once(InviteSessionType type, String deviceId, String channelId, String stream, ErrorCallback callback) { String key = buildKey(type, deviceId, channelId, stream); List> callbacks = inviteErrorCallbackMap.get(key); if (callbacks == null) { callbacks = new CopyOnWriteArrayList<>(); inviteErrorCallbackMap.put(key, callbacks); } callbacks.add(callback); } private String buildKey(InviteSessionType type, String deviceId, String channelId, String stream) { String key = type + ":" + deviceId + ":" + channelId; // 如果ssrc未null那么可以实现一个通道只能一次操作,ssrc不为null则可以支持一个通道多次invite if (stream != null) { key += (":" + stream); } return key; } @Override public void clearInviteInfo(String deviceId) { removeInviteInfo(null, deviceId, null, null); } @Override public int getStreamInfoCount(String mediaServerId) { int count = 0; String key = VideoManagerConstants.INVITE_PREFIX + ":*:*:*:*:*"; List scanResult = RedisUtil.scan(redisTemplate, key); if (scanResult.size() == 0) { return 0; }else { for (Object keyObj : scanResult) { String keyStr = (String) keyObj; InviteInfo inviteInfo = (InviteInfo) redisTemplate.opsForValue().get(keyStr); if (inviteInfo != null && inviteInfo.getStreamInfo() != null && inviteInfo.getStreamInfo().getMediaServerId().equals(mediaServerId)) { count++; } } } return count; } @Override public void call(InviteSessionType type, String deviceId, String channelId, String stream, int code, String msg, Object data) { String key = buildSubStreamKey(type, deviceId, channelId, stream); List> callbacks = inviteErrorCallbackMap.get(key); if (callbacks == null) { return; } for (ErrorCallback callback : callbacks) { callback.run(code, msg, data); } inviteErrorCallbackMap.remove(key); } private String buildSubStreamKey(InviteSessionType type, String deviceId, String channelId, String stream) { String key = type + ":" + ":" + deviceId + ":" + channelId; // 如果ssrc为null那么可以实现一个通道只能一次操作,ssrc不为null则可以支持一个通道多次invite if (stream != null) { key += (":" + stream); } return key; } @Override public InviteInfo getInviteInfoBySSRC(String ssrc) { String key = VideoManagerConstants.INVITE_PREFIX + ":*:*:*:*:" + ssrc; List scanResult = RedisUtil.scan(redisTemplate, key); if (scanResult.size() != 1) { return null; } return (InviteInfo) redisTemplate.opsForValue().get(scanResult.get(0)); } @Override public InviteInfo updateInviteInfoForSSRC(InviteInfo inviteInfo, String ssrc) { InviteInfo inviteInfoInDb = getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream()); if (inviteInfoInDb == null) { return null; } removeInviteInfo(inviteInfoInDb); String key = VideoManagerConstants.INVITE_PREFIX + ":" + inviteInfo.getType() + ":" + inviteInfo.getDeviceId() + ":" + inviteInfo.getChannelId() + ":" + inviteInfo.getStream() + ":" + ssrc; if (inviteInfoInDb.getSsrcInfo() != null) { inviteInfoInDb.getSsrcInfo().setSsrc(ssrc); } redisTemplate.opsForValue().set(key, inviteInfoInDb); return inviteInfoInDb; } }