| | |
| | | import org.slf4j.LoggerFactory; |
| | | import org.springframework.beans.factory.InitializingBean; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.data.redis.core.RedisTemplate; |
| | | import org.springframework.stereotype.Component; |
| | | |
| | | import javax.sdp.*; |
| | |
| | | import javax.sip.message.Response; |
| | | import java.text.ParseException; |
| | | import java.time.Instant; |
| | | import java.util.HashMap; |
| | | import java.util.Map; |
| | | import java.util.Random; |
| | | import java.util.Vector; |
| | | import java.util.*; |
| | | |
| | | /** |
| | | * SIP命令类型: INVITE请求 |
| | |
| | | |
| | | @Autowired |
| | | private IRedisRpcService redisRpcService; |
| | | |
| | | @Autowired |
| | | private RedisTemplate<Object, Object> redisTemplate; |
| | | |
| | | @Autowired |
| | | private SSRCFactory ssrcFactory; |
| | |
| | | sendRtpItem.setOnlyAudio(false); |
| | | sendRtpItem.setStatus(0); |
| | | sendRtpItem.setSessionName(sessionName); |
| | | |
| | | // 清理可能存在的缓存避免用到旧的数据 |
| | | List<SendRtpItem> sendRtpItemList = redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, gbStream.getStream()); |
| | | if (!sendRtpItemList.isEmpty()) { |
| | | for (SendRtpItem rtpItem : sendRtpItemList) { |
| | | redisCatchStorage.deleteSendRTPServer(rtpItem); |
| | | } |
| | | } |
| | | if ("push".equals(gbStream.getStreamType())) { |
| | | sendRtpItem.setPlayType(InviteStreamType.PUSH); |
| | | if (streamPushItem != null) { |
| | |
| | | |
| | | StreamPushItem transform = streamPushService.transform(pushListItem); |
| | | transform.setSelf(userSetting.getServerId().equals(pushListItem.getSeverId())); |
| | | redisCatchStorage.updateSendRTPSever(sendRtpItem); |
| | | // 开始推流 |
| | | sendPushStream(sendRtpItem, mediaServerItem, platform, request); |
| | | }else { |
| | |
| | | } |
| | | }, userSetting.getPlatformPlayTimeout()); |
| | | // |
| | | redisRpcService.waitePushStreamOnline(sendRtpItem, (sendRtpItemFromRedis) -> { |
| | | long key = redisRpcService.waitePushStreamOnline(sendRtpItem, (sendRtpItemKey) -> { |
| | | dynamicTask.stop(sendRtpItem.getCallId()); |
| | | if (sendRtpItemKey == null) { |
| | | logger.warn("[级联点播] 等待推流得到结果未空: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); |
| | | try { |
| | | responseAck(request, Response.BUSY_HERE); |
| | | } catch (SipException | InvalidArgumentException | ParseException e) { |
| | | logger.error("未处理的异常 ", e); |
| | | } |
| | | return; |
| | | } |
| | | SendRtpItem sendRtpItemFromRedis = (SendRtpItem)redisTemplate.opsForValue().get(sendRtpItemKey); |
| | | if (sendRtpItemFromRedis == null) { |
| | | logger.warn("[级联点播] 等待推流, 未找到redis中缓存的发流信息: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); |
| | | try { |
| | | responseAck(request, Response.BUSY_HERE); |
| | | } catch (SipException | InvalidArgumentException | ParseException e) { |
| | | logger.error("未处理的异常 ", e); |
| | | } |
| | | return; |
| | | } |
| | | if (sendRtpItemFromRedis.getServerId().equals(userSetting.getServerId())) { |
| | | |
| | | logger.info("[级联点播] 等待的推流在本平台上线 {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); |
| | | int localPort = sendRtpPortManager.getNextPort(mediaServerItem); |
| | | if (localPort == 0) { |
| | | logger.warn("上级点时创建sendRTPItem失败,可能是服务器端口资源不足"); |
| | | try { |
| | | responseAck(request, Response.BUSY_HERE); |
| | | } catch (SipException e) { |
| | | logger.error("未处理的异常 ", e); |
| | | } catch (InvalidArgumentException e) { |
| | | logger.error("未处理的异常 ", e); |
| | | } catch (ParseException e) { |
| | | } catch (SipException | InvalidArgumentException | ParseException e) { |
| | | logger.error("未处理的异常 ", e); |
| | | } |
| | | return; |
| | |
| | | } |
| | | }); |
| | | // 添加回复的拒绝或者错误的通知 |
| | | // redis消息例如: PUBLISH VM_MSG_STREAM_PUSH_RESPONSE '{"code":1,"msg":"失败","app":"1","stream":"2"}' |
| | | redisPushStreamResponseListener.addEvent(sendRtpItem.getApp(), sendRtpItem.getStream(), response -> { |
| | | if (response.getCode() != 0) { |
| | | dynamicTask.stop(sendRtpItem.getCallId()); |
| | | redisRpcService.stopWaitePushStreamOnline(sendRtpItem); |
| | | redisRpcService.removeCallback(key); |
| | | try { |
| | | responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg()); |
| | | } catch (SipException | InvalidArgumentException | ParseException e) { |
| | |
| | | * 来自其他wvp的推流 |
| | | */ |
| | | private void otherWvpPushStream(SendRtpItem sendRtpItem, SIPRequest request, ParentPlatform platform) { |
| | | logger.info("[级联点播]直播流来自其他平台,发送redis消息"); |
| | | sendRtpItem = redisRpcService.getSendRtpItem(sendRtpItem); |
| | | logger.info("[级联点播] 来自其他wvp的推流 {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); |
| | | sendRtpItem = redisRpcService.getSendRtpItem(sendRtpItem.getRedisKey()); |
| | | if (sendRtpItem == null) { |
| | | return; |
| | | } |
| | | // 写入redis, 超时时回复 |
| | | sendRtpItem.setStatus(1); |
| | | SIPResponse response = sendStreamAck(request, sendRtpItem, platform); |