From 935221ab4112894ad3bc92ed91eff3af8bd2226b Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期一, 08 八月 2022 09:32:38 +0800
Subject: [PATCH] Merge pull request #567 from mrjackwang/wvp-28181-2.0

---
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java |   80 +++++++++++++++++++++++---
 src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java                         |    7 ++
 src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/media/MediaController.java                     |   29 +++++++++
 src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java                        |   39 +++++++++++-
 4 files changed, 139 insertions(+), 16 deletions(-)

diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
index e4b9e3e..34cb753 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -17,9 +17,11 @@
 import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
 import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
+import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
 import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
 import com.genersoft.iot.vmp.service.IMediaServerService;
 import com.genersoft.iot.vmp.service.IPlayService;
+import com.genersoft.iot.vmp.service.IStreamProxyService;
 import com.genersoft.iot.vmp.service.IStreamPushService;
 import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
@@ -65,6 +67,8 @@
 
     @Autowired
     private IStreamPushService streamPushService;
+    @Autowired
+    private IStreamProxyService streamProxyService;
 
     @Autowired
     private IRedisCatchStorage redisCatchStorage;
@@ -142,6 +146,7 @@
 
                 MediaServerItem mediaServerItem = null;
                 StreamPushItem streamPushItem = null;
+                StreamProxyItem proxyByAppAndStream =null;
                 // 涓嶆槸閫氶亾鍙兘鏄洿鎾祦
                 if (channel != null && gbStream == null) {
                     if (channel.getStatus() == 0) {
@@ -171,6 +176,13 @@
                         if ("push".equals(gbStream.getStreamType())) {
                             streamPushItem = streamPushService.getPush(gbStream.getApp(), gbStream.getStream());
                             if (streamPushItem == null) {
+                                logger.info("[ app={}, stream={} ]鎵句笉鍒皕lm {}锛岃繑鍥�410", gbStream.getApp(), gbStream.getStream(), mediaServerId);
+                                responseAck(evt, Response.GONE);
+                                return;
+                            }
+                        }else if("proxy".equals(gbStream.getStreamType())){
+                            proxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(gbStream.getApp(), gbStream.getStream());
+                            if (proxyByAppAndStream == null) {
                                 logger.info("[ app={}, stream={} ]鎵句笉鍒皕lm {}锛岃繑鍥�410", gbStream.getApp(), gbStream.getStream(), mediaServerId);
                                 responseAck(evt, Response.GONE);
                                 return;
@@ -416,14 +428,33 @@
                         }
                     }
                 } else if (gbStream != null) {
-                    if (streamPushItem != null && streamPushItem.isPushIng()) {
-                        // 鎺ㄦ祦鐘舵��
-                        pushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
-                                mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
-                    } else {
-                        // 鏈帹娴� 鎷夎捣
-                        notifyStreamOnline(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
-                                mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+                    if("push".equals(gbStream.getStreamType())) {
+                        if (streamPushItem != null && streamPushItem.isPushIng()) {
+                            // 鎺ㄦ祦鐘舵��
+                            pushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
+                                    mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+                        } else {
+                            // 鏈帹娴� 鎷夎捣
+                            notifyStreamOnline(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
+                                    mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+                        }
+                    }else if ("proxy".equals(gbStream.getStreamType())){
+                        if(null != proxyByAppAndStream &&proxyByAppAndStream.isStatus()){
+                            pushProxyStream(evt, gbStream,  platform, callIdHeader, mediaServerItem, port, tcpActive,
+                                    mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+                        }else{
+                            //寮�鍚唬鐞嗘媺娴�
+                            boolean start1 = streamProxyService.start(gbStream.getApp(), gbStream.getStream());
+                            if(start1) {
+                                pushProxyStream(evt, gbStream,  platform, callIdHeader, mediaServerItem, port, tcpActive,
+                                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+                            }else{
+                                //澶辫触鍚庨�氱煡
+                                notifyStreamOnline(evt, gbStream, null, platform, callIdHeader, mediaServerItem, port, tcpActive,
+                                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+                            }
+                        }
+
                     }
                 }
             }
@@ -442,7 +473,39 @@
     /**
      * 瀹夋帓鎺ㄦ祦
      */
+    private void pushProxyStream(RequestEvent evt, GbStream gbStream, ParentPlatform platform,
+                            CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
+                            int port, Boolean tcpActive, boolean mediaTransmissionTCP,
+                            String channelId, String addressStr, String ssrc, String requesterId) throws InvalidArgumentException, ParseException, SipException {
+            Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
+            if (streamReady) {
+                // 鑷钩鍙板唴瀹�
+                SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
+                        gbStream.getApp(), gbStream.getStream(), channelId,
+                        mediaTransmissionTCP);
 
+                if (sendRtpItem == null) {
+                    logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�");
+                    responseAck(evt, Response.BUSY_HERE);
+                    return;
+                }
+                if (tcpActive != null) {
+                    sendRtpItem.setTcpActive(tcpActive);
+                }
+                sendRtpItem.setPlayType(InviteStreamType.PUSH);
+                // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶�
+                sendRtpItem.setStatus(1);
+                sendRtpItem.setCallId(callIdHeader.getCallId());
+                byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog());
+                sendRtpItem.setDialog(dialogByteArray);
+                byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction());
+                sendRtpItem.setTransaction(transactionByteArray);
+                redisCatchStorage.updateSendRTPSever(sendRtpItem);
+                sendStreamAck(mediaServerItem, sendRtpItem, platform, evt);
+
+        }
+
+    }
     private void pushStream(RequestEvent evt, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
                             CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
                             int port, Boolean tcpActive, boolean mediaTransmissionTCP,
@@ -487,7 +550,6 @@
         }
 
     }
-
     /**
      * 閫氱煡娴佷笂绾�
      */
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
index c23cfcd..5974918 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
@@ -8,6 +8,7 @@
 import java.util.Map;
 import java.util.Set;
 
+import com.genersoft.iot.vmp.media.zlm.ZLMRunner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -53,6 +54,9 @@
 
     @Autowired
     private SipConfig sipConfig;
+
+    @Autowired
+    private ZLMRunner zlmRunner;
 
     @Value("${server.ssl.enabled:false}")
     private boolean sslEnabled;
@@ -277,7 +281,13 @@
             return null;
         }
         String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerId;
-        return (MediaServerItem)redisUtil.get(key);
+        MediaServerItem serverItem=(MediaServerItem)redisUtil.get(key);
+        if(null==serverItem){
+            //zlm鏈嶅姟涓嶅湪绾匡紝鍚姩閲嶈繛
+            reloadZlm();
+            serverItem=(MediaServerItem)redisUtil.get(key);
+        }
+        return serverItem;
     }
 
     @Override
