| package com.genersoft.iot.vmp.service.impl; | 
|   | 
| import com.alibaba.fastjson2.JSON; | 
| import com.baomidou.dynamic.datasource.annotation.DS; | 
| import com.genersoft.iot.vmp.common.InviteInfo; | 
| import com.genersoft.iot.vmp.common.InviteSessionStatus; | 
| import com.genersoft.iot.vmp.common.InviteSessionType; | 
| import com.genersoft.iot.vmp.common.VideoManagerConstants; | 
| import com.genersoft.iot.vmp.media.event.MediaArrivalEvent; | 
| import com.genersoft.iot.vmp.media.event.MediaDepartureEvent; | 
| import com.genersoft.iot.vmp.service.IInviteStreamService; | 
| import com.genersoft.iot.vmp.service.bean.ErrorCallback; | 
| import com.genersoft.iot.vmp.storager.IVideoManagerStorage; | 
| import com.genersoft.iot.vmp.utils.redis.RedisUtil; | 
| import org.slf4j.Logger; | 
| import org.slf4j.LoggerFactory; | 
| import org.springframework.beans.factory.annotation.Autowired; | 
| import org.springframework.context.event.EventListener; | 
| import org.springframework.data.redis.core.RedisTemplate; | 
| import org.springframework.scheduling.annotation.Async; | 
| import org.springframework.stereotype.Service; | 
|   | 
| import java.util.List; | 
| import java.util.Map; | 
| import java.util.concurrent.ConcurrentHashMap; | 
| import java.util.concurrent.CopyOnWriteArrayList; | 
|   | 
| @Service | 
| @DS("master") | 
| public class InviteStreamServiceImpl implements IInviteStreamService { | 
|   | 
|     private final Logger logger = LoggerFactory.getLogger(InviteStreamServiceImpl.class); | 
|   | 
|     private final Map<String, List<ErrorCallback<Object>>> inviteErrorCallbackMap = new ConcurrentHashMap<>(); | 
|   | 
|     @Autowired | 
|     private RedisTemplate<Object, Object> redisTemplate; | 
|   | 
|     @Autowired | 
|     private IVideoManagerStorage storage; | 
|   | 
|     /** | 
|      * 流到来的处理 | 
|      */ | 
|     @Async("taskExecutor") | 
|     @org.springframework.context.event.EventListener | 
|     public void onApplicationEvent(MediaArrivalEvent event) { | 
| //        if ("rtsp".equals(event.getSchema()) && "rtp".equals(event.getApp())) { | 
| // | 
| //        } | 
|     } | 
|   | 
|     /** | 
|      * 流离开的处理 | 
|      */ | 
|     @Async("taskExecutor") | 
|     @EventListener | 
|     public void onApplicationEvent(MediaDepartureEvent event) { | 
|         if ("rtsp".equals(event.getSchema()) && "rtp".equals(event.getApp())) { | 
|             InviteInfo inviteInfo = getInviteInfoByStream(null, event.getStream()); | 
|             if (inviteInfo != null && (inviteInfo.getType() == InviteSessionType.PLAY || inviteInfo.getType() == InviteSessionType.PLAYBACK)) { | 
|                 removeInviteInfo(inviteInfo); | 
|                 storage.stopPlay(inviteInfo.getDeviceId(), inviteInfo.getChannelId()); | 
|             } | 
|         } | 
|     } | 
|   | 
|     @Override | 
|     public void updateInviteInfo(InviteInfo inviteInfo) { | 
|         if (inviteInfo == null || (inviteInfo.getDeviceId() == null || inviteInfo.getChannelId() == null)) { | 
|             logger.warn("[更新Invite信息],参数不全: {}", JSON.toJSON(inviteInfo)); | 
|             return; | 
|         } | 
|         InviteInfo inviteInfoForUpdate = null; | 
|   | 
|         if (InviteSessionStatus.ready == inviteInfo.getStatus()) { | 
|             if (inviteInfo.getDeviceId() == null | 
|                     || inviteInfo.getChannelId() == null | 
|                     || inviteInfo.getType() == null | 
|                     || inviteInfo.getStream() == null | 
|             ) { | 
|                 return; | 
|             } | 
|             inviteInfoForUpdate = inviteInfo; | 
|         } else { | 
|             InviteInfo inviteInfoInRedis = getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), | 
|                     inviteInfo.getChannelId(), inviteInfo.getStream()); | 
|             if (inviteInfoInRedis == null) { | 
|                 logger.warn("[更新Invite信息],未从缓存中读取到Invite信息: deviceId: {}, channel: {}, stream: {}", | 
|                         inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream()); | 
|                 return; | 
|             } | 
|             if (inviteInfo.getStreamInfo() != null) { | 
|                 inviteInfoInRedis.setStreamInfo(inviteInfo.getStreamInfo()); | 
|             } | 
|             if (inviteInfo.getSsrcInfo() != null) { | 
|                 inviteInfoInRedis.setSsrcInfo(inviteInfo.getSsrcInfo()); | 
|             } | 
|             if (inviteInfo.getStreamMode() != null) { | 
|                 inviteInfoInRedis.setStreamMode(inviteInfo.getStreamMode()); | 
|             } | 
|             if (inviteInfo.getReceiveIp() != null) { | 
|                 inviteInfoInRedis.setReceiveIp(inviteInfo.getReceiveIp()); | 
|             } | 
|             if (inviteInfo.getReceivePort() != null) { | 
|                 inviteInfoInRedis.setReceivePort(inviteInfo.getReceivePort()); | 
|             } | 
|             if (inviteInfo.getStatus() != null) { | 
|                 inviteInfoInRedis.setStatus(inviteInfo.getStatus()); | 
|             } | 
|   | 
|             inviteInfoForUpdate = inviteInfoInRedis; | 
|   | 
|         } | 
|         String key = VideoManagerConstants.INVITE_PREFIX + | 
|                 ":" + inviteInfoForUpdate.getType() + | 
|                 ":" + inviteInfoForUpdate.getDeviceId() + | 
|                 ":" + inviteInfoForUpdate.getChannelId() + | 
|                 ":" + inviteInfoForUpdate.getStream()+ | 
|                 ":" + inviteInfoForUpdate.getSsrcInfo().getSsrc(); | 
|         redisTemplate.opsForValue().set(key, inviteInfoForUpdate); | 
|     } | 
|   | 
|     @Override | 
|     public InviteInfo updateInviteInfoForStream(InviteInfo inviteInfo, String stream) { | 
|   | 
|         InviteInfo inviteInfoInDb = getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream()); | 
|         if (inviteInfoInDb == null) { | 
|             return null; | 
|         } | 
|         removeInviteInfo(inviteInfoInDb); | 
|         String key = VideoManagerConstants.INVITE_PREFIX + | 
|                 ":" + inviteInfo.getType() + | 
|                 ":" + inviteInfo.getDeviceId() + | 
|                 ":" + inviteInfo.getChannelId() + | 
|                 ":" + stream + | 
|                 ":" + inviteInfo.getSsrcInfo().getSsrc(); | 
|         inviteInfoInDb.setStream(stream); | 
|         if (inviteInfoInDb.getSsrcInfo() != null) { | 
|             inviteInfoInDb.getSsrcInfo().setStream(stream); | 
|         } | 
|         redisTemplate.opsForValue().set(key, inviteInfoInDb); | 
|         return inviteInfoInDb; | 
|     } | 
|   | 
|     @Override | 
|     public InviteInfo getInviteInfo(InviteSessionType type, String deviceId, String channelId, String stream) { | 
|         String key = VideoManagerConstants.INVITE_PREFIX + | 
|                 ":" + (type != null ? type : "*") + | 
|                 ":" + (deviceId != null ? deviceId : "*") + | 
|                 ":" + (channelId != null ? channelId : "*") + | 
|                 ":" + (stream != null ? stream : "*") | 
|                 + ":*"; | 
|         List<Object> scanResult = RedisUtil.scan(redisTemplate, key); | 
|         if (scanResult.isEmpty()) { | 
|             return null; | 
|         } | 
|         if (scanResult.size() != 1) { | 
|             logger.warn("[获取InviteInfo] 发现 key: {}存在多条", key); | 
|         } | 
|   | 
|         return (InviteInfo) redisTemplate.opsForValue().get(scanResult.get(0)); | 
|     } | 
|   | 
|     @Override | 
|     public InviteInfo getInviteInfoByDeviceAndChannel(InviteSessionType type, String deviceId, String channelId) { | 
|         return getInviteInfo(type, deviceId, channelId, null); | 
|     } | 
|   | 
|     @Override | 
|     public InviteInfo getInviteInfoByStream(InviteSessionType type, String stream) { | 
|         return getInviteInfo(type, null, null, stream); | 
|     } | 
|   | 
|     @Override | 
|     public void removeInviteInfo(InviteSessionType type, String deviceId, String channelId, String stream) { | 
|         String scanKey = VideoManagerConstants.INVITE_PREFIX + | 
|                 ":" + (type != null ? type : "*") + | 
|                 ":" + (deviceId != null ? deviceId : "*") + | 
|                 ":" + (channelId != null ? channelId : "*") + | 
|                 ":" + (stream != null ? stream : "*") + | 
|                 ":*"; | 
|         List<Object> scanResult = RedisUtil.scan(redisTemplate, scanKey); | 
|         if (scanResult.size() > 0) { | 
|             for (Object keyObj : scanResult) { | 
|                 String key = (String) keyObj; | 
|                 InviteInfo inviteInfo = (InviteInfo) redisTemplate.opsForValue().get(key); | 
|                 if (inviteInfo == null) { | 
|                     continue; | 
|                 } | 
|                 redisTemplate.delete(key); | 
|                 inviteErrorCallbackMap.remove(buildKey(type, deviceId, channelId, inviteInfo.getStream())); | 
|             } | 
|         } | 
|     } | 
|   | 
|     @Override | 
|     public void removeInviteInfoByDeviceAndChannel(InviteSessionType inviteSessionType, String deviceId, String channelId) { | 
|         removeInviteInfo(inviteSessionType, deviceId, channelId, null); | 
|     } | 
|   | 
|     @Override | 
|     public void removeInviteInfo(InviteInfo inviteInfo) { | 
|         removeInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream()); | 
|     } | 
|   | 
|     @Override | 
|     public void once(InviteSessionType type, String deviceId, String channelId, String stream, ErrorCallback<Object> callback) { | 
|         String key = buildKey(type, deviceId, channelId, stream); | 
|         List<ErrorCallback<Object>> callbacks = inviteErrorCallbackMap.get(key); | 
|         if (callbacks == null) { | 
|             callbacks = new CopyOnWriteArrayList<>(); | 
|             inviteErrorCallbackMap.put(key, callbacks); | 
|         } | 
|         callbacks.add(callback); | 
|   | 
|     } | 
|   | 
|     private String buildKey(InviteSessionType type, String deviceId, String channelId, String stream) { | 
|         String key = type + ":" +  deviceId + ":" + channelId; | 
|         // 如果ssrc未null那么可以实现一个通道只能一次操作,ssrc不为null则可以支持一个通道多次invite | 
|         if (stream != null) { | 
|             key += (":" + stream); | 
|         } | 
|         return key; | 
|     } | 
|   | 
|   | 
|     @Override | 
|     public void clearInviteInfo(String deviceId) { | 
|         removeInviteInfo(null, deviceId, null, null); | 
|     } | 
|   | 
|     @Override | 
|     public int getStreamInfoCount(String mediaServerId) { | 
|         int count = 0; | 
|         String key = VideoManagerConstants.INVITE_PREFIX + ":*:*:*:*:*"; | 
|         List<Object> scanResult = RedisUtil.scan(redisTemplate, key); | 
|         if (scanResult.size() == 0) { | 
|             return 0; | 
|         }else { | 
|             for (Object keyObj : scanResult) { | 
|                 String keyStr = (String) keyObj; | 
|                 InviteInfo inviteInfo = (InviteInfo) redisTemplate.opsForValue().get(keyStr); | 
|                 if (inviteInfo != null && inviteInfo.getStreamInfo() != null && inviteInfo.getStreamInfo().getMediaServerId().equals(mediaServerId)) { | 
|                     count++; | 
|                 } | 
|             } | 
|         } | 
|         return count; | 
|     } | 
|   | 
|     @Override | 
|     public void call(InviteSessionType type, String deviceId, String channelId, String stream, int code, String msg, Object data) { | 
|         String key = buildSubStreamKey(type, deviceId, channelId, stream); | 
|         List<ErrorCallback<Object>> callbacks = inviteErrorCallbackMap.get(key); | 
|         if (callbacks == null) { | 
|             return; | 
|         } | 
|         for (ErrorCallback<Object> callback : callbacks) { | 
|             callback.run(code, msg, data); | 
|         } | 
|         inviteErrorCallbackMap.remove(key); | 
|     } | 
|   | 
|   | 
|     private String buildSubStreamKey(InviteSessionType type, String deviceId, String channelId, String stream) { | 
|         String key = type + ":" + ":" +  deviceId + ":" + channelId; | 
|         // 如果ssrc为null那么可以实现一个通道只能一次操作,ssrc不为null则可以支持一个通道多次invite | 
|         if (stream != null) { | 
|             key += (":" + stream); | 
|         } | 
|         return key; | 
|     } | 
|   | 
|     @Override | 
|     public InviteInfo getInviteInfoBySSRC(String ssrc) { | 
|         String key = VideoManagerConstants.INVITE_PREFIX + ":*:*:*:*:" + ssrc; | 
|         List<Object> scanResult = RedisUtil.scan(redisTemplate, key); | 
|         if (scanResult.size() != 1) { | 
|             return null; | 
|         } | 
|   | 
|         return (InviteInfo) redisTemplate.opsForValue().get(scanResult.get(0)); | 
|     } | 
|   | 
|     @Override | 
|     public InviteInfo updateInviteInfoForSSRC(InviteInfo inviteInfo, String ssrc) { | 
|         InviteInfo inviteInfoInDb = getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream()); | 
|         if (inviteInfoInDb == null) { | 
|             return null; | 
|         } | 
|         removeInviteInfo(inviteInfoInDb); | 
|         String key = VideoManagerConstants.INVITE_PREFIX + | 
|                 ":" + inviteInfo.getType() + | 
|                 ":" + inviteInfo.getDeviceId() + | 
|                 ":" + inviteInfo.getChannelId() + | 
|                 ":" + inviteInfo.getStream() + | 
|                 ":" + ssrc; | 
|         if (inviteInfoInDb.getSsrcInfo() != null) { | 
|             inviteInfoInDb.getSsrcInfo().setSsrc(ssrc); | 
|         } | 
|         redisTemplate.opsForValue().set(key, inviteInfoInDb); | 
|         return inviteInfoInDb; | 
|     } | 
| } |