From 55a240bb4504baa9a75e44bc6cc597c96b80705d Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期四, 18 四月 2024 15:52:34 +0800 Subject: [PATCH] 临时提交 --- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java | 41 ++++++- src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java | 21 +++ src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java | 2 src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java | 14 -- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java | 2 src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java | 12 +- src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java | 100 ++++++++++++++----- src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java | 58 ++++++++--- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java | 4 src/main/resources/application.yml | 2 10 files changed, 176 insertions(+), 80 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java index f1744d1..65c33fe 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java @@ -1,5 +1,7 @@ package com.genersoft.iot.vmp.gb28181.bean; +import com.genersoft.iot.vmp.common.VideoManagerConstants; + public class SendRtpItem { /** @@ -89,7 +91,7 @@ /** * invite 鐨� callId */ - private String CallId; + private String callId; /** * invite 鐨� fromTag @@ -242,11 +244,11 @@ } public String getCallId() { - return CallId; + return callId; } public void setCallId(String callId) { - CallId = callId; + this.callId = callId; } public InviteStreamType getPlayType() { @@ -364,7 +366,7 @@ ", localPort=" + localPort + ", mediaServerId='" + mediaServerId + '\'' + ", serverId='" + serverId + '\'' + - ", CallId='" + CallId + '\'' + + ", CallId='" + callId + '\'' + ", fromTag='" + fromTag + '\'' + ", toTag='" + toTag + '\'' + ", pt=" + pt + @@ -376,4 +378,15 @@ ", sessionName='" + sessionName + '\'' + '}'; } + + public String getRedisKey() { + String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + + serverId + "_" + + mediaServerId + "_" + + platformId + "_" + + channelId + "_" + + stream + "_" + + callId; + return key; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index 383e042..fabac36 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -96,7 +96,7 @@ logger.info("[鏀跺埌ACK]锛� 鏉ヨ嚜->{}", fromUserId); SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId()); if (sendRtpItem == null) { - logger.warn("[鏀跺埌ACK]锛氭湭鎵惧埌鏉ヨ嚜{}锛岀洰鏍囦负({})鐨勬帹娴佷俊鎭�",fromUserId, toUserId); + logger.warn("[鏀跺埌ACK]锛氭湭鎵惧埌鏉ヨ嚜{}锛宑allId: {}", fromUserId, callIdHeader.getCallId()); return; } // tcp涓诲姩鏃讹紝姝ゆ椂鏄骇鑱斾笅绾у钩鍙帮紝鍦ㄥ洖澶�200ok鏃讹紝鏈湴宸茬粡璇锋眰zlm寮�鍚洃鍚紝璺宠繃涓嬮潰姝ラ @@ -117,7 +117,7 @@ if (parentPlatform != null) { Map<String, Object> param = getSendRtpParam(sendRtpItem); if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) { - WVPResult wvpResult = redisRpcService.startSendRtp(sendRtpItem); + WVPResult wvpResult = redisRpcService.startSendRtp(sendRtpItem.getRedisKey(), sendRtpItem); if (wvpResult.getCode() == 0) { MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), parentPlatform.getServerGBId(), parentPlatform.getName(), userSetting.getServerId(), diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java index 805173a..5b8bad4 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -137,7 +137,7 @@ if (platform != null) { redisCatchStorage.sendPlatformStopPlayMsg(sendRtpItem, platform); if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) { - redisRpcService.stopSendRtp(sendRtpItem); + redisRpcService.stopSendRtp(sendRtpItem.getRedisKey()); redisCatchStorage.deleteSendRTPServer(null, null, sendRtpItem.getCallId(), null); }else { MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 51c5b69..ff84fc4 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -44,6 +44,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import javax.sdp.*; @@ -84,6 +85,9 @@ @Autowired private IRedisRpcService redisRpcService; + + @Autowired + private RedisTemplate<Object, Object> redisTemplate; @Autowired private SSRCFactory ssrcFactory; @@ -604,6 +608,7 @@ StreamPushItem transform = streamPushService.transform(pushListItem); transform.setSelf(userSetting.getServerId().equals(pushListItem.getSeverId())); + redisCatchStorage.updateSendRTPSever(sendRtpItem); // 寮�濮嬫帹娴� sendPushStream(sendRtpItem, mediaServerItem, platform, request); }else { @@ -766,7 +771,7 @@ redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel); // 璁剧疆瓒呮椂 dynamicTask.startDelay(sendRtpItem.getCallId(), () -> { - redisRpcService.stopWaitePushStreamOnline(sendRtpItem); + redisRpcService.stopWaitePushStreamOnline(sendRtpItem.getRedisKey(), sendRtpItem); logger.info("[ app={}, stream={} ] 绛夊緟璁惧寮�濮嬫帹娴佽秴鏃�", sendRtpItem.getApp(), sendRtpItem.getStream()); try { responseAck(request, Response.REQUEST_TIMEOUT); // 瓒呮椂 @@ -775,8 +780,27 @@ } }, userSetting.getPlatformPlayTimeout()); // - redisRpcService.waitePushStreamOnline(sendRtpItem, (sendRtpItemFromRedis) -> { + redisRpcService.waitePushStreamOnline(sendRtpItem, (sendRtpItemKey) -> { dynamicTask.stop(sendRtpItem.getCallId()); + if (sendRtpItemKey == null) { + logger.warn("[绾ц仈鐐规挱] 绛夊緟鎺ㄦ祦寰楀埌缁撴灉鏈┖锛� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); + try { + responseAck(request, Response.BUSY_HERE); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("鏈鐞嗙殑寮傚父 ", e); + } + return; + } + SendRtpItem sendRtpItemFromRedis = (SendRtpItem)redisTemplate.opsForValue().get(sendRtpItemKey); + if (sendRtpItemFromRedis == null) { + logger.warn("[绾ц仈鐐规挱] 绛夊緟鎺ㄦ祦, 鏈壘鍒皉edis涓紦瀛樼殑鍙戞祦淇℃伅锛� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); + try { + responseAck(request, Response.BUSY_HERE); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("鏈鐞嗙殑寮傚父 ", e); + } + return; + } if (sendRtpItemFromRedis.getServerId().equals(userSetting.getServerId())) { logger.info("[绾ц仈鐐规挱] 绛夊緟鐨勬帹娴佸湪鏈钩鍙颁笂绾� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); int localPort = sendRtpPortManager.getNextPort(mediaServerItem); @@ -784,11 +808,7 @@ logger.warn("涓婄骇鐐规椂鍒涘缓sendRTPItem澶辫触锛屽彲鑳芥槸鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�"); try { responseAck(request, Response.BUSY_HERE); - } catch (SipException e) { - logger.error("鏈鐞嗙殑寮傚父 ", e); - } catch (InvalidArgumentException e) { - logger.error("鏈鐞嗙殑寮傚父 ", e); - } catch (ParseException e) { + } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("鏈鐞嗙殑寮傚父 ", e); } return; @@ -814,7 +834,7 @@ redisPushStreamResponseListener.addEvent(sendRtpItem.getApp(), sendRtpItem.getStream(), response -> { if (response.getCode() != 0) { dynamicTask.stop(sendRtpItem.getCallId()); - redisRpcService.stopWaitePushStreamOnline(sendRtpItem); + redisRpcService.stopWaitePushStreamOnline(sendRtpItem.getRedisKey(), sendRtpItem); try { responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg()); } catch (SipException | InvalidArgumentException | ParseException e) { @@ -831,7 +851,10 @@ */ private void otherWvpPushStream(SendRtpItem sendRtpItem, SIPRequest request, ParentPlatform platform) { logger.info("[绾ц仈鐐规挱] 鏉ヨ嚜鍏朵粬wvp鐨勬帹娴� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); - sendRtpItem = redisRpcService.getSendRtpItem(sendRtpItem); + sendRtpItem = redisRpcService.getSendRtpItem(sendRtpItem.getRedisKey()); + if (sendRtpItem == null) { + return; + } // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� sendRtpItem.setStatus(1); SIPResponse response = sendStreamAck(request, sendRtpItem, platform); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index b68770e..4d29532 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -539,7 +539,7 @@ } }else { // 閫氱煡鍏朵粬wvp鍋滄鍙戞祦 - redisRpcService.rtpSendStopped(sendRtpItem); + redisRpcService.rtpSendStopped(sendRtpItem.getRedisKey()); } } catch (SipException | InvalidArgumentException | ParseException | SsrcTransactionNotFoundException e) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java index 8d1b7f0..70d53bc 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java @@ -6,16 +6,16 @@ public interface IRedisRpcService { - SendRtpItem getSendRtpItem(SendRtpItem sendRtpItem); + SendRtpItem getSendRtpItem(String sendRtpItemKey); - WVPResult startSendRtp(SendRtpItem sendRtpItem); + WVPResult startSendRtp(String sendRtpItemKey, SendRtpItem sendRtpItem); - void waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback<SendRtpItem> callback); + WVPResult stopSendRtp(String sendRtpItemKey); - WVPResult stopSendRtp(SendRtpItem sendRtpItem); + void waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback<String> callback); - void stopWaitePushStreamOnline(SendRtpItem sendRtpItem); + void stopWaitePushStreamOnline(String sendRtpItemKey, SendRtpItem sendRtpItem); - void rtpSendStopped(SendRtpItem sendRtpItem); + void rtpSendStopped(String sendRtpItemKey); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java index 69e942f..b61615b 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java @@ -1,6 +1,5 @@ package com.genersoft.iot.vmp.service.redisMsg.control; -import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig; @@ -21,6 +20,7 @@ import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; +import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,7 +78,14 @@ * 鑾峰彇鍙戞祦鐨勪俊鎭� */ public RedisRpcResponse getSendRtpItem(RedisRpcRequest request) { - SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class); + String sendRtpItemKey = request.getParam().toString(); + SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(sendRtpItemKey); + if (sendRtpItem == null) { + logger.info("[redis-rpc] 鑾峰彇鍙戞祦鐨勪俊鎭�, 鏈壘鍒皉edis涓殑鍙戞祦淇℃伅锛� key锛歿}", sendRtpItemKey); + RedisRpcResponse response = request.getResponse(); + response.setStatusCode(200); + return response; + } logger.info("[redis-rpc] 鑾峰彇鍙戞祦鐨勪俊鎭細 {}/{}, 鐩爣鍦板潃锛� {}锛歿}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort()); // 鏌ヨ鏈骇鏄惁鏈夎繖涓祦 MediaServerItem mediaServerItem = mediaServerService.getMediaServerByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream()); @@ -103,9 +110,10 @@ sendRtpItem.setSsrc(ssrc); } redisCatchStorage.updateSendRTPSever(sendRtpItem); + redisTemplate.opsForValue().set(sendRtpItemKey, sendRtpItem); RedisRpcResponse response = request.getResponse(); response.setStatusCode(200); - response.setBody(sendRtpItem); + response.setBody(sendRtpItemKey); return response; } @@ -113,14 +121,25 @@ * 鐩戝惉娴佷笂绾� */ public RedisRpcResponse waitePushStreamOnline(RedisRpcRequest request) { - SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class); + SendRtpItem sendRtpItem = JSONObject.parseObject(request.getParam().toString(), SendRtpItem.class); logger.info("[redis-rpc] 鐩戝惉娴佷笂绾匡細 {}/{}, 鐩爣鍦板潃锛� {}锛歿}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort()); // 鏌ヨ鏈骇鏄惁鏈夎繖涓祦 MediaServerItem mediaServerItem = mediaServerService.getMediaServerByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream()); if (mediaServerItem != null) { logger.info("[redis-rpc] 鐩戝惉娴佷笂绾挎椂鍙戠幇娴佸凡瀛樺湪鐩存帴杩斿洖锛� {}/{}, 鐩爣鍦板潃锛� {}锛歿}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() ); + // 璇诲彇redis涓殑涓婄骇鐐规挱淇℃伅锛岀敓鎴恠endRtpItm鍙戦�佸嚭鍘� + if (sendRtpItem.getSsrc() == null) { + // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡 + String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); + sendRtpItem.setSsrc(ssrc); + } + sendRtpItem.setMediaServerId(mediaServerItem.getId()); + sendRtpItem.setLocalIp(mediaServerItem.getSdpIp()); + sendRtpItem.setServerId(userSetting.getServerId()); + + redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem); RedisRpcResponse response = request.getResponse(); - response.setBody(sendRtpItem); + response.setBody(sendRtpItem.getRedisKey()); response.setStatusCode(200); } // 鐩戝惉娴佷笂绾裤�� 娴佷笂绾跨洿鎺ュ彂閫乻endRtpItem娑堟伅缁欏疄闄呯殑淇′护澶勭悊鑰� @@ -139,8 +158,9 @@ sendRtpItem.setLocalIp(mediaServerItemInUse.getSdpIp()); sendRtpItem.setServerId(userSetting.getServerId()); + redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem); RedisRpcResponse response = request.getResponse(); - response.setBody(sendRtpItem); + response.setBody(sendRtpItem.getRedisKey()); response.setStatusCode(200); // 鎵嬪姩鍙戦�佺粨鏋� sendResponse(response); @@ -153,7 +173,14 @@ * 鍋滄鐩戝惉娴佷笂绾� */ public RedisRpcResponse stopWaitePushStreamOnline(RedisRpcRequest request) { - SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class); + String sendRtpItemKey = request.getParam().toString(); + SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(sendRtpItemKey); + if (sendRtpItem == null) { + logger.info("[redis-rpc] 鍋滄鐩戝惉娴佷笂绾�, 鏈壘鍒皉edis涓殑鍙戞祦淇℃伅锛� key锛歿}", sendRtpItemKey); + RedisRpcResponse response = request.getResponse(); + response.setStatusCode(200); + return response; + } logger.info("[redis-rpc] 鍋滄鐩戝惉娴佷笂绾匡細 {}/{}, 鐩爣鍦板潃锛� {}锛歿}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() ); // 鐩戝惉娴佷笂绾裤�� 娴佷笂绾跨洿鎺ュ彂閫乻endRtpItem娑堟伅缁欏疄闄呯殑淇′护澶勭悊鑰� @@ -168,24 +195,33 @@ * 寮�濮嬪彂娴� */ public RedisRpcResponse startSendRtp(RedisRpcRequest request) { - SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class); + String sendRtpItemKey = request.getParam().toString(); + SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(sendRtpItemKey); + RedisRpcResponse response = request.getResponse(); + response.setStatusCode(200); + if (sendRtpItem == null) { + logger.info("[redis-rpc] 寮�濮嬪彂娴�, 鏈壘鍒皉edis涓殑鍙戞祦淇℃伅锛� key锛歿}", sendRtpItemKey); + WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "鏈壘鍒皉edis涓殑鍙戞祦淇℃伅"); + response.setBody(wvpResult); + return response; + } logger.info("[redis-rpc] 寮�濮嬪彂娴侊細 {}/{}, 鐩爣鍦板潃锛� {}锛歿}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort()); MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId()); if (mediaServerItem == null) { logger.info("[redis-rpc] startSendRtp->鏈壘鍒癕ediaServer锛� {}", sendRtpItem.getMediaServerId() ); - RedisRpcResponse response = request.getResponse(); - response.setStatusCode(200); + WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "鏈壘鍒癕ediaServer"); + response.setBody(wvpResult); + return response; } Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream()); if (!streamReady) { logger.info("[redis-rpc] startSendRtp->娴佷笉鍦ㄧ嚎锛� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() ); - RedisRpcResponse response = request.getResponse(); - response.setStatusCode(200); + WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "娴佷笉鍦ㄧ嚎"); + response.setBody(wvpResult); + return response; } JSONObject jsonObject = zlmServerFactory.startSendRtp(mediaServerItem, sendRtpItem); - RedisRpcResponse response = request.getResponse(); - response.setStatusCode(200); if (jsonObject.getInteger("code") == 0) { logger.info("[redis-rpc] 鍙戞祦鎴愬姛锛� {}/{}, 鐩爣鍦板潃锛� {}锛歿}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort()); WVPResult wvpResult = WVPResult.success(); @@ -202,43 +238,51 @@ * 鍋滄鍙戞祦 */ public RedisRpcResponse stopSendRtp(RedisRpcRequest request) { - SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class); + String sendRtpItemKey = request.getParam().toString(); + SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(sendRtpItemKey); + RedisRpcResponse response = request.getResponse(); + response.setStatusCode(200); + if (sendRtpItem == null) { + logger.info("[redis-rpc] 鍋滄鎺ㄦ祦, 鏈壘鍒皉edis涓殑鍙戞祦淇℃伅锛� key锛歿}", sendRtpItemKey); + WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "鏈壘鍒皉edis涓殑鍙戞祦淇℃伅"); + response.setBody(wvpResult); + return response; + } logger.info("[redis-rpc] 鍋滄鎺ㄦ祦锛� {}/{}, 鐩爣鍦板潃锛� {}锛歿}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() ); MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId()); if (mediaServerItem == null) { logger.info("[redis-rpc] stopSendRtp->鏈壘鍒癕ediaServer锛� {}", sendRtpItem.getMediaServerId() ); - RedisRpcResponse response = request.getResponse(); - response.setStatusCode(200); + WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "鏈壘鍒癕ediaServer"); + response.setBody(wvpResult); + return response; } JSONObject jsonObject = zlmServerFactory.stopSendRtpStream(mediaServerItem, sendRtpItem); - RedisRpcResponse response = request.getResponse(); - response.setStatusCode(200); if (jsonObject.getInteger("code") == 0) { logger.info("[redis-rpc] 鍋滄鎺ㄦ祦鎴愬姛锛� {}/{}, 鐩爣鍦板潃锛� {}锛歿}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() ); - WVPResult wvpResult = WVPResult.success(); - response.setBody(wvpResult); + response.setBody(WVPResult.success()); + return response; }else { int code = jsonObject.getInteger("code"); String msg = jsonObject.getString("msg"); logger.info("[redis-rpc] 鍋滄鎺ㄦ祦澶辫触锛� {}/{}, 鐩爣鍦板潃锛� {}锛歿}锛� code锛� {}, msg: {}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), code, msg ); - WVPResult wvpResult = WVPResult.fail(code, msg); - response.setBody(wvpResult); + response.setBody(WVPResult.fail(code, msg)); + return response; } - return response; } /** * 鍏朵粬wvp閫氱煡鎺ㄦ祦宸茬粡鍋滄浜� */ public RedisRpcResponse rtpSendStopped(RedisRpcRequest request) { - SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class); - logger.info("[redis-rpc] 鎺ㄦ祦宸茬粡鍋滄锛� {}/{}, 鐩爣鍦板潃锛� {}锛歿}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() ); - SendRtpItem sendRtpItemInCatch = redisCatchStorage.querySendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getStream(), sendRtpItem.getCallId()); + String sendRtpItemKey = request.getParam().toString(); + SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(sendRtpItemKey); RedisRpcResponse response = request.getResponse(); response.setStatusCode(200); - if (sendRtpItemInCatch == null) { + if (sendRtpItem == null) { + logger.info("[redis-rpc] 鎺ㄦ祦宸茬粡鍋滄, 鏈壘鍒皉edis涓殑鍙戞祦淇℃伅锛� key锛歿}", sendRtpItemKey); return response; } + logger.info("[redis-rpc] 鎺ㄦ祦宸茬粡鍋滄锛� {}/{}, 鐩爣鍦板潃锛� {}锛歿}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() ); String platformId = sendRtpItem.getPlatformId(); ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId); if (platform == null) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java index 9dcfadf..f4c429c 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java @@ -14,10 +14,12 @@ import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; +import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; @Service @@ -37,6 +39,9 @@ @Autowired private SSRCFactory ssrcFactory; + @Autowired + private RedisTemplate<Object, Object> redisTemplate; + private RedisRpcRequest buildRequest(String uri, Object param) { RedisRpcRequest request = new RedisRpcRequest(); request.setFromId(userSetting.getServerId()); @@ -46,32 +51,40 @@ } @Override - public SendRtpItem getSendRtpItem(SendRtpItem sendRtpItem) { - - RedisRpcRequest request = buildRequest("getSendRtpItem", sendRtpItem); + public SendRtpItem getSendRtpItem(String sendRtpItemKey) { + RedisRpcRequest request = buildRequest("getSendRtpItem", sendRtpItemKey); RedisRpcResponse response = redisRpcConfig.request(request, 10); - return JSON.parseObject(response.getBody().toString(), SendRtpItem.class); + if (response.getBody() == null) { + return null; + } + return (SendRtpItem)redisTemplate.opsForValue().get(response.getBody().toString()); } @Override - public WVPResult startSendRtp(SendRtpItem sendRtpItem) { + public WVPResult startSendRtp(String sendRtpItemKey, SendRtpItem sendRtpItem) { logger.info("[璇锋眰鍏朵粬WVP] 寮�濮嬫帹娴侊紝wvp锛歿}锛� {}/{}", sendRtpItem.getServerId(), sendRtpItem.getApp(), sendRtpItem.getStream()); - RedisRpcRequest request = buildRequest("startSendRtp", sendRtpItem); + RedisRpcRequest request = buildRequest("startSendRtp", sendRtpItemKey); request.setToId(sendRtpItem.getServerId()); RedisRpcResponse response = redisRpcConfig.request(request, 10); return JSON.parseObject(response.getBody().toString(), WVPResult.class); } @Override - public WVPResult stopSendRtp(SendRtpItem sendRtpItem) { - RedisRpcRequest request = buildRequest("stopSendRtp", sendRtpItem); + public WVPResult stopSendRtp(String sendRtpItemKey) { + SendRtpItem sendRtpItem = (SendRtpItem)redisTemplate.opsForValue().get(sendRtpItemKey); + if (sendRtpItem == null) { + logger.info("[璇锋眰鍏朵粬WVP] 鍋滄鎺ㄦ祦, 鏈壘鍒皉edis涓殑鍙戞祦淇℃伅锛� key锛歿}", sendRtpItemKey); + return WVPResult.fail(ErrorCode.ERROR100.getCode(), "鏈壘鍒板彂娴佷俊鎭�"); + } + logger.info("[璇锋眰鍏朵粬WVP] 鍋滄鎺ㄦ祦锛寃vp锛歿}锛� {}/{}", sendRtpItem.getServerId(), sendRtpItem.getApp(), sendRtpItem.getStream()); + RedisRpcRequest request = buildRequest("stopSendRtp", sendRtpItemKey); request.setToId(sendRtpItem.getServerId()); RedisRpcResponse response = redisRpcConfig.request(request, 10); return JSON.parseObject(response.getBody().toString(), WVPResult.class); } @Override - public void waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback<SendRtpItem> callback) { + public void waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback<String> callback) { logger.info("[璇锋眰鎵�鏈塛VP鐩戝惉娴佷笂绾縘 {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); // 鐩戝惉娴佷笂绾裤�� 娴佷笂绾跨洿鎺ュ彂閫乻endRtpItem娑堟伅缁欏疄闄呯殑淇′护澶勭悊鑰� HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed( @@ -87,36 +100,47 @@ sendRtpItem.setMediaServerId(mediaServerItemInUse.getId()); sendRtpItem.setLocalIp(mediaServerItemInUse.getSdpIp()); sendRtpItem.setServerId(userSetting.getServerId()); + redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem); if (callback != null) { - callback.run(sendRtpItem); + callback.run(sendRtpItem.getRedisKey()); } hookSubscribe.removeSubscribe(hook); }); RedisRpcRequest request = buildRequest("waitePushStreamOnline", sendRtpItem); request.setToId(sendRtpItem.getServerId()); redisRpcConfig.request(request, response -> { - SendRtpItem sendRtpItemFromOther = JSON.parseObject(response.getBody().toString(), SendRtpItem.class); - logger.info("[璇锋眰鎵�鏈塛VP鐩戝惉娴佷笂绾縘 娴佷笂绾� {}/{}->{}", sendRtpItemFromOther.getApp(), sendRtpItemFromOther.getStream(), sendRtpItemFromOther); + if (response.getBody() == null) { + logger.info("[璇锋眰鎵�鏈塛VP鐩戝惉娴佷笂绾縘 娴佷笂绾�,浣嗘槸鏈壘鍒板彂娴佷俊鎭細{}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); + return; + } + logger.info("[璇锋眰鎵�鏈塛VP鐩戝惉娴佷笂绾縘 娴佷笂绾� {}/{}->{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.toString()); + if (callback != null) { - callback.run(sendRtpItemFromOther); + callback.run(response.getBody().toString()); } }); } @Override - public void stopWaitePushStreamOnline(SendRtpItem sendRtpItem) { + public void stopWaitePushStreamOnline(String sendRtpItemKey, SendRtpItem sendRtpItem) { + logger.info("[鍋滄WVP鐩戝惉娴佷笂绾縘 {}/{}锛� key锛歿}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItemKey); HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed( sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null); hookSubscribe.removeSubscribe(hook); - RedisRpcRequest request = buildRequest("stopWaitePushStreamOnline", sendRtpItem); + RedisRpcRequest request = buildRequest("stopWaitePushStreamOnline", sendRtpItemKey); request.setToId(sendRtpItem.getServerId()); redisRpcConfig.request(request, 10); } @Override - public void rtpSendStopped(SendRtpItem sendRtpItem) { - RedisRpcRequest request = buildRequest("rtpSendStopped", sendRtpItem); + public void rtpSendStopped(String sendRtpItemKey) { + SendRtpItem sendRtpItem = (SendRtpItem)redisTemplate.opsForValue().get(sendRtpItemKey); + if (sendRtpItem == null) { + logger.info("[鍋滄WVP鐩戝惉娴佷笂绾縘 鏈壘鍒皉edis涓殑鍙戞祦淇℃伅锛� key锛歿}", sendRtpItemKey); + return; + } + RedisRpcRequest request = buildRequest("rtpSendStopped", sendRtpItemKey); request.setToId(sendRtpItem.getServerId()); redisRpcConfig.request(request, 10); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 97e4573..b2909ee 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -141,15 +141,7 @@ @Override public void updateSendRTPSever(SendRtpItem sendRtpItem) { - - String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + - userSetting.getServerId() + "_" - + sendRtpItem.getMediaServerId() + "_" - + sendRtpItem.getPlatformId() + "_" - + sendRtpItem.getChannelId() + "_" - + sendRtpItem.getStream() + "_" - + sendRtpItem.getCallId(); - redisTemplate.opsForValue().set(key, sendRtpItem); + redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem); } @Override @@ -186,7 +178,7 @@ callId = "*"; } String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX - + userSetting.getServerId() + "_*_" + + "*_*_" + platformGbId + "_" + channelId + "_" + streamId + "_" @@ -292,7 +284,7 @@ */ @Override public void deleteSendRTPServer(SendRtpItem sendRtpItem) { - deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),sendRtpItem.getCallId(), sendRtpItem.getServerId()); + deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),sendRtpItem.getCallId(), sendRtpItem.getStream()); } @Override diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 3f47844..69f947e 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -2,4 +2,4 @@ application: name: wvp profiles: - active: local \ No newline at end of file + active: local2 \ No newline at end of file -- Gitblit v1.8.0