@@ -470,8 +480,13 @@
         String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId();
 
         if (redisUtil.zSize(key)  == null || redisUtil.zSize(key) == 0) {
-            logger.info("鑾峰彇璐熻浇鏈�浣庣殑鑺傜偣鏃舵棤鍦ㄧ嚎鑺傜偣");
-            return null;
+            logger.info("鑾峰彇璐熻浇鏈�浣庣殑鑺傜偣鏃舵棤鍦ㄧ嚎鑺傜偣锛屽惎鍔ㄩ噸杩炴満鍒�");
+            //鍚姩閲嶈繛
+            reloadZlm();
+            if (redisUtil.zSize(key)  == null || redisUtil.zSize(key) == 0) {
+                logger.info("鑾峰彇璐熻浇鏈�浣庣殑鑺傜偣鏃舵棤鍦ㄧ嚎鑺傜偣");
+                return null;
+            }
         }
 
         // 鑾峰彇鍒嗘暟鏈�浣庣殑锛屽強骞跺彂鏈�浣庣殑
@@ -633,8 +648,14 @@
         MediaServerItem mediaServerItem = getOne(mediaServerId);
         if (mediaServerItem == null) {
             // zlm杩炴帴閲嶈瘯
-            logger.warn("[鏇存柊ZLM 淇濇椿淇℃伅]澶辫触锛屾湭鎵惧埌娴佸獟浣撲俊鎭�");
-            return;
+            logger.warn("[鏇存柊ZLM 淇濇椿淇℃伅]澶辫触锛屾湭鎵惧埌娴佸獟浣撲俊鎭�,灏濊瘯閲嶈繛zlm");
+            reloadZlm();
+            mediaServerItem = getOne(mediaServerId);
+            if (mediaServerItem == null) {
+                // zlm杩炴帴閲嶈瘯
+                logger.warn("[鏇存柊ZLM 淇濇椿淇℃伅]澶辫触锛屾湭鎵惧埌娴佸獟浣撲俊鎭�");
+                return;
+            }
         }
         String key = VideoManagerConstants.MEDIA_SERVER_KEEPALIVE_PREFIX + userSetting.getServerId() + "_" + mediaServerId;
         int hookAliveInterval = mediaServerItem.getHookAliveInterval() + 2;
