648540858
2023-07-07 885842249fb6b264b0abf78668872d04bdc179ce
src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java
@@ -11,6 +11,7 @@
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IDeviceChannelService;
import com.genersoft.iot.vmp.service.IDeviceService;
@@ -34,6 +35,7 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@SuppressWarnings("rawtypes")
@Tag(name = "第三方服务对接")
@@ -120,12 +122,12 @@
        int localPort = zlmServerFactory.createRTPServer(mediaServerItem, stream, ssrcInt, null, false, tcpMode);
        // 注册回调如果rtp收流超时则通过回调发送通知
        if (callBack != null) {
            HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, mediaServerItem.getId());
            HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(stream, String.valueOf(ssrcInt), mediaServerItem.getId());
            // 订阅 zlm启动事件, 新的zlm也会从这里进入系统
            hookSubscribe.addSubscribe(hookSubscribeForRtpServerTimeout,
                    (mediaServerItemInUse, response)->{
                        if (stream.equals(response.getString("stream_id"))) {
                            logger.info("[开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调", callId);
                            logger.info("[第三方服务对接->开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调", callId);
                            OkHttpClient.Builder httpClientBuilder = new OkHttpClient.Builder();
                            OkHttpClient client = httpClientBuilder.build();
                            String url = callBack + "?callId="  + callId;
@@ -133,7 +135,7 @@
                            try {
                                client.newCall(request).execute();
                            } catch (IOException e) {
                                logger.error("[开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调失败", callId, e);
                                logger.error("[第三方服务对接->开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调失败", callId, e);
                            }
                        }
                    });
@@ -143,6 +145,9 @@
        otherRtpSendInfo.setReceivePort(localPort);
        otherRtpSendInfo.setCallId(callId);
        otherRtpSendInfo.setStream(stream);
        String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + stream;
        // 将信息写入redis中,以备后用
        redisTemplate.opsForValue().set(receiveKey, otherRtpSendInfo);
        if (isSend != null && isSend) {
            String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + callId;
            // 预创建发流信息
@@ -160,7 +165,7 @@
            }, 15000);
            otherRtpSendInfo.setIp(mediaServerItem.getSdpIp());
            otherRtpSendInfo.setPort(port);
            logger.info("[开启收流和获取发流信息] 结果,callId->{}, {}", callId, otherRtpSendInfo);
            logger.info("[第三方服务对接->开启收流和获取发流信息] 结果,callId->{}, {}", callId, otherRtpSendInfo);
        }
        return otherRtpSendInfo;
    }
@@ -173,6 +178,9 @@
        logger.info("[第三方服务对接->关闭收流] stream->{}", stream);
        MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer();
        zlmServerFactory.closeRtpServer(mediaServerItem,stream);
        String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + stream;
        // 将信息写入redis中,以备后用
        redisTemplate.delete(receiveKey);
    }
    @GetMapping(value = "/send/start")
