From 1fc2916c2b4b28fbf722c4401e559805f9578573 Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期日, 28 四月 2024 22:25:58 +0800
Subject: [PATCH] Merge pull request #1432 from AlphaWu/Zafu-Dev-20240428
---
 src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java |  277 ++++++++++++++++++++++---------------------------------
 1 files changed, 111 insertions(+), 166 deletions(-)
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java
old mode 100644
new mode 100755
index ed73dd1..6e00960
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java
@@ -1,17 +1,23 @@
 package com.genersoft.iot.vmp.service.impl;
 
 import com.alibaba.fastjson2.JSON;
+import com.baomidou.dynamic.datasource.annotation.DS;
 import com.genersoft.iot.vmp.common.InviteInfo;
 import com.genersoft.iot.vmp.common.InviteSessionStatus;
 import com.genersoft.iot.vmp.common.InviteSessionType;
 import com.genersoft.iot.vmp.common.VideoManagerConstants;
+import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
+import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
 import com.genersoft.iot.vmp.service.IInviteStreamService;
 import com.genersoft.iot.vmp.service.bean.ErrorCallback;
+import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
 import com.genersoft.iot.vmp.utils.redis.RedisUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.event.EventListener;
 import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
 
 import java.util.List;
@@ -20,6 +26,7 @@
 import java.util.concurrent.CopyOnWriteArrayList;
 
 @Service
