From bf6e09d231f49fb0c2cd5a81f6b31cc64d27c368 Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期三, 17 四月 2024 12:56:22 +0800
Subject: [PATCH] 修复多wvp国标级联机制
---
src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java | 4
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java | 18 ++
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java | 73 +++++------
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java | 15 +-
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java | 37 ++---
src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java | 5
src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java | 57 +++++++++
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java | 7
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java | 76 ++++++++++++
src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java | 17 ++
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java | 2
src/main/resources/application.yml | 2
12 files changed, 237 insertions(+), 76 deletions(-)
diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java
index 5ddaed3..dcf2830 100644
--- a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java
+++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java
@@ -42,6 +42,9 @@
@Autowired
private RedisRpcConfig redisRpcConfig;
+ @Autowired
+ private RedisPushStreamResponseListener redisPushStreamCloseResponseListener;
+
/**
* redis娑堟伅鐩戝惉鍣ㄥ鍣� 鍙互娣诲姞澶氫釜鐩戝惉涓嶅悓璇濋鐨剅edis鐩戝惉鍣紝鍙渶瑕佹妸娑堟伅鐩戝惉鍣ㄥ拰鐩稿簲鐨勬秷鎭闃呭鐞嗗櫒缁戝畾锛岃娑堟伅鐩戝惉鍣�
@@ -61,6 +64,7 @@
container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE));
container.addMessageListener(redisCloseStreamMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE));
container.addMessageListener(redisRpcConfig, new PatternTopic(RedisRpcConfig.REDIS_REQUEST_CHANNEL_KEY));
+ container.addMessageListener(redisPushStreamCloseResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE));
return container;
}
}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
index 2c134fd..2e0ecf5 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
@@ -15,9 +15,7 @@
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
-import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.*;
-import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
@@ -135,29 +133,22 @@
logger.info("[鏀跺埌bye] 鍋滄鎺ㄦ祦锛歿}, 濯掍綋鑺傜偣锛� {}", streamId, sendRtpItem.getMediaServerId());
if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) {
- // 鏌ヨ杩欒矾娴佹槸鍚︽槸鏈钩鍙扮殑
- StreamPushItem push = pushService.getPush(sendRtpItem.getApp(), sendRtpItem.getStream());
- if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) {
- redisRpcService.stopSendRtp(sendRtpItem);
- }else {
- MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
- redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),
- callIdHeader.getCallId(), null);
- zlmServerFactory.stopSendRtpStream(mediaInfo, param);
- if (userSetting.getUseCustomSsrcForParentInvite()) {
- mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc());
- }
-
- ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
- if (platform != null) {
- MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
- sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
- sendRtpItem.getPlatformId(), platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
- messageForPushChannel.setPlatFormIndex(platform.getId());
- redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
+ ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
+ if (platform != null) {
+ redisCatchStorage.sendPlatformStopPlayMsg(sendRtpItem, platform);
+ if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) {
+ redisRpcService.stopSendRtp(sendRtpItem);
}else {
- logger.info("[涓婄骇骞冲彴鍋滄瑙傜湅] 鏈壘鍒板钩鍙皗}鐨勪俊鎭紝鍙戦�乺edis娑堟伅澶辫触", sendRtpItem.getPlatformId());
+ MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
+ redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),
+ callIdHeader.getCallId(), null);
+ zlmServerFactory.stopSendRtpStream(mediaInfo, param);
+ if (userSetting.getUseCustomSsrcForParentInvite()) {
+ mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc());
+ }
}
+ }else {
+ logger.info("[涓婄骇骞冲彴鍋滄瑙傜湅] 鏈壘鍒板钩鍙皗}鐨勪俊鎭紝鍙戦�乺edis娑堟伅澶辫触", sendRtpItem.getPlatformId());
}
}else {
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
index 59ff50c..a51dd37 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -29,6 +29,7 @@
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
+import com.genersoft.iot.vmp.service.redisMsg.RedisPushStreamResponseListener;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
@@ -125,6 +126,9 @@
@Autowired
private SendRtpPortManager sendRtpPortManager;
+
+ @Autowired
+ private RedisPushStreamResponseListener redisPushStreamResponseListener;
@Override
@@ -759,6 +763,7 @@
redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
// 璁剧疆瓒呮椂
dynamicTask.startDelay(sendRtpItem.getCallId(), () -> {
+ redisRpcService.stopWaitePushStreamOnline(sendRtpItem);
logger.info("[ app={}, stream={} ] 绛夊緟璁惧寮�濮嬫帹娴佽秴鏃�", sendRtpItem.getApp(), sendRtpItem.getStream());
try {
responseAck(request, Response.REQUEST_TIMEOUT); // 瓒呮椂
@@ -801,7 +806,18 @@
// 鍏朵粬骞冲彴鍐呭
otherWvpPushStream(sendRtpItemFromRedis, request, platform);
}
-
+ });
+ // 娣诲姞鍥炲鐨勬嫆缁濇垨鑰呴敊璇殑閫氱煡
+ redisPushStreamResponseListener.addEvent(sendRtpItem.getApp(), sendRtpItem.getStream(), response -> {
+ if (response.getCode() != 0) {
+ dynamicTask.stop(sendRtpItem.getCallId());
+ redisRpcService.stopWaitePushStreamOnline(sendRtpItem);
+ try {
+ responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg());
+ } catch (SipException | InvalidArgumentException | ParseException e) {
+ logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鐐规挱鍥炲: {}", e.getMessage());
+ }
+ }
});
}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
index 32bf76a..2a45ad8 100755
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -10,19 +10,22 @@
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
-import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
+import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
-import com.genersoft.iot.vmp.media.zlm.dto.*;
+import com.genersoft.iot.vmp.media.zlm.dto.HookType;
+import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
+import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
+import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.*;
import com.genersoft.iot.vmp.service.*;
-import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
+import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
@@ -77,6 +80,10 @@
@Autowired
private IRedisCatchStorage redisCatchStorage;
+
+
+ @Autowired
+ private IRedisRpcService redisRpcService;
@Autowired
private IInviteStreamService inviteStreamService;
@@ -511,41 +518,33 @@
}
if (sendRtpItem.getApp().equals(param.getApp())) {
- logger.info(sendRtpItem.toString());
- if (userSetting.getServerId().equals(sendRtpItem.getServerId())) {
- MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
- sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
- sendRtpItem.getPlatformId(), null, userSetting.getServerId(), param.getMediaServerId());
- // 閫氱煡鍏朵粬wvp鍋滄鍙戞祦
- redisCatchStorage.sendPushStreamClose(messageForPushChannel);
- }else {
- String platformId = sendRtpItem.getPlatformId();
- ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
- Device device = deviceService.getDevice(platformId);
-
- try {
- if (platform != null) {
- commanderFroPlatform.streamByeCmd(platform, sendRtpItem);
- redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(),
- sendRtpItem.getCallId(), sendRtpItem.getStream());
- } else {
- cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId());
- if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST)
- || sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) {
- AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
- if (audioBroadcastCatch != null) {
- // 鏉ヨ嚜涓婄骇骞冲彴鐨勫仠姝㈠璁�
- logger.info("[鍋滄瀵硅] 鏉ヨ嚜涓婄骇锛屽钩鍙帮細{}, 閫氶亾锛歿}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
- audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
- }
+ ParentPlatform platform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
+ Device device = deviceService.getDevice(sendRtpItem.getPlatformId());
+ try {
+ if (platform != null) {
+ commanderFroPlatform.streamByeCmd(platform, sendRtpItem);
+ redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),
+ sendRtpItem.getCallId(), sendRtpItem.getStream());
+ redisCatchStorage.sendPlatformStopPlayMsg(sendRtpItem, platform);
+ } else if (device != null) {
+ cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId());
+ if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST)
+ || sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) {
+ AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+ if (audioBroadcastCatch != null) {
+ // 鏉ヨ嚜涓婄骇骞冲彴鐨勫仠姝㈠璁�
+ logger.info("[鍋滄瀵硅] 鏉ヨ嚜涓婄骇锛屽钩鍙帮細{}, 閫氶亾锛歿}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+ audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
}
}
- } catch (SipException | InvalidArgumentException | ParseException |
- SsrcTransactionNotFoundException e) {
- logger.error("[鍛戒护鍙戦�佸け璐 鍙戦�丅YE: {}", e.getMessage());
+ }else {
+ // 閫氱煡鍏朵粬wvp鍋滄鍙戞祦
+ redisRpcService.rtpSendStopped(sendRtpItem);
}
+ } catch (SipException | InvalidArgumentException | ParseException |
+ SsrcTransactionNotFoundException e) {
+ logger.error("[鍛戒护鍙戦�佸け璐 鍙戦�丅YE: {}", e.getMessage());
}
-
}
}
}
@@ -593,11 +592,7 @@
redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(),
sendRtpItem.getCallId(), sendRtpItem.getStream());
if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) {
- MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
- sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
- sendRtpItem.getPlatformId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
- messageForPushChannel.setPlatFormIndex(parentPlatform.getId());
- redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
+ redisCatchStorage.sendPlatformStopPlayMsg(sendRtpItem, parentPlatform);
}
}
}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java
index ec24abe..b503fab 100755
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java
@@ -372,6 +372,6 @@
param.put("app", sendRtpItem.getApp());
param.put("stream", sendRtpItem.getStream());
param.put("ssrc", sendRtpItem.getSsrc());
- return zlmresTfulUtils.startSendRtp(mediaServerItem, param);
+ return zlmresTfulUtils.stopSendRtp(mediaServerItem, param);
}
}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java
index a601ae9..8d1b7f0 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java
@@ -13,4 +13,9 @@
void waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback<SendRtpItem> callback);
WVPResult stopSendRtp(SendRtpItem sendRtpItem);
+
+ void stopWaitePushStreamOnline(SendRtpItem sendRtpItem);
+
+ void rtpSendStopped(SendRtpItem sendRtpItem);
+
}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java
new file mode 100644
index 0000000..c90771b
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java
@@ -0,0 +1,76 @@
+package com.genersoft.iot.vmp.service.redisMsg;
+
+import com.alibaba.fastjson2.JSON;
+import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.stereotype.Component;
+import org.springframework.util.ObjectUtils;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * 鎺ユ敹redis杩斿洖鐨勬帹娴佺粨鏋�
+ * @author lin
+ */
+@Component
+public class RedisPushStreamResponseListener implements MessageListener {
+
+ private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamResponseListener.class);
+
+ private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
+
+ @Qualifier("taskExecutor")
+ @Autowired
+ private ThreadPoolTaskExecutor taskExecutor;
+
+
+ private Map<String, PushStreamResponseEvent> responseEvents = new ConcurrentHashMap<>();
+
+ public interface PushStreamResponseEvent{
+ void run(MessageForPushChannelResponse response);
+ }
+
+ @Override
+ public void onMessage(Message message, byte[] bytes) {
+ logger.info("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉]锛� {}", new String(message.getBody()));
+ boolean isEmpty = taskQueue.isEmpty();
+ taskQueue.offer(message);
+ if (isEmpty) {
+ taskExecutor.execute(() -> {
+ while (!taskQueue.isEmpty()) {
+ Message msg = taskQueue.poll();
+ try {
+ MessageForPushChannelResponse response = JSON.parseObject(new String(msg.getBody()), MessageForPushChannelResponse.class);
+ if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){
+ logger.info("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉]锛氬弬鏁颁笉鍏�");
+ continue;
+ }
+ // 鏌ョ湅姝e湪绛夊緟鐨刬nvite娑堟伅
+ if (responseEvents.get(response.getApp() + response.getStream()) != null) {
+ responseEvents.get(response.getApp() + response.getStream()).run(response);
+ }
+ }catch (Exception e) {
+ logger.warn("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉] 鍙戠幇鏈鐞嗙殑寮傚父, \r\n{}", JSON.toJSONString(message));
+ logger.error("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉] 寮傚父鍐呭锛� ", e);
+ }
+ }
+ });
+ }
+ }
+
+ public void addEvent(String app, String stream, PushStreamResponseEvent callback) {
+ responseEvents.put(app + stream, callback);
+ }
+
+ public void removeEvent(String app, String stream) {
+ responseEvents.remove(app + stream);
+ }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java
index 7a81eab..682f564 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java
@@ -7,8 +7,10 @@
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
+import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
+import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
@@ -18,12 +20,17 @@
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
+import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
+
+import javax.sip.InvalidArgumentException;
+import javax.sip.SipException;
+import java.text.ParseException;
/**
* 鍏朵粬wvp鍙戣捣鐨剅pc璋冪敤锛岃繖閲岀殑鏂规硶琚� RedisRpcConfig 閫氳繃鍙嶅皠瀵绘壘瀵瑰簲鐨勬柟娉曞悕绉拌皟鐢�
@@ -57,6 +64,14 @@
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
+
+
+ @Autowired
+ private ISIPCommanderForPlatform commanderFroPlatform;
+
+
+ @Autowired
+ private IVideoManagerStorage storager;
/**
@@ -133,6 +148,20 @@
return null;
}
+ /**
+ * 鍋滄鐩戝惉娴佷笂绾�
+ */
+ public RedisRpcResponse stopWaitePushStreamOnline(RedisRpcRequest request) {
+ SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class);
+ logger.info("[redis-rpc] 鍋滄鐩戝惉娴佷笂绾匡細 {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() );
+
+ // 鐩戝惉娴佷笂绾裤�� 娴佷笂绾跨洿鎺ュ彂閫乻endRtpItem娑堟伅缁欏疄闄呯殑淇′护澶勭悊鑰�
+ HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed(
+ sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null);
+ hookSubscribe.removeSubscribe(hook);
+ return null;
+ }
+
/**
* 寮�濮嬪彂娴�
@@ -194,6 +223,34 @@
return response;
}
+ /**
+ * 鍏朵粬wvp閫氱煡鎺ㄦ祦宸茬粡鍋滄浜�
+ */
+ public RedisRpcResponse rtpSendStopped(RedisRpcRequest request) {
+ SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class);
+ logger.info("[redis-rpc] 鎺ㄦ祦宸茬粡鍋滄锛� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() );
+ SendRtpItem sendRtpItemInCatch = redisCatchStorage.querySendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getStream(), sendRtpItem.getCallId());
+ RedisRpcResponse response = request.getResponse();
+ response.setStatusCode(200);
+ if (sendRtpItemInCatch == null) {
+ return response;
+ }
+ String platformId = sendRtpItem.getPlatformId();
+ ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
+ if (platform == null) {
+ return response;
+ }
+ try {
+ commanderFroPlatform.streamByeCmd(platform, sendRtpItem);
+ redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(),
+ sendRtpItem.getCallId(), sendRtpItem.getStream());
+ redisCatchStorage.sendPlatformStopPlayMsg(sendRtpItem, platform);
+ } catch (SipException | InvalidArgumentException | ParseException e) {
+ logger.error("[鍛戒护鍙戦�佸け璐 鍙戦�丅YE: {}", e.getMessage());
+ }
+ return response;
+ }
+
private void sendResponse(RedisRpcResponse response){
response.setToId(userSetting.getServerId());
RedisRpcMessage message = new RedisRpcMessage();
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java
index e9a00a9..f11b9aa 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java
@@ -97,4 +97,21 @@
});
}
+
+ @Override
+ public void stopWaitePushStreamOnline(SendRtpItem sendRtpItem) {
+ HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed(
+ sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null);
+ hookSubscribe.removeSubscribe(hook);
+ RedisRpcRequest request = buildRequest("stopWaitePushStreamOnline", sendRtpItem);
+ request.setToId(sendRtpItem.getServerId());
+ redisRpcConfig.request(request, 10);
+ }
+
+ @Override
+ public void rtpSendStopped(SendRtpItem sendRtpItem) {
+ RedisRpcRequest request = buildRequest("rtpSendStopped", sendRtpItem);
+ request.setToId(sendRtpItem.getServerId());
+ redisRpcConfig.request(request, 10);
+ }
}
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
index 108bc17..0cf6c39 100755
--- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
@@ -2,10 +2,7 @@
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.SystemAllInfo;
-import com.genersoft.iot.vmp.gb28181.bean.AlarmChannelMessage;
-import com.genersoft.iot.vmp.gb28181.bean.Device;
-import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch;
-import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
+import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
@@ -207,7 +204,7 @@
void sendPlatformStartPlayMsg(MessageForPushChannel messageForPushChannel);
- void sendPlatformStopPlayMsg(MessageForPushChannel messageForPushChannel);
+ void sendPlatformStopPlayMsg(SendRtpItem sendRtpItem, ParentPlatform platform);
void addPushListItem(String app, String stream, OnStreamChangedHookParam param);
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
index 60ebfab..d1e8052 100755
--- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -5,10 +5,7 @@
import com.genersoft.iot.vmp.common.SystemAllInfo;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.UserSetting;
-import com.genersoft.iot.vmp.gb28181.bean.AlarmChannelMessage;
-import com.genersoft.iot.vmp.gb28181.bean.Device;
-import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch;
-import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
+import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
@@ -644,9 +641,15 @@
}
@Override
- public void sendPlatformStopPlayMsg(MessageForPushChannel msg) {
+ public void sendPlatformStopPlayMsg(SendRtpItem sendRtpItem, ParentPlatform platform) {
+
+ MessageForPushChannel msg = MessageForPushChannel.getInstance(0,
+ sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
+ sendRtpItem.getPlatformId(), platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
+ msg.setPlatFormIndex(platform.getId());
+
String key = VideoManagerConstants.VM_MSG_STREAM_STOP_PLAY_NOTIFY;
- logger.info("[redis鍙戦�侀�氱煡] 鍙戦�� 涓婄骇骞冲彴鍋滄瑙傜湅 {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId());
+ logger.info("[redis鍙戦�侀�氱煡] 鍙戦�� 涓婄骇骞冲彴鍋滄瑙傜湅 {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), platform.getServerGBId());
redisTemplate.convertAndSend(key, JSON.toJSON(msg));
}
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
index 3f47844..69f947e 100644
--- a/src/main/resources/application.yml
+++ b/src/main/resources/application.yml
@@ -2,4 +2,4 @@
application:
name: wvp
profiles:
- active: local
\ No newline at end of file
+ active: local2
\ No newline at end of file
--
Gitblit v1.8.0