@@ -187,9 +195,10 @@
    @Parameter(name = "onlyAudio", description = "是否只有音频", required = true)
    @Parameter(name = "isUdp", description = "是否为UDP", required = true)
    @Parameter(name = "streamType", description = "流类型,1为es流,2为ps流, 默认es流", required = false)
    public void sendRTP(String ssrc, String ip, Integer port, String app, String stream, String callId, Boolean onlyAudio, Boolean isUdp, @RequestParam(required = false)Integer streamType) {
        logger.info("[第三方服务对接->发送流] ssrc->{}, ip->{}, port->{}, app->{}, stream->{}, callId->{}, onlyAudio->{}, streamType->{}",
                ssrc, ip, port, app, stream, callId, onlyAudio, streamType == 1? "ES":"PS");
    @Parameter(name = "pt", description = "rtp的pt", required = true)
    public void sendRTP(String ssrc, String ip, Integer port, String app, String stream, String callId, Boolean onlyAudio, Boolean isUdp, @RequestParam(required = false)Integer streamType, Integer pt) {
        logger.info("[第三方服务对接->发送流] ssrc->{}, ip->{}, port->{}, app->{}, stream->{}, callId->{}, onlyAudio->{}, streamType->{}, pt->{}",
                ssrc, ip, port, app, stream, callId, onlyAudio, streamType == 1? "ES":"PS", pt);
        if (ObjectUtils.isEmpty(streamType)) {
            streamType = 1;
        }
@@ -197,7 +206,7 @@
        String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + callId;
        OtherRtpSendInfo sendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(key);
        if (sendInfo != null) {
            zlmServerFactory.releasePort(mediaServerItem, sendInfo.getCallId());
            zlmServerFactory.releasePort(mediaServerItem, callId);
        }else {
            sendInfo = new OtherRtpSendInfo();
        }
@@ -218,19 +227,51 @@
        param.put("src_port", sendInfo.getPort());
        param.put("use_ps", streamType==2 ? "1" : "0");
        param.put("only_audio", onlyAudio ? "1" : "0");
        param.put("pt", pt);
        JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, param);
        if (jsonObject.getInteger("code") == 0) {
            logger.info("[第三方服务对接->发送流] 发流成功,callId->{}", callId);
            redisTemplate.opsForValue().set(key, sendInfo);
        dynamicTask.stop(key);
        Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, app, stream);
        if (streamReady) {
            logger.info("[第三方服务对接->发送流] 流存在,开始发流,callId->{}", callId);
            JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, param);
            if (jsonObject.getInteger("code") == 0) {
                logger.info("[第三方服务对接->发送流] 发流成功,callId->{}", callId);
                redisTemplate.opsForValue().set(key, sendInfo);
            }else {
                redisTemplate.delete(key);
                logger.info("[第三方服务对接->发送流] 发流失败,callId->{}, {}", callId, jsonObject.getString("msg"));
                throw new ControllerException(ErrorCode.ERROR100.getCode(), "[发流失败] " + jsonObject.getString("msg"));
            }
        }else {
            redisTemplate.delete(key);
            logger.info("[第三方服务对接->发送流] 发流失败,callId->{}, {}", callId, jsonObject.getString("msg"));
            throw new ControllerException(ErrorCode.ERROR100.getCode(), "[发流失败] " + jsonObject.getString("msg"));
            logger.info("[第三方服务对接->发送流] 流不存在,等待流上线,callId->{}", callId);
            String uuid = UUID.randomUUID().toString();
            HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(app, stream, true, "rtsp", mediaServerItem.getId());
            dynamicTask.startDelay(uuid, ()->{
                logger.info("[第三方服务对接->发送流] 等待流上线超时 callId->{}", callId);
                redisTemplate.delete(key);
                hookSubscribe.removeSubscribe(hookSubscribeForStreamChange);
            }, 10000);
            // 订阅 zlm启动事件, 新的zlm也会从这里进入系统
            OtherRtpSendInfo finalSendInfo = sendInfo;
            hookSubscribe.addSubscribe(hookSubscribeForStreamChange,
                    (mediaServerItemInUse, response)->{
                        dynamicTask.stop(uuid);
                        logger.info("[第三方服务对接->发送流] 流上线,开始发流 callId->{}", callId);
                        JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, param);
                        System.out.println("========发流结果==========");
                        System.out.println(jsonObject);
                        if (jsonObject.getInteger("code") == 0) {
                            logger.info("[第三方服务对接->发送流] 发流成功,callId->{}", callId);
                            redisTemplate.opsForValue().set(key, finalSendInfo);
                        }else {
                            redisTemplate.delete(key);
                            logger.info("[第三方服务对接->发送流] 发流失败,callId->{}, {}", callId, jsonObject.getString("msg"));
                            throw new ControllerException(ErrorCode.ERROR100.getCode(), "[发流失败] " + jsonObject.getString("msg"));
                        }
                    });
        }
    }
    @GetMapping(value = "/send/stop")
    @ResponseBody