src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -17,9 +17,11 @@ import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IPlayService; import com.genersoft.iot.vmp.service.IStreamProxyService; import com.genersoft.iot.vmp.service.IStreamPushService; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.bean.SSRCInfo; @@ -65,6 +67,8 @@ @Autowired private IStreamPushService streamPushService; @Autowired private IStreamProxyService streamProxyService; @Autowired private IRedisCatchStorage redisCatchStorage; @@ -142,6 +146,7 @@ MediaServerItem mediaServerItem = null; StreamPushItem streamPushItem = null; StreamProxyItem proxyByAppAndStream =null; // 不是通道可能是直播流 if (channel != null && gbStream == null) { if (channel.getStatus() == 0) { @@ -171,6 +176,13 @@ if ("push".equals(gbStream.getStreamType())) { streamPushItem = streamPushService.getPush(gbStream.getApp(), gbStream.getStream()); if (streamPushItem == null) { logger.info("[ app={}, stream={} ]找不到zlm {},返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId); responseAck(evt, Response.GONE); return; } }else if("proxy".equals(gbStream.getStreamType())){ proxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(gbStream.getApp(), gbStream.getStream()); if (proxyByAppAndStream == null) { logger.info("[ app={}, stream={} ]找不到zlm {},返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId); responseAck(evt, Response.GONE); return; @@ -416,6 +428,7 @@ } } } else if (gbStream != null) { if("push".equals(gbStream.getStreamType())) { if (streamPushItem != null && streamPushItem.isPushIng()) { // 推流状态 pushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, @@ -424,6 +437,24 @@ // 未推流 拉起 notifyStreamOnline(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); } }else if ("proxy".equals(gbStream.getStreamType())){ if(null != proxyByAppAndStream &&proxyByAppAndStream.isStatus()){ pushProxyStream(evt, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive, mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); }else{ //开启代理拉流 boolean start1 = streamProxyService.start(gbStream.getApp(), gbStream.getStream()); if(start1) { pushProxyStream(evt, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive, mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); }else{ //失败后通知 notifyStreamOnline(evt, gbStream, null, platform, callIdHeader, mediaServerItem, port, tcpActive, mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); } } } } } @@ -442,7 +473,39 @@ /** * 安排推流 */ private void pushProxyStream(RequestEvent evt, GbStream gbStream, ParentPlatform platform, CallIdHeader callIdHeader, MediaServerItem mediaServerItem, int port, Boolean tcpActive, boolean mediaTransmissionTCP, String channelId, String addressStr, String ssrc, String requesterId) throws InvalidArgumentException, ParseException, SipException { Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream()); if (streamReady) { // 自平台内容 SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP); if (sendRtpItem == null) { logger.warn("服务器端口资源不足"); responseAck(evt, Response.BUSY_HERE); return; } if (tcpActive != null) { sendRtpItem.setTcpActive(tcpActive); } sendRtpItem.setPlayType(InviteStreamType.PUSH); // 写入redis, 超时时回复 sendRtpItem.setStatus(1); sendRtpItem.setCallId(callIdHeader.getCallId()); byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog()); sendRtpItem.setDialog(dialogByteArray); byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction()); sendRtpItem.setTransaction(transactionByteArray); redisCatchStorage.updateSendRTPSever(sendRtpItem); sendStreamAck(mediaServerItem, sendRtpItem, platform, evt); } } private void pushStream(RequestEvent evt, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform, CallIdHeader callIdHeader, MediaServerItem mediaServerItem, int port, Boolean tcpActive, boolean mediaTransmissionTCP, @@ -487,7 +550,6 @@ } } /** * 通知流上线 */ src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
@@ -8,6 +8,7 @@ import java.util.Map; import java.util.Set; import com.genersoft.iot.vmp.media.zlm.ZLMRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -53,6 +54,9 @@ @Autowired private SipConfig sipConfig; @Autowired private ZLMRunner zlmRunner; @Value("${server.ssl.enabled:false}") private boolean sslEnabled; @@ -277,7 +281,13 @@ return null; } String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerId; return (MediaServerItem)redisUtil.get(key); MediaServerItem serverItem=(MediaServerItem)redisUtil.get(key); if(null==serverItem){ //zlm服务不在线,启动重连 reloadZlm(); serverItem=(MediaServerItem)redisUtil.get(key); } return serverItem; } @Override @@ -470,8 +480,13 @@ String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId(); if (redisUtil.zSize(key) == null || redisUtil.zSize(key) == 0) { logger.info("获取负载最低的节点时无在线节点,启动重连机制"); //启动重连 reloadZlm(); if (redisUtil.zSize(key) == null || redisUtil.zSize(key) == 0) { logger.info("获取负载最低的节点时无在线节点"); return null; } } // 获取分数最低的,及并发最低的 @@ -633,8 +648,14 @@ MediaServerItem mediaServerItem = getOne(mediaServerId); if (mediaServerItem == null) { // zlm连接重试 logger.warn("[更新ZLM 保活信息]失败,未找到流媒体信息,尝试重连zlm"); reloadZlm(); mediaServerItem = getOne(mediaServerId); if (mediaServerItem == null) { // zlm连接重试 logger.warn("[更新ZLM 保活信息]失败,未找到流媒体信息"); return; } } String key = VideoManagerConstants.MEDIA_SERVER_KEEPALIVE_PREFIX + userSetting.getServerId() + "_" + mediaServerId; int hookAliveInterval = mediaServerItem.getHookAliveInterval() + 2; @@ -657,4 +678,12 @@ } } public void reloadZlm(){ try { zlmRunner.run(); Thread.sleep(500);//延迟0.5秒缓冲时间 } catch (Exception e) { logger.warn("尝试重连zlm失败!",e); } } } src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.TypeReference; import com.genersoft.iot.vmp.conf.MediaConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; @@ -78,6 +79,10 @@ @Autowired TransactionDefinition transactionDefinition; @Autowired private MediaConfig mediaConfig; @Override public List<StreamPushItem> handleJSON(String jsonData, MediaServerItem mediaServerItem) { if (jsonData == null) { @@ -142,6 +147,8 @@ stream.setStreamType("push"); stream.setStatus(true); stream.setCreateTime(DateUtil.getNow()); stream.setStreamType("push"); stream.setMediaServerId(mediaConfig.getId()); int add = gbStreamMapper.add(stream); return add > 0; } src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/media/MediaController.java
@@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.OnPublishHookParam; import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IStreamProxyService; import com.genersoft.iot.vmp.service.IStreamPushService; import com.genersoft.iot.vmp.service.IMediaService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; @@ -37,6 +38,8 @@ @Autowired private IMediaService mediaService; @Autowired private IStreamProxyService streamProxyService; /** @@ -95,9 +98,31 @@ result.setMsg("scccess"); result.setData(streamInfo); }else { //获取流失败,重启拉流后重试一次 streamProxyService.stop(app,stream); boolean start = streamProxyService.start(app, stream); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } if (useSourceIpAsStreamIp != null && useSourceIpAsStreamIp) { String host = request.getHeader("Host"); String localAddr = host.split(":")[0]; logger.info("使用{}作为返回流的ip", localAddr); streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, localAddr, authority); }else { streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, authority); } if (streamInfo != null){ result.setCode(0); result.setMsg("scccess"); result.setData(streamInfo); }else { result.setCode(-1); result.setMsg("fail"); } } return result; } }