648540858
2024-04-17 bf6e09d231f49fb0c2cd5a81f6b31cc64d27c368
src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java
@@ -7,8 +7,10 @@
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
@@ -18,12 +20,17 @@
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import java.text.ParseException;
/**
 * 其他wvp发起的rpc调用,这里的方法被 RedisRpcConfig 通过反射寻找对应的方法名称调用
@@ -57,6 +64,14 @@
    @Autowired
    private RedisTemplate<Object, Object> redisTemplate;
    @Autowired
    private ISIPCommanderForPlatform commanderFroPlatform;
    @Autowired
    private IVideoManagerStorage storager;
    /**
@@ -133,6 +148,20 @@
        return null;
    }
    /**
     * 停止监听流上线
     */
    public RedisRpcResponse stopWaitePushStreamOnline(RedisRpcRequest request) {
        SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class);
        logger.info("[redis-rpc] 停止监听流上线: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() );
        // 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者
        HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed(
                sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null);
        hookSubscribe.removeSubscribe(hook);
        return null;
    }
    /**
     * 开始发流
@@ -194,6 +223,34 @@
        return response;
    }
    /**
     * 其他wvp通知推流已经停止了
     */
    public RedisRpcResponse rtpSendStopped(RedisRpcRequest request) {
        SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class);
        logger.info("[redis-rpc] 推流已经停止: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() );
        SendRtpItem sendRtpItemInCatch = redisCatchStorage.querySendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getStream(), sendRtpItem.getCallId());
        RedisRpcResponse response = request.getResponse();
        response.setStatusCode(200);
        if (sendRtpItemInCatch == null) {
            return response;
        }
        String platformId = sendRtpItem.getPlatformId();
        ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
        if (platform == null) {
            return response;
        }
        try {
            commanderFroPlatform.streamByeCmd(platform, sendRtpItem);
            redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(),
                    sendRtpItem.getCallId(), sendRtpItem.getStream());
            redisCatchStorage.sendPlatformStopPlayMsg(sendRtpItem, platform);
        } catch (SipException | InvalidArgumentException | ParseException e) {
            logger.error("[命令发送失败] 发送BYE: {}", e.getMessage());
        }
        return response;
    }
    private void sendResponse(RedisRpcResponse response){
        response.setToId(userSetting.getServerId());
        RedisRpcMessage message = new RedisRpcMessage();