panlinlin
2024-06-14 07a8ef9e256c70a3a5b15782add81dcad1e2ffc2
src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java
@@ -1,25 +1,33 @@
package com.genersoft.iot.vmp.service.impl;
import com.alibaba.fastjson2.JSON;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.genersoft.iot.vmp.common.InviteInfo;
import com.genersoft.iot.vmp.common.InviteSessionStatus;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
@Service
@DS("master")
public class InviteStreamServiceImpl implements IInviteStreamService {
    private final Logger logger = LoggerFactory.getLogger(InviteStreamServiceImpl.class);
@@ -29,8 +37,41 @@
    @Autowired
    private RedisTemplate<Object, Object> redisTemplate;
    @Autowired
    private IVideoManagerStorage storage;
    /**
     * 流到来的处理
     */
    @Async("taskExecutor")
    @org.springframework.context.event.EventListener
    public void onApplicationEvent(MediaArrivalEvent event) {
//        if ("rtsp".equals(event.getSchema()) && "rtp".equals(event.getApp())) {
//
//        }
    }
    /**
     * 流离开的处理
     */
    @Async("taskExecutor")
    @EventListener
    public void onApplicationEvent(MediaDepartureEvent event) {
        if ("rtsp".equals(event.getSchema()) && "rtp".equals(event.getApp())) {
            InviteInfo inviteInfo = getInviteInfoByStream(null, event.getStream());
            if (inviteInfo != null && (inviteInfo.getType() == InviteSessionType.PLAY || inviteInfo.getType() == InviteSessionType.PLAYBACK)) {
                removeInviteInfo(inviteInfo);
                storage.stopPlay(inviteInfo.getDeviceId(), inviteInfo.getChannelId());
            }
        }
    }
    @Override
    public void updateInviteInfo(InviteInfo inviteInfo) {
        updateInviteInfo(inviteInfo, null);
    }
    @Override
    public void updateInviteInfo(InviteInfo inviteInfo, Long time) {
        if (inviteInfo == null || (inviteInfo.getDeviceId() == null || inviteInfo.getChannelId() == null)) {
            logger.warn("[更新Invite信息],参数不全: {}", JSON.toJSON(inviteInfo));
            return;
@@ -82,7 +123,11 @@
                ":" + inviteInfoForUpdate.getChannelId() +
                ":" + inviteInfoForUpdate.getStream()+
                ":" + inviteInfoForUpdate.getSsrcInfo().getSsrc();
        redisTemplate.opsForValue().set(key, inviteInfoForUpdate);
        if (time != null && time > 0) {
            redisTemplate.opsForValue().set(key, inviteInfoForUpdate, time, TimeUnit.SECONDS);
        }else {
            redisTemplate.opsForValue().set(key, inviteInfoForUpdate);
        }
    }
    @Override
@@ -116,8 +161,11 @@
                ":" + (stream != null ? stream : "*")
                + ":*";
        List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
        if (scanResult.size() != 1) {
        if (scanResult.isEmpty()) {
            return null;
        }
        if (scanResult.size() != 1) {
            logger.warn("[获取InviteInfo] 发现 key: {}存在多条", key);
        }
        return (InviteInfo) redisTemplate.opsForValue().get(scanResult.get(0));
@@ -204,6 +252,9 @@
                String keyStr = (String) keyObj;
                InviteInfo inviteInfo = (InviteInfo) redisTemplate.opsForValue().get(keyStr);
                if (inviteInfo != null && inviteInfo.getStreamInfo() != null && inviteInfo.getStreamInfo().getMediaServerId().equals(mediaServerId)) {
                    if (inviteInfo.getType().equals(InviteSessionType.DOWNLOAD) && inviteInfo.getStreamInfo().getProgress() == 1) {
                        continue;
                    }
                    count++;
                }
            }
@@ -257,7 +308,7 @@
                ":" + inviteInfo.getDeviceId() +
                ":" + inviteInfo.getChannelId() +
                ":" + inviteInfo.getStream() +
                ":" + inviteInfo.getSsrcInfo().getSsrc();
                ":" + ssrc;
        if (inviteInfoInDb.getSsrcInfo() != null) {
            inviteInfoInDb.getSsrcInfo().setSsrc(ssrc);
        }