src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java
@@ -34,23 +34,13 @@ @Autowired private RedisPushStreamStatusListMsgListener redisPushStreamListMsgListener; @Autowired private RedisPushStreamResponseListener redisPushStreamResponseListener; @Autowired private RedisCloseStreamMsgListener redisCloseStreamMsgListener; @Autowired private RedisPushStreamCloseResponseListener redisPushStreamCloseResponseListener; @Autowired private RedisPlatformWaitPushStreamOnlineListener redisPlatformWaitPushStreamOnlineListener; @Autowired private RedisPlatformStartSendRtpListener redisPlatformStartSendRtpListener; @Autowired private RedisPlatformPushStreamOnlineLister redisPlatformPushStreamOnlineLister; private RedisRpcConfig redisRpcConfig; /** @@ -69,12 +59,8 @@ container.addMessageListener(redisAlarmMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_SUBSCRIBE_ALARM_RECEIVE)); container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE)); container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE)); container.addMessageListener(redisPushStreamResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE)); container.addMessageListener(redisCloseStreamMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE)); container.addMessageListener(redisPushStreamCloseResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED)); container.addMessageListener(redisPlatformStartSendRtpListener, new PatternTopic(VideoManagerConstants.START_SEND_PUSH_STREAM)); container.addMessageListener(redisPlatformWaitPushStreamOnlineListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_REQUESTED)); container.addMessageListener(redisPlatformPushStreamOnlineLister, new PatternTopic(VideoManagerConstants.PUSH_STREAM_ONLINE)); container.addMessageListener(redisRpcConfig, new PatternTopic(RedisRpcConfig.REDIS_REQUEST_CHANNEL_KEY)); return container; } } src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java
New file @@ -0,0 +1,205 @@ package com.genersoft.iot.vmp.conf.redis; import com.alibaba.fastjson2.JSON; import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse; import com.genersoft.iot.vmp.service.redisMsg.control.RedisRpcController; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.Map; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; @Component public class RedisRpcConfig implements MessageListener { private final static Logger logger = LoggerFactory.getLogger(RedisRpcConfig.class); public final static String REDIS_REQUEST_CHANNEL_KEY = "WVP_REDIS_REQUEST_CHANNEL_KEY"; private final Random random = new Random(); @Autowired private UserSetting userSetting; @Autowired private RedisRpcController redisRpcController; @Autowired private RedisTemplate<Object, Object> redisTemplate; private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); @Qualifier("taskExecutor") @Autowired private ThreadPoolTaskExecutor taskExecutor; @Override public void onMessage(Message message, byte[] pattern) { boolean isEmpty = taskQueue.isEmpty(); taskQueue.offer(message); if (isEmpty) { taskExecutor.execute(() -> { while (!taskQueue.isEmpty()) { Message msg = taskQueue.poll(); try { RedisRpcMessage redisRpcMessage = JSON.parseObject(new String(msg.getBody()), RedisRpcMessage.class); if (redisRpcMessage.getRequest() != null) { handlerRequest(redisRpcMessage.getRequest()); } else if (redisRpcMessage.getResponse() != null){ handlerResponse(redisRpcMessage.getResponse()); } else { logger.error("[redis rpc 解析失败] {}", JSON.toJSONString(redisRpcMessage)); } } catch (Exception e) { logger.error("[redis rpc 解析异常] ", e); } } }); } } private void handlerResponse(RedisRpcResponse response) { if (userSetting.getServerId().equals(response.getToId())) { return; } response(response); } private void handlerRequest(RedisRpcRequest request) { try { if (userSetting.getServerId().equals(request.getFromId())) { return; } Method method = getMethod(request.getUri()); // 没有携带目标ID的可以理解为哪个wvp有结果就哪个回复,携带目标ID,但是如果是不存在的uri则直接回复404 if (userSetting.getServerId().equals(request.getToId())) { if (method == null) { // 回复404结果 RedisRpcResponse response = request.getResponse(); response.setStatusCode(404); sendResponse(response); return; } RedisRpcResponse response = (RedisRpcResponse)method.invoke(redisRpcController, request); if(response != null) { sendResponse(response); } }else { if (method == null) { return; } RedisRpcResponse response = (RedisRpcResponse)method.invoke(redisRpcController, request); if (response != null) { sendResponse(response); } } }catch (InvocationTargetException | IllegalAccessException e) { logger.error("[redis rpc ] 处理请求失败 ", e); } } private Method getMethod(String name) { // 启动后扫描所有的路径注解 Method[] methods = redisRpcController.getClass().getMethods(); for (Method method : methods) { if (method.getName().equals(name)) { return method; } } return null; } private void sendResponse(RedisRpcResponse response){ response.setToId(userSetting.getServerId()); RedisRpcMessage message = new RedisRpcMessage(); message.setResponse(response); redisTemplate.convertAndSend(REDIS_REQUEST_CHANNEL_KEY, message); } private void sendRequest(RedisRpcRequest request){ RedisRpcMessage message = new RedisRpcMessage(); message.setRequest(request); redisTemplate.convertAndSend(REDIS_REQUEST_CHANNEL_KEY, message); } private final Map<Long, SynchronousQueue<RedisRpcResponse>> topicSubscribers = new ConcurrentHashMap<>(); private final Map<Long, CommonCallback<RedisRpcResponse>> callbacks = new ConcurrentHashMap<>(); public RedisRpcResponse request(RedisRpcRequest request, int timeOut) { request.setSn((long) random.nextInt(1000) + 1); SynchronousQueue<RedisRpcResponse> subscribe = subscribe(request.getSn()); try { sendRequest(request); return subscribe.poll(timeOut, TimeUnit.SECONDS); } catch (InterruptedException e) { logger.warn("[redis rpc timeout] uri: {}, sn: {}", request.getUri(), request.getSn(), e); } finally { this.unsubscribe(request.getSn()); } return null; } public void request(RedisRpcRequest request, CommonCallback<RedisRpcResponse> callback) { request.setSn((long) random.nextInt(1000) + 1); setCallback(request.getSn(), callback); sendRequest(request); } public Boolean response(RedisRpcResponse response) { SynchronousQueue<RedisRpcResponse> queue = topicSubscribers.get(response.getSn()); CommonCallback<RedisRpcResponse> callback = callbacks.get(response.getSn()); if (queue != null) { try { return queue.offer(response, 2, TimeUnit.SECONDS); } catch (InterruptedException e) { logger.error("{}", e.getMessage(), e); } }else if (callback != null) { callback.run(response); callbacks.remove(response.getSn()); } return false; } private void unsubscribe(long key) { topicSubscribers.remove(key); } private SynchronousQueue<RedisRpcResponse> subscribe(long key) { SynchronousQueue<RedisRpcResponse> queue = null; if (!topicSubscribers.containsKey(key)) topicSubscribers.put(key, queue = new SynchronousQueue<>()); return queue; } private void setCallback(long key, CommonCallback<RedisRpcResponse> callback) { if (!callbacks.containsKey(key)) { callbacks.put(key, callback); } } public void removeCallback(long key) { callbacks.remove(key); } } src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcMessage.java
New file @@ -0,0 +1,24 @@ package com.genersoft.iot.vmp.conf.redis.bean; public class RedisRpcMessage { private RedisRpcRequest request; private RedisRpcResponse response; public RedisRpcRequest getRequest() { return request; } public void setRequest(RedisRpcRequest request) { this.request = request; } public RedisRpcResponse getResponse() { return response; } public void setResponse(RedisRpcResponse response) { this.response = response; } } src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcRequest.java
New file @@ -0,0 +1,93 @@ package com.genersoft.iot.vmp.conf.redis.bean; /** * 通过redis发送请求 */ public class RedisRpcRequest { /** * 来自的WVP ID */ private String fromId; /** * 目标的WVP ID */ private String toId; /** * 序列号 */ private long sn; /** * 访问的路径 */ private String uri; /** * 参数 */ private Object param; public String getFromId() { return fromId; } public void setFromId(String fromId) { this.fromId = fromId; } public String getToId() { return toId; } public void setToId(String toId) { this.toId = toId; } public String getUri() { return uri; } public void setUri(String uri) { this.uri = uri; } public Object getParam() { return param; } public void setParam(Object param) { this.param = param; } public long getSn() { return sn; } public void setSn(long sn) { this.sn = sn; } @Override public String toString() { return "RedisRpcRequest{" + "fromId='" + fromId + '\'' + ", toId='" + toId + '\'' + ", sn='" + sn + '\'' + ", uri='" + uri + '\'' + ", param=" + param + '}'; } public RedisRpcResponse getResponse() { RedisRpcResponse response = new RedisRpcResponse(); response.setFromId(fromId); response.setToId(toId); response.setSn(sn); response.setUri(uri); return response; } } src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcResponse.java
New file @@ -0,0 +1,87 @@ package com.genersoft.iot.vmp.conf.redis.bean; /** * 通过redis发送回复 */ public class RedisRpcResponse { /** * 来自的WVP ID */ private String fromId; /** * 目标的WVP ID */ private String toId; /** * 序列号 */ private long sn; /** * 状态码 */ private int statusCode; /** * 访问的路径 */ private String uri; /** * 参数 */ private Object body; public String getFromId() { return fromId; } public void setFromId(String fromId) { this.fromId = fromId; } public String getToId() { return toId; } public void setToId(String toId) { this.toId = toId; } public long getSn() { return sn; } public void setSn(long sn) { this.sn = sn; } public int getStatusCode() { return statusCode; } public void setStatusCode(int statusCode) { this.statusCode = statusCode; } public String getUri() { return uri; } public void setUri(String uri) { this.uri = uri; } public Object getBody() { return body; } public void setBody(Object body) { this.body = body; } } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
@@ -15,9 +15,11 @@ import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IPlayService; import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; @@ -54,6 +56,8 @@ @Autowired private IRedisCatchStorage redisCatchStorage; @Autowired private IRedisRpcService redisRpcService; @Autowired private UserSetting userSetting; @@ -113,7 +117,15 @@ if (parentPlatform != null) { Map<String, Object> param = getSendRtpParam(sendRtpItem); if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) { redisCatchStorage.sendStartSendRtp(sendRtpItem); // redisCatchStorage.sendStartSendRtp(sendRtpItem); WVPResult wvpResult = redisRpcService.startSendRtp(sendRtpItem); if (wvpResult.getCode() == 0) { MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), parentPlatform.getServerGBId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); messageForPushChannel.setPlatFormIndex(parentPlatform.getId()); redisCatchStorage.sendPlatformStartPlayMsg(messageForPushChannel); } } else { JSONObject startSendRtpStreamResult = sendRtp(sendRtpItem, mediaInfo, param); if (startSendRtpStreamResult != null) { src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
@@ -18,7 +18,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.bean.RequestStopPushStreamMsg; import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import gov.nist.javax.sip.message.SIPRequest; @@ -97,6 +97,9 @@ @Autowired private IStreamPushService pushService; @Autowired private IRedisRpcService redisRpcService; @Override public void afterPropertiesSet() throws Exception { @@ -134,13 +137,8 @@ if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) { // 查询这路流是否是本平台的 StreamPushItem push = pushService.getPush(sendRtpItem.getApp(), sendRtpItem.getStream()); if (push!= null && !push.isSelf()) { // 不是本平台的就发送redis消息让其他wvp停止发流 ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId()); if (platform != null) { RequestStopPushStreamMsg streamMsg = RequestStopPushStreamMsg.getInstance(sendRtpItem, platform.getName(), platform.getId()); // redisGbPlayMsgListener.sendMsgForStopSendRtpStream(sendRtpItem.getServerId(), streamMsg); } if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) { redisRpcService.stopSendRtp(sendRtpItem); }else { MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -19,7 +19,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; import com.genersoft.iot.vmp.service.redisMsg.RedisPlatformPushStreamOnlineLister; import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.*; @@ -29,7 +29,6 @@ import com.genersoft.iot.vmp.service.bean.InviteErrorCode; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.service.redisMsg.RedisPushStreamResponseListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.DateUtil; @@ -86,16 +85,13 @@ private IRedisCatchStorage redisCatchStorage; @Autowired private IInviteStreamService inviteStreamService; private IRedisRpcService redisRpcService; @Autowired private SSRCFactory ssrcFactory; @Autowired private DynamicTask dynamicTask; @Autowired private RedisPushStreamResponseListener redisPushStreamResponseListener; @Autowired private IPlayService playService; @@ -120,9 +116,6 @@ @Autowired private UserSetting userSetting; @Autowired private RedisPlatformPushStreamOnlineLister mediaListManager; @Autowired private SipConfig config; @@ -594,15 +587,17 @@ sendRtpItem.setSessionName(sessionName); if ("push".equals(gbStream.getStreamType())) { sendRtpItem.setPlayType(InviteStreamType.PUSH); if (streamPushItem != null) { // 从redis查询是否正在接收这个推流 OnStreamChangedHookParam pushListItem = redisCatchStorage.getPushListItem(gbStream.getApp(), gbStream.getStream()); if (pushListItem != null) { sendRtpItem.setServerId(pushListItem.getSeverId()); sendRtpItem.setMediaServerId(pushListItem.getMediaServerId()); StreamPushItem transform = streamPushService.transform(pushListItem); transform.setSelf(userSetting.getServerId().equals(pushListItem.getSeverId())); // 推流状态 // 开始推流 sendPushStream(sendRtpItem, mediaServerItem, platform, request); }else { if (!platform.isStartOfflinePush()) { @@ -702,8 +697,6 @@ } // 写入redis, 超时时回复 sendRtpItem.setStatus(1); sendRtpItem.setFromTag(request.getFromTag()); sendRtpItem.setLocalIp(mediaServerItem.getSdpIp()); SIPResponse response = sendStreamAck(request, sendRtpItem, platform); if (response != null) { sendRtpItem.setToTag(response.getToTag()); @@ -714,7 +707,6 @@ sendRtpItem.setSsrc(ssrc); } redisCatchStorage.updateSendRTPSever(sendRtpItem); } else { // 不在线 拉起 notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request); @@ -769,18 +761,14 @@ dynamicTask.startDelay(sendRtpItem.getCallId(), () -> { logger.info("[ app={}, stream={} ] 等待设备开始推流超时", sendRtpItem.getApp(), sendRtpItem.getStream()); try { redisPushStreamResponseListener.removeEvent(sendRtpItem.getApp(), sendRtpItem.getStream()); mediaListManager.removedChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream()); responseAck(request, Response.REQUEST_TIMEOUT); // 超时 } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("未处理的异常 ", e); } }, userSetting.getPlatformPlayTimeout()); redisCatchStorage.addWaiteSendRtpItem(sendRtpItem, userSetting.getPlatformPlayTimeout()); // 添加上线的通知 mediaListManager.addChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream(), (sendRtpItemFromRedis) -> { // redisRpcService.waitePushStreamOnline(sendRtpItem, (sendRtpItemFromRedis) -> { dynamicTask.stop(sendRtpItem.getCallId()); redisPushStreamResponseListener.removeEvent(sendRtpItem.getApp(), sendRtpItem.getStream()); if (sendRtpItemFromRedis.getServerId().equals(userSetting.getServerId())) { int localPort = sendRtpPortManager.getNextPort(mediaServerItem); @@ -813,19 +801,7 @@ // 其他平台内容 otherWvpPushStream(sendRtpItemFromRedis, request, platform); } }); // 添加回复的拒绝或者错误的通知 redisPushStreamResponseListener.addEvent(sendRtpItem.getApp(), sendRtpItem.getStream(), response -> { if (response.getCode() != 0) { dynamicTask.stop(sendRtpItem.getCallId()); mediaListManager.removedChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream()); try { responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg()); } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[命令发送失败] 国标级联 点播回复: {}", e.getMessage()); } } }); } @@ -836,12 +812,9 @@ */ private void otherWvpPushStream(SendRtpItem sendRtpItem, SIPRequest request, ParentPlatform platform) { logger.info("[级联点播]直播流来自其他平台,发送redis消息"); // 发送redis消息 redisCatchStorage.sendStartSendRtp(sendRtpItem); sendRtpItem = redisRpcService.getSendRtpItem(sendRtpItem); // 写入redis, 超时时回复 sendRtpItem.setStatus(1); sendRtpItem.setCallId(request.getCallIdHeader().getCallId()); sendRtpItem.setFromTag(request.getFromTag()); SIPResponse response = sendStreamAck(request, sendRtpItem, platform); if (response != null) { sendRtpItem.setToTag(response.getToTag()); src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -23,7 +23,6 @@ import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.service.redisMsg.RedisPlatformPushStreamOnlineLister; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.DateUtil; @@ -99,9 +98,6 @@ @Autowired private EventPublisher eventPublisher; @Autowired private RedisPlatformPushStreamOnlineLister zlmMediaListManager; @Autowired private ZlmHttpHookSubscribe subscribe; src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java
@@ -365,4 +365,13 @@ } return result; } public JSONObject stopSendRtpStream(MediaServerItem mediaServerItem, SendRtpItem sendRtpItem) { Map<String, Object> param = new HashMap<>(); param.put("vhost", "__defaultVhost__"); param.put("app", sendRtpItem.getApp()); param.put("stream", sendRtpItem.getStream()); param.put("ssrc", sendRtpItem.getSsrc()); return zlmresTfulUtils.startSendRtp(mediaServerItem, param); } } src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java
@@ -6,7 +6,6 @@ import com.genersoft.iot.vmp.media.zlm.dto.ServerKeepaliveData; import com.genersoft.iot.vmp.service.bean.MediaServerLoad; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.vmanager.bean.RecordFile; import java.util.List; @@ -97,4 +96,5 @@ List<MediaServerItem> getAllWithAssistPort(); MediaServerItem getMediaServerByAppAndStream(String app, String stream); } src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
@@ -25,7 +25,6 @@ import com.genersoft.iot.vmp.utils.JsonUtil; import com.genersoft.iot.vmp.utils.redis.RedisUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.RecordFile; import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Response; @@ -36,19 +35,15 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionStatus; import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; import java.io.File; import java.time.LocalDateTime; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; /** * 媒体服务器节点管理 @@ -751,6 +746,22 @@ @Override public List<MediaServerItem> getAllWithAssistPort() { return mediaServerMapper.queryAllWithAssistPort(); } @Override public MediaServerItem getMediaServerByAppAndStream(String app, String stream) { List<MediaServerItem> mediaServerItemList = getAllOnline(); if (mediaServerItemList.isEmpty()) { return null; } for (MediaServerItem mediaServerItem : mediaServerItemList) { Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, app, stream); if (streamReady) { return mediaServerItem; } } return null; } } src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java
New file @@ -0,0 +1,16 @@ package com.genersoft.iot.vmp.service.redisMsg; import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; public interface IRedisRpcService { SendRtpItem getSendRtpItem(SendRtpItem sendRtpItem); WVPResult startSendRtp(SendRtpItem sendRtpItem); void waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback<SendRtpItem> callback); WVPResult stopSendRtp(SendRtpItem sendRtpItem); } src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformPushStreamOnlineLister.java
File was deleted src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java
File was deleted src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformWaitPushStreamOnlineListener.java
File was deleted src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java
File was deleted src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java
File was deleted src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java
New file @@ -0,0 +1,203 @@ package com.genersoft.iot.vmp.service.redisMsg.control; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; /** * 其他wvp发起的rpc调用,这里的方法被 RedisRpcConfig 通过反射寻找对应的方法名称调用 */ @Component public class RedisRpcController { private final static Logger logger = LoggerFactory.getLogger(RedisRpcController.class); @Autowired private SSRCFactory ssrcFactory; @Autowired private IMediaServerService mediaServerService; @Autowired private SendRtpPortManager sendRtpPortManager; @Autowired private IRedisCatchStorage redisCatchStorage; @Autowired private UserSetting userSetting; @Autowired private ZlmHttpHookSubscribe hookSubscribe; @Autowired private ZLMServerFactory zlmServerFactory; @Autowired private RedisTemplate<Object, Object> redisTemplate; /** * 获取发流的信息 */ public RedisRpcResponse getSendRtpItem(RedisRpcRequest request) { SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class); logger.info("[redis-rpc] 获取发流的信息: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() ); // 查询本级是否有这个流 MediaServerItem mediaServerItem = mediaServerService.getMediaServerByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream()); if (mediaServerItem == null) { RedisRpcResponse response = request.getResponse(); response.setStatusCode(200); } // 自平台内容 int localPort = sendRtpPortManager.getNextPort(mediaServerItem); if (localPort == 0) { logger.info("[redis-rpc] getSendRtpItem->服务器端口资源不足" ); RedisRpcResponse response = request.getResponse(); response.setStatusCode(200); } // 写入redis, 超时时回复 sendRtpItem.setStatus(1); sendRtpItem.setServerId(userSetting.getServerId()); sendRtpItem.setLocalIp(mediaServerItem.getSdpIp()); if (sendRtpItem.getSsrc() == null) { // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); sendRtpItem.setSsrc(ssrc); } redisCatchStorage.updateSendRTPSever(sendRtpItem); RedisRpcResponse response = request.getResponse(); response.setStatusCode(200); response.setBody(sendRtpItem); return response; } /** * 监听流上线 */ public RedisRpcResponse waitePushStreamOnline(RedisRpcRequest request) { SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class); logger.info("[redis-rpc] 监听流上线: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() ); // 查询本级是否有这个流 MediaServerItem mediaServerItem = mediaServerService.getMediaServerByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream()); if (mediaServerItem != null) { logger.info("[redis-rpc] 监听流上线时发现流已存在直接返回: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() ); RedisRpcResponse response = request.getResponse(); response.setBody(sendRtpItem); response.setStatusCode(200); } // 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者 HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed( sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null); hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> { logger.info("[redis-rpc] 监听流上线,流已上线: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() ); // 读取redis中的上级点播信息,生成sendRtpItm发送出去 if (sendRtpItem.getSsrc() == null) { // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItemInUse.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItemInUse.getId()); sendRtpItem.setSsrc(ssrc); } sendRtpItem.setMediaServerId(mediaServerItemInUse.getId()); sendRtpItem.setLocalIp(mediaServerItemInUse.getSdpIp()); sendRtpItem.setServerId(userSetting.getServerId()); RedisRpcResponse response = request.getResponse(); response.setBody(sendRtpItem); response.setStatusCode(200); // 手动发送结果 sendResponse(response); }); return null; } /** * 开始发流 */ public RedisRpcResponse startSendRtp(RedisRpcRequest request) { SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class); MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId()); if (mediaServerItem == null) { logger.info("[redis-rpc] startSendRtp->未找到MediaServer: {}", sendRtpItem.getMediaServerId() ); RedisRpcResponse response = request.getResponse(); response.setStatusCode(200); } Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream()); if (!streamReady) { logger.info("[redis-rpc] startSendRtp->流不在线: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() ); RedisRpcResponse response = request.getResponse(); response.setStatusCode(200); } JSONObject jsonObject = zlmServerFactory.startSendRtp(mediaServerItem, sendRtpItem); RedisRpcResponse response = request.getResponse(); response.setStatusCode(200); if (jsonObject.getInteger("code") == 0) { WVPResult wvpResult = WVPResult.success(); response.setBody(wvpResult); }else { WVPResult wvpResult = WVPResult.fail(jsonObject.getInteger("code"), jsonObject.getString("msg")); response.setBody(wvpResult); } return response; } /** * 停止发流 */ public RedisRpcResponse stopSendRtp(RedisRpcRequest request) { SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class); logger.info("[redis-rpc] 停止推流: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() ); MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId()); if (mediaServerItem == null) { logger.info("[redis-rpc] stopSendRtp->未找到MediaServer: {}", sendRtpItem.getMediaServerId() ); RedisRpcResponse response = request.getResponse(); response.setStatusCode(200); } JSONObject jsonObject = zlmServerFactory.stopSendRtpStream(mediaServerItem, sendRtpItem); RedisRpcResponse response = request.getResponse(); response.setStatusCode(200); if (jsonObject.getInteger("code") == 0) { logger.info("[redis-rpc] 停止推流成功: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() ); WVPResult wvpResult = WVPResult.success(); response.setBody(wvpResult); }else { int code = jsonObject.getInteger("code"); String msg = jsonObject.getString("msg"); logger.info("[redis-rpc] 停止推流失败: {}/{}, code: {}, msg: {}", sendRtpItem.getApp(), sendRtpItem.getStream(),code, msg ); WVPResult wvpResult = WVPResult.fail(code, msg); response.setBody(wvpResult); } return response; } private void sendResponse(RedisRpcResponse response){ response.setToId(userSetting.getServerId()); RedisRpcMessage message = new RedisRpcMessage(); message.setResponse(response); redisTemplate.convertAndSend(RedisRpcConfig.REDIS_REQUEST_CHANNEL_KEY, message); } } src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java
New file @@ -0,0 +1,100 @@ package com.genersoft.iot.vmp.service.redisMsg.service; import com.alibaba.fastjson2.JSON; import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class RedisRpcServiceImpl implements IRedisRpcService { @Autowired private RedisRpcConfig redisRpcConfig; @Autowired private UserSetting userSetting; @Autowired private ZlmHttpHookSubscribe hookSubscribe; @Autowired private SSRCFactory ssrcFactory; private RedisRpcRequest buildRequest(String uri, Object param) { RedisRpcRequest request = new RedisRpcRequest(); request.setFromId(userSetting.getServerId()); request.setParam(param); request.setUri(uri); return request; } @Override public SendRtpItem getSendRtpItem(SendRtpItem sendRtpItem) { RedisRpcRequest request = buildRequest("getSendRtpItem", sendRtpItem); RedisRpcResponse response = redisRpcConfig.request(request, 10); return JSON.parseObject(response.getBody().toString(), SendRtpItem.class); } @Override public WVPResult startSendRtp(SendRtpItem sendRtpItem) { RedisRpcRequest request = buildRequest("startSendRtp", sendRtpItem); request.setToId(sendRtpItem.getServerId()); RedisRpcResponse response = redisRpcConfig.request(request, 10); return JSON.parseObject(response.getBody().toString(), WVPResult.class); } @Override public WVPResult stopSendRtp(SendRtpItem sendRtpItem) { RedisRpcRequest request = buildRequest("stopSendRtp", sendRtpItem); request.setToId(sendRtpItem.getServerId()); RedisRpcResponse response = redisRpcConfig.request(request, 10); return JSON.parseObject(response.getBody().toString(), WVPResult.class); } @Override public void waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback<SendRtpItem> callback) { // 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者 HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed( sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null); hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> { // 读取redis中的上级点播信息,生成sendRtpItm发送出去 if (sendRtpItem.getSsrc() == null) { // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItemInUse.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItemInUse.getId()); sendRtpItem.setSsrc(ssrc); } sendRtpItem.setMediaServerId(mediaServerItemInUse.getId()); sendRtpItem.setLocalIp(mediaServerItemInUse.getSdpIp()); sendRtpItem.setServerId(userSetting.getServerId()); if (callback != null) { callback.run(sendRtpItem); } hookSubscribe.removeSubscribe(hook); }); RedisRpcRequest request = buildRequest("waitePushStreamOnline", sendRtpItem); request.setToId(sendRtpItem.getServerId()); redisRpcConfig.request(request, response -> { SendRtpItem sendRtpItemFromOther = JSON.parseObject(response.getBody().toString(), SendRtpItem.class); if (callback != null) { callback.run(sendRtpItemFromOther); } }); } } src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -682,19 +682,20 @@ @Override public void addWaiteSendRtpItem(SendRtpItem sendRtpItem, int platformPlayTimeout) { String key = VideoManagerConstants.WAITE_SEND_PUSH_STREAM + sendRtpItem.getApp() + "_" + sendRtpItem.getStream(); redisTemplate.opsForValue().set(key, platformPlayTimeout); redisTemplate.opsForValue().set(key, sendRtpItem); } @Override public SendRtpItem getWaiteSendRtpItem(String app, String stream) { String key = VideoManagerConstants.WAITE_SEND_PUSH_STREAM + app + "_" + stream; return (SendRtpItem)redisTemplate.opsForValue().get(key); return JsonUtil.redisJsonToObject(redisTemplate, key, SendRtpItem.class); } @Override public void sendStartSendRtp(SendRtpItem sendRtpItem) { String key = VideoManagerConstants.START_SEND_PUSH_STREAM + sendRtpItem.getApp() + "_" + sendRtpItem.getStream(); redisTemplate.opsForValue().set(key, JSON.toJSONString(sendRtpItem)); logger.info("[redis发送通知] 通知其他WVP推流 {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getPlatformId()); redisTemplate.convertAndSend(key, JSON.toJSON(sendRtpItem)); } @Override