From f9abfca003bc9515f1f6028657fa6347326a1402 Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期一, 15 四月 2024 21:33:22 +0800
Subject: [PATCH] 临时提交
---
src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java | 12
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java | 386 +++++++++-----------
/dev/null | 474 ------------------------
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java | 28 +
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java | 12
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java | 3
src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java | 1
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformWaitPushStreamOnlineListener.java | 81 ++++
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ChannelOnlineEvent.java | 4
src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java | 2
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java | 81 ++++
11 files changed, 395 insertions(+), 689 deletions(-)
diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
index d19b8f0..af574b9 100644
--- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
+++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
@@ -72,6 +72,8 @@
public static final String REGISTER_EXPIRE_TASK_KEY_PREFIX = "VMP_device_register_expire_";
public static final String PUSH_STREAM_LIST = "VMP_PUSH_STREAM_LIST_";
+ public static final String WAITE_SEND_PUSH_STREAM = "VMP_WAITE_SEND_PUSH_STREAM:";
+ public static final String START_SEND_PUSH_STREAM = "VMP_START_SEND_PUSH_STREAM:";
diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java
index c14ebcd..fb88f54 100644
--- a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java
+++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java
@@ -32,9 +32,6 @@
private RedisStreamMsgListener redisStreamMsgListener;
@Autowired
- private RedisGbPlayMsgListener redisGbPlayMsgListener;
-
- @Autowired
private RedisPushStreamStatusMsgListener redisPushStreamStatusMsgListener;
@Autowired
@@ -48,6 +45,12 @@
@Autowired
private RedisPushStreamCloseResponseListener redisPushStreamCloseResponseListener;
+
+ @Autowired
+ private RedisPlatformWaitPushStreamOnlineListener redisPlatformWaitPushStreamOnlineListener;
+
+ @Autowired
+ private RedisPlatformStartSendRtpListener redisPlatformStartSendRtpListener;
/**
@@ -65,12 +68,13 @@
container.addMessageListener(redisGPSMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_GPS));
container.addMessageListener(redisAlarmMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_SUBSCRIBE_ALARM_RECEIVE));
container.addMessageListener(redisStreamMsgListener, new PatternTopic(VideoManagerConstants.WVP_MSG_STREAM_CHANGE_PREFIX + "PUSH"));
- container.addMessageListener(redisGbPlayMsgListener, new PatternTopic(RedisGbPlayMsgListener.WVP_PUSH_STREAM_KEY));
container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE));
container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE));
container.addMessageListener(redisPushStreamResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE));
container.addMessageListener(redisCloseStreamMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE));
container.addMessageListener(redisPushStreamCloseResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED));
+ container.addMessageListener(redisPlatformWaitPushStreamOnlineListener, new PatternTopic(VideoManagerConstants.WAITE_SEND_PUSH_STREAM));
+ container.addMessageListener(redisPlatformStartSendRtpListener, new PatternTopic(VideoManagerConstants.START_SEND_PUSH_STREAM));
return container;
}
}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java
index 30193d2..c0507df 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java
@@ -22,6 +22,11 @@
*/
private String platformId;
+ /**
+ * 骞冲彴鍚嶇О
+ */
+ private String platformName;
+
/**
* 瀵瑰簲璁惧id
*/
@@ -60,6 +65,11 @@
* 鏄惁涓簍cp涓诲姩妯″紡
*/
private boolean tcpActive;
+
+ /**
+ * 鑷繁鎺ㄦ祦浣跨敤鐨処P
+ */
+ private String localIp;
/**
* 鑷繁鎺ㄦ祦浣跨敤鐨勭鍙�
@@ -306,6 +316,22 @@
this.receiveStream = receiveStream;
}
+ public String getPlatformName() {
+ return platformName;
+ }
+
+ public void setPlatformName(String platformName) {
+ this.platformName = platformName;
+ }
+
+ public String getLocalIp() {
+ return localIp;
+ }
+
+ public void setLocalIp(String localIp) {
+ this.localIp = localIp;
+ }
+
@Override
public String toString() {
return "SendRtpItem{" +
@@ -313,6 +339,7 @@
", port=" + port +
", ssrc='" + ssrc + '\'' +
", platformId='" + platformId + '\'' +
+ ", platformName='" + platformName + '\'' +
", deviceId='" + deviceId + '\'' +
", app='" + app + '\'' +
", channelId='" + channelId + '\'' +
@@ -320,6 +347,7 @@
", stream='" + stream + '\'' +
", tcp=" + tcp +
", tcpActive=" + tcpActive +
+ ", localIp=" + localIp +
", localPort=" + localPort +
", mediaServerId='" + mediaServerId + '\'' +
", serverId='" + serverId + '\'' +
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
index 96b8b11..3539922 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -18,6 +18,7 @@
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
+import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
@@ -28,7 +29,6 @@
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
-import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
import com.genersoft.iot.vmp.service.redisMsg.RedisPushStreamResponseListener;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
@@ -127,12 +127,11 @@
@Autowired
private SipConfig config;
-
- @Autowired
- private RedisGbPlayMsgListener redisGbPlayMsgListener;
-
@Autowired
private VideoStreamSessionManager streamSession;
+
+ @Autowired
+ private SendRtpPortManager sendRtpPortManager;
@Override
@@ -577,21 +576,40 @@
}else {
ssrc = gb28181Sdp.getSsrc();
}
+ SendRtpItem sendRtpItem = new SendRtpItem();
+ sendRtpItem.setTcpActive(tcpActive);
+ sendRtpItem.setTcp(mediaTransmissionTCP);
+ sendRtpItem.setRtcp(platform.isRtcp());
+ sendRtpItem.setSsrc(ssrc);
+ sendRtpItem.setPlatformName(platform.getName());
+ sendRtpItem.setPlatformId(platform.getServerGBId());
+ sendRtpItem.setMediaServerId(mediaServerItem.getId());
+ sendRtpItem.setChannelId(channelId);
+ sendRtpItem.setIp(addressStr);
+ sendRtpItem.setPort(port);
+ sendRtpItem.setUsePs(true);
+ sendRtpItem.setApp(gbStream.getApp());
+ sendRtpItem.setStream(gbStream.getStream());
+ sendRtpItem.setCallId(callIdHeader.getCallId());
+ sendRtpItem.setFromTag(request.getFromTag());
+ sendRtpItem.setOnlyAudio(false);
+ sendRtpItem.setPlayType(InviteStreamType.PUSH);
+ sendRtpItem.setStatus(0);
if ("push".equals(gbStream.getStreamType())) {
if (streamPushItem != null) {
// 浠巖edis鏌ヨ鏄惁姝e湪鎺ユ敹杩欎釜鎺ㄦ祦
OnStreamChangedHookParam pushListItem = redisCatchStorage.getPushListItem(gbStream.getApp(), gbStream.getStream());
+
+ sendRtpItem.setServerId(pushListItem.getSeverId());
if (pushListItem != null) {
StreamPushItem transform = streamPushService.transform(pushListItem);
transform.setSelf(userSetting.getServerId().equals(pushListItem.getSeverId()));
// 鎺ㄦ祦鐘舵��
- pushStream(evt, request, gbStream, transform, platform, callIdHeader, mediaServerItem, port, tcpActive,
- mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+ pushStream(sendRtpItem, mediaServerItem, platform, request);
}else {
// 鏈帹娴� 鎷夎捣
- notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
- mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+ notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request);
}
}
} else if ("proxy".equals(gbStream.getStreamType())) {
@@ -601,8 +619,7 @@
mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
} else {
//寮�鍚唬鐞嗘媺娴�
- notifyStreamOnline(evt, request, gbStream, null, platform, callIdHeader, mediaServerItem, port, tcpActive,
- mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+ notifyProxyStreamOnline(sendRtpItem, mediaServerItem, platform, request);
}
}
@@ -659,8 +676,9 @@
sendRtpItem.setStatus(1);
sendRtpItem.setCallId(callIdHeader.getCallId());
sendRtpItem.setFromTag(request.getFromTag());
+ sendRtpItem.setLocalIp(mediaServerItem.getSdpIp());
- SIPResponse response = sendStreamAck(mediaServerItem, request, sendRtpItem, platform, evt);
+ SIPResponse response = sendStreamAck(request, sendRtpItem, platform);
if (response != null) {
sendRtpItem.setToTag(response.getToTag());
}
@@ -670,19 +688,14 @@
}
- private void pushStream(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
- CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
- int port, Boolean tcpActive, boolean mediaTransmissionTCP,
- String channelId, String addressStr, String ssrc, String requesterId) {
+ private void pushStream(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) {
// 鎺ㄦ祦
- if (streamPushItem.isSelf()) {
- Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
+ if (sendRtpItem.getServerId().equals(userSetting.getServerId())) {
+ Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
if (streamReady != null && streamReady) {
// 鑷钩鍙板唴瀹�
- SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
- gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
-
- if (sendRtpItem == null) {
+ int localPort = sendRtpPortManager.getNextPort(mediaServerItem);
+ if (localPort == 0) {
logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�");
try {
responseAck(request, Response.BUSY_HERE);
@@ -691,16 +704,11 @@
}
return;
}
- if (tcpActive != null) {
- sendRtpItem.setTcpActive(tcpActive);
- }
- sendRtpItem.setPlayType(InviteStreamType.PUSH);
// 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶�
sendRtpItem.setStatus(1);
- sendRtpItem.setCallId(callIdHeader.getCallId());
-
sendRtpItem.setFromTag(request.getFromTag());
- SIPResponse response = sendStreamAck(mediaServerItem, request, sendRtpItem, platform, evt);
+ sendRtpItem.setLocalIp(mediaServerItem.getSdpIp());
+ SIPResponse response = sendStreamAck(request, sendRtpItem, platform);
if (response != null) {
sendRtpItem.setToTag(response.getToTag());
}
@@ -708,210 +716,168 @@
} else {
// 涓嶅湪绾� 鎷夎捣
- notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
- mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+ notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request);
}
-
} else {
+ SendRtpItem sendRtpItem = new SendRtpItem();
+ sendRtpItem.setRtcp(platform.isRtcp());
+ sendRtpItem.setTcp(mediaTransmissionTCP);
+ sendRtpItem.setTcpActive();
// 鍏朵粬骞冲彴鍐呭
- otherWvpPushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
- mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+ otherWvpPushStream(sendRtpItem, request, platform);
}
}
/**
* 閫氱煡娴佷笂绾�
*/
- private void notifyStreamOnline(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
- CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
- int port, Boolean tcpActive, boolean mediaTransmissionTCP,
- String channelId, String addressStr, String ssrc, String requesterId) {
- if ("proxy".equals(gbStream.getStreamType())) {
- // TODO 鎺у埗鍚敤浠ヤ娇璁惧涓婄嚎
- logger.info("[ app={}, stream={} ]閫氶亾鏈帹娴侊紝鍚敤娴佸悗寮�濮嬫帹娴�", gbStream.getApp(), gbStream.getStream());
- // 鐩戝惉娴佷笂绾�
- HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(gbStream.getApp(), gbStream.getStream(), true, "rtsp", mediaServerItem.getId());
- zlmHttpHookSubscribe.addSubscribe(hookSubscribe, (mediaServerItemInUSe, hookParam) -> {
- OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam;
- logger.info("[涓婄骇鐐规挱]鎷夋祦浠g悊宸茬粡灏辩华锛� {}/{}", streamChangedHookParam.getApp(), streamChangedHookParam.getStream());
- dynamicTask.stop(callIdHeader.getCallId());
- pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive,
- mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
- });
- dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
- logger.info("[ app={}, stream={} ] 绛夊緟鎷夋祦浠g悊娴佽秴鏃�", gbStream.getApp(), gbStream.getStream());
- zlmHttpHookSubscribe.removeSubscribe(hookSubscribe);
- }, userSetting.getPlatformPlayTimeout());
- boolean start = streamProxyService.start(gbStream.getApp(), gbStream.getStream());
- if (!start) {
- try {
- responseAck(request, Response.BUSY_HERE, "channel [" + gbStream.getGbId() + "] offline");
- } catch (SipException | InvalidArgumentException | ParseException e) {
- logger.error("[鍛戒护鍙戦�佸け璐 invite 閫氶亾鏈帹娴�: {}", e.getMessage());
- }
- zlmHttpHookSubscribe.removeSubscribe(hookSubscribe);
- dynamicTask.stop(callIdHeader.getCallId());
+ private void notifyProxyStreamOnline(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) {
+ // TODO 鎺у埗鍚敤浠ヤ娇璁惧涓婄嚎
+ logger.info("[ app={}, stream={} ]閫氶亾鏈帹娴侊紝鍚敤娴佸悗寮�濮嬫帹娴�", gbStream.getApp(), gbStream.getStream());
+ // 鐩戝惉娴佷笂绾�
+ HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(gbStream.getApp(), gbStream.getStream(), true, "rtsp", mediaServerItem.getId());
+ zlmHttpHookSubscribe.addSubscribe(hookSubscribe, (mediaServerItemInUSe, hookParam) -> {
+ OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam;
+ logger.info("[涓婄骇鐐规挱]鎷夋祦浠g悊宸茬粡灏辩华锛� {}/{}", streamChangedHookParam.getApp(), streamChangedHookParam.getStream());
+ dynamicTask.stop(callIdHeader.getCallId());
+ pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive,
+ mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+ });
+ dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
+ logger.info("[ app={}, stream={} ] 绛夊緟鎷夋祦浠g悊娴佽秴鏃�", gbStream.getApp(), gbStream.getStream());
+ zlmHttpHookSubscribe.removeSubscribe(hookSubscribe);
+ }, userSetting.getPlatformPlayTimeout());
+ boolean start = streamProxyService.start(gbStream.getApp(), gbStream.getStream());
+ if (!start) {
+ try {
+ responseAck(request, Response.BUSY_HERE, "channel [" + gbStream.getGbId() + "] offline");
+ } catch (SipException | InvalidArgumentException | ParseException e) {
+ logger.error("[鍛戒护鍙戦�佸け璐 invite 閫氶亾鏈帹娴�: {}", e.getMessage());
}
- } else if ("push".equals(gbStream.getStreamType())) {
- if (!platform.isStartOfflinePush()) {
- // 骞冲彴璁剧疆涓叧闂簡鎷夎捣绂荤嚎鐨勬帹娴佸垯鐩存帴鍥炲
- try {
- logger.info("[涓婄骇鐐规挱] 澶辫触锛屾帹娴佽澶囨湭鎺ㄦ祦锛宑hannel: {}, app: {}, stream: {}", gbStream.getGbId(), gbStream.getApp(), gbStream.getStream());
- responseAck(request, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing");
- } catch (SipException | InvalidArgumentException | ParseException e) {
- logger.error("[鍛戒护鍙戦�佸け璐 invite 閫氶亾鏈帹娴�: {}", e.getMessage());
- }
- return;
- }
- // 鍙戦�乺edis娑堟伅浠ヤ娇璁惧涓婄嚎
- logger.info("[ app={}, stream={} ]閫氶亾鏈帹娴侊紝鍙戦�乺edis淇℃伅鎺у埗璁惧寮�濮嬫帹娴�", gbStream.getApp(), gbStream.getStream());
-
- MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1,
- gbStream.getApp(), gbStream.getStream(), gbStream.getGbId(), gbStream.getPlatformId(),
- platform.getName(), null, gbStream.getMediaServerId());
- redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
- // 璁剧疆瓒呮椂
- dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
- logger.info("[ app={}, stream={} ] 绛夊緟璁惧寮�濮嬫帹娴佽秴鏃�", gbStream.getApp(), gbStream.getStream());
- try {
- redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream());
- mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream());
- responseAck(request, Response.REQUEST_TIMEOUT); // 瓒呮椂
- } catch (SipException | InvalidArgumentException | ParseException e) {
- logger.error("鏈鐞嗙殑寮傚父 ", e);
- }
- }, userSetting.getPlatformPlayTimeout());
- // 娣诲姞鐩戝惉
- int finalPort = port;
- Boolean finalTcpActive = tcpActive;
-
- // 娣诲姞鍦ㄦ湰鏈轰笂绾跨殑閫氱煡
- mediaListManager.addChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream(), (app, stream, serverId) -> {
- dynamicTask.stop(callIdHeader.getCallId());
- redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream());
- if (serverId.equals(userSetting.getServerId())) {
- SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId,
- app, stream, channelId, mediaTransmissionTCP, platform.isRtcp());
-
- if (sendRtpItem == null) {
- logger.warn("涓婄骇鐐规椂鍒涘缓sendRTPItem澶辫触锛屽彲鑳芥槸鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�");
- try {
- responseAck(request, Response.BUSY_HERE);
- } catch (SipException e) {
- logger.error("鏈鐞嗙殑寮傚父 ", e);
- } catch (InvalidArgumentException e) {
- logger.error("鏈鐞嗙殑寮傚父 ", e);
- } catch (ParseException e) {
- logger.error("鏈鐞嗙殑寮傚父 ", e);
- }
- return;
- }
- if (finalTcpActive != null) {
- sendRtpItem.setTcpActive(finalTcpActive);
- }
- sendRtpItem.setPlayType(InviteStreamType.PUSH);
- // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶�
- sendRtpItem.setStatus(1);
- sendRtpItem.setCallId(callIdHeader.getCallId());
-
- sendRtpItem.setFromTag(request.getFromTag());
- SIPResponse response = sendStreamAck(mediaServerItem, request, sendRtpItem, platform, evt);
- if (response != null) {
- sendRtpItem.setToTag(response.getToTag());
- }
- redisCatchStorage.updateSendRTPSever(sendRtpItem);
- } else {
- // 鍏朵粬骞冲彴鍐呭
- otherWvpPushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
- mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
- }
- });
-
- // 娣诲姞鍥炲鐨勬嫆缁濇垨鑰呴敊璇殑閫氱煡
- redisPushStreamResponseListener.addEvent(gbStream.getApp(), gbStream.getStream(), response -> {
- if (response.getCode() != 0) {
- dynamicTask.stop(callIdHeader.getCallId());
- mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream());
- try {
- responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg());
- } catch (SipException | InvalidArgumentException | ParseException e) {
- logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鐐规挱鍥炲: {}", e.getMessage());
- }
- }
- });
+ zlmHttpHookSubscribe.removeSubscribe(hookSubscribe);
+ dynamicTask.stop(callIdHeader.getCallId());
}
}
/**
- * 鏉ヨ嚜鍏朵粬wvp鐨勬帹娴�
+ * 閫氱煡娴佷笂绾�
*/
- private void otherWvpPushStream(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
- CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
- int port, Boolean tcpActive, boolean mediaTransmissionTCP,
- String channelId, String addressStr, String ssrc, String requesterId) {
- logger.info("[绾ц仈鐐规挱]鐩存挱娴佹潵鑷叾浠栧钩鍙帮紝鍙戦�乺edis娑堟伅");
- // 鍙戦�乺edis娑堟伅
- redisGbPlayMsgListener.sendMsg(streamPushItem.getServerId(), streamPushItem.getMediaServerId(),
- streamPushItem.getApp(), streamPushItem.getStream(), addressStr, port, ssrc, requesterId,
- channelId, mediaTransmissionTCP, platform.isRtcp(),platform.getName(), responseSendItemMsg -> {
- SendRtpItem sendRtpItem = responseSendItemMsg.getSendRtpItem();
- if (sendRtpItem == null || responseSendItemMsg.getMediaServerItem() == null) {
- logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�");
- try {
- responseAck(request, Response.BUSY_HERE);
- } catch (SipException e) {
- logger.error("鏈鐞嗙殑寮傚父 ", e);
- } catch (InvalidArgumentException e) {
- logger.error("鏈鐞嗙殑寮傚父 ", e);
- } catch (ParseException e) {
- logger.error("鏈鐞嗙殑寮傚父 ", e);
- }
- return;
- }
- // 鏀跺埌sendItem
- if (tcpActive != null) {
- sendRtpItem.setTcpActive(tcpActive);
- }
- sendRtpItem.setPlayType(InviteStreamType.PUSH);
- // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶�
- sendRtpItem.setStatus(1);
- sendRtpItem.setCallId(callIdHeader.getCallId());
+ private void notifyPushStreamOnline(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) {
+ if (!platform.isStartOfflinePush()) {
+ // 骞冲彴璁剧疆涓叧闂簡鎷夎捣绂荤嚎鐨勬帹娴佸垯鐩存帴鍥炲
+ try {
+ logger.info("[涓婄骇鐐规挱] 澶辫触锛屾帹娴佽澶囨湭鎺ㄦ祦锛宑hannel: {}, app: {}, stream: {}", gbStream.getGbId(), gbStream.getApp(), gbStream.getStream());
+ responseAck(request, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing");
+ } catch (SipException | InvalidArgumentException | ParseException e) {
+ logger.error("[鍛戒护鍙戦�佸け璐 invite 閫氶亾鏈帹娴�: {}", e.getMessage());
+ }
+ return;
+ }
+ // 鍙戦�乺edis娑堟伅浠ヤ娇璁惧涓婄嚎
+ logger.info("[ app={}, stream={} ]閫氶亾鏈帹娴侊紝鍙戦�乺edis淇℃伅鎺у埗璁惧寮�濮嬫帹娴�", gbStream.getApp(), gbStream.getStream());
- sendRtpItem.setFromTag(request.getFromTag());
- SIPResponse response = sendStreamAck(responseSendItemMsg.getMediaServerItem(), request, sendRtpItem, platform, evt);
- if (response != null) {
- sendRtpItem.setToTag(response.getToTag());
- }
- redisCatchStorage.updateSendRTPSever(sendRtpItem);
- }, (wvpResult) -> {
+ MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1,
+ gbStream.getApp(), gbStream.getStream(), gbStream.getGbId(), gbStream.getPlatformId(),
+ platform.getName(), userSetting.getServerId(), gbStream.getMediaServerId());
+ redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
+ // 璁剧疆瓒呮椂
+ dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
+ logger.info("[ app={}, stream={} ] 绛夊緟璁惧寮�濮嬫帹娴佽秴鏃�", gbStream.getApp(), gbStream.getStream());
+ try {
+ redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream());
+ mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream());
+ responseAck(request, Response.REQUEST_TIMEOUT); // 瓒呮椂
+ } catch (SipException | InvalidArgumentException | ParseException e) {
+ logger.error("鏈鐞嗙殑寮傚父 ", e);
+ }
+ }, userSetting.getPlatformPlayTimeout());
+ // 鍐欏叆redis寰呭彂娴佷俊鎭紝渚涘叾浠杦vp璇诲彇骞剁敓鎴愬彂娴佷俊鎭�
+ SendRtpItem sendRtpItemTemp = new SendRtpItem();
+ sendRtpItemTemp.setIp(addressStr);
+ sendRtpItemTemp.setPort(port);
+ sendRtpItemTemp.setSsrc(ssrc);
+ sendRtpItemTemp.setPlatformId(requesterId);
+ sendRtpItemTemp.setPlatformName(platform.getName());
+ sendRtpItemTemp.setTcp(mediaTransmissionTCP);
+ sendRtpItemTemp.setRtcp(platform.isRtcp());
+ sendRtpItemTemp.setTcpActive(tcpActive);
+ sendRtpItemTemp.setPlayType(InviteStreamType.PUSH);
+ redisCatchStorage.addWaiteSendRtpItem(sendRtpItemTemp, userSetting.getPlatformPlayTimeout());
+ // 娣诲姞涓婄嚎鐨勯�氱煡
+ mediaListManager.addChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream(), (sendRtpItemFromRedis) -> {
+ dynamicTask.stop(callIdHeader.getCallId());
+ redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream());
+ if (sendRtpItemFromRedis.getServerId().equals(userSetting.getServerId())) {
- // 閿欒
- if (wvpResult.getCode() == RedisGbPlayMsgListener.ERROR_CODE_OFFLINE) {
- // 绂荤嚎
- // 鏌ヨ鏄惁鍦ㄦ湰鏈轰笂绾夸簡
- StreamPushItem currentStreamPushItem = streamPushService.getPush(streamPushItem.getApp(), streamPushItem.getStream());
- if (currentStreamPushItem.isPushIng()) {
- // 鍦ㄧ嚎鐘舵��
- pushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
- mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
-
- } else {
- // 涓嶅湪绾� 鎷夎捣
- notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
- mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
- }
- }
+ int localPort = sendRtpPortManager.getNextPort(mediaServerItem);
+ if (localPort == 0) {
+ logger.warn("涓婄骇鐐规椂鍒涘缓sendRTPItem澶辫触锛屽彲鑳芥槸鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�");
try {
responseAck(request, Response.BUSY_HERE);
- } catch (InvalidArgumentException | ParseException | SipException e) {
- logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鐐规挱鍥炲 BUSY_HERE: {}", e.getMessage());
+ } catch (SipException e) {
+ logger.error("鏈鐞嗙殑寮傚父 ", e);
+ } catch (InvalidArgumentException e) {
+ logger.error("鏈鐞嗙殑寮傚父 ", e);
+ } catch (ParseException e) {
+ logger.error("鏈鐞嗙殑寮傚父 ", e);
}
- });
+ return;
+ }
+ sendRtpItemTemp.setLocalPort(localPort);
+ sendRtpItemTemp.setLocalIp(ObjectUtils.isEmpty(platform.getSendStreamIp()): );
+ // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶�
+ sendRtpItemTemp.setStatus(1);
+ sendRtpItemTemp.setCallId(callIdHeader.getCallId());
+
+ sendRtpItemTemp.setFromTag(request.getFromTag());
+ SIPResponse response = sendStreamAck(request, sendRtpItemTemp, platform);
+ if (response != null) {
+ sendRtpItemTemp.setToTag(response.getToTag());
+ }
+ redisCatchStorage.updateSendRTPSever(sendRtpItemTemp);
+ } else {
+ // 鍏朵粬骞冲彴鍐呭
+ otherWvpPushStream(sendRtpItemFromRedis, request, platform);
+ }
+ });
+
+ // 娣诲姞鍥炲鐨勬嫆缁濇垨鑰呴敊璇殑閫氱煡
+ redisPushStreamResponseListener.addEvent(gbStream.getApp(), gbStream.getStream(), response -> {
+ if (response.getCode() != 0) {
+ dynamicTask.stop(callIdHeader.getCallId());
+ mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream());
+ try {
+ responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg());
+ } catch (SipException | InvalidArgumentException | ParseException e) {
+ logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鐐规挱鍥炲: {}", e.getMessage());
+ }
+ }
+ });
}
- public SIPResponse sendStreamAck(MediaServerItem mediaServerItem, SIPRequest request, SendRtpItem sendRtpItem, ParentPlatform platform, RequestEvent evt) {
- String sdpIp = mediaServerItem.getSdpIp();
+
+ /**
+ * 鏉ヨ嚜鍏朵粬wvp鐨勬帹娴�
+ */
+ private void otherWvpPushStream(SendRtpItem sendRtpItem, SIPRequest request, ParentPlatform platform) {
+ logger.info("[绾ц仈鐐规挱]鐩存挱娴佹潵鑷叾浠栧钩鍙帮紝鍙戦�乺edis娑堟伅");
+ // 鍙戦�乺edis娑堟伅
+ redisCatchStorage.sendStartSendRtp(sendRtpItem);
+ // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶�
+ sendRtpItem.setStatus(1);
+ sendRtpItem.setCallId(request.getCallIdHeader().getCallId());
+ sendRtpItem.setFromTag(request.getFromTag());
+ SIPResponse response = sendStreamAck(request, sendRtpItem, platform);
+ if (response != null) {
+ sendRtpItem.setToTag(response.getToTag());
+ }
+ redisCatchStorage.updateSendRTPSever(sendRtpItem);
+ }
+
+ public SIPResponse sendStreamAck(SIPRequest request, SendRtpItem sendRtpItem, ParentPlatform platform) {
+
+ String sdpIp = sendRtpItem.getLocalIp();
if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) {
sdpIp = platform.getSendStreamIp();
}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ChannelOnlineEvent.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ChannelOnlineEvent.java
index 714838e..6b3c94f 100755
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ChannelOnlineEvent.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ChannelOnlineEvent.java
@@ -1,5 +1,7 @@
package com.genersoft.iot.vmp.media.zlm.dto;
+import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
+
import java.text.ParseException;
/**
@@ -7,5 +9,5 @@
*/
public interface ChannelOnlineEvent {
- void run(String app, String stream, String serverId) throws ParseException;
+ void run(SendRtpItem sendRtpItem) throws ParseException;
}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java b/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java
index 1a9e3e5..6a4f866 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java
@@ -61,6 +61,7 @@
messageForPushChannel.setGbId(gbId);
messageForPushChannel.setApp(app);
messageForPushChannel.setStream(stream);
+ messageForPushChannel.setServerId(serverId);
messageForPushChannel.setMediaServerId(mediaServerId);
messageForPushChannel.setPlatFormId(platFormId);
messageForPushChannel.setPlatFormName(platFormName);
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
deleted file mode 100755
index 3b990f0..0000000
--- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
+++ /dev/null
@@ -1,474 +0,0 @@
-package com.genersoft.iot.vmp.service.redisMsg;
-
-import com.alibaba.fastjson2.JSON;
-import com.alibaba.fastjson2.JSONObject;
-import com.genersoft.iot.vmp.common.VideoManagerConstants;
-import com.genersoft.iot.vmp.conf.DynamicTask;
-import com.genersoft.iot.vmp.conf.UserSetting;
-import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
-import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
-import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
-import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
-import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
-import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
-import com.genersoft.iot.vmp.service.IMediaServerService;
-import com.genersoft.iot.vmp.service.bean.*;
-import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
-import com.genersoft.iot.vmp.utils.redis.RedisUtil;
-import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.data.redis.connection.Message;
-import org.springframework.data.redis.connection.MessageListener;
-import org.springframework.data.redis.core.RedisTemplate;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-import org.springframework.stereotype.Component;
-
-import java.text.ParseException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-
-/**
- * 鐩戝惉涓嬬骇鍙戦�佹帹閫佷俊鎭紝骞跺彂閫佸浗鏍囨帹娴佹秷鎭笂绾�
- * @author lin
- */
-@Component
-public class RedisGbPlayMsgListener implements MessageListener {
-
- private final static Logger logger = LoggerFactory.getLogger(RedisGbPlayMsgListener.class);
-
- public static final String WVP_PUSH_STREAM_KEY = "WVP_PUSH_STREAM";
-
- /**
- * 娴佸獟浣撲笉瀛樺湪鐨勯敊璇帥
- */
- public static final int ERROR_CODE_MEDIA_SERVER_NOT_FOUND = -1;
-
- /**
- * 绂荤嚎鐨勯敊璇帥
- */
- public static final int ERROR_CODE_OFFLINE = -2;
-
- /**
- * 瓒呮椂鐨勯敊璇帥
- */
- public static final int ERROR_CODE_TIMEOUT = -3;
-
- private Map<String, PlayMsgCallback> callbacks = new ConcurrentHashMap<>();
- private Map<String, PlayMsgCallbackForStartSendRtpStream> callbacksForStartSendRtpStream = new ConcurrentHashMap<>();
- private Map<String, PlayMsgErrorCallback> callbacksForError = new ConcurrentHashMap<>();
-
- @Autowired
- private UserSetting userSetting;
-
-
- @Autowired
- private RedisTemplate<Object, Object> redisTemplate;
-
- @Autowired
- private ZLMServerFactory zlmServerFactory;
-
- @Autowired
- private IMediaServerService mediaServerService;
-
- @Autowired
- private IRedisCatchStorage redisCatchStorage;
-
-
- @Autowired
- private DynamicTask dynamicTask;
-
-
- @Autowired
- private ZlmHttpHookSubscribe subscribe;
-
- private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
-
- @Qualifier("taskExecutor")
- @Autowired
- private ThreadPoolTaskExecutor taskExecutor;
-
-
- public interface PlayMsgCallback{
- void handler(ResponseSendItemMsg responseSendItemMsg) throws ParseException;
- }
-
- public interface PlayMsgCallbackForStartSendRtpStream{
- void handler(JSONObject jsonObject);
- }
-
- public interface PlayMsgErrorCallback{
- void handler(WVPResult wvpResult);
- }
-
- @Override
- public void onMessage(Message message, byte[] bytes) {
- boolean isEmpty = taskQueue.isEmpty();
- taskQueue.offer(message);
- if (isEmpty) {
- taskExecutor.execute(() -> {
- while (!taskQueue.isEmpty()) {
- Message msg = taskQueue.poll();
- try {
- WvpRedisMsg wvpRedisMsg = JSON.parseObject(msg.getBody(), WvpRedisMsg.class);
- logger.info("[鏀跺埌REDIS閫氱煡] 娑堟伅锛� {}", JSON.toJSONString(wvpRedisMsg));
- if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) {
- continue;
- }
- if (WvpRedisMsg.isRequest(wvpRedisMsg)) {
- logger.info("[鏀跺埌REDIS閫氱煡] 璇锋眰锛� {}", new String(msg.getBody()));
-
- switch (wvpRedisMsg.getCmd()){
- case WvpRedisMsgCmd.GET_SEND_ITEM:
- RequestSendItemMsg content = JSON.parseObject(wvpRedisMsg.getContent(), RequestSendItemMsg.class);
- requestSendItemMsgHand(content, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
- break;
- case WvpRedisMsgCmd.REQUEST_PUSH_STREAM:
- RequestPushStreamMsg param = JSON.to(RequestPushStreamMsg.class, wvpRedisMsg.getContent());
- requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
- break;
- case WvpRedisMsgCmd.REQUEST_STOP_PUSH_STREAM:
- RequestStopPushStreamMsg streamMsg = JSON.to(RequestStopPushStreamMsg.class, wvpRedisMsg.getContent());
- requestStopPushStreamMsgHand(streamMsg, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
- break;
- default:
- break;
- }
-
- }else {
- logger.info("[鏀跺埌REDIS閫氱煡] 鍥炲锛� {}", new String(msg.getBody()));
- switch (wvpRedisMsg.getCmd()){
- case WvpRedisMsgCmd.GET_SEND_ITEM:
-
- WVPResult content = JSON.to(WVPResult.class, wvpRedisMsg.getContent());
-
- String key = wvpRedisMsg.getSerial();
- switch (content.getCode()) {
- case 0:
- ResponseSendItemMsg responseSendItemMsg =JSON.to(ResponseSendItemMsg.class, content.getData());
- PlayMsgCallback playMsgCallback = callbacks.get(key);
- if (playMsgCallback != null) {
- callbacksForError.remove(key);
- try {
- playMsgCallback.handler(responseSendItemMsg);
- } catch (ParseException e) {
- logger.error("[REDIS娑堟伅澶勭悊寮傚父] ", e);
- }
- }
- break;
- case ERROR_CODE_MEDIA_SERVER_NOT_FOUND:
- case ERROR_CODE_OFFLINE:
- case ERROR_CODE_TIMEOUT:
- PlayMsgErrorCallback errorCallback = callbacksForError.get(key);
- if (errorCallback != null) {
- callbacks.remove(key);
- errorCallback.handler(content);
- }
- break;
- default:
- break;
- }
- break;
- case WvpRedisMsgCmd.REQUEST_PUSH_STREAM:
- WVPResult wvpResult = JSON.to(WVPResult.class, wvpRedisMsg.getContent());
- String serial = wvpRedisMsg.getSerial();
- switch (wvpResult.getCode()) {
- case 0:
- JSONObject jsonObject = (JSONObject)wvpResult.getData();
- PlayMsgCallbackForStartSendRtpStream playMsgCallback = callbacksForStartSendRtpStream.get(serial);
- if (playMsgCallback != null) {
- callbacksForError.remove(serial);
- playMsgCallback.handler(jsonObject);
- }
- break;
- case ERROR_CODE_MEDIA_SERVER_NOT_FOUND:
- case ERROR_CODE_OFFLINE:
- case ERROR_CODE_TIMEOUT:
- PlayMsgErrorCallback errorCallback = callbacksForError.get(serial);
- if (errorCallback != null) {
- callbacks.remove(serial);
- errorCallback.handler(wvpResult);
- }
- break;
- default:
- break;
- }
- break;
- default:
- break;
- }
-
- }
- }catch (Exception e) {
- logger.warn("[RedisGbPlayMsg] 鍙戠幇鏈鐞嗙殑寮傚父, \r\n{}", JSON.toJSONString(message));
- logger.error("[RedisGbPlayMsg] 寮傚父鍐呭锛� ", e);
- }
- }
- });
- }
- }
-
- /**
- * 澶勭悊鏀跺埌鐨勮姹傛帹娴佺殑璇锋眰
- */
- private void requestPushStreamMsgHand(RequestPushStreamMsg requestPushStreamMsg, String fromId, String serial) {
- MediaServerItem mediaInfo = mediaServerService.getOne(requestPushStreamMsg.getMediaServerId());
- if (mediaInfo == null) {
- // TODO 鍥炲閿欒
- return;
- }
- String is_Udp = requestPushStreamMsg.isTcp() ? "0" : "1";
- Map<String, Object> param = new HashMap<>();
- param.put("vhost","__defaultVhost__");
- param.put("app",requestPushStreamMsg.getApp());
- param.put("stream",requestPushStreamMsg.getStream());
- param.put("ssrc", requestPushStreamMsg.getSsrc());
- param.put("dst_url",requestPushStreamMsg.getIp());
- param.put("dst_port", requestPushStreamMsg.getPort());
- param.put("is_udp", is_Udp);
- param.put("src_port", requestPushStreamMsg.getSrcPort());
- param.put("pt", requestPushStreamMsg.getPt());
- param.put("use_ps", requestPushStreamMsg.isPs() ? "1" : "0");
- param.put("only_audio", requestPushStreamMsg.isOnlyAudio() ? "1" : "0");
- JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaInfo, param);
- // 鍥炲娑堟伅
- responsePushStream(jsonObject, fromId, serial);
- }
-
- private void responsePushStream(JSONObject content, String toId, String serial) {
-
- WVPResult<JSONObject> result = new WVPResult<>();
- result.setCode(0);
- result.setData(content);
-
- WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), toId,
- WvpRedisMsgCmd.REQUEST_PUSH_STREAM, serial, JSON.toJSONString(result));
- JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
- redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
- }
-
- /**
- * 澶勭悊鏀跺埌鐨勮姹俿endItem鐨勮姹�
- */
- private void requestSendItemMsgHand(RequestSendItemMsg content, String toId, String serial) {
- MediaServerItem mediaServerItem = mediaServerService.getOne(content.getMediaServerId());
- if (mediaServerItem == null) {
- logger.info("[鍥炲鎺ㄦ祦淇℃伅] 娴佸獟浣搟}涓嶅瓨鍦� ", content.getMediaServerId());
-
- WVPResult<SendRtpItem> result = new WVPResult<>();
- result.setCode(ERROR_CODE_MEDIA_SERVER_NOT_FOUND);
- result.setMsg("娴佸獟浣撲笉瀛樺湪");
-
- WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), toId,
- WvpRedisMsgCmd.GET_SEND_ITEM, serial, JSON.toJSONString(result));
-
- JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
- redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
- return;
- }
- // 纭畾娴佹槸鍚﹀湪绾�
- Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, content.getApp(), content.getStream());
- if (streamReady != null && streamReady) {
- logger.info("[鍥炲鎺ㄦ祦淇℃伅] {}/{}", content.getApp(), content.getStream());
- responseSendItem(mediaServerItem, content, toId, serial);
- }else {
- // 娴佸凡缁忕绾�
- // 鍙戦�乺edis娑堟伅浠ヤ娇璁惧涓婄嚎
- logger.info("[ app={}, stream={} ]閫氶亾绂荤嚎锛屽彂閫乺edis淇℃伅鎺у埗璁惧寮�濮嬫帹娴�",content.getApp(), content.getStream());
-
- String taskKey = UUID.randomUUID().toString();
- // 璁剧疆瓒呮椂
- dynamicTask.startDelay(taskKey, ()->{
- logger.info("[ app={}, stream={} ] 绛夊緟璁惧寮�濮嬫帹娴佽秴鏃�", content.getApp(), content.getStream());
- WVPResult<SendRtpItem> result = new WVPResult<>();
- result.setCode(ERROR_CODE_TIMEOUT);
- WvpRedisMsg response = WvpRedisMsg.getResponseInstance(
- userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, JSON.toJSONString(result)
- );
- JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
- redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
- }, userSetting.getPlatformPlayTimeout());
-
- // 娣诲姞璁㈤槄
- HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(content.getApp(), content.getStream(), true, "rtsp", mediaServerItem.getId());
-
- subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam)->{
- dynamicTask.stop(taskKey);
- responseSendItem(mediaServerItem, content, toId, serial);
- });
-
- MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1, content.getApp(), content.getStream(),
- content.getChannelId(), content.getPlatformId(), content.getPlatformName(), content.getServerId(),
- content.getMediaServerId());
-
- String key = VideoManagerConstants.VM_MSG_STREAM_PUSH_REQUESTED;
- logger.info("[redis鍙戦�侀�氱煡] 鎺ㄦ祦琚姹� {}: {}/{}", key, messageForPushChannel.getApp(), messageForPushChannel.getStream());
- redisTemplate.convertAndSend(key, JSON.toJSON(messageForPushChannel));
- }
- }
-
- /**
- * 灏嗚幏鍙栧埌鐨剆endItem鍙戦�佸嚭鍘�
- */
- private void responseSendItem(MediaServerItem mediaServerItem, RequestSendItemMsg content, String toId, String serial) {
- SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, content.getIp(),
- content.getPort(), content.getSsrc(), content.getPlatformId(),
- content.getApp(), content.getStream(), content.getChannelId(),
- content.getTcp(), content.getRtcp());
-
- WVPResult<ResponseSendItemMsg> result = new WVPResult<>();
- result.setCode(0);
- ResponseSendItemMsg responseSendItemMsg = new ResponseSendItemMsg();
- responseSendItemMsg.setSendRtpItem(sendRtpItem);
- responseSendItemMsg.setMediaServerItem(mediaServerItem);
- result.setData(responseSendItemMsg);
- redisCatchStorage.updateSendRTPSever(sendRtpItem);
-
- WvpRedisMsg response = WvpRedisMsg.getResponseInstance(
- userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, JSON.toJSONString(result)
- );
- JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
- redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
- }
-
- /**
- * 鍙戦�佹秷鎭姹備笅绾х敓鎴愭帹娴佷俊鎭�
- * @param serverId 涓嬬骇鏈嶅姟ID
- * @param app 搴旂敤鍚�
- * @param stream 娴両D
- * @param ip 鐩爣IP
- * @param port 鐩爣绔彛
- * @param ssrc ssrc
- * @param platformId 骞冲彴鍥芥爣缂栧彿
- * @param channelId 閫氶亾ID
- * @param isTcp 鏄惁浣跨敤TCP
- * @param callback 寰楀埌淇℃伅鐨勫洖璋�
- */
- public void sendMsg(String serverId, String mediaServerId, String app, String stream, String ip, int port, String ssrc,
- String platformId, String channelId, boolean isTcp, boolean rtcp, String platformName, PlayMsgCallback callback, PlayMsgErrorCallback errorCallback) {
- RequestSendItemMsg requestSendItemMsg = RequestSendItemMsg.getInstance(
- serverId, mediaServerId, app, stream, ip, port, ssrc, platformId, channelId, isTcp, rtcp, platformName);
- requestSendItemMsg.setServerId(serverId);
- String key = UUID.randomUUID().toString();
- WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId, WvpRedisMsgCmd.GET_SEND_ITEM,
- key, JSON.toJSONString(requestSendItemMsg));
-
- JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg);
- logger.info("[璇锋眰鎺ㄦ祦SendItem] {}: {}", serverId, jsonObject);
- callbacks.put(key, callback);
- callbacksForError.put(key, errorCallback);
- dynamicTask.startDelay(key, ()->{
- callbacks.remove(key);
- callbacksForError.remove(key);
- WVPResult<Object> wvpResult = new WVPResult<>();
- wvpResult.setCode(ERROR_CODE_TIMEOUT);
- wvpResult.setMsg("timeout");
- errorCallback.handler(wvpResult);
- }, userSetting.getPlatformPlayTimeout());
- redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
- }
-
- /**
- * 鍙戦�佽姹傛帹娴佺殑娑堟伅
- * @param param 鎺ㄦ祦鍙傛暟
- * @param callback 鍥炶皟
- */
- public void sendMsgForStartSendRtpStream(String serverId, RequestPushStreamMsg param, PlayMsgCallbackForStartSendRtpStream callback) {
- String key = UUID.randomUUID().toString();
- WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId,
- WvpRedisMsgCmd.REQUEST_PUSH_STREAM, key, JSON.toJSONString(param));
-
- JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg);
- logger.info("[REDIS 璇锋眰鍏朵粬骞冲彴鎺ㄦ祦] {}: {}", serverId, jsonObject);
- dynamicTask.startDelay(key, ()->{
- callbacksForStartSendRtpStream.remove(key);
- callbacksForError.remove(key);
- }, userSetting.getPlatformPlayTimeout());
- callbacksForStartSendRtpStream.put(key, callback);
- callbacksForError.put(key, (wvpResult)->{
- logger.info("[REDIS 璇锋眰鍏朵粬骞冲彴鎺ㄦ祦] 澶辫触: {}", wvpResult.getMsg());
- callbacksForStartSendRtpStream.remove(key);
- callbacksForError.remove(key);
- });
- redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
- }
-
- /**
- * 鍙戦�佽姹傛帹娴佺殑娑堟伅
- */
- public void sendMsgForStopSendRtpStream(String serverId, RequestStopPushStreamMsg streamMsg) {
- String key = UUID.randomUUID().toString();
- WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId,
- WvpRedisMsgCmd.REQUEST_STOP_PUSH_STREAM, key, JSON.toJSONString(streamMsg));
-
- JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg);
- logger.info("[REDIS 璇锋眰鍏朵粬骞冲彴鍋滄鎺ㄦ祦] {}: {}", serverId, jsonObject);
- redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
- }
-
- private SendRtpItem querySendRTPServer(String platformGbId, String channelId, String streamId, String callId) {
- if (platformGbId == null) {
- platformGbId = "*";
- }
- if (channelId == null) {
- channelId = "*";
- }
- if (streamId == null) {
- streamId = "*";
- }
- if (callId == null) {
- callId = "*";
- }
- String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX
- + userSetting.getServerId() + "_*_"
- + platformGbId + "_"
- + channelId + "_"
- + streamId + "_"
- + callId;
- List<Object> scan = RedisUtil.scan(redisTemplate, key);
- if (scan.size() > 0) {
- return (SendRtpItem)redisTemplate.opsForValue().get(scan.get(0));
- }else {
- return null;
- }
- }
-
- /**
- * 澶勭悊鏀跺埌鐨勮姹傛帹娴佺殑璇锋眰
- */
- private void requestStopPushStreamMsgHand(RequestStopPushStreamMsg streamMsg, String fromId, String serial) {
- SendRtpItem sendRtpItem = streamMsg.getSendRtpItem();
- if (sendRtpItem == null) {
- logger.info("[REDIS 鎵ц鍏朵粬骞冲彴鐨勮姹傚仠姝㈡帹娴乚 澶辫触锛� sendRtpItem涓篘ULL");
- return;
- }
- MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
- if (mediaInfo == null) {
- // TODO 鍥炲閿欒
- return;
- }
- Map<String, Object> param = new HashMap<>();
- param.put("vhost","__defaultVhost__");
- param.put("app",sendRtpItem.getApp());
- param.put("stream",sendRtpItem.getStream());
- param.put("ssrc", sendRtpItem.getSsrc());
-
- if (zlmServerFactory.stopSendRtpStream(mediaInfo, param)) {
- logger.info("[REDIS 鎵ц鍏朵粬骞冲彴鐨勮姹傚仠姝㈡帹娴乚 鎴愬姛锛� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
- // 鍙戦�乺edis娑堟伅
- MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
- sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
- sendRtpItem.getPlatformId(), streamMsg.getPlatformName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
- messageForPushChannel.setPlatFormIndex(streamMsg.getPlatFormIndex());
- redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
- }
-
- }
-}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java
new file mode 100755
index 0000000..14a96e8
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java
@@ -0,0 +1,81 @@
+package com.genersoft.iot.vmp.service.redisMsg;
+
+import com.alibaba.fastjson2.JSON;
+import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
+import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
+import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
+import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.stereotype.Component;
+import org.springframework.util.ObjectUtils;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * 鏀跺埌娑堟伅鍚庡紑濮嬬粰涓婄骇鍙戞祦
+ * @author lin
+ */
+@Component
+public class RedisPlatformStartSendRtpListener implements MessageListener {
+
+ private final static Logger logger = LoggerFactory.getLogger(RedisPlatformStartSendRtpListener.class);
+
+ private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
+
+ @Autowired
+ private UserSetting userSetting;
+
+ @Autowired
+ private ZlmHttpHookSubscribe hookSubscribe;
+
+ @Qualifier("taskExecutor")
+ @Autowired
+ private ThreadPoolTaskExecutor taskExecutor;
+
+
+ @Override
+ public void onMessage(Message message, byte[] bytes) {
+ logger.info("[REDIS娑堟伅-鏀跺埌涓婄骇绛夊埌璁惧鎺ㄦ祦鐨剅edis娑堟伅]锛� {}", new String(message.getBody()));
+ boolean isEmpty = taskQueue.isEmpty();
+ taskQueue.offer(message);
+ if (isEmpty) {
+ taskExecutor.execute(() -> {
+ while (!taskQueue.isEmpty()) {
+ Message msg = taskQueue.poll();
+ try {
+ MessageForPushChannel messageForPushChannel = JSON.parseObject(new String(msg.getBody()), MessageForPushChannel.class);
+ if (messageForPushChannel == null
+ || ObjectUtils.isEmpty(messageForPushChannel.getApp())
+ || ObjectUtils.isEmpty(messageForPushChannel.getStream())
+ || userSetting.getServerId().equals(messageForPushChannel.getServerId())){
+ continue;
+ }
+
+ // 鐩戝惉娴佷笂绾裤�� 娴佷笂绾跨洿鎺ュ彂閫乻endRtpItem娑堟伅缁欏疄闄呯殑淇′护澶勭悊鑰�
+ HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed(
+ messageForPushChannel.getApp(), messageForPushChannel.getStream(), true, "rtsp",
+ null);
+ hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> {
+ // 璇诲彇redis涓殑涓婄骇鐐规挱淇℃伅锛岀敓鎴恠endRtpItm鍙戦�佸嚭鍘�
+
+ });
+
+
+ }catch (Exception e) {
+ logger.warn("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉] 鍙戠幇鏈鐞嗙殑寮傚父, \r\n{}", JSON.toJSONString(message));
+ logger.error("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉] 寮傚父鍐呭锛� ", e);
+ }
+ }
+ });
+ }
+ }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformWaitPushStreamOnlineListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformWaitPushStreamOnlineListener.java
new file mode 100755
index 0000000..f3b415d
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformWaitPushStreamOnlineListener.java
@@ -0,0 +1,81 @@
+package com.genersoft.iot.vmp.service.redisMsg;
+
+import com.alibaba.fastjson2.JSON;
+import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
+import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
+import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
+import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.stereotype.Component;
+import org.springframework.util.ObjectUtils;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * 涓婄骇绛夊埌璁惧鎺ㄦ祦鐨剅edis娑堟伅
+ * @author lin
+ */
+@Component
+public class RedisPlatformWaitPushStreamOnlineListener implements MessageListener {
+
+ private final static Logger logger = LoggerFactory.getLogger(RedisPlatformWaitPushStreamOnlineListener.class);
+
+ private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
+
+ @Autowired
+ private UserSetting userSetting;
+
+ @Autowired
+ private ZlmHttpHookSubscribe hookSubscribe;
+
+ @Qualifier("taskExecutor")
+ @Autowired
+ private ThreadPoolTaskExecutor taskExecutor;
+
+
+ @Override
+ public void onMessage(Message message, byte[] bytes) {
+ logger.info("[REDIS娑堟伅-鏀跺埌涓婄骇绛夊埌璁惧鎺ㄦ祦鐨剅edis娑堟伅]锛� {}", new String(message.getBody()));
+ boolean isEmpty = taskQueue.isEmpty();
+ taskQueue.offer(message);
+ if (isEmpty) {
+ taskExecutor.execute(() -> {
+ while (!taskQueue.isEmpty()) {
+ Message msg = taskQueue.poll();
+ try {
+ MessageForPushChannel messageForPushChannel = JSON.parseObject(new String(msg.getBody()), MessageForPushChannel.class);
+ if (messageForPushChannel == null
+ || ObjectUtils.isEmpty(messageForPushChannel.getApp())
+ || ObjectUtils.isEmpty(messageForPushChannel.getStream())
+ || userSetting.getServerId().equals(messageForPushChannel.getServerId())){
+ continue;
+ }
+
+ // 鐩戝惉娴佷笂绾裤�� 娴佷笂绾跨洿鎺ュ彂閫乻endRtpItem娑堟伅缁欏疄闄呯殑淇′护澶勭悊鑰�
+ HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed(
+ messageForPushChannel.getApp(), messageForPushChannel.getStream(), true, "rtsp",
+ null);
+ hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> {
+ // 璇诲彇redis涓殑涓婄骇鐐规挱淇℃伅锛岀敓鎴恠endRtpItm鍙戦�佸嚭鍘�
+
+ });
+
+
+ }catch (Exception e) {
+ logger.warn("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉] 鍙戠幇鏈鐞嗙殑寮傚父, \r\n{}", JSON.toJSONString(message));
+ logger.error("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉] 寮傚父鍐呭锛� ", e);
+ }
+ }
+ });
+ }
+ }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
index 66db103..1e5e93d 100755
--- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
@@ -217,4 +217,7 @@
void sendPushStreamClose(MessageForPushChannel messageForPushChannel);
+ void addWaiteSendRtpItem(SendRtpItem sendRtpItem, int platformPlayTimeout);
+
+ void sendStartSendRtp(SendRtpItem sendRtpItem);
}
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
index 1eac4df..60084df 100755
--- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -678,4 +678,16 @@
logger.info("[redis鍙戦�侀�氱煡] 鍙戦�� 鍋滄鍚戜笂绾ф帹娴� {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId());
redisTemplate.convertAndSend(key, JSON.toJSON(msg));
}
+
+ @Override
+ public void addWaiteSendRtpItem(SendRtpItem sendRtpItem, int platformPlayTimeout) {
+ String key = VideoManagerConstants.WAITE_SEND_PUSH_STREAM + sendRtpItem.getApp() + "_" + sendRtpItem.getStream();
+ redisTemplate.opsForValue().set(key, platformPlayTimeout);
+ }
+
+ @Override
+ public void sendStartSendRtp(SendRtpItem sendRtpItem) {
+ String key = VideoManagerConstants.START_SEND_PUSH_STREAM + sendRtpItem.getApp() + "_" + sendRtpItem.getStream();
+ redisTemplate.opsForValue().set(key, JSON.toJSONString(sendRtpItem));
+ }
}
--
Gitblit v1.8.0