+@DS("master")
 public class InviteStreamServiceImpl implements IInviteStreamService {
 
     private final Logger logger = LoggerFactory.getLogger(InviteStreamServiceImpl.class);
@@ -28,6 +35,35 @@
 
     @Autowired
     private RedisTemplate<Object, Object> redisTemplate;
+
+    @Autowired
+    private IVideoManagerStorage storage;
+
+    /**
+     * 娴佸埌鏉ョ殑澶勭悊
+     */
+    @Async("taskExecutor")
+    @org.springframework.context.event.EventListener
+    public void onApplicationEvent(MediaArrivalEvent event) {
+//        if ("rtsp".equals(event.getSchema()) && "rtp".equals(event.getApp())) {
+//
+//        }
+    }
+
+    /**
+     * 娴佺寮�鐨勫鐞�
+     */
+    @Async("taskExecutor")
+    @EventListener
+    public void onApplicationEvent(MediaDepartureEvent event) {
+        if ("rtsp".equals(event.getSchema()) && "rtp".equals(event.getApp())) {
+            InviteInfo inviteInfo = getInviteInfoByStream(null, event.getStream());
+            if (inviteInfo != null && (inviteInfo.getType() == InviteSessionType.PLAY || inviteInfo.getType() == InviteSessionType.PLAYBACK)) {
+                removeInviteInfo(inviteInfo);
+                storage.stopPlay(inviteInfo.getDeviceId(), inviteInfo.getChannelId());
+            }
+        }
+    }
 
     @Override
     public void updateInviteInfo(InviteInfo inviteInfo) {
@@ -77,23 +113,50 @@
 
         }
         String key = VideoManagerConstants.INVITE_PREFIX +
-                "_" + inviteInfoForUpdate.getType() +
-                "_" + inviteInfoForUpdate.getDeviceId() +
-                "_" + inviteInfoForUpdate.getChannelId() +
-                "_" + inviteInfoForUpdate.getStream();
+                ":" + inviteInfoForUpdate.getType() +
+                ":" + inviteInfoForUpdate.getDeviceId() +
+                ":" + inviteInfoForUpdate.getChannelId() +
+                ":" + inviteInfoForUpdate.getStream()+
+                ":" + inviteInfoForUpdate.getSsrcInfo().getSsrc();
         redisTemplate.opsForValue().set(key, inviteInfoForUpdate);
+    }
+
+    @Override
+    public InviteInfo updateInviteInfoForStream(InviteInfo inviteInfo, String stream) {
+
+        InviteInfo inviteInfoInDb = getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
+        if (inviteInfoInDb == null) {
+            return null;
+        }
+        removeInviteInfo(inviteInfoInDb);
+        String key = VideoManagerConstants.INVITE_PREFIX +
+                ":" + inviteInfo.getType() +
+                ":" + inviteInfo.getDeviceId() +
+                ":" + inviteInfo.getChannelId() +
+                ":" + stream +
+                ":" + inviteInfo.getSsrcInfo().getSsrc();
+        inviteInfoInDb.setStream(stream);
+        if (inviteInfoInDb.getSsrcInfo() != null) {
+            inviteInfoInDb.getSsrcInfo().setStream(stream);
+        }
+        redisTemplate.opsForValue().set(key, inviteInfoInDb);
+        return inviteInfoInDb;
     }
 
     @Override
     public InviteInfo getInviteInfo(InviteSessionType type, String deviceId, String channelId, String stream) {
         String key = VideoManagerConstants.INVITE_PREFIX +
-                "_" + (type != null ? type : "*") +
-                "_" + (deviceId != null ? deviceId : "*") +
-                "_" + (channelId != null ? channelId : "*") +
-                "_" + (stream != null ? stream : "*");
+                ":" + (type != null ? type : "*") +
+                ":" + (deviceId != null ? deviceId : "*") +
+                ":" + (channelId != null ? channelId : "*") +
+                ":" + (stream != null ? stream : "*")
+                + ":*";
         List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
-        if (scanResult.size() != 1) {
+        if (scanResult.isEmpty()) {
             return null;
+        }
+        if (scanResult.size() != 1) {
+            logger.warn("[鑾峰彇InviteInfo] 鍙戠幇 key: {}瀛樺湪澶氭潯", key);
         }
 
         return (InviteInfo) redisTemplate.opsForValue().get(scanResult.get(0));
@@ -112,10 +175,11 @@
     @Override
     public void removeInviteInfo(InviteSessionType type, String deviceId, String channelId, String stream) {
         String scanKey = VideoManagerConstants.INVITE_PREFIX +
-                "_" + (type != null ? type : "*") +
-                "_" + (deviceId != null ? deviceId : "*") +
-                "_" + (channelId != null ? channelId : "*") +
-                "_" + (stream != null ? stream : "*");
+                ":" + (type != null ? type : "*") +
+                ":" + (deviceId != null ? deviceId : "*") +
+                ":" + (channelId != null ? channelId : "*") +
+                ":" + (stream != null ? stream : "*") +
+                ":*";
         List<Object> scanResult = RedisUtil.scan(redisTemplate, scanKey);
         if (scanResult.size() > 0) {
             for (Object keyObj : scanResult) {
@@ -152,24 +216,11 @@
 
     }
 
-    @Override
-    public void call(InviteSessionType type, String deviceId, String channelId, String stream, int code, String msg, Object data) {
-        String key = buildKey(type, deviceId, channelId, stream);
-        List<ErrorCallback<Object>> callbacks = inviteErrorCallbackMap.get(key);
-        if (callbacks == null) {
-            return;
-        }
-        for (ErrorCallback<Object> callback : callbacks) {
-            callback.run(code, msg, data);
-        }
-        inviteErrorCallbackMap.remove(key);
-    }
-
     private String buildKey(InviteSessionType type, String deviceId, String channelId, String stream) {
-        String key = type + "_" +  deviceId + "_" + channelId;
+        String key = type + ":" +  deviceId + ":" + channelId;
         // 濡傛灉ssrc鏈猲ull閭d箞鍙互瀹炵幇涓�涓�氶亾鍙兘涓�娆℃搷浣滐紝ssrc涓嶄负null鍒欏彲浠ユ敮鎸佷竴涓�氶亾澶氭invite
         if (stream != null) {
-            key += ("_" + stream);
+            key += (":" + stream);
         }
         return key;
     }
@@ -183,7 +234,7 @@
     @Override
     public int getStreamInfoCount(String mediaServerId) {
         int count = 0;
-        String key = VideoManagerConstants.INVITE_PREFIX + "_*_*_*_*";
+        String key = VideoManagerConstants.INVITE_PREFIX + ":*:*:*:*:*";
         List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
         if (scanResult.size() == 0) {
             return 0;
@@ -199,69 +250,9 @@
         return count;
     }
 
-    /*======================璁惧涓诲瓙鐮佹祦閫昏緫START=========================*/
-
     @Override
-    public InviteInfo getInviteInfoByDeviceAndChannel(InviteSessionType type, String deviceId, String channelId, boolean isSubStream) {
-        return getInviteInfo(type, deviceId, channelId,isSubStream, null);
-    }
-
-    @Override
-    public void removeInviteInfoByDeviceAndChannel(InviteSessionType inviteSessionType, String deviceId, String channelId, boolean isSubStream) {
-        removeInviteInfo(inviteSessionType, deviceId, channelId,isSubStream, null);
-    }
-
-    @Override
-    public InviteInfo getInviteInfo(InviteSessionType type, String deviceId, String channelId,boolean isSubStream, String stream) {
-        String key = VideoManagerConstants.INVITE_PREFIX +
-                "_" + (type != null ? type : "*") +
-                "_" + (isSubStream ? "sub" : "main") +
-                "_" + (deviceId != null ? deviceId : "*") +
-                "_" + (channelId != null ? channelId : "*") +
-                "_" + (stream != null ? stream : "*");
-        List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
-        if (scanResult.size() != 1) {
-            return null;
-        }
-        return (InviteInfo) redisTemplate.opsForValue().get(scanResult.get(0));
-    }
-
-    @Override
-    public void removeInviteInfo(InviteSessionType type, String deviceId, String channelId, boolean isSubStream, String stream) {
-        String scanKey = VideoManagerConstants.INVITE_PREFIX +
-                "_" + (type != null ? type : "*") +
-                "_" + (isSubStream ? "sub" : "main") +
-                "_" + (deviceId != null ? deviceId : "*") +
-                "_" + (channelId != null ? channelId : "*") +
-                "_" + (stream != null ? stream : "*");
-        List<Object> scanResult = RedisUtil.scan(redisTemplate, scanKey);
-        if (scanResult.size() > 0) {
-            for (Object keyObj : scanResult) {
-                String key = (String) keyObj;
-                InviteInfo inviteInfo = (InviteInfo) redisTemplate.opsForValue().get(key);
-                if (inviteInfo == null) {
-                    continue;
-                }
-                redisTemplate.delete(key);
-                inviteErrorCallbackMap.remove(buildKey(type, deviceId, channelId, inviteInfo.getStream()));
-            }
-        }
-    }
-
-    @Override
-    public void once(InviteSessionType type, String deviceId, String channelId, boolean isSubStream, String stream, ErrorCallback<Object> callback) {
-        String key = buildSubStreamKey(type, deviceId, channelId,isSubStream, stream);
-        List<ErrorCallback<Object>> callbacks = inviteErrorCallbackMap.get(key);
-        if (callbacks == null) {
-            callbacks = new CopyOnWriteArrayList<>();
-            inviteErrorCallbackMap.put(key, callbacks);
-        }
-        callbacks.add(callback);
-    }
-
-    @Override
-    public void call(InviteSessionType type, String deviceId, String channelId, boolean isSubStream, String stream, int code, String msg, Object data) {
-        String key = buildSubStreamKey(type, deviceId, channelId,isSubStream, stream);
+    public void call(InviteSessionType type, String deviceId, String channelId, String stream, int code, String msg, Object data) {
+        String key = buildSubStreamKey(type, deviceId, channelId, stream);
         List<ErrorCallback<Object>> callbacks = inviteErrorCallbackMap.get(key);
         if (callbacks == null) {
             return;
@@ -273,89 +264,43 @@
     }
 
 
-    private String buildSubStreamKey(InviteSessionType type, String deviceId, String channelId, boolean isSubStream, String stream) {
-        String key = type + "_" + (isSubStream ? "sub":"main") + "_" +  deviceId + "_" + channelId;
+    private String buildSubStreamKey(InviteSessionType type, String deviceId, String channelId, String stream) {
+        String key = type + ":" + ":" +  deviceId + ":" + channelId;
         // 濡傛灉ssrc涓簄ull閭d箞鍙互瀹炵幇涓�涓�氶亾鍙兘涓�娆℃搷浣滐紝ssrc涓嶄负null鍒欏彲浠ユ敮鎸佷竴涓�氶亾澶氭invite
         if (stream != null) {
-            key += ("_" + stream);
+            key += (":" + stream);
         }
         return key;
     }
-    @Override
-    public void updateInviteInfoSub(InviteInfo inviteInfo) {
-        if (inviteInfo == null || (inviteInfo.getDeviceId() == null || inviteInfo.getChannelId() == null)) {
-            logger.warn("[鏇存柊Invite淇℃伅]锛屽弬鏁颁笉鍏細 {}", JSON.toJSON(inviteInfo));
-            return;
-        }
-        InviteInfo inviteInfoForUpdate = null;
-
-        if (InviteSessionStatus.ready == inviteInfo.getStatus()) {
-            if (inviteInfo.getDeviceId() == null
-                    || inviteInfo.getChannelId() == null
-                    || inviteInfo.getType() == null
-                    || inviteInfo.getStream() == null
-            ) {
-                return;
-            }
-            inviteInfoForUpdate = inviteInfo;
-        } else {
-            InviteInfo inviteInfoInRedis = getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(),
-                    inviteInfo.getChannelId(),inviteInfo.isSubStream(), inviteInfo.getStream());
-            if (inviteInfoInRedis == null) {
-                logger.warn("[鏇存柊Invite淇℃伅]锛屾湭浠庣紦瀛樹腑璇诲彇鍒癐nvite淇℃伅锛� deviceId: {}, channel: {}, stream: {}",
-                        inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
-                return;
-            }
-            if (inviteInfo.getStreamInfo() != null) {
-                inviteInfoInRedis.setStreamInfo(inviteInfo.getStreamInfo());
-            }
-            if (inviteInfo.getSsrcInfo() != null) {
-                inviteInfoInRedis.setSsrcInfo(inviteInfo.getSsrcInfo());
-            }
-            if (inviteInfo.getStreamMode() != null) {
-                inviteInfoInRedis.setStreamMode(inviteInfo.getStreamMode());
-            }
-            if (inviteInfo.getReceiveIp() != null) {
-                inviteInfoInRedis.setReceiveIp(inviteInfo.getReceiveIp());
-            }
-            if (inviteInfo.getReceivePort() != null) {
-                inviteInfoInRedis.setReceivePort(inviteInfo.getReceivePort());
-            }
-            if (inviteInfo.getStatus() != null) {
-                inviteInfoInRedis.setStatus(inviteInfo.getStatus());
-            }
-
-            inviteInfoForUpdate = inviteInfoInRedis;
-
-        }
-        String key = VideoManagerConstants.INVITE_PREFIX +
-                "_" + inviteInfoForUpdate.getType() +
-                "_" + (inviteInfoForUpdate.isSubStream() ? "sub":"main") +
-                "_" + inviteInfoForUpdate.getDeviceId() +
-                "_" + inviteInfoForUpdate.getChannelId() +
-                "_" + inviteInfoForUpdate.getStream();
-        redisTemplate.opsForValue().set(key, inviteInfoForUpdate);
-    }
 
     @Override
-    public InviteInfo getInviteInfoByStream(InviteSessionType type, String stream, boolean isSubStream) {
-        return getInviteInfo(type, null, null,isSubStream, stream);
-    }
-
-    @Override
-    public List<Object> getInviteInfos(InviteSessionType type, String deviceId, String channelId, String stream) {
-        String key = VideoManagerConstants.INVITE_PREFIX +
-                "_" + (type != null ? type : "*") +
-                "_" + (deviceId != null ? deviceId : "*") +
-                "_" + (channelId != null ? channelId : "*") +
-                "_" + (stream != null ? stream : "*");
+    public InviteInfo getInviteInfoBySSRC(String ssrc) {
+        String key = VideoManagerConstants.INVITE_PREFIX + ":*:*:*:*:" + ssrc;
         List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
-        return scanResult;
+        if (scanResult.size() != 1) {
+            return null;
+        }
+
+        return (InviteInfo) redisTemplate.opsForValue().get(scanResult.get(0));
     }
 
-    /*======================璁惧涓诲瓙鐮佹祦閫昏緫END=========================*/
-
-
-
-
+    @Override
+    public InviteInfo updateInviteInfoForSSRC(InviteInfo inviteInfo, String ssrc) {
+        InviteInfo inviteInfoInDb = getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
+        if (inviteInfoInDb == null) {
+            return null;
+        }
+        removeInviteInfo(inviteInfoInDb);
+        String key = VideoManagerConstants.INVITE_PREFIX +
+                ":" + inviteInfo.getType() +
+                ":" + inviteInfo.getDeviceId() +
+                ":" + inviteInfo.getChannelId() +
+                ":" + inviteInfo.getStream() +
+                ":" + ssrc;
+        if (inviteInfoInDb.getSsrcInfo() != null) {
+            inviteInfoInDb.getSsrcInfo().setSsrc(ssrc);
+        }
+        redisTemplate.opsForValue().set(key, inviteInfoInDb);
+        return inviteInfoInDb;
+    }
 }
--
Gitblit v1.8.0