From 7d530099878dbcf474182bbbfc694ecfe74c9fbd Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期日, 07 四月 2024 20:06:48 +0800
Subject: [PATCH] 优化推流通知

---
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java |   22 +++--
 src/main/java/com/genersoft/iot/vmp/vmanager/cloudRecord/CloudRecordController.java                 |    5 -
 src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java                                   |   29 +++++++
 src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java                              |   53 ++++++++-----
 src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java                               |    2 
 src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java                        |    6 +
 src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java                                |    2 
 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java                    |   98 +++++++++++------------
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java    |    2 
 src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java                        |    4 -
 10 files changed, 132 insertions(+), 91 deletions(-)

diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java
index 361bdc6..30193d2 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java
@@ -305,4 +305,33 @@
     public void setReceiveStream(String receiveStream) {
         this.receiveStream = receiveStream;
     }
+
+    @Override
+    public String toString() {
+        return "SendRtpItem{" +
+                "ip='" + ip + '\'' +
+                ", port=" + port +
+                ", ssrc='" + ssrc + '\'' +
+                ", platformId='" + platformId + '\'' +
+                ", deviceId='" + deviceId + '\'' +
+                ", app='" + app + '\'' +
+                ", channelId='" + channelId + '\'' +
+                ", status=" + status +
+                ", stream='" + stream + '\'' +
+                ", tcp=" + tcp +
+                ", tcpActive=" + tcpActive +
+                ", localPort=" + localPort +
+                ", mediaServerId='" + mediaServerId + '\'' +
+                ", serverId='" + serverId + '\'' +
+                ", CallId='" + CallId + '\'' +
+                ", fromTag='" + fromTag + '\'' +
+                ", toTag='" + toTag + '\'' +
+                ", pt=" + pt +
+                ", usePs=" + usePs +
+                ", onlyAudio=" + onlyAudio +
+                ", rtcp=" + rtcp +
+                ", playType=" + playType +
+                ", receiveStream='" + receiveStream + '\'' +
+                '}';
+    }
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
index 7004820..242e5ef 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
@@ -116,7 +116,7 @@
 
 		if (parentPlatform != null) {
 			Map<String, Object> param = getSendRtpParam(sendRtpItem);
-			if (mediaInfo == null) {
+			if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) {
 				RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(
 						sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStream(),
 						sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(),
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
index 3205498..74fad54 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -575,14 +575,20 @@
                     }
 
                     if ("push".equals(gbStream.getStreamType())) {
-                        if (streamPushItem != null && streamPushItem.isPushIng()) {
-                            // 鎺ㄦ祦鐘舵��
-                            pushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
-                                    mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
-                        } else {
-                            // 鏈帹娴� 鎷夎捣
-                            notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
-                                    mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+                        if (streamPushItem != null) {
+                            // 浠巖edis鏌ヨ鏄惁姝e湪鎺ユ敹杩欎釜鎺ㄦ祦
+                            OnStreamChangedHookParam pushListItem = redisCatchStorage.getPushListItem(gbStream.getApp(), gbStream.getStream());
+                            if (pushListItem != null) {
+                                StreamPushItem transform = streamPushService.transform(pushListItem);
+                                transform.setSelf(userSetting.getServerId().equals(pushListItem.getSeverId()));
+                                // 鎺ㄦ祦鐘舵��
+                                pushStream(evt, request, gbStream, transform, platform, callIdHeader, mediaServerItem, port, tcpActive,
+                                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+                            }else {
+                                // 鏈帹娴� 鎷夎捣
+                                notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
+                                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+                            }
                         }
                     } else if ("proxy".equals(gbStream.getStreamType())) {
                         if (null != proxyByAppAndStream) {
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
index 5feb306..3a7f452 100755
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -509,32 +509,43 @@
                     List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(param.getStream());
                     if (!sendRtpItems.isEmpty()) {
                         for (SendRtpItem sendRtpItem : sendRtpItems) {
-                            if (sendRtpItem != null && sendRtpItem.getApp().equals(param.getApp())) {
-                                String platformId = sendRtpItem.getPlatformId();
-                                ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
-                                Device device = deviceService.getDevice(platformId);
+                            if (sendRtpItem == null) {
+                                continue;
+                            }
 
-                                try {
-                                    if (platform != null) {
-                                        commanderFroPlatform.streamByeCmd(platform, sendRtpItem);
-                                        redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(),
-                                                sendRtpItem.getCallId(), sendRtpItem.getStream());
-                                    } else {
-                                        cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId());
-                                        if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST)
-                                                || sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) {
-                                            AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
-                                            if (audioBroadcastCatch != null) {
-                                                // 鏉ヨ嚜涓婄骇骞冲彴鐨勫仠姝㈠璁�
-                                                logger.info("[鍋滄瀵硅] 鏉ヨ嚜涓婄骇锛屽钩鍙帮細{}, 閫氶亾锛歿}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
-                                                audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+                            if (sendRtpItem.getApp().equals(param.getApp())) {
+                                logger.info(sendRtpItem.toString());
+                                if (userSetting.getServerId().equals(sendRtpItem.getServerId())) {
+                                    // 閫氱煡鍏朵粬wvp鍋滄鍙戞祦
+//                                    redisCatchStorage.sendRtp
+                                }else {
+                                    String platformId = sendRtpItem.getPlatformId();
+                                    ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
+                                    Device device = deviceService.getDevice(platformId);
+
+                                    try {
+                                        if (platform != null) {
+                                            commanderFroPlatform.streamByeCmd(platform, sendRtpItem);
+                                            redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(),
+                                                    sendRtpItem.getCallId(), sendRtpItem.getStream());
+                                        } else {
+                                            cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId());
+                                            if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST)
+                                                    || sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) {
+                                                AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+                                                if (audioBroadcastCatch != null) {
+                                                    // 鏉ヨ嚜涓婄骇骞冲彴鐨勫仠姝㈠璁�
+                                                    logger.info("[鍋滄瀵硅] 鏉ヨ嚜涓婄骇锛屽钩鍙帮細{}, 閫氶亾锛歿}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+                                                    audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+                                                }
                                             }
                                         }
+                                    } catch (SipException | InvalidArgumentException | ParseException |
+                                             SsrcTransactionNotFoundException e) {
+                                        logger.error("[鍛戒护鍙戦�佸け璐 鍙戦�丅YE: {}", e.getMessage());
                                     }
-                                } catch (SipException | InvalidArgumentException | ParseException |
-                                         SsrcTransactionNotFoundException e) {
-                                    logger.error("[鍛戒护鍙戦�佸け璐 鍙戦�丅YE: {}", e.getMessage());
                                 }
+
                             }
                         }
                     }
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
index c8d203f..8c0e9b0 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -1170,7 +1170,7 @@
             dynamicTask.startDelay(key, ()->{
                 logger.info("[璇煶骞挎挱]绛夊緟invite娑堟伅瓒呮椂锛歿}/{}", device.getDeviceId(), channelId);
                 stopAudioBroadcast(device.getDeviceId(), channelId);
-            }, 2000);
+            }, 10*1000);
         }, eventResultForError -> {
             // 鍙戦�佸け璐�
             logger.error("璇煶骞挎挱鍙戦�佸け璐ワ細 {}:{}", channelId, eventResultForError.msg);
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
index 59c5ace..f0230f7 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
@@ -8,7 +8,6 @@
 import com.genersoft.iot.vmp.conf.DynamicTask;
 import com.genersoft.iot.vmp.conf.UserSetting;
 import com.genersoft.iot.vmp.conf.exception.ControllerException;
-import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
 import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
 import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
 import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
@@ -25,7 +24,6 @@
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
 import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
-import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper;
 import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper;
 import com.genersoft.iot.vmp.storager.dao.StreamProxyMapper;
 import com.genersoft.iot.vmp.utils.DateUtil;
@@ -333,8 +331,6 @@
             result = zlmresTfulUtils.addStreamProxy(mediaServerItem, param.getApp(), param.getStream(), param.getUrl().trim(),
                     param.isEnableAudio(), param.isEnableMp4(), param.getRtpType());
         }
-        System.out.println("addStreamProxyToZlm====");
-        System.out.println(result);
         if (result != null && result.getInteger("code") == 0) {
             JSONObject data = result.getJSONObject("data");
             if (data == null) {
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java
index f5f2948..0912f0b 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java
@@ -1,11 +1,7 @@
 package com.genersoft.iot.vmp.service.redisMsg;
 
-import com.alibaba.fastjson2.JSON;
-import com.alibaba.fastjson2.JSONObject;
 import com.genersoft.iot.vmp.conf.UserSetting;
-
 import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
-import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -41,52 +37,52 @@
 
     @Override
     public void onMessage(Message message, byte[] bytes) {
-        boolean isEmpty = taskQueue.isEmpty();
-        taskQueue.offer(message);
-        if (isEmpty) {
-            taskExecutor.execute(() -> {
-                while (!taskQueue.isEmpty()) {
-                    Message msg = taskQueue.poll();
-                    try {
-                        JSONObject steamMsgJson = JSON.parseObject(msg.getBody(), JSONObject.class);
-                        if (steamMsgJson == null) {
-                            logger.warn("[鏀跺埌redis 娴佸彉鍖朷娑堟伅瑙f瀽澶辫触");
-                            continue;
-                        }
-                        String serverId = steamMsgJson.getString("serverId");
-
-                        if (userSetting.getServerId().equals(serverId)) {
-                            // 鑷繁鍙戦�佺殑娑堟伅蹇界暐鍗冲彲
-                            continue;
-                        }
-                        logger.info("[鏀跺埌redis 娴佸彉鍖朷锛� {}", new String(message.getBody()));
-                        String app = steamMsgJson.getString("app");
-                        String stream = steamMsgJson.getString("stream");
-                        boolean register = steamMsgJson.getBoolean("register");
-                        String mediaServerId = steamMsgJson.getString("mediaServerId");
-                        OnStreamChangedHookParam onStreamChangedHookParam = new OnStreamChangedHookParam();
-                        onStreamChangedHookParam.setSeverId(serverId);
-                        onStreamChangedHookParam.setApp(app);
-                        onStreamChangedHookParam.setStream(stream);
-                        onStreamChangedHookParam.setRegist(register);
-                        onStreamChangedHookParam.setMediaServerId(mediaServerId);
-                        onStreamChangedHookParam.setCreateStamp(System.currentTimeMillis()/1000);
-                        onStreamChangedHookParam.setAliveSecond(0L);
-                        onStreamChangedHookParam.setTotalReaderCount("0");
-                        onStreamChangedHookParam.setOriginType(0);
-                        onStreamChangedHookParam.setOriginTypeStr("0");
-                        onStreamChangedHookParam.setOriginTypeStr("unknown");
-                        if (register) {
-                            zlmMediaListManager.addPush(onStreamChangedHookParam);
-                        }else {
-                            zlmMediaListManager.removeMedia(app, stream);
-                        }
-                    }catch (Exception e) {
-                        logger.warn("[REDIS娑堟伅-娴佸彉鍖朷 鍙戠幇鏈鐞嗙殑寮傚父, \r\n{}", JSON.toJSONString(message));
-                        logger.error("[REDIS娑堟伅-娴佸彉鍖朷 寮傚父鍐呭锛� ", e);
-                    }
-                }
-            });
-        }
+//        boolean isEmpty = taskQueue.isEmpty();
+//        taskQueue.offer(message);
+//        if (isEmpty) {
+//            taskExecutor.execute(() -> {
+//                while (!taskQueue.isEmpty()) {
+//                    Message msg = taskQueue.poll();
+//                    try {
+//                        JSONObject steamMsgJson = JSON.parseObject(msg.getBody(), JSONObject.class);
+//                        if (steamMsgJson == null) {
+//                            logger.warn("[鏀跺埌redis 娴佸彉鍖朷娑堟伅瑙f瀽澶辫触");
+//                            continue;
+//                        }
+//                        String serverId = steamMsgJson.getString("serverId");
+//
+//                        if (userSetting.getServerId().equals(serverId)) {
+//                            // 鑷繁鍙戦�佺殑娑堟伅蹇界暐鍗冲彲
+//                            continue;
+//                        }
+//                        logger.info("[鏀跺埌redis 娴佸彉鍖朷锛� {}", new String(message.getBody()));
+//                        String app = steamMsgJson.getString("app");
+//                        String stream = steamMsgJson.getString("stream");
+//                        boolean register = steamMsgJson.getBoolean("register");
+//                        String mediaServerId = steamMsgJson.getString("mediaServerId");
+//                        OnStreamChangedHookParam onStreamChangedHookParam = new OnStreamChangedHookParam();
+//                        onStreamChangedHookParam.setSeverId(serverId);
+//                        onStreamChangedHookParam.setApp(app);
+//                        onStreamChangedHookParam.setStream(stream);
+//                        onStreamChangedHookParam.setRegist(register);
+//                        onStreamChangedHookParam.setMediaServerId(mediaServerId);
+//                        onStreamChangedHookParam.setCreateStamp(System.currentTimeMillis()/1000);
+//                        onStreamChangedHookParam.setAliveSecond(0L);
+//                        onStreamChangedHookParam.setTotalReaderCount("0");
+//                        onStreamChangedHookParam.setOriginType(0);
+//                        onStreamChangedHookParam.setOriginTypeStr("0");
+//                        onStreamChangedHookParam.setOriginTypeStr("unknown");
+//                        if (register) {
+//                            zlmMediaListManager.addPush(onStreamChangedHookParam);
+//                        }else {
+//                            zlmMediaListManager.removeMedia(app, stream);
+//                        }
+//                    }catch (Exception e) {
+//                        logger.warn("[REDIS娑堟伅-娴佸彉鍖朷 鍙戠幇鏈鐞嗙殑寮傚父, \r\n{}", JSON.toJSONString(message));
+//                        logger.error("[REDIS娑堟伅-娴佸彉鍖朷 寮傚父鍐呭锛� ", e);
+//                    }
+//                }
+//            });
+//        }
     }
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
index 78fd280..06b1d56 100755
--- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
@@ -211,5 +211,7 @@
 
     void addPushListItem(String app, String stream, OnStreamChangedHookParam param);
 
+    OnStreamChangedHookParam getPushListItem(String app, String stream);
+
     void removePushListItem(String app, String stream, String mediaServerId);
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
index 18a037d..a1a0033 100755
--- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -657,6 +657,12 @@
     }
 
     @Override
+    public OnStreamChangedHookParam getPushListItem(String app, String stream) {
+        String key = VideoManagerConstants.PUSH_STREAM_LIST + app + "_" + stream;
+        return (OnStreamChangedHookParam)redisTemplate.opsForValue().get(key);
+    }
+
+    @Override
     public void removePushListItem(String app, String stream, String mediaServerId) {
         String key = VideoManagerConstants.PUSH_STREAM_LIST + app + "_" + stream;
         OnStreamChangedHookParam param = (OnStreamChangedHookParam)redisTemplate.opsForValue().get(key);
diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/cloudRecord/CloudRecordController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/cloudRecord/CloudRecordController.java
index b3d1990..b37a3d9 100755
--- a/src/main/java/com/genersoft/iot/vmp/vmanager/cloudRecord/CloudRecordController.java
+++ b/src/main/java/com/genersoft/iot/vmp/vmanager/cloudRecord/CloudRecordController.java
@@ -1,12 +1,8 @@
 package com.genersoft.iot.vmp.vmanager.cloudRecord;
 
 import com.alibaba.fastjson2.JSONArray;
-import com.genersoft.iot.vmp.conf.DynamicTask;
-import com.genersoft.iot.vmp.conf.UserSetting;
 import com.genersoft.iot.vmp.conf.exception.ControllerException;
 import com.genersoft.iot.vmp.conf.security.JwtUtils;
-import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
-import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import com.genersoft.iot.vmp.service.ICloudRecordService;
 import com.genersoft.iot.vmp.service.IMediaServerService;
@@ -22,7 +18,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.web.bind.annotation.*;
 
 import javax.servlet.http.HttpServletRequest;

--
Gitblit v1.8.0