@@ -657,4 +678,12 @@
         }
     }
 
+    public void reloadZlm(){
+        try {
+            zlmRunner.run();
+            Thread.sleep(500);//寤惰繜0.5绉掔紦鍐叉椂闂�
+        } catch (Exception e) {
+            logger.warn("灏濊瘯閲嶈繛zlm澶辫触锛�",e);
+        }
+    }
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
index 5fa83f5..6c6c04b 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
@@ -4,6 +4,7 @@
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
 import com.alibaba.fastjson.TypeReference;
+import com.genersoft.iot.vmp.conf.MediaConfig;
 import com.genersoft.iot.vmp.conf.UserSetting;
 import com.genersoft.iot.vmp.gb28181.bean.*;
 import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
@@ -78,6 +79,10 @@
     @Autowired
     TransactionDefinition transactionDefinition;
 
+    @Autowired
+    private MediaConfig mediaConfig;
+
+
     @Override
     public List<StreamPushItem> handleJSON(String jsonData, MediaServerItem mediaServerItem) {
         if (jsonData == null) {
@@ -142,6 +147,8 @@
         stream.setStreamType("push");
         stream.setStatus(true);
         stream.setCreateTime(DateUtil.getNow());
+        stream.setStreamType("push");
+        stream.setMediaServerId(mediaConfig.getId());
         int add = gbStreamMapper.add(stream);
         return add > 0;
     }
diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/media/MediaController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/media/MediaController.java
index 94fe8df..48973f9 100644
--- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/media/MediaController.java
+++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/media/MediaController.java
@@ -6,6 +6,7 @@
 import com.genersoft.iot.vmp.media.zlm.dto.OnPublishHookParam;
 import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
 import com.genersoft.iot.vmp.service.IMediaServerService;
+import com.genersoft.iot.vmp.service.IStreamProxyService;
 import com.genersoft.iot.vmp.service.IStreamPushService;
 import com.genersoft.iot.vmp.service.IMediaService;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@@ -37,6 +38,8 @@
 
     @Autowired
     private IMediaService mediaService;
+    @Autowired
+    private IStreamProxyService streamProxyService;
 
 
     /**
@@ -95,8 +98,30 @@
             result.setMsg("scccess");
             result.setData(streamInfo);
         }else {
-            result.setCode(-1);
-            result.setMsg("fail");
+            //鑾峰彇娴佸け璐ワ紝閲嶅惎鎷夋祦鍚庨噸璇曚竴娆�
+            streamProxyService.stop(app,stream);
+            boolean start = streamProxyService.start(app, stream);
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+            if (useSourceIpAsStreamIp != null && useSourceIpAsStreamIp) {
+                String host = request.getHeader("Host");
+                String localAddr = host.split(":")[0];
+                logger.info("浣跨敤{}浣滀负杩斿洖娴佺殑ip", localAddr);
+                streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, localAddr, authority);
+            }else {
+                streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, authority);
+            }
+            if (streamInfo != null){
+                result.setCode(0);
+                result.setMsg("scccess");
+                result.setData(streamInfo);
+            }else {
+                result.setCode(-1);
+                result.setMsg("fail");
+            }
         }
         return result;
     }

--
Gitblit v1.8.0