648540858
2022-08-08 935221ab4112894ad3bc92ed91eff3af8bd2226b
Merge pull request #567 from mrjackwang/wvp-28181-2.0

更新上级级联查看直播视频及代理拉流视频流bug
4个文件已修改
127 ■■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java 64 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java 31 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/media/MediaController.java 25 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -17,9 +17,11 @@
import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.service.IStreamProxyService;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
@@ -65,6 +67,8 @@
    @Autowired
    private IStreamPushService streamPushService;
    @Autowired
    private IStreamProxyService streamProxyService;
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
@@ -142,6 +146,7 @@
                MediaServerItem mediaServerItem = null;
                StreamPushItem streamPushItem = null;
                StreamProxyItem proxyByAppAndStream =null;
                // 不是通道可能是直播流
                if (channel != null && gbStream == null) {
                    if (channel.getStatus() == 0) {
@@ -171,6 +176,13 @@
                        if ("push".equals(gbStream.getStreamType())) {
                            streamPushItem = streamPushService.getPush(gbStream.getApp(), gbStream.getStream());
                            if (streamPushItem == null) {
                                logger.info("[ app={}, stream={} ]找不到zlm {},返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId);
                                responseAck(evt, Response.GONE);
                                return;
                            }
                        }else if("proxy".equals(gbStream.getStreamType())){
                            proxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(gbStream.getApp(), gbStream.getStream());
                            if (proxyByAppAndStream == null) {
                                logger.info("[ app={}, stream={} ]找不到zlm {},返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId);
                                responseAck(evt, Response.GONE);
                                return;
@@ -416,6 +428,7 @@
                        }
                    }
                } else if (gbStream != null) {
                    if("push".equals(gbStream.getStreamType())) {
                    if (streamPushItem != null && streamPushItem.isPushIng()) {
                        // 推流状态
                        pushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
@@ -424,6 +437,24 @@
                        // 未推流 拉起
                        notifyStreamOnline(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                                mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                        }
                    }else if ("proxy".equals(gbStream.getStreamType())){
                        if(null != proxyByAppAndStream &&proxyByAppAndStream.isStatus()){
                            pushProxyStream(evt, gbStream,  platform, callIdHeader, mediaServerItem, port, tcpActive,
                                    mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                        }else{
                            //开启代理拉流
                            boolean start1 = streamProxyService.start(gbStream.getApp(), gbStream.getStream());
                            if(start1) {
                                pushProxyStream(evt, gbStream,  platform, callIdHeader, mediaServerItem, port, tcpActive,
                                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                            }else{
                                //失败后通知
                                notifyStreamOnline(evt, gbStream, null, platform, callIdHeader, mediaServerItem, port, tcpActive,
                                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                            }
                        }
                    }
                }
            }
@@ -442,7 +473,39 @@
    /**
     * 安排推流
     */
    private void pushProxyStream(RequestEvent evt, GbStream gbStream, ParentPlatform platform,
                            CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
                            int port, Boolean tcpActive, boolean mediaTransmissionTCP,
                            String channelId, String addressStr, String ssrc, String requesterId) throws InvalidArgumentException, ParseException, SipException {
            Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
            if (streamReady) {
                // 自平台内容
                SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                        gbStream.getApp(), gbStream.getStream(), channelId,
                        mediaTransmissionTCP);
                if (sendRtpItem == null) {
                    logger.warn("服务器端口资源不足");
                    responseAck(evt, Response.BUSY_HERE);
                    return;
                }
                if (tcpActive != null) {
                    sendRtpItem.setTcpActive(tcpActive);
                }
                sendRtpItem.setPlayType(InviteStreamType.PUSH);
                // 写入redis, 超时时回复
                sendRtpItem.setStatus(1);
                sendRtpItem.setCallId(callIdHeader.getCallId());
                byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog());
                sendRtpItem.setDialog(dialogByteArray);
                byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction());
                sendRtpItem.setTransaction(transactionByteArray);
                redisCatchStorage.updateSendRTPSever(sendRtpItem);
                sendStreamAck(mediaServerItem, sendRtpItem, platform, evt);
        }
    }
    private void pushStream(RequestEvent evt, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
                            CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
                            int port, Boolean tcpActive, boolean mediaTransmissionTCP,
@@ -487,7 +550,6 @@
        }
    }
    /**
     * 通知流上线
     */
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
@@ -8,6 +8,7 @@
import java.util.Map;
import java.util.Set;
import com.genersoft.iot.vmp.media.zlm.ZLMRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -53,6 +54,9 @@
    @Autowired
    private SipConfig sipConfig;
    @Autowired
    private ZLMRunner zlmRunner;
    @Value("${server.ssl.enabled:false}")
    private boolean sslEnabled;
@@ -277,7 +281,13 @@
            return null;
        }
        String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerId;
        return (MediaServerItem)redisUtil.get(key);
        MediaServerItem serverItem=(MediaServerItem)redisUtil.get(key);
        if(null==serverItem){
            //zlm服务不在线,启动重连
            reloadZlm();
            serverItem=(MediaServerItem)redisUtil.get(key);
        }
        return serverItem;
    }
    @Override
