src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java
@@ -1,5 +1,7 @@ package com.genersoft.iot.vmp.gb28181.bean; import com.genersoft.iot.vmp.common.VideoManagerConstants; public class SendRtpItem { /** @@ -89,7 +91,7 @@ /** * invite 的 callId */ private String CallId; private String callId; /** * invite 的 fromTag @@ -242,11 +244,11 @@ } public String getCallId() { return CallId; return callId; } public void setCallId(String callId) { CallId = callId; this.callId = callId; } public InviteStreamType getPlayType() { @@ -364,7 +366,7 @@ ", localPort=" + localPort + ", mediaServerId='" + mediaServerId + '\'' + ", serverId='" + serverId + '\'' + ", CallId='" + CallId + '\'' + ", CallId='" + callId + '\'' + ", fromTag='" + fromTag + '\'' + ", toTag='" + toTag + '\'' + ", pt=" + pt + @@ -376,4 +378,15 @@ ", sessionName='" + sessionName + '\'' + '}'; } public String getRedisKey() { String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + serverId + "_" + mediaServerId + "_" + platformId + "_" + channelId + "_" + stream + "_" + callId; return key; } } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
@@ -96,7 +96,7 @@ logger.info("[收到ACK]: 来自->{}", fromUserId); SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId()); if (sendRtpItem == null) { logger.warn("[收到ACK]:未找到来自{},目标为({})的推流信息",fromUserId, toUserId); logger.warn("[收到ACK]:未找到来自{},callId: {}", fromUserId, callIdHeader.getCallId()); return; } // tcp主动时,此时是级联下级平台,在回复200ok时,本地已经请求zlm开启监听,跳过下面步骤 @@ -117,7 +117,7 @@ if (parentPlatform != null) { Map<String, Object> param = getSendRtpParam(sendRtpItem); if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) { WVPResult wvpResult = redisRpcService.startSendRtp(sendRtpItem); WVPResult wvpResult = redisRpcService.startSendRtp(sendRtpItem.getRedisKey(), sendRtpItem); if (wvpResult.getCode() == 0) { MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), parentPlatform.getServerGBId(), parentPlatform.getName(), userSetting.getServerId(), src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
@@ -137,7 +137,7 @@ if (platform != null) { redisCatchStorage.sendPlatformStopPlayMsg(sendRtpItem, platform); if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) { redisRpcService.stopSendRtp(sendRtpItem); redisRpcService.stopSendRtp(sendRtpItem.getRedisKey()); redisCatchStorage.deleteSendRTPServer(null, null, sendRtpItem.getCallId(), null); }else { MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -44,6 +44,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import javax.sdp.*; @@ -84,6 +85,9 @@ @Autowired private IRedisRpcService redisRpcService; @Autowired private RedisTemplate<Object, Object> redisTemplate; @Autowired private SSRCFactory ssrcFactory; @@ -604,6 +608,7 @@ StreamPushItem transform = streamPushService.transform(pushListItem); transform.setSelf(userSetting.getServerId().equals(pushListItem.getSeverId())); redisCatchStorage.updateSendRTPSever(sendRtpItem); // 开始推流 sendPushStream(sendRtpItem, mediaServerItem, platform, request); }else { @@ -766,7 +771,7 @@ redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel); // 设置超时 dynamicTask.startDelay(sendRtpItem.getCallId(), () -> { redisRpcService.stopWaitePushStreamOnline(sendRtpItem); redisRpcService.stopWaitePushStreamOnline(sendRtpItem.getRedisKey(), sendRtpItem); logger.info("[ app={}, stream={} ] 等待设备开始推流超时", sendRtpItem.getApp(), sendRtpItem.getStream()); try { responseAck(request, Response.REQUEST_TIMEOUT); // 超时 @@ -775,8 +780,27 @@ } }, userSetting.getPlatformPlayTimeout()); // redisRpcService.waitePushStreamOnline(sendRtpItem, (sendRtpItemFromRedis) -> { redisRpcService.waitePushStreamOnline(sendRtpItem, (sendRtpItemKey) -> { dynamicTask.stop(sendRtpItem.getCallId()); if (sendRtpItemKey == null) { logger.warn("[级联点播] 等待推流得到结果未空: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); try { responseAck(request, Response.BUSY_HERE); } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("未处理的异常 ", e); } return; } SendRtpItem sendRtpItemFromRedis = (SendRtpItem)redisTemplate.opsForValue().get(sendRtpItemKey); if (sendRtpItemFromRedis == null) { logger.warn("[级联点播] 等待推流, 未找到redis中缓存的发流信息: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); try { responseAck(request, Response.BUSY_HERE); } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("未处理的异常 ", e); } return; } if (sendRtpItemFromRedis.getServerId().equals(userSetting.getServerId())) { logger.info("[级联点播] 等待的推流在本平台上线 {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); int localPort = sendRtpPortManager.getNextPort(mediaServerItem); @@ -784,11 +808,7 @@ logger.warn("上级点时创建sendRTPItem失败,可能是服务器端口资源不足"); try { responseAck(request, Response.BUSY_HERE); } catch (SipException e) { logger.error("未处理的异常 ", e); } catch (InvalidArgumentException e) { logger.error("未处理的异常 ", e); } catch (ParseException e) { } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("未处理的异常 ", e); } return; @@ -814,7 +834,7 @@ redisPushStreamResponseListener.addEvent(sendRtpItem.getApp(), sendRtpItem.getStream(), response -> { if (response.getCode() != 0) { dynamicTask.stop(sendRtpItem.getCallId()); redisRpcService.stopWaitePushStreamOnline(sendRtpItem); redisRpcService.stopWaitePushStreamOnline(sendRtpItem.getRedisKey(), sendRtpItem); try { responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg()); } catch (SipException | InvalidArgumentException | ParseException e) { @@ -831,7 +851,10 @@ */ private void otherWvpPushStream(SendRtpItem sendRtpItem, SIPRequest request, ParentPlatform platform) { logger.info("[级联点播] 来自其他wvp的推流 {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); sendRtpItem = redisRpcService.getSendRtpItem(sendRtpItem); sendRtpItem = redisRpcService.getSendRtpItem(sendRtpItem.getRedisKey()); if (sendRtpItem == null) { return; } // 写入redis, 超时时回复 sendRtpItem.setStatus(1); SIPResponse response = sendStreamAck(request, sendRtpItem, platform); src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -539,7 +539,7 @@ } }else { // 通知其他wvp停止发流 redisRpcService.rtpSendStopped(sendRtpItem); redisRpcService.rtpSendStopped(sendRtpItem.getRedisKey()); } } catch (SipException | InvalidArgumentException | ParseException | SsrcTransactionNotFoundException e) { src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java
@@ -6,16 +6,16 @@ public interface IRedisRpcService { SendRtpItem getSendRtpItem(SendRtpItem sendRtpItem); SendRtpItem getSendRtpItem(String sendRtpItemKey); WVPResult startSendRtp(SendRtpItem sendRtpItem); WVPResult startSendRtp(String sendRtpItemKey, SendRtpItem sendRtpItem); void waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback<SendRtpItem> callback); WVPResult stopSendRtp(String sendRtpItemKey); WVPResult stopSendRtp(SendRtpItem sendRtpItem); void waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback<String> callback); void stopWaitePushStreamOnline(SendRtpItem sendRtpItem); void stopWaitePushStreamOnline(String sendRtpItemKey, SendRtpItem sendRtpItem); void rtpSendStopped(SendRtpItem sendRtpItem); void rtpSendStopped(String sendRtpItemKey); } src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java
@@ -1,6 +1,5 @@ package com.genersoft.iot.vmp.service.redisMsg.control; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig; @@ -21,6 +20,7 @@ import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,7 +78,14 @@ * 获取发流的信息 */ public RedisRpcResponse getSendRtpItem(RedisRpcRequest request) { SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class); String sendRtpItemKey = request.getParam().toString(); SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(sendRtpItemKey); if (sendRtpItem == null) { logger.info("[redis-rpc] 获取发流的信息, 未找到redis中的发流信息, key:{}", sendRtpItemKey); RedisRpcResponse response = request.getResponse(); response.setStatusCode(200); return response; } logger.info("[redis-rpc] 获取发流的信息: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort()); // 查询本级是否有这个流 MediaServerItem mediaServerItem = mediaServerService.getMediaServerByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream()); @@ -103,9 +110,10 @@ sendRtpItem.setSsrc(ssrc); } redisCatchStorage.updateSendRTPSever(sendRtpItem); redisTemplate.opsForValue().set(sendRtpItemKey, sendRtpItem); RedisRpcResponse response = request.getResponse(); response.setStatusCode(200); response.setBody(sendRtpItem); response.setBody(sendRtpItemKey); return response; } @@ -113,14 +121,25 @@ * 监听流上线 */ public RedisRpcResponse waitePushStreamOnline(RedisRpcRequest request) { SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class); SendRtpItem sendRtpItem = JSONObject.parseObject(request.getParam().toString(), SendRtpItem.class); logger.info("[redis-rpc] 监听流上线: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort()); // 查询本级是否有这个流 MediaServerItem mediaServerItem = mediaServerService.getMediaServerByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream()); if (mediaServerItem != null) { logger.info("[redis-rpc] 监听流上线时发现流已存在直接返回: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() ); // 读取redis中的上级点播信息,生成sendRtpItm发送出去 if (sendRtpItem.getSsrc() == null) { // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); sendRtpItem.setSsrc(ssrc); } sendRtpItem.setMediaServerId(mediaServerItem.getId()); sendRtpItem.setLocalIp(mediaServerItem.getSdpIp()); sendRtpItem.setServerId(userSetting.getServerId()); redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem); RedisRpcResponse response = request.getResponse(); response.setBody(sendRtpItem); response.setBody(sendRtpItem.getRedisKey()); response.setStatusCode(200); } // 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者 @@ -139,8 +158,9 @@ sendRtpItem.setLocalIp(mediaServerItemInUse.getSdpIp()); sendRtpItem.setServerId(userSetting.getServerId()); redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem); RedisRpcResponse response = request.getResponse(); response.setBody(sendRtpItem); response.setBody(sendRtpItem.getRedisKey()); response.setStatusCode(200); // 手动发送结果 sendResponse(response); @@ -153,7 +173,14 @@ * 停止监听流上线 */ public RedisRpcResponse stopWaitePushStreamOnline(RedisRpcRequest request) { SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class); String sendRtpItemKey = request.getParam().toString(); SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(sendRtpItemKey); if (sendRtpItem == null) { logger.info("[redis-rpc] 停止监听流上线, 未找到redis中的发流信息, key:{}", sendRtpItemKey); RedisRpcResponse response = request.getResponse(); response.setStatusCode(200); return response; } logger.info("[redis-rpc] 停止监听流上线: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() ); // 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者 @@ -168,24 +195,33 @@ * 开始发流 */ public RedisRpcResponse startSendRtp(RedisRpcRequest request) { SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class); String sendRtpItemKey = request.getParam().toString(); SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(sendRtpItemKey); RedisRpcResponse response = request.getResponse(); response.setStatusCode(200); if (sendRtpItem == null) { logger.info("[redis-rpc] 开始发流, 未找到redis中的发流信息, key:{}", sendRtpItemKey); WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到redis中的发流信息"); response.setBody(wvpResult); return response; } logger.info("[redis-rpc] 开始发流: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort()); MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId()); if (mediaServerItem == null) { logger.info("[redis-rpc] startSendRtp->未找到MediaServer: {}", sendRtpItem.getMediaServerId() ); RedisRpcResponse response = request.getResponse(); response.setStatusCode(200); WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到MediaServer"); response.setBody(wvpResult); return response; } Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream()); if (!streamReady) { logger.info("[redis-rpc] startSendRtp->流不在线: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() ); RedisRpcResponse response = request.getResponse(); response.setStatusCode(200); WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "流不在线"); response.setBody(wvpResult); return response; } JSONObject jsonObject = zlmServerFactory.startSendRtp(mediaServerItem, sendRtpItem); RedisRpcResponse response = request.getResponse(); response.setStatusCode(200); if (jsonObject.getInteger("code") == 0) { logger.info("[redis-rpc] 发流成功: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort()); WVPResult wvpResult = WVPResult.success(); @@ -202,43 +238,51 @@ * 停止发流 */ public RedisRpcResponse stopSendRtp(RedisRpcRequest request) { SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class); String sendRtpItemKey = request.getParam().toString(); SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(sendRtpItemKey); RedisRpcResponse response = request.getResponse(); response.setStatusCode(200); if (sendRtpItem == null) { logger.info("[redis-rpc] 停止推流, 未找到redis中的发流信息, key:{}", sendRtpItemKey); WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到redis中的发流信息"); response.setBody(wvpResult); return response; } logger.info("[redis-rpc] 停止推流: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() ); MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId()); if (mediaServerItem == null) { logger.info("[redis-rpc] stopSendRtp->未找到MediaServer: {}", sendRtpItem.getMediaServerId() ); RedisRpcResponse response = request.getResponse(); response.setStatusCode(200); WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到MediaServer"); response.setBody(wvpResult); return response; } JSONObject jsonObject = zlmServerFactory.stopSendRtpStream(mediaServerItem, sendRtpItem); RedisRpcResponse response = request.getResponse(); response.setStatusCode(200); if (jsonObject.getInteger("code") == 0) { logger.info("[redis-rpc] 停止推流成功: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() ); WVPResult wvpResult = WVPResult.success(); response.setBody(wvpResult); response.setBody(WVPResult.success()); return response; }else { int code = jsonObject.getInteger("code"); String msg = jsonObject.getString("msg"); logger.info("[redis-rpc] 停止推流失败: {}/{}, 目标地址: {}:{}, code: {}, msg: {}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), code, msg ); WVPResult wvpResult = WVPResult.fail(code, msg); response.setBody(wvpResult); response.setBody(WVPResult.fail(code, msg)); return response; } return response; } /** * 其他wvp通知推流已经停止了 */ public RedisRpcResponse rtpSendStopped(RedisRpcRequest request) { SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class); logger.info("[redis-rpc] 推流已经停止: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() ); SendRtpItem sendRtpItemInCatch = redisCatchStorage.querySendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getStream(), sendRtpItem.getCallId()); String sendRtpItemKey = request.getParam().toString(); SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(sendRtpItemKey); RedisRpcResponse response = request.getResponse(); response.setStatusCode(200); if (sendRtpItemInCatch == null) { if (sendRtpItem == null) { logger.info("[redis-rpc] 推流已经停止, 未找到redis中的发流信息, key:{}", sendRtpItemKey); return response; } logger.info("[redis-rpc] 推流已经停止: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() ); String platformId = sendRtpItem.getPlatformId(); ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId); if (platform == null) { src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java
@@ -14,10 +14,12 @@ import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; @Service @@ -37,6 +39,9 @@ @Autowired private SSRCFactory ssrcFactory; @Autowired private RedisTemplate<Object, Object> redisTemplate; private RedisRpcRequest buildRequest(String uri, Object param) { RedisRpcRequest request = new RedisRpcRequest(); request.setFromId(userSetting.getServerId()); @@ -46,32 +51,40 @@ } @Override public SendRtpItem getSendRtpItem(SendRtpItem sendRtpItem) { RedisRpcRequest request = buildRequest("getSendRtpItem", sendRtpItem); public SendRtpItem getSendRtpItem(String sendRtpItemKey) { RedisRpcRequest request = buildRequest("getSendRtpItem", sendRtpItemKey); RedisRpcResponse response = redisRpcConfig.request(request, 10); return JSON.parseObject(response.getBody().toString(), SendRtpItem.class); if (response.getBody() == null) { return null; } return (SendRtpItem)redisTemplate.opsForValue().get(response.getBody().toString()); } @Override public WVPResult startSendRtp(SendRtpItem sendRtpItem) { public WVPResult startSendRtp(String sendRtpItemKey, SendRtpItem sendRtpItem) { logger.info("[请求其他WVP] 开始推流,wvp:{}, {}/{}", sendRtpItem.getServerId(), sendRtpItem.getApp(), sendRtpItem.getStream()); RedisRpcRequest request = buildRequest("startSendRtp", sendRtpItem); RedisRpcRequest request = buildRequest("startSendRtp", sendRtpItemKey); request.setToId(sendRtpItem.getServerId()); RedisRpcResponse response = redisRpcConfig.request(request, 10); return JSON.parseObject(response.getBody().toString(), WVPResult.class); } @Override public WVPResult stopSendRtp(SendRtpItem sendRtpItem) { RedisRpcRequest request = buildRequest("stopSendRtp", sendRtpItem); public WVPResult stopSendRtp(String sendRtpItemKey) { SendRtpItem sendRtpItem = (SendRtpItem)redisTemplate.opsForValue().get(sendRtpItemKey); if (sendRtpItem == null) { logger.info("[请求其他WVP] 停止推流, 未找到redis中的发流信息, key:{}", sendRtpItemKey); return WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到发流信息"); } logger.info("[请求其他WVP] 停止推流,wvp:{}, {}/{}", sendRtpItem.getServerId(), sendRtpItem.getApp(), sendRtpItem.getStream()); RedisRpcRequest request = buildRequest("stopSendRtp", sendRtpItemKey); request.setToId(sendRtpItem.getServerId()); RedisRpcResponse response = redisRpcConfig.request(request, 10); return JSON.parseObject(response.getBody().toString(), WVPResult.class); } @Override public void waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback<SendRtpItem> callback) { public void waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback<String> callback) { logger.info("[请求所有WVP监听流上线] {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); // 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者 HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed( @@ -87,36 +100,47 @@ sendRtpItem.setMediaServerId(mediaServerItemInUse.getId()); sendRtpItem.setLocalIp(mediaServerItemInUse.getSdpIp()); sendRtpItem.setServerId(userSetting.getServerId()); redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem); if (callback != null) { callback.run(sendRtpItem); callback.run(sendRtpItem.getRedisKey()); } hookSubscribe.removeSubscribe(hook); }); RedisRpcRequest request = buildRequest("waitePushStreamOnline", sendRtpItem); request.setToId(sendRtpItem.getServerId()); redisRpcConfig.request(request, response -> { SendRtpItem sendRtpItemFromOther = JSON.parseObject(response.getBody().toString(), SendRtpItem.class); logger.info("[请求所有WVP监听流上线] 流上线 {}/{}->{}", sendRtpItemFromOther.getApp(), sendRtpItemFromOther.getStream(), sendRtpItemFromOther); if (response.getBody() == null) { logger.info("[请求所有WVP监听流上线] 流上线,但是未找到发流信息:{}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); return; } logger.info("[请求所有WVP监听流上线] 流上线 {}/{}->{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.toString()); if (callback != null) { callback.run(sendRtpItemFromOther); callback.run(response.getBody().toString()); } }); } @Override public void stopWaitePushStreamOnline(SendRtpItem sendRtpItem) { public void stopWaitePushStreamOnline(String sendRtpItemKey, SendRtpItem sendRtpItem) { logger.info("[停止WVP监听流上线] {}/{}, key:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItemKey); HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed( sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null); hookSubscribe.removeSubscribe(hook); RedisRpcRequest request = buildRequest("stopWaitePushStreamOnline", sendRtpItem); RedisRpcRequest request = buildRequest("stopWaitePushStreamOnline", sendRtpItemKey); request.setToId(sendRtpItem.getServerId()); redisRpcConfig.request(request, 10); } @Override public void rtpSendStopped(SendRtpItem sendRtpItem) { RedisRpcRequest request = buildRequest("rtpSendStopped", sendRtpItem); public void rtpSendStopped(String sendRtpItemKey) { SendRtpItem sendRtpItem = (SendRtpItem)redisTemplate.opsForValue().get(sendRtpItemKey); if (sendRtpItem == null) { logger.info("[停止WVP监听流上线] 未找到redis中的发流信息, key:{}", sendRtpItemKey); return; } RedisRpcRequest request = buildRequest("rtpSendStopped", sendRtpItemKey); request.setToId(sendRtpItem.getServerId()); redisRpcConfig.request(request, 10); } src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -141,15 +141,7 @@ @Override public void updateSendRTPSever(SendRtpItem sendRtpItem) { String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetting.getServerId() + "_" + sendRtpItem.getMediaServerId() + "_" + sendRtpItem.getPlatformId() + "_" + sendRtpItem.getChannelId() + "_" + sendRtpItem.getStream() + "_" + sendRtpItem.getCallId(); redisTemplate.opsForValue().set(key, sendRtpItem); redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem); } @Override @@ -186,7 +178,7 @@ callId = "*"; } String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetting.getServerId() + "_*_" + "*_*_" + platformGbId + "_" + channelId + "_" + streamId + "_" @@ -292,7 +284,7 @@ */ @Override public void deleteSendRTPServer(SendRtpItem sendRtpItem) { deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),sendRtpItem.getCallId(), sendRtpItem.getServerId()); deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),sendRtpItem.getCallId(), sendRtpItem.getStream()); } @Override src/main/resources/application.yml
@@ -2,4 +2,4 @@ application: name: wvp profiles: active: local active: local2