From 9c6765d44ef2ccb06fdaf525a06e564a331ab892 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期二, 16 四月 2024 22:10:35 +0800 Subject: [PATCH] 重构多wvp国标级联机制 --- src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java | 18 - src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcMessage.java | 24 + src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcRequest.java | 93 +++++ src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java | 205 ++++++++++++ src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java | 2 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java | 16 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java | 45 -- /dev/null | 76 ---- src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcResponse.java | 87 +++++ src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java | 4 src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java | 7 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java | 14 src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java | 16 + src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java | 203 ++++++++++++ src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java | 21 + src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java | 100 ++++++ src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java | 9 17 files changed, 789 insertions(+), 151 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java index 5385efa..5ddaed3 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java @@ -34,23 +34,13 @@ @Autowired private RedisPushStreamStatusListMsgListener redisPushStreamListMsgListener; - @Autowired - private RedisPushStreamResponseListener redisPushStreamResponseListener; @Autowired private RedisCloseStreamMsgListener redisCloseStreamMsgListener; - @Autowired - private RedisPushStreamCloseResponseListener redisPushStreamCloseResponseListener; @Autowired - private RedisPlatformWaitPushStreamOnlineListener redisPlatformWaitPushStreamOnlineListener; - - @Autowired - private RedisPlatformStartSendRtpListener redisPlatformStartSendRtpListener; - - @Autowired - private RedisPlatformPushStreamOnlineLister redisPlatformPushStreamOnlineLister; + private RedisRpcConfig redisRpcConfig; /** @@ -69,12 +59,8 @@ container.addMessageListener(redisAlarmMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_SUBSCRIBE_ALARM_RECEIVE)); container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE)); container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE)); - container.addMessageListener(redisPushStreamResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE)); container.addMessageListener(redisCloseStreamMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE)); - container.addMessageListener(redisPushStreamCloseResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED)); - container.addMessageListener(redisPlatformStartSendRtpListener, new PatternTopic(VideoManagerConstants.START_SEND_PUSH_STREAM)); - container.addMessageListener(redisPlatformWaitPushStreamOnlineListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_REQUESTED)); - container.addMessageListener(redisPlatformPushStreamOnlineLister, new PatternTopic(VideoManagerConstants.PUSH_STREAM_ONLINE)); + container.addMessageListener(redisRpcConfig, new PatternTopic(RedisRpcConfig.REDIS_REQUEST_CHANNEL_KEY)); return container; } } diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java new file mode 100644 index 0000000..545fdc1 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java @@ -0,0 +1,205 @@ +package com.genersoft.iot.vmp.conf.redis; + +import com.alibaba.fastjson2.JSON; +import com.genersoft.iot.vmp.common.CommonCallback; +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage; +import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest; +import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse; +import com.genersoft.iot.vmp.service.redisMsg.control.RedisRpcController; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Component; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; + +@Component +public class RedisRpcConfig implements MessageListener { + + private final static Logger logger = LoggerFactory.getLogger(RedisRpcConfig.class); + + public final static String REDIS_REQUEST_CHANNEL_KEY = "WVP_REDIS_REQUEST_CHANNEL_KEY"; + + private final Random random = new Random(); + + @Autowired + private UserSetting userSetting; + + @Autowired + private RedisRpcController redisRpcController; + + @Autowired + private RedisTemplate<Object, Object> redisTemplate; + + private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); + + @Qualifier("taskExecutor") + @Autowired + private ThreadPoolTaskExecutor taskExecutor; + + @Override + public void onMessage(Message message, byte[] pattern) { + boolean isEmpty = taskQueue.isEmpty(); + taskQueue.offer(message); + if (isEmpty) { + taskExecutor.execute(() -> { + while (!taskQueue.isEmpty()) { + Message msg = taskQueue.poll(); + try { + RedisRpcMessage redisRpcMessage = JSON.parseObject(new String(msg.getBody()), RedisRpcMessage.class); + if (redisRpcMessage.getRequest() != null) { + handlerRequest(redisRpcMessage.getRequest()); + } else if (redisRpcMessage.getResponse() != null){ + handlerResponse(redisRpcMessage.getResponse()); + } else { + logger.error("[redis rpc 瑙f瀽澶辫触] {}", JSON.toJSONString(redisRpcMessage)); + } + } catch (Exception e) { + logger.error("[redis rpc 瑙f瀽寮傚父] ", e); + } + } + }); + } + } + + private void handlerResponse(RedisRpcResponse response) { + if (userSetting.getServerId().equals(response.getToId())) { + return; + } + response(response); + } + + private void handlerRequest(RedisRpcRequest request) { + try { + if (userSetting.getServerId().equals(request.getFromId())) { + return; + } + Method method = getMethod(request.getUri()); + // 娌℃湁鎼哄甫鐩爣ID鐨勫彲浠ョ悊瑙d负鍝釜wvp鏈夌粨鏋滃氨鍝釜鍥炲锛屾惡甯︾洰鏍嘔D锛屼絾鏄鏋滄槸涓嶅瓨鍦ㄧ殑uri鍒欑洿鎺ュ洖澶�404 + if (userSetting.getServerId().equals(request.getToId())) { + if (method == null) { + // 鍥炲404缁撴灉 + RedisRpcResponse response = request.getResponse(); + response.setStatusCode(404); + sendResponse(response); + return; + } + RedisRpcResponse response = (RedisRpcResponse)method.invoke(redisRpcController, request); + if(response != null) { + sendResponse(response); + } + }else { + if (method == null) { + return; + } + RedisRpcResponse response = (RedisRpcResponse)method.invoke(redisRpcController, request); + if (response != null) { + sendResponse(response); + } + } + }catch (InvocationTargetException | IllegalAccessException e) { + logger.error("[redis rpc ] 澶勭悊璇锋眰澶辫触 ", e); + } + + } + + private Method getMethod(String name) { + // 鍚姩鍚庢壂鎻忔墍鏈夌殑璺緞娉ㄨВ + Method[] methods = redisRpcController.getClass().getMethods(); + for (Method method : methods) { + if (method.getName().equals(name)) { + return method; + } + } + return null; + } + + private void sendResponse(RedisRpcResponse response){ + response.setToId(userSetting.getServerId()); + RedisRpcMessage message = new RedisRpcMessage(); + message.setResponse(response); + redisTemplate.convertAndSend(REDIS_REQUEST_CHANNEL_KEY, message); + } + + private void sendRequest(RedisRpcRequest request){ + RedisRpcMessage message = new RedisRpcMessage(); + message.setRequest(request); + redisTemplate.convertAndSend(REDIS_REQUEST_CHANNEL_KEY, message); + } + + + private final Map<Long, SynchronousQueue<RedisRpcResponse>> topicSubscribers = new ConcurrentHashMap<>(); + private final Map<Long, CommonCallback<RedisRpcResponse>> callbacks = new ConcurrentHashMap<>(); + + public RedisRpcResponse request(RedisRpcRequest request, int timeOut) { + request.setSn((long) random.nextInt(1000) + 1); + SynchronousQueue<RedisRpcResponse> subscribe = subscribe(request.getSn()); + try { + sendRequest(request); + return subscribe.poll(timeOut, TimeUnit.SECONDS); + } catch (InterruptedException e) { + logger.warn("[redis rpc timeout] uri: {}, sn: {}", request.getUri(), request.getSn(), e); + } finally { + this.unsubscribe(request.getSn()); + } + return null; + } + + public void request(RedisRpcRequest request, CommonCallback<RedisRpcResponse> callback) { + request.setSn((long) random.nextInt(1000) + 1); + setCallback(request.getSn(), callback); + sendRequest(request); + } + + public Boolean response(RedisRpcResponse response) { + SynchronousQueue<RedisRpcResponse> queue = topicSubscribers.get(response.getSn()); + CommonCallback<RedisRpcResponse> callback = callbacks.get(response.getSn()); + if (queue != null) { + try { + return queue.offer(response, 2, TimeUnit.SECONDS); + } catch (InterruptedException e) { + logger.error("{}", e.getMessage(), e); + } + }else if (callback != null) { + callback.run(response); + callbacks.remove(response.getSn()); + } + return false; + } + + private void unsubscribe(long key) { + topicSubscribers.remove(key); + } + + + private SynchronousQueue<RedisRpcResponse> subscribe(long key) { + SynchronousQueue<RedisRpcResponse> queue = null; + if (!topicSubscribers.containsKey(key)) + topicSubscribers.put(key, queue = new SynchronousQueue<>()); + return queue; + } + + private void setCallback(long key, CommonCallback<RedisRpcResponse> callback) { + if (!callbacks.containsKey(key)) { + callbacks.put(key, callback); + } + + } + + public void removeCallback(long key) { + callbacks.remove(key); + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcMessage.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcMessage.java new file mode 100644 index 0000000..061df6f --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcMessage.java @@ -0,0 +1,24 @@ +package com.genersoft.iot.vmp.conf.redis.bean; + +public class RedisRpcMessage { + + private RedisRpcRequest request; + + private RedisRpcResponse response; + + public RedisRpcRequest getRequest() { + return request; + } + + public void setRequest(RedisRpcRequest request) { + this.request = request; + } + + public RedisRpcResponse getResponse() { + return response; + } + + public void setResponse(RedisRpcResponse response) { + this.response = response; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcRequest.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcRequest.java new file mode 100644 index 0000000..e31eb45 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcRequest.java @@ -0,0 +1,93 @@ +package com.genersoft.iot.vmp.conf.redis.bean; + +/** + * 閫氳繃redis鍙戦�佽姹� + */ +public class RedisRpcRequest { + + /** + * 鏉ヨ嚜鐨刉VP ID + */ + private String fromId; + + + /** + * 鐩爣鐨刉VP ID + */ + private String toId; + + /** + * 搴忓垪鍙� + */ + private long sn; + + /** + * 璁块棶鐨勮矾寰� + */ + private String uri; + + /** + * 鍙傛暟 + */ + private Object param; + + public String getFromId() { + return fromId; + } + + public void setFromId(String fromId) { + this.fromId = fromId; + } + + public String getToId() { + return toId; + } + + public void setToId(String toId) { + this.toId = toId; + } + + public String getUri() { + return uri; + } + + public void setUri(String uri) { + this.uri = uri; + } + + public Object getParam() { + return param; + } + + public void setParam(Object param) { + this.param = param; + } + + public long getSn() { + return sn; + } + + public void setSn(long sn) { + this.sn = sn; + } + + @Override + public String toString() { + return "RedisRpcRequest{" + + "fromId='" + fromId + '\'' + + ", toId='" + toId + '\'' + + ", sn='" + sn + '\'' + + ", uri='" + uri + '\'' + + ", param=" + param + + '}'; + } + + public RedisRpcResponse getResponse() { + RedisRpcResponse response = new RedisRpcResponse(); + response.setFromId(fromId); + response.setToId(toId); + response.setSn(sn); + response.setUri(uri); + return response; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcResponse.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcResponse.java new file mode 100644 index 0000000..ef94816 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcResponse.java @@ -0,0 +1,87 @@ +package com.genersoft.iot.vmp.conf.redis.bean; + +/** + * 閫氳繃redis鍙戦�佸洖澶� + */ +public class RedisRpcResponse { + + /** + * 鏉ヨ嚜鐨刉VP ID + */ + private String fromId; + + + /** + * 鐩爣鐨刉VP ID + */ + private String toId; + + + /** + * 搴忓垪鍙� + */ + private long sn; + + /** + * 鐘舵�佺爜 + */ + private int statusCode; + + /** + * 璁块棶鐨勮矾寰� + */ + private String uri; + + /** + * 鍙傛暟 + */ + private Object body; + + public String getFromId() { + return fromId; + } + + public void setFromId(String fromId) { + this.fromId = fromId; + } + + public String getToId() { + return toId; + } + + public void setToId(String toId) { + this.toId = toId; + } + + public long getSn() { + return sn; + } + + public void setSn(long sn) { + this.sn = sn; + } + + public int getStatusCode() { + return statusCode; + } + + public void setStatusCode(int statusCode) { + this.statusCode = statusCode; + } + + public String getUri() { + return uri; + } + + public void setUri(String uri) { + this.uri = uri; + } + + public Object getBody() { + return body; + } + + public void setBody(Object body) { + this.body = body; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index af7db2e..b748451 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -15,9 +15,11 @@ import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IPlayService; -import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg; +import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; +import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; +import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; @@ -54,6 +56,8 @@ @Autowired private IRedisCatchStorage redisCatchStorage; + @Autowired + private IRedisRpcService redisRpcService; @Autowired private UserSetting userSetting; @@ -113,7 +117,15 @@ if (parentPlatform != null) { Map<String, Object> param = getSendRtpParam(sendRtpItem); if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) { - redisCatchStorage.sendStartSendRtp(sendRtpItem); +// redisCatchStorage.sendStartSendRtp(sendRtpItem); + WVPResult wvpResult = redisRpcService.startSendRtp(sendRtpItem); + if (wvpResult.getCode() == 0) { + MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, sendRtpItem.getApp(), sendRtpItem.getStream(), + sendRtpItem.getChannelId(), parentPlatform.getServerGBId(), parentPlatform.getName(), userSetting.getServerId(), + sendRtpItem.getMediaServerId()); + messageForPushChannel.setPlatFormIndex(parentPlatform.getId()); + redisCatchStorage.sendPlatformStartPlayMsg(messageForPushChannel); + } } else { JSONObject startSendRtpStreamResult = sendRtp(sendRtpItem, mediaInfo, param); if (startSendRtpStreamResult != null) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java index 69f8142..2c134fd 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -18,7 +18,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; -import com.genersoft.iot.vmp.service.bean.RequestStopPushStreamMsg; +import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import gov.nist.javax.sip.message.SIPRequest; @@ -97,6 +97,9 @@ @Autowired private IStreamPushService pushService; + @Autowired + private IRedisRpcService redisRpcService; + @Override public void afterPropertiesSet() throws Exception { @@ -134,13 +137,8 @@ if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) { // 鏌ヨ杩欒矾娴佹槸鍚︽槸鏈钩鍙扮殑 StreamPushItem push = pushService.getPush(sendRtpItem.getApp(), sendRtpItem.getStream()); - if (push!= null && !push.isSelf()) { - // 涓嶆槸鏈钩鍙扮殑灏卞彂閫乺edis娑堟伅璁╁叾浠杦vp鍋滄鍙戞祦 - ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId()); - if (platform != null) { - RequestStopPushStreamMsg streamMsg = RequestStopPushStreamMsg.getInstance(sendRtpItem, platform.getName(), platform.getId()); -// redisGbPlayMsgListener.sendMsgForStopSendRtpStream(sendRtpItem.getServerId(), streamMsg); - } + if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) { + redisRpcService.stopSendRtp(sendRtpItem); }else { MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index f12f38a..59ff50c 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -19,7 +19,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; -import com.genersoft.iot.vmp.service.redisMsg.RedisPlatformPushStreamOnlineLister; +import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.*; @@ -29,7 +29,6 @@ import com.genersoft.iot.vmp.service.bean.InviteErrorCode; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.bean.SSRCInfo; -import com.genersoft.iot.vmp.service.redisMsg.RedisPushStreamResponseListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.DateUtil; @@ -86,16 +85,13 @@ private IRedisCatchStorage redisCatchStorage; @Autowired - private IInviteStreamService inviteStreamService; + private IRedisRpcService redisRpcService; @Autowired private SSRCFactory ssrcFactory; @Autowired private DynamicTask dynamicTask; - - @Autowired - private RedisPushStreamResponseListener redisPushStreamResponseListener; @Autowired private IPlayService playService; @@ -120,9 +116,6 @@ @Autowired private UserSetting userSetting; - - @Autowired - private RedisPlatformPushStreamOnlineLister mediaListManager; @Autowired private SipConfig config; @@ -594,15 +587,17 @@ sendRtpItem.setSessionName(sessionName); if ("push".equals(gbStream.getStreamType())) { + sendRtpItem.setPlayType(InviteStreamType.PUSH); if (streamPushItem != null) { // 浠巖edis鏌ヨ鏄惁姝e湪鎺ユ敹杩欎釜鎺ㄦ祦 OnStreamChangedHookParam pushListItem = redisCatchStorage.getPushListItem(gbStream.getApp(), gbStream.getStream()); - if (pushListItem != null) { sendRtpItem.setServerId(pushListItem.getSeverId()); + sendRtpItem.setMediaServerId(pushListItem.getMediaServerId()); + StreamPushItem transform = streamPushService.transform(pushListItem); transform.setSelf(userSetting.getServerId().equals(pushListItem.getSeverId())); - // 鎺ㄦ祦鐘舵�� + // 寮�濮嬫帹娴� sendPushStream(sendRtpItem, mediaServerItem, platform, request); }else { if (!platform.isStartOfflinePush()) { @@ -702,8 +697,6 @@ } // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� sendRtpItem.setStatus(1); - sendRtpItem.setFromTag(request.getFromTag()); - sendRtpItem.setLocalIp(mediaServerItem.getSdpIp()); SIPResponse response = sendStreamAck(request, sendRtpItem, platform); if (response != null) { sendRtpItem.setToTag(response.getToTag()); @@ -714,7 +707,6 @@ sendRtpItem.setSsrc(ssrc); } redisCatchStorage.updateSendRTPSever(sendRtpItem); - } else { // 涓嶅湪绾� 鎷夎捣 notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request); @@ -769,18 +761,14 @@ dynamicTask.startDelay(sendRtpItem.getCallId(), () -> { logger.info("[ app={}, stream={} ] 绛夊緟璁惧寮�濮嬫帹娴佽秴鏃�", sendRtpItem.getApp(), sendRtpItem.getStream()); try { - redisPushStreamResponseListener.removeEvent(sendRtpItem.getApp(), sendRtpItem.getStream()); - mediaListManager.removedChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream()); responseAck(request, Response.REQUEST_TIMEOUT); // 瓒呮椂 } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("鏈鐞嗙殑寮傚父 ", e); } }, userSetting.getPlatformPlayTimeout()); - redisCatchStorage.addWaiteSendRtpItem(sendRtpItem, userSetting.getPlatformPlayTimeout()); - // 娣诲姞涓婄嚎鐨勯�氱煡 - mediaListManager.addChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream(), (sendRtpItemFromRedis) -> { + // + redisRpcService.waitePushStreamOnline(sendRtpItem, (sendRtpItemFromRedis) -> { dynamicTask.stop(sendRtpItem.getCallId()); - redisPushStreamResponseListener.removeEvent(sendRtpItem.getApp(), sendRtpItem.getStream()); if (sendRtpItemFromRedis.getServerId().equals(userSetting.getServerId())) { int localPort = sendRtpPortManager.getNextPort(mediaServerItem); @@ -813,19 +801,7 @@ // 鍏朵粬骞冲彴鍐呭 otherWvpPushStream(sendRtpItemFromRedis, request, platform); } - }); - // 娣诲姞鍥炲鐨勬嫆缁濇垨鑰呴敊璇殑閫氱煡 - redisPushStreamResponseListener.addEvent(sendRtpItem.getApp(), sendRtpItem.getStream(), response -> { - if (response.getCode() != 0) { - dynamicTask.stop(sendRtpItem.getCallId()); - mediaListManager.removedChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream()); - try { - responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg()); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鐐规挱鍥炲: {}", e.getMessage()); - } - } }); } @@ -836,12 +812,9 @@ */ private void otherWvpPushStream(SendRtpItem sendRtpItem, SIPRequest request, ParentPlatform platform) { logger.info("[绾ц仈鐐规挱]鐩存挱娴佹潵鑷叾浠栧钩鍙帮紝鍙戦�乺edis娑堟伅"); - // 鍙戦�乺edis娑堟伅 - redisCatchStorage.sendStartSendRtp(sendRtpItem); + sendRtpItem = redisRpcService.getSendRtpItem(sendRtpItem); // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� sendRtpItem.setStatus(1); - sendRtpItem.setCallId(request.getCallIdHeader().getCallId()); - sendRtpItem.setFromTag(request.getFromTag()); SIPResponse response = sendStreamAck(request, sendRtpItem, platform); if (response != null) { sendRtpItem.setToTag(response.getToTag()); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 346b4a2..32bf76a 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -23,7 +23,6 @@ import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.bean.SSRCInfo; -import com.genersoft.iot.vmp.service.redisMsg.RedisPlatformPushStreamOnlineLister; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.DateUtil; @@ -99,9 +98,6 @@ @Autowired private EventPublisher eventPublisher; - - @Autowired - private RedisPlatformPushStreamOnlineLister zlmMediaListManager; @Autowired private ZlmHttpHookSubscribe subscribe; diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java index 027e990..ec24abe 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java @@ -365,4 +365,13 @@ } return result; } + + public JSONObject stopSendRtpStream(MediaServerItem mediaServerItem, SendRtpItem sendRtpItem) { + Map<String, Object> param = new HashMap<>(); + param.put("vhost", "__defaultVhost__"); + param.put("app", sendRtpItem.getApp()); + param.put("stream", sendRtpItem.getStream()); + param.put("ssrc", sendRtpItem.getSsrc()); + return zlmresTfulUtils.startSendRtp(mediaServerItem, param); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java b/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java index 2e6151d..972db32 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java @@ -6,7 +6,6 @@ import com.genersoft.iot.vmp.media.zlm.dto.ServerKeepaliveData; import com.genersoft.iot.vmp.service.bean.MediaServerLoad; import com.genersoft.iot.vmp.service.bean.SSRCInfo; -import com.genersoft.iot.vmp.vmanager.bean.RecordFile; import java.util.List; @@ -97,4 +96,5 @@ List<MediaServerItem> getAllWithAssistPort(); + MediaServerItem getMediaServerByAppAndStream(String app, String stream); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java index 190d665..aeb0dc8 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java @@ -25,7 +25,6 @@ import com.genersoft.iot.vmp.utils.JsonUtil; import com.genersoft.iot.vmp.utils.redis.RedisUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; -import com.genersoft.iot.vmp.vmanager.bean.RecordFile; import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Response; @@ -36,19 +35,15 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.jdbc.datasource.DataSourceTransactionManager; -import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionStatus; -import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; import java.io.File; import java.time.LocalDateTime; import java.util.*; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; /** * 濯掍綋鏈嶅姟鍣ㄨ妭鐐圭鐞� @@ -751,6 +746,22 @@ @Override public List<MediaServerItem> getAllWithAssistPort() { + return mediaServerMapper.queryAllWithAssistPort(); } + + @Override + public MediaServerItem getMediaServerByAppAndStream(String app, String stream) { + List<MediaServerItem> mediaServerItemList = getAllOnline(); + if (mediaServerItemList.isEmpty()) { + return null; + } + for (MediaServerItem mediaServerItem : mediaServerItemList) { + Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, app, stream); + if (streamReady) { + return mediaServerItem; + } + } + return null; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java new file mode 100644 index 0000000..a601ae9 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java @@ -0,0 +1,16 @@ +package com.genersoft.iot.vmp.service.redisMsg; + +import com.genersoft.iot.vmp.common.CommonCallback; +import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.vmanager.bean.WVPResult; + +public interface IRedisRpcService { + + SendRtpItem getSendRtpItem(SendRtpItem sendRtpItem); + + WVPResult startSendRtp(SendRtpItem sendRtpItem); + + void waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback<SendRtpItem> callback); + + WVPResult stopSendRtp(SendRtpItem sendRtpItem); +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformPushStreamOnlineLister.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformPushStreamOnlineLister.java deleted file mode 100755 index 8ad0807..0000000 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformPushStreamOnlineLister.java +++ /dev/null @@ -1,97 +0,0 @@ -package com.genersoft.iot.vmp.service.redisMsg; - -import com.alibaba.fastjson2.JSON; -import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.gb28181.bean.GbStream; -import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData; -import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; -import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; -import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; -import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; -import com.genersoft.iot.vmp.media.zlm.dto.*; -import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; -import com.genersoft.iot.vmp.service.IMediaServerService; -import com.genersoft.iot.vmp.service.IStreamProxyService; -import com.genersoft.iot.vmp.service.IStreamPushService; -import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse; -import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import com.genersoft.iot.vmp.storager.IVideoManagerStorage; -import com.genersoft.iot.vmp.storager.dao.GbStreamMapper; -import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper; -import com.genersoft.iot.vmp.storager.dao.StreamPushMapper; -import com.genersoft.iot.vmp.utils.DateUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.data.redis.connection.Message; -import org.springframework.data.redis.connection.MessageListener; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import org.springframework.stereotype.Component; - -import java.text.ParseException; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; - -/** - * @author lin - */ -@Component -public class RedisPlatformPushStreamOnlineLister implements MessageListener { - - private final Logger logger = LoggerFactory.getLogger("RedisPlatformPushStreamOnlineLister"); - - private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); - - @Qualifier("taskExecutor") - @Autowired - private ThreadPoolTaskExecutor taskExecutor; - - /** - * 閫氳繃redis娑堟伅鎺ユ敹娴佷笂绾跨殑閫氱煡锛屽鏋滄湰鏈虹敱瀵硅繖涓祦鐨勭洃鍚紝鍒欏洖璋� - */ - @Override - public void onMessage(Message message, byte[] pattern) { - boolean isEmpty = taskQueue.isEmpty(); - taskQueue.offer(message); - if (isEmpty) { - taskExecutor.execute(() -> { - while (!taskQueue.isEmpty()) { - Message msg = taskQueue.poll(); - SendRtpItem sendRtpItem = JSON.parseObject(new String(msg.getBody()), SendRtpItem.class); - sendStreamEvent(sendRtpItem); - } - }); - } - } - - private final Map<String, ChannelOnlineEvent> channelOnPublishEvents = new ConcurrentHashMap<>(); - - public void sendStreamEvent(SendRtpItem sendRtpItem) { - // 鏌ョ湅鎺ㄦ祦鐘舵�� - ChannelOnlineEvent channelOnlineEventLister = getChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream()); - if (channelOnlineEventLister != null) { - try { - channelOnlineEventLister.run(sendRtpItem); - } catch (ParseException e) { - logger.error("sendStreamEvent: ", e); - } - removedChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream()); - } - } - - public void addChannelOnlineEventLister(String app, String stream, ChannelOnlineEvent callback) { - this.channelOnPublishEvents.put(app + "_" + stream, callback); - } - - public void removedChannelOnlineEventLister(String app, String stream) { - this.channelOnPublishEvents.remove(app + "_" + stream); - } - - public ChannelOnlineEvent getChannelOnlineEventLister(String app, String stream) { - return this.channelOnPublishEvents.get(app + "_" + stream); - } - - -} diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java deleted file mode 100755 index 25dd334..0000000 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java +++ /dev/null @@ -1,122 +0,0 @@ -package com.genersoft.iot.vmp.service.redisMsg; - -import com.alibaba.fastjson2.JSON; -import com.alibaba.fastjson2.JSONObject; -import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; -import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; -import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; -import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; -import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; -import com.genersoft.iot.vmp.service.IMediaServerService; -import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.data.redis.connection.Message; -import org.springframework.data.redis.connection.MessageListener; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import org.springframework.stereotype.Component; -import org.springframework.util.ObjectUtils; - -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentLinkedQueue; - -/** - * 鏀跺埌娑堟伅鍚庡紑濮嬬粰涓婄骇鍙戞祦 - * @author lin - */ -@Component -public class RedisPlatformStartSendRtpListener implements MessageListener { - - private final static Logger logger = LoggerFactory.getLogger(RedisPlatformStartSendRtpListener.class); - - private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); - - @Autowired - private ZLMServerFactory zlmServerFactory; - - @Autowired - private IMediaServerService mediaServerService; - - @Qualifier("taskExecutor") - @Autowired - private ThreadPoolTaskExecutor taskExecutor; - - - @Override - public void onMessage(Message message, byte[] bytes) { - logger.info("[REDIS娑堟伅-鏀跺埌涓婄骇绛夊埌璁惧鎺ㄦ祦鐨剅edis娑堟伅]锛� {}", new String(message.getBody())); - boolean isEmpty = taskQueue.isEmpty(); - taskQueue.offer(message); - if (isEmpty) { - taskExecutor.execute(() -> { - while (!taskQueue.isEmpty()) { - Message msg = taskQueue.poll(); - try { - SendRtpItem sendRtpItem = JSON.parseObject(new String(msg.getBody()), SendRtpItem.class); - sendRtpItem.getMediaServerId(); - MediaServerItem mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - if (mediaServer == null) { - return; - } - Map<String, Object> sendRtpParam = getSendRtpParam(sendRtpItem); - sendRtp(sendRtpItem, mediaServer, sendRtpParam); - - }catch (Exception e) { - logger.warn("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉] 鍙戠幇鏈鐞嗙殑寮傚父, \r\n{}", JSON.toJSONString(message)); - logger.error("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉] 寮傚父鍐呭锛� ", e); - } - } - }); - } - } - - private Map<String, Object> getSendRtpParam(SendRtpItem sendRtpItem) { - String isUdp = sendRtpItem.isTcp() ? "0" : "1"; - Map<String, Object> param = new HashMap<>(12); - param.put("vhost","__defaultVhost__"); - param.put("app",sendRtpItem.getApp()); - param.put("stream",sendRtpItem.getStream()); - param.put("ssrc", sendRtpItem.getSsrc()); - param.put("dst_url",sendRtpItem.getIp()); - param.put("dst_port", sendRtpItem.getPort()); - param.put("src_port", sendRtpItem.getLocalPort()); - param.put("pt", sendRtpItem.getPt()); - param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); - param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); - param.put("is_udp", isUdp); - if (!sendRtpItem.isTcp()) { - // udp妯″紡涓嬪紑鍚痳tcp淇濇椿 - param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0"); - } - return param; - } - - private JSONObject sendRtp(SendRtpItem sendRtpItem, MediaServerItem mediaInfo, Map<String, Object> param){ - JSONObject startSendRtpStreamResult = null; - if (sendRtpItem.getLocalPort() != 0) { - if (sendRtpItem.isTcpActive()) { - startSendRtpStreamResult = zlmServerFactory.startSendRtpPassive(mediaInfo, param); - }else { - param.put("dst_url", sendRtpItem.getIp()); - param.put("dst_port", sendRtpItem.getPort()); - startSendRtpStreamResult = zlmServerFactory.startSendRtpStream(mediaInfo, param); - } - }else { - if (sendRtpItem.isTcpActive()) { - startSendRtpStreamResult = zlmServerFactory.startSendRtpPassive(mediaInfo, param); - }else { - param.put("dst_url", sendRtpItem.getIp()); - param.put("dst_port", sendRtpItem.getPort()); - startSendRtpStreamResult = zlmServerFactory.startSendRtpStream(mediaInfo, param); - } - } - return startSendRtpStreamResult; - - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformWaitPushStreamOnlineListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformWaitPushStreamOnlineListener.java deleted file mode 100755 index 25600a2..0000000 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformWaitPushStreamOnlineListener.java +++ /dev/null @@ -1,106 +0,0 @@ -package com.genersoft.iot.vmp.service.redisMsg; - -import com.alibaba.fastjson2.JSON; -import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; -import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; -import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; -import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; -import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; -import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; -import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.data.redis.connection.Message; -import org.springframework.data.redis.connection.MessageListener; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import org.springframework.stereotype.Component; -import org.springframework.util.ObjectUtils; - -import java.util.concurrent.ConcurrentLinkedQueue; - -/** - * 涓婄骇绛夊埌璁惧鎺ㄦ祦鐨剅edis娑堟伅 - * @author lin - */ -@Component -public class RedisPlatformWaitPushStreamOnlineListener implements MessageListener { - - private final static Logger logger = LoggerFactory.getLogger(RedisPlatformWaitPushStreamOnlineListener.class); - - private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); - - @Autowired - private UserSetting userSetting; - - @Autowired - private IRedisCatchStorage redisCatchStorage; - - @Autowired - private ZlmHttpHookSubscribe hookSubscribe; - - @Autowired - private RedisPlatformPushStreamOnlineLister redisPlatformPushStreamOnlineLister; - - @Autowired - private SSRCFactory ssrcFactory; - - @Qualifier("taskExecutor") - @Autowired - private ThreadPoolTaskExecutor taskExecutor; - - - /** - * 褰撲笂绾х偣鎾椂锛岃繖閲岃礋璐g洃鍚瓑鍒版祦涓婄嚎锛屾祦涓婄嚎鍚庡鏋滄槸鍦ㄥ綋鍓嶆湇鍔″垯鐩存帴鍥炶皟锛屽鏋滄槸鍏朵粬wvp锛屽垯鐢眗edis娑堟伅杩涜閫氱煡 - */ - @Override - public void onMessage(Message message, byte[] bytes) { - logger.info("[REDIS娑堟伅-鏀跺埌涓婄骇绛夊埌璁惧鎺ㄦ祦鐨剅edis娑堟伅]锛� {}", new String(message.getBody())); - boolean isEmpty = taskQueue.isEmpty(); - taskQueue.offer(message); - if (isEmpty) { - taskExecutor.execute(() -> { - while (!taskQueue.isEmpty()) { - Message msg = taskQueue.poll(); - try { - MessageForPushChannel messageForPushChannel = JSON.parseObject(new String(msg.getBody()), MessageForPushChannel.class); - if (messageForPushChannel == null - || ObjectUtils.isEmpty(messageForPushChannel.getApp()) - || ObjectUtils.isEmpty(messageForPushChannel.getStream()) - || userSetting.getServerId().equals(messageForPushChannel.getServerId())){ - continue; - } - - // 鐩戝惉娴佷笂绾裤�� 娴佷笂绾跨洿鎺ュ彂閫乻endRtpItem娑堟伅缁欏疄闄呯殑淇′护澶勭悊鑰� - HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed( - messageForPushChannel.getApp(), messageForPushChannel.getStream(), true, "rtsp", - null); - hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> { - // 璇诲彇redis涓殑涓婄骇鐐规挱淇℃伅锛岀敓鎴恠endRtpItm鍙戦�佸嚭鍘� - SendRtpItem sendRtpItem = redisCatchStorage.getWaiteSendRtpItem(messageForPushChannel.getApp(), messageForPushChannel.getStream()); - if (sendRtpItem.getSsrc() == null) { - // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡 - String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItemInUse.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItemInUse.getId()); - sendRtpItem.setSsrc(ssrc); - sendRtpItem.setMediaServerId(mediaServerItemInUse.getId()); - sendRtpItem.setLocalIp(mediaServerItemInUse.getSdpIp()); - redisPlatformPushStreamOnlineLister.sendStreamEvent(sendRtpItem); - // 閫氱煡鍏朵粬wvp锛� 鐢盧edisPlatformPushStreamOnlineLister鎺ユ敹姝ょ洃鍚�� - redisCatchStorage.sendPushStreamOnline(sendRtpItem); - } - }); - - - }catch (Exception e) { - logger.warn("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉] 鍙戠幇鏈鐞嗙殑寮傚父, \r\n{}", JSON.toJSONString(message)); - logger.error("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉] 寮傚父鍐呭锛� ", e); - } - } - }); - } - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java deleted file mode 100755 index a031573..0000000 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java +++ /dev/null @@ -1,99 +0,0 @@ -package com.genersoft.iot.vmp.service.redisMsg; - -import com.alibaba.fastjson2.JSON; -import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; -import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; -import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; -import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; -import com.genersoft.iot.vmp.service.IMediaServerService; -import com.genersoft.iot.vmp.service.IStreamPushService; -import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; -import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse; -import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import com.genersoft.iot.vmp.storager.IVideoManagerStorage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.redis.connection.Message; -import org.springframework.data.redis.connection.MessageListener; -import org.springframework.stereotype.Component; - -import javax.sip.InvalidArgumentException; -import javax.sip.SipException; -import java.text.ParseException; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -/** - * 鎺ユ敹redis鍙戦�佺殑缁撴潫鎺ㄦ祦璇锋眰 - * @author lin - */ -@Component -public class RedisPushStreamCloseResponseListener implements MessageListener { - - private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamCloseResponseListener.class); - - @Autowired - private IStreamPushService streamPushService; - - @Autowired - private IRedisCatchStorage redisCatchStorage; - - @Autowired - private IVideoManagerStorage storager; - - @Autowired - private ISIPCommanderForPlatform commanderFroPlatform; - - @Autowired - private UserSetting userSetting; - - @Autowired - private IMediaServerService mediaServerService; - - @Autowired - private ZLMServerFactory zlmServerFactory; - - - private Map<String, PushStreamResponseEvent> responseEvents = new ConcurrentHashMap<>(); - - public interface PushStreamResponseEvent{ - void run(MessageForPushChannelResponse response); - } - - @Override - public void onMessage(Message message, byte[] bytes) { - logger.info("[REDIS娑堟伅-鎺ㄦ祦缁撴潫]锛� {}", new String(message.getBody())); - MessageForPushChannel pushChannel = JSON.parseObject(message.getBody(), MessageForPushChannel.class); - StreamPushItem push = streamPushService.getPush(pushChannel.getApp(), pushChannel.getStream()); - if (push != null) { - List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChannelId( - push.getGbId()); - if (!sendRtpItems.isEmpty()) { - for (SendRtpItem sendRtpItem : sendRtpItems) { - ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId()); - if (parentPlatform != null) { - redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStream()); - try { - commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鍙戦�丅YE: {}", e.getMessage()); - } - } - } - } - } - - } - - public void addEvent(String app, String stream, PushStreamResponseEvent callback) { - responseEvents.put(app + stream, callback); - } - - public void removeEvent(String app, String stream) { - responseEvents.remove(app + stream); - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java deleted file mode 100755 index c90771b..0000000 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java +++ /dev/null @@ -1,76 +0,0 @@ -package com.genersoft.iot.vmp.service.redisMsg; - -import com.alibaba.fastjson2.JSON; -import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.data.redis.connection.Message; -import org.springframework.data.redis.connection.MessageListener; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import org.springframework.stereotype.Component; -import org.springframework.util.ObjectUtils; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; - -/** - * 鎺ユ敹redis杩斿洖鐨勬帹娴佺粨鏋� - * @author lin - */ -@Component -public class RedisPushStreamResponseListener implements MessageListener { - - private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamResponseListener.class); - - private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); - - @Qualifier("taskExecutor") - @Autowired - private ThreadPoolTaskExecutor taskExecutor; - - - private Map<String, PushStreamResponseEvent> responseEvents = new ConcurrentHashMap<>(); - - public interface PushStreamResponseEvent{ - void run(MessageForPushChannelResponse response); - } - - @Override - public void onMessage(Message message, byte[] bytes) { - logger.info("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉]锛� {}", new String(message.getBody())); - boolean isEmpty = taskQueue.isEmpty(); - taskQueue.offer(message); - if (isEmpty) { - taskExecutor.execute(() -> { - while (!taskQueue.isEmpty()) { - Message msg = taskQueue.poll(); - try { - MessageForPushChannelResponse response = JSON.parseObject(new String(msg.getBody()), MessageForPushChannelResponse.class); - if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){ - logger.info("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉]锛氬弬鏁颁笉鍏�"); - continue; - } - // 鏌ョ湅姝e湪绛夊緟鐨刬nvite娑堟伅 - if (responseEvents.get(response.getApp() + response.getStream()) != null) { - responseEvents.get(response.getApp() + response.getStream()).run(response); - } - }catch (Exception e) { - logger.warn("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉] 鍙戠幇鏈鐞嗙殑寮傚父, \r\n{}", JSON.toJSONString(message)); - logger.error("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉] 寮傚父鍐呭锛� ", e); - } - } - }); - } - } - - public void addEvent(String app, String stream, PushStreamResponseEvent callback) { - responseEvents.put(app + stream, callback); - } - - public void removeEvent(String app, String stream) { - responseEvents.remove(app + stream); - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java new file mode 100644 index 0000000..7a81eab --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java @@ -0,0 +1,203 @@ +package com.genersoft.iot.vmp.service.redisMsg.control; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig; +import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage; +import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest; +import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse; +import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; +import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; +import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; +import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.vmanager.bean.WVPResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Component; + +/** + * 鍏朵粬wvp鍙戣捣鐨剅pc璋冪敤锛岃繖閲岀殑鏂规硶琚� RedisRpcConfig 閫氳繃鍙嶅皠瀵绘壘瀵瑰簲鐨勬柟娉曞悕绉拌皟鐢� + */ +@Component +public class RedisRpcController { + + private final static Logger logger = LoggerFactory.getLogger(RedisRpcController.class); + + @Autowired + private SSRCFactory ssrcFactory; + + @Autowired + private IMediaServerService mediaServerService; + + @Autowired + private SendRtpPortManager sendRtpPortManager; + + @Autowired + private IRedisCatchStorage redisCatchStorage; + + @Autowired + private UserSetting userSetting; + + @Autowired + private ZlmHttpHookSubscribe hookSubscribe; + + @Autowired + private ZLMServerFactory zlmServerFactory; + + + @Autowired + private RedisTemplate<Object, Object> redisTemplate; + + + /** + * 鑾峰彇鍙戞祦鐨勪俊鎭� + */ + public RedisRpcResponse getSendRtpItem(RedisRpcRequest request) { + SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class); + logger.info("[redis-rpc] 鑾峰彇鍙戞祦鐨勪俊鎭細 {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() ); + // 鏌ヨ鏈骇鏄惁鏈夎繖涓祦 + MediaServerItem mediaServerItem = mediaServerService.getMediaServerByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream()); + if (mediaServerItem == null) { + RedisRpcResponse response = request.getResponse(); + response.setStatusCode(200); + } + // 鑷钩鍙板唴瀹� + int localPort = sendRtpPortManager.getNextPort(mediaServerItem); + if (localPort == 0) { + logger.info("[redis-rpc] getSendRtpItem->鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�" ); + RedisRpcResponse response = request.getResponse(); + response.setStatusCode(200); + } + // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� + sendRtpItem.setStatus(1); + sendRtpItem.setServerId(userSetting.getServerId()); + sendRtpItem.setLocalIp(mediaServerItem.getSdpIp()); + if (sendRtpItem.getSsrc() == null) { + // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡 + String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); + sendRtpItem.setSsrc(ssrc); + } + redisCatchStorage.updateSendRTPSever(sendRtpItem); + RedisRpcResponse response = request.getResponse(); + response.setStatusCode(200); + response.setBody(sendRtpItem); + return response; + } + + /** + * 鐩戝惉娴佷笂绾� + */ + public RedisRpcResponse waitePushStreamOnline(RedisRpcRequest request) { + SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class); + logger.info("[redis-rpc] 鐩戝惉娴佷笂绾匡細 {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() ); + // 鏌ヨ鏈骇鏄惁鏈夎繖涓祦 + MediaServerItem mediaServerItem = mediaServerService.getMediaServerByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream()); + if (mediaServerItem != null) { + logger.info("[redis-rpc] 鐩戝惉娴佷笂绾挎椂鍙戠幇娴佸凡瀛樺湪鐩存帴杩斿洖锛� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() ); + RedisRpcResponse response = request.getResponse(); + response.setBody(sendRtpItem); + response.setStatusCode(200); + } + // 鐩戝惉娴佷笂绾裤�� 娴佷笂绾跨洿鎺ュ彂閫乻endRtpItem娑堟伅缁欏疄闄呯殑淇′护澶勭悊鑰� + HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed( + sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null); + + hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> { + logger.info("[redis-rpc] 鐩戝惉娴佷笂绾匡紝娴佸凡涓婄嚎锛� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() ); + // 璇诲彇redis涓殑涓婄骇鐐规挱淇℃伅锛岀敓鎴恠endRtpItm鍙戦�佸嚭鍘� + if (sendRtpItem.getSsrc() == null) { + // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡 + String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItemInUse.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItemInUse.getId()); + sendRtpItem.setSsrc(ssrc); + } + sendRtpItem.setMediaServerId(mediaServerItemInUse.getId()); + sendRtpItem.setLocalIp(mediaServerItemInUse.getSdpIp()); + sendRtpItem.setServerId(userSetting.getServerId()); + RedisRpcResponse response = request.getResponse(); + response.setBody(sendRtpItem); + response.setStatusCode(200); + // 鎵嬪姩鍙戦�佺粨鏋� + sendResponse(response); + + }); + return null; + } + + + /** + * 寮�濮嬪彂娴� + */ + public RedisRpcResponse startSendRtp(RedisRpcRequest request) { + SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class); + MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + if (mediaServerItem == null) { + logger.info("[redis-rpc] startSendRtp->鏈壘鍒癕ediaServer锛� {}", sendRtpItem.getMediaServerId() ); + RedisRpcResponse response = request.getResponse(); + response.setStatusCode(200); + } + + Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream()); + if (!streamReady) { + logger.info("[redis-rpc] startSendRtp->娴佷笉鍦ㄧ嚎锛� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() ); + RedisRpcResponse response = request.getResponse(); + response.setStatusCode(200); + } + JSONObject jsonObject = zlmServerFactory.startSendRtp(mediaServerItem, sendRtpItem); + RedisRpcResponse response = request.getResponse(); + response.setStatusCode(200); + if (jsonObject.getInteger("code") == 0) { + WVPResult wvpResult = WVPResult.success(); + response.setBody(wvpResult); + }else { + WVPResult wvpResult = WVPResult.fail(jsonObject.getInteger("code"), jsonObject.getString("msg")); + response.setBody(wvpResult); + } + return response; + } + + /** + * 鍋滄鍙戞祦 + */ + public RedisRpcResponse stopSendRtp(RedisRpcRequest request) { + SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class); + logger.info("[redis-rpc] 鍋滄鎺ㄦ祦锛� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() ); + MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + if (mediaServerItem == null) { + logger.info("[redis-rpc] stopSendRtp->鏈壘鍒癕ediaServer锛� {}", sendRtpItem.getMediaServerId() ); + RedisRpcResponse response = request.getResponse(); + response.setStatusCode(200); + } + JSONObject jsonObject = zlmServerFactory.stopSendRtpStream(mediaServerItem, sendRtpItem); + RedisRpcResponse response = request.getResponse(); + response.setStatusCode(200); + if (jsonObject.getInteger("code") == 0) { + logger.info("[redis-rpc] 鍋滄鎺ㄦ祦鎴愬姛锛� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() ); + WVPResult wvpResult = WVPResult.success(); + response.setBody(wvpResult); + }else { + int code = jsonObject.getInteger("code"); + String msg = jsonObject.getString("msg"); + logger.info("[redis-rpc] 鍋滄鎺ㄦ祦澶辫触锛� {}/{}, code锛� {}, msg: {}", sendRtpItem.getApp(), sendRtpItem.getStream(),code, msg ); + WVPResult wvpResult = WVPResult.fail(code, msg); + response.setBody(wvpResult); + } + return response; + } + + private void sendResponse(RedisRpcResponse response){ + response.setToId(userSetting.getServerId()); + RedisRpcMessage message = new RedisRpcMessage(); + message.setResponse(response); + redisTemplate.convertAndSend(RedisRpcConfig.REDIS_REQUEST_CHANNEL_KEY, message); + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java new file mode 100644 index 0000000..e9a00a9 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java @@ -0,0 +1,100 @@ +package com.genersoft.iot.vmp.service.redisMsg.service; + +import com.alibaba.fastjson2.JSON; +import com.genersoft.iot.vmp.common.CommonCallback; +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig; +import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest; +import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse; +import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; +import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; +import com.genersoft.iot.vmp.vmanager.bean.WVPResult; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Service +public class RedisRpcServiceImpl implements IRedisRpcService { + + + @Autowired + private RedisRpcConfig redisRpcConfig; + + @Autowired + private UserSetting userSetting; + + @Autowired + private ZlmHttpHookSubscribe hookSubscribe; + + @Autowired + private SSRCFactory ssrcFactory; + + private RedisRpcRequest buildRequest(String uri, Object param) { + RedisRpcRequest request = new RedisRpcRequest(); + request.setFromId(userSetting.getServerId()); + request.setParam(param); + request.setUri(uri); + return request; + } + + @Override + public SendRtpItem getSendRtpItem(SendRtpItem sendRtpItem) { + + RedisRpcRequest request = buildRequest("getSendRtpItem", sendRtpItem); + RedisRpcResponse response = redisRpcConfig.request(request, 10); + return JSON.parseObject(response.getBody().toString(), SendRtpItem.class); + } + + @Override + public WVPResult startSendRtp(SendRtpItem sendRtpItem) { + RedisRpcRequest request = buildRequest("startSendRtp", sendRtpItem); + request.setToId(sendRtpItem.getServerId()); + RedisRpcResponse response = redisRpcConfig.request(request, 10); + return JSON.parseObject(response.getBody().toString(), WVPResult.class); + } + + @Override + public WVPResult stopSendRtp(SendRtpItem sendRtpItem) { + RedisRpcRequest request = buildRequest("stopSendRtp", sendRtpItem); + request.setToId(sendRtpItem.getServerId()); + RedisRpcResponse response = redisRpcConfig.request(request, 10); + return JSON.parseObject(response.getBody().toString(), WVPResult.class); + } + + @Override + public void waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback<SendRtpItem> callback) { + // 鐩戝惉娴佷笂绾裤�� 娴佷笂绾跨洿鎺ュ彂閫乻endRtpItem娑堟伅缁欏疄闄呯殑淇′护澶勭悊鑰� + HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed( + sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null); + hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> { + + // 璇诲彇redis涓殑涓婄骇鐐规挱淇℃伅锛岀敓鎴恠endRtpItm鍙戦�佸嚭鍘� + if (sendRtpItem.getSsrc() == null) { + // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡 + String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItemInUse.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItemInUse.getId()); + sendRtpItem.setSsrc(ssrc); + } + sendRtpItem.setMediaServerId(mediaServerItemInUse.getId()); + sendRtpItem.setLocalIp(mediaServerItemInUse.getSdpIp()); + sendRtpItem.setServerId(userSetting.getServerId()); + if (callback != null) { + callback.run(sendRtpItem); + } + hookSubscribe.removeSubscribe(hook); + }); + RedisRpcRequest request = buildRequest("waitePushStreamOnline", sendRtpItem); + request.setToId(sendRtpItem.getServerId()); + redisRpcConfig.request(request, response -> { + SendRtpItem sendRtpItemFromOther = JSON.parseObject(response.getBody().toString(), SendRtpItem.class); + if (callback != null) { + callback.run(sendRtpItemFromOther); + } + }); + + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 7088837..60ebfab 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -682,19 +682,20 @@ @Override public void addWaiteSendRtpItem(SendRtpItem sendRtpItem, int platformPlayTimeout) { String key = VideoManagerConstants.WAITE_SEND_PUSH_STREAM + sendRtpItem.getApp() + "_" + sendRtpItem.getStream(); - redisTemplate.opsForValue().set(key, platformPlayTimeout); + redisTemplate.opsForValue().set(key, sendRtpItem); } @Override public SendRtpItem getWaiteSendRtpItem(String app, String stream) { String key = VideoManagerConstants.WAITE_SEND_PUSH_STREAM + app + "_" + stream; - return (SendRtpItem)redisTemplate.opsForValue().get(key); + return JsonUtil.redisJsonToObject(redisTemplate, key, SendRtpItem.class); } @Override public void sendStartSendRtp(SendRtpItem sendRtpItem) { String key = VideoManagerConstants.START_SEND_PUSH_STREAM + sendRtpItem.getApp() + "_" + sendRtpItem.getStream(); - redisTemplate.opsForValue().set(key, JSON.toJSONString(sendRtpItem)); + logger.info("[redis鍙戦�侀�氱煡] 閫氱煡鍏朵粬WVP鎺ㄦ祦 {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getPlatformId()); + redisTemplate.convertAndSend(key, JSON.toJSON(sendRtpItem)); } @Override -- Gitblit v1.8.0