From 2b1f7a47394363e95deb4dfa0f1c67d41e747f7f Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期三, 01 二月 2023 10:56:40 +0800
Subject: [PATCH] Merge branch 'wvp-28181-2.0' into fix-269

---
 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java |  393 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 393 insertions(+), 0 deletions(-)

diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
new file mode 100644
index 0000000..35ed99e
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
@@ -0,0 +1,393 @@
+package com.genersoft.iot.vmp.service.redisMsg;
+
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONObject;
+import com.genersoft.iot.vmp.conf.DynamicTask;
+import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
+import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
+import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
+import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
+import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
+import com.genersoft.iot.vmp.service.IMediaServerService;
+import com.genersoft.iot.vmp.service.bean.*;
+import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
+import com.genersoft.iot.vmp.utils.redis.RedisUtil;
+import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.stereotype.Component;
+
+import java.text.ParseException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+
+/**
+ * 鐩戝惉涓嬬骇鍙戦�佹帹閫佷俊鎭紝骞跺彂閫佸浗鏍囨帹娴佹秷鎭笂绾�
+ * @author lin
+ */
+@Component
+public class RedisGbPlayMsgListener implements MessageListener {
+
+    private final static Logger logger = LoggerFactory.getLogger(RedisGbPlayMsgListener.class);
+
+    public static final String WVP_PUSH_STREAM_KEY = "WVP_PUSH_STREAM";
+
+    /**
+     * 娴佸獟浣撲笉瀛樺湪鐨勯敊璇帥
+     */
+    public static final  int ERROR_CODE_MEDIA_SERVER_NOT_FOUND = -1;
+
+    /**
+     * 绂荤嚎鐨勯敊璇帥
+     */
+    public static final  int ERROR_CODE_OFFLINE = -2;
+
+    /**
+     * 瓒呮椂鐨勯敊璇帥
+     */
+    public static final  int ERROR_CODE_TIMEOUT = -3;
+
+    private Map<String, PlayMsgCallback> callbacks = new ConcurrentHashMap<>();
+    private Map<String, PlayMsgCallbackForStartSendRtpStream> callbacksForStartSendRtpStream = new ConcurrentHashMap<>();
+    private Map<String, PlayMsgErrorCallback> callbacksForError = new ConcurrentHashMap<>();
+
+    @Autowired
+    private UserSetting userSetting;
+
+
+    @Autowired
+    private ZLMMediaListManager zlmMediaListManager;
+
+    @Autowired
+    private ZLMRTPServerFactory zlmrtpServerFactory;
+
+    @Autowired
+    private IMediaServerService mediaServerService;
+
+    @Autowired
+    private IRedisCatchStorage redisCatchStorage;
+
+    @Autowired
+    private DynamicTask dynamicTask;
+
+    @Autowired
+    private ZLMMediaListManager mediaListManager;
+
+    @Autowired
+    private ZlmHttpHookSubscribe subscribe;
+
+    private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
+
+    @Qualifier("taskExecutor")
+    @Autowired
+    private ThreadPoolTaskExecutor taskExecutor;
+
+
+    public interface PlayMsgCallback{
+        void handler(ResponseSendItemMsg responseSendItemMsg) throws ParseException;
+    }
+
+    public interface PlayMsgCallbackForStartSendRtpStream{
+        void handler(JSONObject jsonObject);
+    }
+
+    public interface PlayMsgErrorCallback{
+        void handler(WVPResult wvpResult);
+    }
+
+    @Override
+    public void onMessage(Message message, byte[] bytes) {
+        boolean isEmpty = taskQueue.isEmpty();
+        taskQueue.offer(message);
+        if (isEmpty) {
+            taskExecutor.execute(() -> {
+                while (!taskQueue.isEmpty()) {
+                    Message msg = taskQueue.poll();
+                    try {
+                        JSONObject msgJSON = JSON.parseObject(msg.getBody(), JSONObject.class);
+                        WvpRedisMsg wvpRedisMsg = JSON.to(WvpRedisMsg.class, msgJSON);
+                        if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) {
+                            continue;
+                        }
+                        if (WvpRedisMsg.isRequest(wvpRedisMsg)) {
+                            logger.info("[鏀跺埌REDIS閫氱煡] 璇锋眰锛� {}", new String(msg.getBody()));
+
+                            switch (wvpRedisMsg.getCmd()){
+                                case WvpRedisMsgCmd.GET_SEND_ITEM:
+                                    RequestSendItemMsg content = JSON.to(RequestSendItemMsg.class, wvpRedisMsg.getContent());
+                                    requestSendItemMsgHand(content, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
+                                    break;
+                                case WvpRedisMsgCmd.REQUEST_PUSH_STREAM:
+                                    RequestPushStreamMsg param = JSON.to(RequestPushStreamMsg.class, wvpRedisMsg.getContent());
+                                    requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
+                                    break;
+                                default:
+                                    break;
+                            }
+
+                        }else {
+                            logger.info("[鏀跺埌REDIS閫氱煡] 鍥炲锛� {}", new String(msg.getBody()));
+                            switch (wvpRedisMsg.getCmd()){
+                                case WvpRedisMsgCmd.GET_SEND_ITEM:
+
+                                   WVPResult content  = JSON.to(WVPResult.class, wvpRedisMsg.getContent());
+
+                                    String key = wvpRedisMsg.getSerial();
+                                    switch (content.getCode()) {
+                                        case 0:
+                                           ResponseSendItemMsg responseSendItemMsg =JSON.to(ResponseSendItemMsg.class, content.getData());
+                                            PlayMsgCallback playMsgCallback = callbacks.get(key);
+                                            if (playMsgCallback != null) {
+                                                callbacksForError.remove(key);
+                                                try {
+                                                    playMsgCallback.handler(responseSendItemMsg);
+                                                } catch (ParseException e) {
+                                                    logger.error("[REDIS娑堟伅澶勭悊寮傚父] ", e);
+                                                }
+                                            }
+                                            break;
+                                        case ERROR_CODE_MEDIA_SERVER_NOT_FOUND:
+                                        case ERROR_CODE_OFFLINE:
+                                        case ERROR_CODE_TIMEOUT:
+                                            PlayMsgErrorCallback errorCallback = callbacksForError.get(key);
+                                            if (errorCallback != null) {
+                                                callbacks.remove(key);
+                                                errorCallback.handler(content);
+                                            }
+                                            break;
+                                        default:
+                                            break;
+                                    }
+                                    break;
+                                case WvpRedisMsgCmd.REQUEST_PUSH_STREAM:
+                                    WVPResult wvpResult  = JSON.to(WVPResult.class, wvpRedisMsg.getContent());
+                                    String serial = wvpRedisMsg.getSerial();
+                                    switch (wvpResult.getCode()) {
+                                        case 0:
+                                            JSONObject jsonObject = (JSONObject)wvpResult.getData();
+                                            PlayMsgCallbackForStartSendRtpStream playMsgCallback = callbacksForStartSendRtpStream.get(serial);
+                                            if (playMsgCallback != null) {
+                                                callbacksForError.remove(serial);
+                                                playMsgCallback.handler(jsonObject);
+                                            }
+                                            break;
+                                        case ERROR_CODE_MEDIA_SERVER_NOT_FOUND:
+                                        case ERROR_CODE_OFFLINE:
+                                        case ERROR_CODE_TIMEOUT:
+                                            PlayMsgErrorCallback errorCallback = callbacksForError.get(serial);
+                                            if (errorCallback != null) {
+                                                callbacks.remove(serial);
+                                                errorCallback.handler(wvpResult);
+                                            }
+                                            break;
+                                        default:
+                                            break;
+                                    }
+                                    break;
+                                default:
+                                    break;
+                            }
+
+                        }
+                    }catch (Exception e) {
+                        logger.warn("[RedisGbPlayMsg] 鍙戠幇鏈鐞嗙殑寮傚父, {}",e.getMessage());
+                    }
+                }
+            });
+        }
+    }
+
+    /**
+     * 澶勭悊鏀跺埌鐨勮姹傛帹娴佺殑璇锋眰
+     */
+    private void requestPushStreamMsgHand(RequestPushStreamMsg requestPushStreamMsg, String fromId, String serial) {
+        MediaServerItem mediaInfo = mediaServerService.getOne(requestPushStreamMsg.getMediaServerId());
+        if (mediaInfo == null) {
+            // TODO 鍥炲閿欒
+            return;
+        }
+        String is_Udp = requestPushStreamMsg.isTcp() ? "0" : "1";
+        Map<String, Object> param = new HashMap<>();
+        param.put("vhost","__defaultVhost__");
+        param.put("app",requestPushStreamMsg.getApp());
+        param.put("stream",requestPushStreamMsg.getStream());
+        param.put("ssrc", requestPushStreamMsg.getSsrc());
+        param.put("dst_url",requestPushStreamMsg.getIp());
+        param.put("dst_port", requestPushStreamMsg.getPort());
+        param.put("is_udp", is_Udp);
+        param.put("src_port", requestPushStreamMsg.getSrcPort());
+        param.put("pt", requestPushStreamMsg.getPt());
+        param.put("use_ps", requestPushStreamMsg.isPs() ? "1" : "0");
+        param.put("only_audio", requestPushStreamMsg.isOnlyAudio() ? "1" : "0");
+        JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
+        // 鍥炲娑堟伅
+        responsePushStream(jsonObject, fromId, serial);
+    }
+
+    private void responsePushStream(JSONObject content, String toId, String serial) {
+
+        WVPResult<JSONObject> result = new WVPResult<>();
+        result.setCode(0);
+        result.setData(content);
+
+        WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), toId,
+                WvpRedisMsgCmd.REQUEST_PUSH_STREAM, serial, result);
+        JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
+        RedisUtil.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
+    }
+
+    /**
+     * 澶勭悊鏀跺埌鐨勮姹俿endItem鐨勮姹�
+     */
+    private void requestSendItemMsgHand(RequestSendItemMsg content, String toId, String serial) {
+        MediaServerItem mediaServerItem = mediaServerService.getOne(content.getMediaServerId());
+        if (mediaServerItem == null) {
+            logger.info("[鍥炲鎺ㄦ祦淇℃伅] 娴佸獟浣搟}涓嶅瓨鍦� ", content.getMediaServerId());
+
+            WVPResult<SendRtpItem> result = new WVPResult<>();
+            result.setCode(ERROR_CODE_MEDIA_SERVER_NOT_FOUND);
+            result.setMsg("娴佸獟浣撲笉瀛樺湪");
+
+            WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), toId,
+                    WvpRedisMsgCmd.GET_SEND_ITEM, serial, result);
+
+            JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
+            RedisUtil.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
+            return;
+        }
+        // 纭畾娴佹槸鍚﹀湪绾�
+        boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, content.getApp(), content.getStream());
+        if (streamReady) {
+            logger.info("[鍥炲鎺ㄦ祦淇℃伅]  {}/{}", content.getApp(), content.getStream());
+            responseSendItem(mediaServerItem, content, toId, serial);
+        }else {
+            // 娴佸凡缁忕绾�
+            // 鍙戦�乺edis娑堟伅浠ヤ娇璁惧涓婄嚎
+            logger.info("[ app={}, stream={} ]閫氶亾绂荤嚎锛屽彂閫乺edis淇℃伅鎺у埗璁惧寮�濮嬫帹娴�",content.getApp(), content.getStream());
+
+            String taskKey = UUID.randomUUID().toString();
+            // 璁剧疆瓒呮椂
+            dynamicTask.startDelay(taskKey, ()->{
+                logger.info("[ app={}, stream={} ] 绛夊緟璁惧寮�濮嬫帹娴佽秴鏃�", content.getApp(), content.getStream());
+                WVPResult<SendRtpItem> result = new WVPResult<>();
+                result.setCode(ERROR_CODE_TIMEOUT);
+                WvpRedisMsg response = WvpRedisMsg.getResponseInstance(
+                        userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, result
+                );
+                JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
+                RedisUtil.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
+            }, userSetting.getPlatformPlayTimeout());
+
+            // 娣诲姞璁㈤槄
+            HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(content.getApp(), content.getStream(), true, "rtsp", mediaServerItem.getId());
+
+            subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json)->{
+                        dynamicTask.stop(taskKey);
+                        responseSendItem(mediaServerItem, content, toId, serial);
+                    });
+
+            MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1, content.getApp(), content.getStream(),
+                    content.getChannelId(), content.getPlatformId(), content.getPlatformName(), content.getServerId(),
+                    content.getMediaServerId());
+            redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
+
+        }
+    }
+
+    /**
+     * 灏嗚幏鍙栧埌鐨剆endItem鍙戦�佸嚭鍘�
+     */
+    private void responseSendItem(MediaServerItem mediaServerItem, RequestSendItemMsg content, String toId, String serial) {
+        SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, content.getIp(),
+                content.getPort(), content.getSsrc(), content.getPlatformId(),
+                content.getApp(), content.getStream(), content.getChannelId(),
+                content.getTcp(), content.getRtcp());
+
+        WVPResult<ResponseSendItemMsg> result = new WVPResult<>();
+        result.setCode(0);
+        ResponseSendItemMsg responseSendItemMsg = new ResponseSendItemMsg();
+        responseSendItemMsg.setSendRtpItem(sendRtpItem);
+        responseSendItemMsg.setMediaServerItem(mediaServerItem);
+        result.setData(responseSendItemMsg);
+
+        WvpRedisMsg response = WvpRedisMsg.getResponseInstance(
+                userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, result
+        );
+        JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
+        RedisUtil.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
+    }
+
+    /**
+     * 鍙戦�佹秷鎭姹備笅绾х敓鎴愭帹娴佷俊鎭�
+     * @param serverId 涓嬬骇鏈嶅姟ID
+     * @param app 搴旂敤鍚�
+     * @param stream 娴両D
+     * @param ip 鐩爣IP
+     * @param port 鐩爣绔彛
+     * @param ssrc  ssrc
+     * @param platformId 骞冲彴鍥芥爣缂栧彿
+     * @param channelId 閫氶亾ID
+     * @param isTcp 鏄惁浣跨敤TCP
+     * @param callback 寰楀埌淇℃伅鐨勫洖璋�
+     */
+    public void sendMsg(String serverId, String mediaServerId, String app, String stream, String ip, int port, String ssrc,
+                        String platformId, String channelId, boolean isTcp, boolean rtcp, String platformName, PlayMsgCallback callback, PlayMsgErrorCallback errorCallback) {
+        RequestSendItemMsg requestSendItemMsg = RequestSendItemMsg.getInstance(
+                serverId, mediaServerId, app, stream, ip, port, ssrc, platformId, channelId, isTcp, rtcp, platformName);
+        requestSendItemMsg.setServerId(serverId);
+        String key = UUID.randomUUID().toString();
+        WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId, WvpRedisMsgCmd.GET_SEND_ITEM,
+                key, requestSendItemMsg);
+
+        JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg);
+        logger.info("[璇锋眰鎺ㄦ祦SendItem] {}: {}", serverId, jsonObject);
+        callbacks.put(key, callback);
+        callbacksForError.put(key, errorCallback);
+        dynamicTask.startDelay(key, ()->{
+            callbacks.remove(key);
+            callbacksForError.remove(key);
+            WVPResult<Object> wvpResult = new WVPResult<>();
+            wvpResult.setCode(ERROR_CODE_TIMEOUT);
+            wvpResult.setMsg("timeout");
+            errorCallback.handler(wvpResult);
+        }, userSetting.getPlatformPlayTimeout());
+        RedisUtil.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
+    }
+
+    /**
+     * 鍙戦�佽姹傛帹娴佺殑娑堟伅
+     * @param param 鎺ㄦ祦鍙傛暟
+     * @param callback 鍥炶皟
+     */
+    public void sendMsgForStartSendRtpStream(String serverId, RequestPushStreamMsg param, PlayMsgCallbackForStartSendRtpStream callback) {
+        String key = UUID.randomUUID().toString();
+        WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId,
+                WvpRedisMsgCmd.REQUEST_PUSH_STREAM, key, param);
+
+        JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg);
+        logger.info("[REDIS 璇锋眰鍏朵粬骞冲彴鎺ㄦ祦] {}: {}", serverId, jsonObject);
+        dynamicTask.startDelay(key, ()->{
+            callbacksForStartSendRtpStream.remove(key);
+            callbacksForError.remove(key);
+        }, userSetting.getPlatformPlayTimeout());
+        callbacksForStartSendRtpStream.put(key, callback);
+        callbacksForError.put(key, (wvpResult)->{
+            logger.info("[REDIS 璇锋眰鍏朵粬骞冲彴鎺ㄦ祦] 澶辫触: {}", wvpResult.getMsg());
+            callbacksForStartSendRtpStream.remove(key);
+            callbacksForError.remove(key);
+        });
+        RedisUtil.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
+    }
+}

--
Gitblit v1.8.0