648540858
2023-07-07 885842249fb6b264b0abf78668872d04bdc179ce
优化第三方对接接口
3个文件已修改
102 ■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java 26 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java 75 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
@@ -154,6 +154,7 @@
    public static final String WVP_STREAM_GB_ID_PREFIX = "memberNo_";
    public static final String WVP_STREAM_GPS_MSG_PREFIX = "WVP_STREAM_GPS_MSG_";
    public static final String WVP_OTHER_SEND_RTP_INFO = "VMP_OTHER_SEND_RTP_INFO_";
    public static final String WVP_OTHER_RECEIVE_RTP_INFO = "VMP_OTHER_RECEIVE_RTP_INFO_";
    /**
     * Redis Const
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -3,6 +3,7 @@
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.*;
@@ -22,14 +23,13 @@
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.vmanager.bean.DeferredResultEx;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.StreamContent;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import com.genersoft.iot.vmp.vmanager.bean.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.*;
@@ -105,6 +105,9 @@
    @Autowired
    private AssistRESTfulUtils assistRESTfulUtils;
    @Autowired
    private RedisTemplate<Object, Object> redisTemplate;
    @Qualifier("taskExecutor")
    @Autowired
@@ -255,6 +258,21 @@
                result.setEnable_mp4(true);
            }
        }
        String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "*";
        // 将信息写入redis中,以备后用
        List<Object> scan = RedisUtil.scan(redisTemplate, receiveKey);
        if (scan.size()>0) {
            for (Object o : scan) {
                String key = (String) o;
                OtherRtpSendInfo otherRtpSendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(key);
                if (otherRtpSendInfo != null && otherRtpSendInfo.getStream().equalsIgnoreCase(param.getStream())) {
                    result.setEnable_audio(true);
                    result.setEnable_mp4(true);
                }
            }
        }
        if (mediaInfo.getRecordAssistPort() > 0 && userSetting.getRecordPath() == null) {
            logger.info("推流时发现尚未设置录像路径,从assist服务中读取");
            JSONObject info = assistRESTfulUtils.getInfo(mediaInfo, null);
src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java
@@ -11,6 +11,7 @@
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IDeviceChannelService;
import com.genersoft.iot.vmp.service.IDeviceService;
@@ -34,6 +35,7 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@SuppressWarnings("rawtypes")
@Tag(name = "第三方服务对接")
@@ -120,12 +122,12 @@
        int localPort = zlmServerFactory.createRTPServer(mediaServerItem, stream, ssrcInt, null, false, tcpMode);
        // 注册回调如果rtp收流超时则通过回调发送通知
        if (callBack != null) {
            HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, mediaServerItem.getId());
            HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(stream, String.valueOf(ssrcInt), mediaServerItem.getId());
            // 订阅 zlm启动事件, 新的zlm也会从这里进入系统
            hookSubscribe.addSubscribe(hookSubscribeForRtpServerTimeout,
                    (mediaServerItemInUse, response)->{
                        if (stream.equals(response.getString("stream_id"))) {
                            logger.info("[开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调", callId);
                            logger.info("[第三方服务对接->开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调", callId);
                            OkHttpClient.Builder httpClientBuilder = new OkHttpClient.Builder();
                            OkHttpClient client = httpClientBuilder.build();
                            String url = callBack + "?callId="  + callId;
@@ -133,7 +135,7 @@
                            try {
                                client.newCall(request).execute();
                            } catch (IOException e) {
                                logger.error("[开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调失败", callId, e);
                                logger.error("[第三方服务对接->开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调失败", callId, e);
                            }
                        }
                    });
@@ -143,6 +145,9 @@
        otherRtpSendInfo.setReceivePort(localPort);
        otherRtpSendInfo.setCallId(callId);
        otherRtpSendInfo.setStream(stream);
        String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + stream;
        // 将信息写入redis中,以备后用
        redisTemplate.opsForValue().set(receiveKey, otherRtpSendInfo);
        if (isSend != null && isSend) {
            String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + callId;
            // 预创建发流信息
@@ -160,7 +165,7 @@
            }, 15000);
            otherRtpSendInfo.setIp(mediaServerItem.getSdpIp());
            otherRtpSendInfo.setPort(port);
            logger.info("[开启收流和获取发流信息] 结果,callId->{}, {}", callId, otherRtpSendInfo);
            logger.info("[第三方服务对接->开启收流和获取发流信息] 结果,callId->{}, {}", callId, otherRtpSendInfo);
        }
        return otherRtpSendInfo;
    }
@@ -173,6 +178,9 @@
        logger.info("[第三方服务对接->关闭收流] stream->{}", stream);
        MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer();
        zlmServerFactory.closeRtpServer(mediaServerItem,stream);
        String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + stream;
        // 将信息写入redis中,以备后用
        redisTemplate.delete(receiveKey);
    }
    @GetMapping(value = "/send/start")
@@ -187,9 +195,10 @@
    @Parameter(name = "onlyAudio", description = "是否只有音频", required = true)
    @Parameter(name = "isUdp", description = "是否为UDP", required = true)
    @Parameter(name = "streamType", description = "流类型,1为es流,2为ps流, 默认es流", required = false)
    public void sendRTP(String ssrc, String ip, Integer port, String app, String stream, String callId, Boolean onlyAudio, Boolean isUdp, @RequestParam(required = false)Integer streamType) {
        logger.info("[第三方服务对接->发送流] ssrc->{}, ip->{}, port->{}, app->{}, stream->{}, callId->{}, onlyAudio->{}, streamType->{}",
                ssrc, ip, port, app, stream, callId, onlyAudio, streamType == 1? "ES":"PS");
    @Parameter(name = "pt", description = "rtp的pt", required = true)
    public void sendRTP(String ssrc, String ip, Integer port, String app, String stream, String callId, Boolean onlyAudio, Boolean isUdp, @RequestParam(required = false)Integer streamType, Integer pt) {
        logger.info("[第三方服务对接->发送流] ssrc->{}, ip->{}, port->{}, app->{}, stream->{}, callId->{}, onlyAudio->{}, streamType->{}, pt->{}",
                ssrc, ip, port, app, stream, callId, onlyAudio, streamType == 1? "ES":"PS", pt);
        if (ObjectUtils.isEmpty(streamType)) {
            streamType = 1;
        }
@@ -197,7 +206,7 @@
        String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + callId;
        OtherRtpSendInfo sendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(key);
        if (sendInfo != null) {
            zlmServerFactory.releasePort(mediaServerItem, sendInfo.getCallId());
            zlmServerFactory.releasePort(mediaServerItem, callId);
        }else {
            sendInfo = new OtherRtpSendInfo();
        }
@@ -218,19 +227,51 @@
        param.put("src_port", sendInfo.getPort());
        param.put("use_ps", streamType==2 ? "1" : "0");
        param.put("only_audio", onlyAudio ? "1" : "0");
        param.put("pt", pt);
        JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, param);
        if (jsonObject.getInteger("code") == 0) {
            logger.info("[第三方服务对接->发送流] 发流成功,callId->{}", callId);
            redisTemplate.opsForValue().set(key, sendInfo);
        dynamicTask.stop(key);
        Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, app, stream);
        if (streamReady) {
            logger.info("[第三方服务对接->发送流] 流存在,开始发流,callId->{}", callId);
            JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, param);
            if (jsonObject.getInteger("code") == 0) {
                logger.info("[第三方服务对接->发送流] 发流成功,callId->{}", callId);
                redisTemplate.opsForValue().set(key, sendInfo);
            }else {
                redisTemplate.delete(key);
                logger.info("[第三方服务对接->发送流] 发流失败,callId->{}, {}", callId, jsonObject.getString("msg"));
                throw new ControllerException(ErrorCode.ERROR100.getCode(), "[发流失败] " + jsonObject.getString("msg"));
            }
        }else {
            redisTemplate.delete(key);
            logger.info("[第三方服务对接->发送流] 发流失败,callId->{}, {}", callId, jsonObject.getString("msg"));
            throw new ControllerException(ErrorCode.ERROR100.getCode(), "[发流失败] " + jsonObject.getString("msg"));
            logger.info("[第三方服务对接->发送流] 流不存在,等待流上线,callId->{}", callId);
            String uuid = UUID.randomUUID().toString();
            HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(app, stream, true, "rtsp", mediaServerItem.getId());
            dynamicTask.startDelay(uuid, ()->{
                logger.info("[第三方服务对接->发送流] 等待流上线超时 callId->{}", callId);
                redisTemplate.delete(key);
                hookSubscribe.removeSubscribe(hookSubscribeForStreamChange);
            }, 10000);
            // 订阅 zlm启动事件, 新的zlm也会从这里进入系统
            OtherRtpSendInfo finalSendInfo = sendInfo;
            hookSubscribe.addSubscribe(hookSubscribeForStreamChange,
                    (mediaServerItemInUse, response)->{
                        dynamicTask.stop(uuid);
                        logger.info("[第三方服务对接->发送流] 流上线,开始发流 callId->{}", callId);
                        JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, param);
                        System.out.println("========发流结果==========");
                        System.out.println(jsonObject);
                        if (jsonObject.getInteger("code") == 0) {
                            logger.info("[第三方服务对接->发送流] 发流成功,callId->{}", callId);
                            redisTemplate.opsForValue().set(key, finalSendInfo);
                        }else {
                            redisTemplate.delete(key);
                            logger.info("[第三方服务对接->发送流] 发流失败,callId->{}, {}", callId, jsonObject.getString("msg"));
                            throw new ControllerException(ErrorCode.ERROR100.getCode(), "[发流失败] " + jsonObject.getString("msg"));
                        }
                    });
        }
    }
    @GetMapping(value = "/send/stop")
    @ResponseBody