648540858
2022-11-21 6b03568c5dd128a3d71c02fb1a3a76a4344a4920
优化rtcp判断
10个文件已修改
120 ■■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java 15 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java 15 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java 18 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java 30 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/bean/RequestSendItemMsg.java 17 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java
@@ -1,7 +1,5 @@
package com.genersoft.iot.vmp.gb28181.bean;
import gov.nist.javax.sip.message.SIPRequest;
public class SendRtpItem {
    /**
@@ -107,6 +105,11 @@
     * 当usePs 为false时,有效。为1时,发送音频;为0时,发送视频;不传时默认为0
     */
    private boolean onlyAudio = false;
    /**
     * 是否开启rtcp保活
     */
    private boolean rtcp = false;
    /**
@@ -281,4 +284,12 @@
    public void setToTag(String toTag) {
        this.toTag = toTag;
    }
    public boolean isRtcp() {
        return rtcp;
    }
    public void setRtcp(boolean rtcp) {
        this.rtcp = rtcp;
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java
@@ -1,8 +1,6 @@
package com.genersoft.iot.vmp.gb28181.session;
import java.util.ArrayList;
import java.util.List;
import com.alibaba.fastjson2.JSON;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
@@ -12,6 +10,9 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import java.util.ArrayList;
import java.util.List;
/**    
 * @description:视频流session管理器,管理视频预览、预览回放的通信句柄 
@@ -27,7 +28,8 @@
    public enum SessionType {
        play,
        playback,
        download
        download,
        broadcast
    }
    /**
@@ -50,9 +52,8 @@
        ssrcTransaction.setSsrc(ssrc);
        ssrcTransaction.setMediaServerId(mediaServerId);
        ssrcTransaction.setType(type);
        RedisUtil.set(VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetting.getServerId()
                + "_" +  deviceId + "_" + channelId + "_" + callId + "_" + stream, ssrcTransaction);
        System.out.println(22222);
        System.out.println(JSON.toJSONString(ssrcTransaction));
        RedisUtil.set(VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetting.getServerId()
                + "_" +  deviceId + "_" + channelId + "_" + callId + "_" + stream, ssrcTransaction);
    }
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
@@ -646,7 +646,7 @@
     * 视频流停止, 不使用回调
     */
    @Override
    public synchronized void streamByeCmd(Device device, String channelId, String stream, String callId) throws InvalidArgumentException, ParseException, SipException, SsrcTransactionNotFoundException {
    public void streamByeCmd(Device device, String channelId, String stream, String callId) throws InvalidArgumentException, ParseException, SipException, SsrcTransactionNotFoundException {
        streamByeCmd(device, channelId, stream, callId, null);
    }
@@ -654,7 +654,7 @@
     * 视频流停止
     */
    @Override
    public synchronized void streamByeCmd(Device device, String channelId, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException {
    public void streamByeCmd(Device device, String channelId, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException {
        SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, callId, stream);
        if (ssrcTransaction == null) {
            throw new SsrcTransactionNotFoundException(device.getDeviceId(), channelId, callId, stream);
@@ -669,7 +669,7 @@
    }
    @Override
    public synchronized void streamByeCmd(Device device, String channelId, SipTransactionInfo sipTransactionInfo, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException {
    public void streamByeCmd(Device device, String channelId, SipTransactionInfo sipTransactionInfo, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException {
        Request byteRequest = headerProvider.createByteRequest(device, channelId, sipTransactionInfo);
        sipSender.transmitRequest(device.getTransport(), byteRequest, null, okEvent);
    }
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
@@ -4,10 +4,10 @@
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
@@ -22,15 +22,12 @@
import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.stack.SIPDialog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.sip.*;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.SipException;
@@ -38,7 +35,6 @@
import javax.sip.header.CallIdHeader;
import javax.sip.header.FromHeader;
import javax.sip.header.HeaderAddress;
import java.text.ParseException;
import javax.sip.header.ToHeader;
import java.text.ParseException;
import java.util.HashMap;
@@ -122,7 +118,8 @@
        }
        String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
        MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
        logger.info("收到ACK,rtp/{}开始向上级推流, 目标={}:{},SSRC={}", sendRtpItem.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc());
        logger.info("收到ACK,rtp/{}开始向上级推流, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getStreamId(),
                sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp());
        Map<String, Object> param = new HashMap<>(12);
        param.put("vhost","__defaultVhost__");
        param.put("app",sendRtpItem.getApp());
@@ -132,9 +129,9 @@
        param.put("pt", sendRtpItem.getPt());
        param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
        param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
        if (!sendRtpItem.isTcp() && parentPlatform != null && parentPlatform.isRtcp()) {
        if (!sendRtpItem.isTcp()) {
            // 开启rtcp保活
            param.put("udp_rtcp_timeout", "1");
            param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0");
        }
        JSONObject jsonObject;
@@ -145,6 +142,9 @@
            param.put("dst_url", sendRtpItem.getIp());
            param.put("dst_port", sendRtpItem.getPort());
            jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
            System.out.println(JSON.toJSONString(param));
            System.out.println();
            System.out.println(jsonObject);
        }
            if (jsonObject == null) {
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -126,6 +126,9 @@
    @Autowired
    private SipConfig config;
    @Autowired
    private VideoStreamSessionManager streamSession;
    @Autowired
@@ -383,7 +386,7 @@
                    }
                    SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                            device.getDeviceId(), channelId,
                            mediaTransmissionTCP);
                            mediaTransmissionTCP, platform.isRtcp());
                    if (tcpActive != null) {
                        sendRtpItem.setTcpActive(tcpActive);
@@ -579,7 +582,7 @@
                // 自平台内容
                SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                        gbStream.getApp(), gbStream.getStream(), channelId,
                        mediaTransmissionTCP);
                        mediaTransmissionTCP, platform.isRtcp());
                if (sendRtpItem == null) {
                    logger.warn("服务器端口资源不足");
@@ -619,7 +622,7 @@
                // 自平台内容
                SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                        gbStream.getApp(), gbStream.getStream(), channelId,
                        mediaTransmissionTCP);
                        mediaTransmissionTCP, platform.isRtcp());
                if (sendRtpItem == null) {
                    logger.warn("服务器端口资源不足");
@@ -736,7 +739,7 @@
                dynamicTask.stop(callIdHeader.getCallId());
                if (serverId.equals(userSetting.getServerId())) {
                    SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId,
                            app, stream, channelId, mediaTransmissionTCP);
                            app, stream, channelId, mediaTransmissionTCP, platform.isRtcp());
                    if (sendRtpItem == null) {
                        logger.warn("上级点时创建sendRTPItem失败,可能是服务器端口资源不足");
@@ -798,7 +801,7 @@
        // 发送redis消息
        redisGbPlayMsgListener.sendMsg(streamPushItem.getServerId(), streamPushItem.getMediaServerId(),
                streamPushItem.getApp(), streamPushItem.getStream(), addressStr, port, ssrc, requesterId,
                channelId, mediaTransmissionTCP, null, responseSendItemMsg -> {
                channelId, mediaTransmissionTCP, platform.isRtcp(), null, responseSendItemMsg -> {
                    SendRtpItem sendRtpItem = responseSendItemMsg.getSendRtpItem();
                    if (sendRtpItem == null || responseSendItemMsg.getMediaServerItem() == null) {
                        logger.warn("服务器端口资源不足");
@@ -904,6 +907,7 @@
        }
        if (device != null) {
            logger.info("收到设备" + requesterId + "的语音广播Invite请求");
            try {
                responseAck(request, Response.TRYING);
            } catch (SipException | InvalidArgumentException | ParseException e) {
@@ -980,7 +984,8 @@
                }
                SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                        device.getDeviceId(), audioBroadcastCatch.getChannelId(),
                        mediaTransmissionTCP);
                        mediaTransmissionTCP, false);
                if (sendRtpItem == null) {
                    logger.warn("服务器端口资源不足");
                    try {
@@ -1006,12 +1011,16 @@
                sendRtpItem.setStreamId(stream);
                sendRtpItem.setPt(8);
                sendRtpItem.setUsePs(false);
                sendRtpItem.setRtcp(false);
                sendRtpItem.setOnlyAudio(true);
                redisCatchStorage.updateSendRTPSever(sendRtpItem);
                Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, app, stream);
                if (streamReady) {
                    sendOk(device, sendRtpItem, sdp, request, mediaServerItem, mediaTransmissionTCP, ssrc);
                    SIPResponse sipResponse = sendOk(device, sendRtpItem, sdp, request, mediaServerItem, mediaTransmissionTCP, ssrc);
                    // 添加事务信息
                    streamSession.put(device.getDeviceId(), audioBroadcastCatch.getChannelId(), request.getCallIdHeader().getCallId()
                            , stream,  sendRtpItem.getSsrc(), mediaServerItem.getId(), sipResponse, VideoStreamSessionManager.SessionType.broadcast );
                }else {
                    logger.warn("[语音通话], 未发现待推送的流,app={},stream={}", app, stream);
                    playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId());
@@ -1029,7 +1038,8 @@
        }
    }
    void sendOk(Device device, SendRtpItem sendRtpItem, SessionDescription sdp, SIPRequest request,  MediaServerItem mediaServerItem, boolean mediaTransmissionTCP, String ssrc){
    SIPResponse sendOk(Device device, SendRtpItem sendRtpItem, SessionDescription sdp, SIPRequest request,  MediaServerItem mediaServerItem, boolean mediaTransmissionTCP, String ssrc){
        SIPResponse sipResponse = null;
        try {
            sendRtpItem.setStatus(2);
            redisCatchStorage.updateSendRTPSever(sendRtpItem);
@@ -1065,15 +1075,17 @@
            parentPlatform.setServerPort(device.getPort());
            parentPlatform.setServerGBId(device.getDeviceId());
            SIPResponse sipResponse = responseSdpAck(request, content.toString(), parentPlatform);
            sipResponse = responseSdpAck(request, content.toString(), parentPlatform);
            AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(device.getDeviceId(), sendRtpItem.getChannelId());
            audioBroadcastCatch.setStatus(AudioBroadcastCatchStatus.Ok);
            audioBroadcastCatch.setSipTransactionInfoByRequset(sipResponse);
            audioBroadcastManager.update(audioBroadcastCatch);
        } catch (SipException | InvalidArgumentException | ParseException | SdpParseException e) {
            logger.error("[命令发送失败] 语音对讲 回复200OK(SDP): {}", e.getMessage());
        }
        return sipResponse;
    }
}
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -531,7 +531,7 @@
//                                            cmder.streamByeCmd(device, sendRtpItem.getChannelId(), audioBroadcastCatch.getSipTransactionInfo(), null);
                                        }
                                    }else {
                                        cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId());
                                        cmder.streamByeCmd(device, null, null, sendRtpItem.getCallId());
                                    }
                                }
@@ -771,7 +771,7 @@
    @ResponseBody
    @PostMapping(value = "/on_rtp_server_timeout", produces = "application/json;charset=UTF-8")
    public JSONObject onRtpServerTimeout(HttpServletRequest request, @RequestBody OnRtpServerTimeoutHookParam param){
        logger.info("[ZLM HOOK] rtpServer收流超时:{}->{}({})", param.getMediaServerId(), param.getStream_id(), param.getSsrc());
        logger.info("[ZLM HOOK] rtpServer rtp超时:{}->{}({})", param.getMediaServerId(), param.getStream_id(), param.getSsrc());
        JSONObject ret = new JSONObject();
        ret.put("code", 0);
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
@@ -177,7 +177,7 @@
     * @param tcp 是否为tcp
     * @return SendRtpItem
     */
    public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, String deviceId, String channelId, boolean tcp){
    public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, String deviceId, String channelId, boolean tcp, boolean rtcp){
        // 默认为随机端口
        int localPort = 0;
@@ -197,6 +197,7 @@
        sendRtpItem.setDeviceId(deviceId);
        sendRtpItem.setChannelId(channelId);
        sendRtpItem.setTcp(tcp);
        sendRtpItem.setRtcp(rtcp);
        sendRtpItem.setApp("rtp");
        sendRtpItem.setLocalPort(localPort);
        sendRtpItem.setServerId(userSetting.getServerId());
@@ -214,7 +215,7 @@
     * @param tcp 是否为tcp
     * @return SendRtpItem
     */
    public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, String app, String stream, String channelId, boolean tcp){
    public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, String app, String stream, String channelId, boolean tcp, boolean rtcp){
        // 默认为随机端口
        int localPort = 0;
        if (userSetting.getGbSendStreamStrict()) {
@@ -235,6 +236,7 @@
        sendRtpItem.setLocalPort(localPort);
        sendRtpItem.setServerId(userSetting.getServerId());
        sendRtpItem.setMediaServerId(serverItem.getId());
        sendRtpItem.setRtcp(rtcp);
        return sendRtpItem;
    }
src/main/java/com/genersoft/iot/vmp/service/bean/RequestSendItemMsg.java
@@ -63,10 +63,16 @@
    private Boolean isTcp;
    /**
     * 是否使用TCP
     */
    private Boolean rtcp;
    public static RequestSendItemMsg getInstance(String serverId, String mediaServerId, String app, String stream, String ip, int port,
                                                          String ssrc, String platformId, String channelId, Boolean isTcp, String platformName) {
                                                          String ssrc, String platformId, String channelId, Boolean isTcp, Boolean rtcp, String platformName) {
        RequestSendItemMsg requestSendItemMsg = new RequestSendItemMsg();
        requestSendItemMsg.setServerId(serverId);
        requestSendItemMsg.setMediaServerId(mediaServerId);
@@ -79,6 +85,7 @@
        requestSendItemMsg.setPlatformName(platformName);
        requestSendItemMsg.setChannelId(channelId);
        requestSendItemMsg.setTcp(isTcp);
        requestSendItemMsg.setRtcp(rtcp);
        return  requestSendItemMsg;
    }
@@ -170,4 +177,12 @@
    public void setTcp(Boolean tcp) {
        isTcp = tcp;
    }
    public Boolean getRtcp() {
        return rtcp;
    }
    public void setRtcp(Boolean rtcp) {
        this.rtcp = rtcp;
    }
}
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -347,7 +347,7 @@
//                        }
                        SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, ip, port, ssrcInfo.getSsrc(), device.getDeviceId(),
                                device.getDeviceId(), channelId,
                                false);
                                false, false);