@@ -470,8 +480,13 @@
        String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId();
        if (redisUtil.zSize(key)  == null || redisUtil.zSize(key) == 0) {
            logger.info("获取负载最低的节点时无在线节点,启动重连机制");
            //启动重连
            reloadZlm();
            if (redisUtil.zSize(key)  == null || redisUtil.zSize(key) == 0) {
            logger.info("获取负载最低的节点时无在线节点");
            return null;
            }
        }
        // 获取分数最低的,及并发最低的
@@ -633,8 +648,14 @@
        MediaServerItem mediaServerItem = getOne(mediaServerId);
        if (mediaServerItem == null) {
            // zlm连接重试
            logger.warn("[更新ZLM 保活信息]失败,未找到流媒体信息,尝试重连zlm");
            reloadZlm();
            mediaServerItem = getOne(mediaServerId);
            if (mediaServerItem == null) {
                // zlm连接重试
            logger.warn("[更新ZLM 保活信息]失败,未找到流媒体信息");
            return;
            }
        }
        String key = VideoManagerConstants.MEDIA_SERVER_KEEPALIVE_PREFIX + userSetting.getServerId() + "_" + mediaServerId;
        int hookAliveInterval = mediaServerItem.getHookAliveInterval() + 2;
@@ -657,4 +678,12 @@
        }
    }
    public void reloadZlm(){
        try {
            zlmRunner.run();
            Thread.sleep(500);//延迟0.5秒缓冲时间
        } catch (Exception e) {
            logger.warn("尝试重连zlm失败!",e);
        }
    }
}
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
@@ -4,6 +4,7 @@
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.genersoft.iot.vmp.conf.MediaConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
@@ -78,6 +79,10 @@
    @Autowired
    TransactionDefinition transactionDefinition;
    @Autowired
    private MediaConfig mediaConfig;
    @Override
    public List<StreamPushItem> handleJSON(String jsonData, MediaServerItem mediaServerItem) {
        if (jsonData == null) {
@@ -142,6 +147,8 @@
        stream.setStreamType("push");
        stream.setStatus(true);
        stream.setCreateTime(DateUtil.getNow());
        stream.setStreamType("push");
        stream.setMediaServerId(mediaConfig.getId());
        int add = gbStreamMapper.add(stream);
        return add > 0;
    }
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/media/MediaController.java
@@ -6,6 +6,7 @@
import com.genersoft.iot.vmp.media.zlm.dto.OnPublishHookParam;
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IStreamProxyService;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.service.IMediaService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@@ -37,6 +38,8 @@
    @Autowired
    private IMediaService mediaService;
    @Autowired
    private IStreamProxyService streamProxyService;
    /**
@@ -95,9 +98,31 @@
            result.setMsg("scccess");
            result.setData(streamInfo);
        }else {
            //获取流失败,重启拉流后重试一次
            streamProxyService.stop(app,stream);
            boolean start = streamProxyService.start(app, stream);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (useSourceIpAsStreamIp != null && useSourceIpAsStreamIp) {
                String host = request.getHeader("Host");
                String localAddr = host.split(":")[0];
                logger.info("使用{}作为返回流的ip", localAddr);
                streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, localAddr, authority);
            }else {
                streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, authority);
            }
            if (streamInfo != null){
                result.setCode(0);
                result.setMsg("scccess");
                result.setData(streamInfo);
            }else {
            result.setCode(-1);
            result.setMsg("fail");
        }
        }
        return result;
    }
}