From f4f3e60a6b84bb9368ac3d1bc515a96310fed1e8 Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期五, 27 十月 2023 15:47:27 +0800
Subject: [PATCH] Merge pull request #1082 from DavidSche/Live_streaming_push
---
 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java |   98 +++++++++++++++++++++++++++++++++++-------------
 1 files changed, 71 insertions(+), 27 deletions(-)
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java
old mode 100644
new mode 100755
index 4e73578..ad00baf
--- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java
@@ -1,20 +1,34 @@
 package com.genersoft.iot.vmp.service.redisMsg;
 
 import com.alibaba.fastjson2.JSON;
+import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType;
+import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
+import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
+import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
+import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
+import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
+import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
+import com.genersoft.iot.vmp.service.IMediaServerService;
+import com.genersoft.iot.vmp.service.IStreamPushService;
+import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
 import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse;
+import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
+import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.data.redis.connection.Message;
 import org.springframework.data.redis.connection.MessageListener;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Component;
-import org.springframework.util.ObjectUtils;
 
+import javax.sip.InvalidArgumentException;
+import javax.sip.SipException;
+import java.text.ParseException;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
 
 /**
  * 鎺ユ敹redis鍙戦�佺殑缁撴潫鎺ㄦ祦璇锋眰
@@ -25,11 +39,26 @@
 
     private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamCloseResponseListener.class);
 
-    private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
-
-    @Qualifier("taskExecutor")
     @Autowired
-    private ThreadPoolTaskExecutor taskExecutor;
+    private IStreamPushService streamPushService;
+
+    @Autowired
+    private IRedisCatchStorage redisCatchStorage;
+
+    @Autowired
+    private IVideoManagerStorage storager;
+
+    @Autowired
+    private ISIPCommanderForPlatform commanderFroPlatform;
+
+    @Autowired
+    private UserSetting userSetting;
+
+    @Autowired
+    private IMediaServerService mediaServerService;
+
+    @Autowired
+    private ZLMServerFactory zlmServerFactory;
 
 
     private Map<String, PushStreamResponseEvent> responseEvents = new ConcurrentHashMap<>();
@@ -40,30 +69,45 @@
 
     @Override
     public void onMessage(Message message, byte[] bytes) {
-        logger.info("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉]锛� {}", new String(message.getBody()));
-        boolean isEmpty = taskQueue.isEmpty();
-        taskQueue.offer(message);
-        if (isEmpty) {
-            taskExecutor.execute(() -> {
-                while (!taskQueue.isEmpty()) {
-                    Message msg = taskQueue.poll();
-                    try {
-                        MessageForPushChannelResponse response = JSON.parseObject(new String(msg.getBody()), MessageForPushChannelResponse.class);
-                        if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){
-                            logger.info("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉]锛氬弬鏁颁笉鍏�");
-                            continue;
+        logger.info("[REDIS娑堟伅-鎺ㄦ祦缁撴潫]锛� {}", new String(message.getBody()));
+        MessageForPushChannel pushChannel = JSON.parseObject(message.getBody(), MessageForPushChannel.class);
+        StreamPushItem push = streamPushService.getPush(pushChannel.getApp(), pushChannel.getStream());
+        if (push != null) {
+            if (redisCatchStorage.isChannelSendingRTP(push.getGbId())) {
+                List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId(
+                        push.getGbId());
+                if (sendRtpItems.size() > 0) {
+                    for (SendRtpItem sendRtpItem : sendRtpItems) {
+                        ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
+                        // 鍋滄鍚戜笂绾ф帹娴�
+                        String streamId = sendRtpItem.getStreamId();
+                        Map<String, Object> param = new HashMap<>();
+                        param.put("vhost","__defaultVhost__");
+                        param.put("app",sendRtpItem.getApp());
+                        param.put("stream",streamId);
+                        param.put("ssrc",sendRtpItem.getSsrc());
+                        logger.info("[REDIS娑堟伅-鎺ㄦ祦缁撴潫] 鍋滄鍚戜笂绾ф帹娴侊細{}", streamId);
+                        MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
+                        redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStreamId());
+                        zlmServerFactory.stopSendRtpStream(mediaInfo, param);
+
+                        try {
+                            commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem);
+                        } catch (SipException | InvalidArgumentException | ParseException e) {
+                            logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鍙戦�丅YE: {}", e.getMessage());
                         }
-                        // 鏌ョ湅姝e湪绛夊緟鐨刬nvite娑堟伅
-                        if (responseEvents.get(response.getApp() + response.getStream()) != null) {
-                            responseEvents.get(response.getApp() + response.getStream()).run(response);
+                        if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) {
+                            MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
+                                    sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(),
+                                    sendRtpItem.getPlatformId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
+                            messageForPushChannel.setPlatFormIndex(parentPlatform.getId());
+                            redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
                         }
-                    }catch (Exception e) {
-                        logger.warn("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉] 鍙戠幇鏈鐞嗙殑寮傚父, \r\n{}", JSON.toJSONString(message));
-                        logger.error("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉] 寮傚父鍐呭锛� ", e);
                     }
                 }
-            });
+            }
         }
+
     }
 
     public void addEvent(String app, String stream, PushStreamResponseEvent callback) {
--
Gitblit v1.8.0