//                        if (sendRtpItem.getLocalPort() == 0) {
@@ -375,6 +375,7 @@
                        sendRtpItem.setStreamId("1000");
                        sendRtpItem.setSsrc(ssrc);
                        sendRtpItem.setOnlyAudio(true);
                        sendRtpItem.setRtcp(false);
                        redisCatchStorage.updateSendRTPSever(sendRtpItem);
                        Map<String, Object> param = new HashMap<>(12);
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
@@ -318,7 +318,7 @@
        SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, content.getIp(),
                content.getPort(), content.getSsrc(), content.getPlatformId(),
                content.getApp(), content.getStream(), content.getChannelId(),
                content.getTcp());
                content.getTcp(), content.getRtcp());
        WVPResult<ResponseSendItemMsg> result = new WVPResult<>();
        result.setCode(0);
@@ -348,9 +348,9 @@
     * @param callback 得到信息的回调
     */
    public void sendMsg(String serverId, String mediaServerId, String app, String stream, String ip, int port, String ssrc,
                        String platformId, String channelId, boolean isTcp, String platformName, PlayMsgCallback callback, PlayMsgErrorCallback errorCallback) {
                        String platformId, String channelId, boolean isTcp, boolean rtcp, String platformName, PlayMsgCallback callback, PlayMsgErrorCallback errorCallback) {
        RequestSendItemMsg requestSendItemMsg = RequestSendItemMsg.getInstance(
                serverId, mediaServerId, app, stream, ip, port, ssrc, platformId, channelId, isTcp, platformName);
                serverId, mediaServerId, app, stream, ip, port, ssrc, platformId, channelId, isTcp, rtcp, platformName);
        requestSendItemMsg.setServerId(serverId);
        String key = UUID.randomUUID().toString();
        WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId, WvpRedisMsgCmd.GET_SEND_ITEM,