From 48ca11aaf324d017c08fadabf27c753fdd76c9b7 Mon Sep 17 00:00:00 2001
From: xiangpei <xiangpei@timesnew.cn>
Date: 星期四, 10 十月 2024 20:19:36 +0800
Subject: [PATCH] xxx
---
src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java | 115 +++++++++++++++++++++++++++++++++++++++++++++++++++++----
1 files changed, 107 insertions(+), 8 deletions(-)
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java
old mode 100644
new mode 100755
index def639b..b34a1b7
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java
@@ -1,25 +1,34 @@
package com.genersoft.iot.vmp.service.impl;
import com.alibaba.fastjson2.JSON;
+import com.baomidou.dynamic.datasource.annotation.DS;
import com.genersoft.iot.vmp.common.InviteInfo;
import com.genersoft.iot.vmp.common.InviteSessionStatus;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
+import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
+import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
+import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.event.EventListener;
import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
@Service
+@DS("master")
public class InviteStreamServiceImpl implements IInviteStreamService {
private final Logger logger = LoggerFactory.getLogger(InviteStreamServiceImpl.class);
@@ -29,8 +38,48 @@
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
+ @Autowired
+ private IVideoManagerStorage storage;
+
+ @Autowired
+ private UserSetting userSetting;
+
+ /**
+ * 娴佸埌鏉ョ殑澶勭悊
+ */
+ @Async("taskExecutor")
+ @org.springframework.context.event.EventListener
+ public void onApplicationEvent(MediaArrivalEvent event) {
+// if ("rtsp".equals(event.getSchema()) && "rtp".equals(event.getApp())) {
+//
+// }
+ }
+
+ /**
+ * 娴佺寮�鐨勫鐞�
+ */
+ @Async("taskExecutor")
+ @EventListener
+ public void onApplicationEvent(MediaDepartureEvent event) {
+ if ("rtsp".equals(event.getSchema()) && "rtp".equals(event.getApp())) {
+ InviteInfo inviteInfo = getInviteInfoByStream(null, event.getStream());
+ if (inviteInfo != null && (inviteInfo.getType() == InviteSessionType.PLAY || inviteInfo.getType() == InviteSessionType.PLAYBACK)) {
+ removeInviteInfo(inviteInfo);
+ storage.stopPlay(inviteInfo.getDeviceId(), inviteInfo.getChannelId());
+ }
+ }
+ }
@Override
public void updateInviteInfo(InviteInfo inviteInfo) {
+ if (InviteSessionStatus.ready == inviteInfo.getStatus()) {
+ updateInviteInfo(inviteInfo, Long.valueOf(userSetting.getPlayTimeout()) * 2);
+ }else {
+ updateInviteInfo(inviteInfo, null);
+ }
+ }
+
+ @Override
+ public void updateInviteInfo(InviteInfo inviteInfo, Long time) {
if (inviteInfo == null || (inviteInfo.getDeviceId() == null || inviteInfo.getChannelId() == null)) {
logger.warn("[鏇存柊Invite淇℃伅]锛屽弬鏁颁笉鍏細 {}", JSON.toJSON(inviteInfo));
return;
@@ -80,8 +129,13 @@
":" + inviteInfoForUpdate.getType() +
":" + inviteInfoForUpdate.getDeviceId() +
":" + inviteInfoForUpdate.getChannelId() +
- ":" + inviteInfoForUpdate.getStream();
- redisTemplate.opsForValue().set(key, inviteInfoForUpdate);
+ ":" + inviteInfoForUpdate.getStream()+
+ ":" + inviteInfoForUpdate.getSsrcInfo().getSsrc();
+ if (time != null && time > 0) {
+ redisTemplate.opsForValue().set(key, inviteInfoForUpdate, time, TimeUnit.SECONDS);
+ }else {
+ redisTemplate.opsForValue().set(key, inviteInfoForUpdate);
+ }
}
@Override
@@ -96,12 +150,18 @@
":" + inviteInfo.getType() +
":" + inviteInfo.getDeviceId() +
":" + inviteInfo.getChannelId() +
- ":" + stream;
+ ":" + stream +
+ ":" + inviteInfo.getSsrcInfo().getSsrc();
inviteInfoInDb.setStream(stream);
if (inviteInfoInDb.getSsrcInfo() != null) {
inviteInfoInDb.getSsrcInfo().setStream(stream);
}
- redisTemplate.opsForValue().set(key, inviteInfoInDb);
+ if (InviteSessionStatus.ready == inviteInfo.getStatus()) {
+ redisTemplate.opsForValue().set(key, inviteInfoInDb, userSetting.getPlayTimeout() * 2, TimeUnit.SECONDS);
+ }else {
+ redisTemplate.opsForValue().set(key, inviteInfoInDb);
+ }
+
return inviteInfoInDb;
}
@@ -111,10 +171,14 @@
":" + (type != null ? type : "*") +
":" + (deviceId != null ? deviceId : "*") +
":" + (channelId != null ? channelId : "*") +
- ":" + (stream != null ? stream : "*");
+ ":" + (stream != null ? stream : "*")
+ + ":*";
List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
- if (scanResult.size() != 1) {
+ if (scanResult.isEmpty()) {
return null;
+ }
+ if (scanResult.size() != 1) {
+ logger.warn("[鑾峰彇InviteInfo] 鍙戠幇 key: {}瀛樺湪澶氭潯", key);
}
return (InviteInfo) redisTemplate.opsForValue().get(scanResult.get(0));
@@ -136,7 +200,8 @@
":" + (type != null ? type : "*") +
":" + (deviceId != null ? deviceId : "*") +
":" + (channelId != null ? channelId : "*") +
- ":" + (stream != null ? stream : "*");
+ ":" + (stream != null ? stream : "*") +
+ ":*";
List<Object> scanResult = RedisUtil.scan(redisTemplate, scanKey);
if (scanResult.size() > 0) {
for (Object keyObj : scanResult) {
@@ -191,7 +256,7 @@
@Override
public int getStreamInfoCount(String mediaServerId) {
int count = 0;
- String key = VideoManagerConstants.INVITE_PREFIX + ":*:*:*:*";
+ String key = VideoManagerConstants.INVITE_PREFIX + ":*:*:*:*:*";
List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
if (scanResult.size() == 0) {
return 0;
@@ -200,6 +265,9 @@
String keyStr = (String) keyObj;
InviteInfo inviteInfo = (InviteInfo) redisTemplate.opsForValue().get(keyStr);
if (inviteInfo != null && inviteInfo.getStreamInfo() != null && inviteInfo.getStreamInfo().getMediaServerId().equals(mediaServerId)) {
+ if (inviteInfo.getType().equals(InviteSessionType.DOWNLOAD) && inviteInfo.getStreamInfo().getProgress() == 1) {
+ continue;
+ }
count++;
}
}
@@ -229,4 +297,35 @@
}
return key;
}
+
+ @Override
+ public InviteInfo getInviteInfoBySSRC(String ssrc) {
+ String key = VideoManagerConstants.INVITE_PREFIX + ":*:*:*:*:" + ssrc;
+ List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
+ if (scanResult.size() != 1) {
+ return null;
+ }
+
+ return (InviteInfo) redisTemplate.opsForValue().get(scanResult.get(0));
+ }
+
+ @Override
+ public InviteInfo updateInviteInfoForSSRC(InviteInfo inviteInfo, String ssrc) {
+ InviteInfo inviteInfoInDb = getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
+ if (inviteInfoInDb == null) {
+ return null;
+ }
+ removeInviteInfo(inviteInfoInDb);
+ String key = VideoManagerConstants.INVITE_PREFIX +
+ ":" + inviteInfo.getType() +
+ ":" + inviteInfo.getDeviceId() +
+ ":" + inviteInfo.getChannelId() +
+ ":" + inviteInfo.getStream() +
+ ":" + ssrc;
+ if (inviteInfoInDb.getSsrcInfo() != null) {
+ inviteInfoInDb.getSsrcInfo().setSsrc(ssrc);
+ }
+ redisTemplate.opsForValue().set(key, inviteInfoInDb);
+ return inviteInfoInDb;
+ }
}
--
Gitblit v1.8.0