From ab74d1cff90cc563e0eca8deb8f154d84eb51908 Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期三, 14 九月 2022 20:51:21 +0800
Subject: [PATCH] Merge branch 'wvp-28181-2.0'

---
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java |  164 +++++++++++++++++-------------------------------------
 1 files changed, 52 insertions(+), 112 deletions(-)

diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
index 5e6e8d7..b855bf7 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -19,14 +19,11 @@
 import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
 import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
 import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
-import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
+import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
 import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
 import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
 import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
-import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
-import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
-import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItemLite;
-import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
+import com.genersoft.iot.vmp.media.zlm.dto.*;
 import com.genersoft.iot.vmp.service.IMediaServerService;
 import com.genersoft.iot.vmp.service.IMediaService;
 import com.genersoft.iot.vmp.service.IPlayService;
@@ -35,6 +32,7 @@
 import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
 import com.genersoft.iot.vmp.service.impl.RedisGbPlayMsgListener;
+import com.genersoft.iot.vmp.service.impl.RedisPushStreamResponseListener;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
 import com.genersoft.iot.vmp.utils.DateUtil;
@@ -53,7 +51,6 @@
 
 import javax.sdp.*;
 import javax.sip.*;
-import javax.sip.address.SipURI;
 import javax.sip.header.CallIdHeader;
 import javax.sip.message.Request;
 import javax.sip.message.Response;
@@ -91,7 +88,7 @@
     private DynamicTask dynamicTask;
 
     @Autowired
-    private SIPCommander cmder;
+    private RedisPushStreamResponseListener redisPushStreamResponseListener;
 
     @Autowired
     private IPlayService playService;
@@ -115,6 +112,9 @@
 	private ZLMRESTfulUtils zlmresTfulUtils;
 
     @Autowired
+    private ZlmHttpHookSubscribe zlmHttpHookSubscribe;
+
+    @Autowired
     private SIPProcessorObserver sipProcessorObserver;
 
     @Autowired
@@ -130,7 +130,7 @@
 	private DeferredResultHolder resultHolder;
 
 	@Autowired
-	private ZLMHttpHookSubscribe subscribe;
+	private ZlmHttpHookSubscribe subscribe;
 
 	@Autowired
 	private SipConfig config;
@@ -282,16 +282,16 @@
                         String protocol = media.getProtocol();
 
                         // 鍖哄垎TCP鍙戞祦杩樻槸udp锛� 褰撳墠榛樿udp
-                        if ("TCP/RTP/AVP".equals(protocol)) {
+                        if ("TCP/RTP/AVP".equalsIgnoreCase(protocol)) {
                             String setup = mediaDescription.getAttribute("setup");
                             if (setup != null) {
                                 mediaTransmissionTCP = true;
-                                if ("active".equals(setup)) {
+                                if ("active".equalsIgnoreCase(setup)) {
                                     tcpActive = true;
                                     // 涓嶆敮鎸乼cp涓诲姩
                                     responseAck(evt, Response.NOT_IMPLEMENTED, "tcp active not support"); // 鐩綍涓嶆敮鎸佺偣鎾�
                                     return;
-                                } else if ("passive".equals(setup)) {
+                                } else if ("passive".equalsIgnoreCase(setup)) {
                                     tcpActive = false;
                                 }
                             }
@@ -336,11 +336,11 @@
                         return;
                     }
                     sendRtpItem.setCallId(callIdHeader.getCallId());
-                    sendRtpItem.setPlayType("Play".equals(sessionName) ? InviteStreamType.PLAY : InviteStreamType.PLAYBACK);
+                    sendRtpItem.setPlayType("Play".equalsIgnoreCase(sessionName) ? InviteStreamType.PLAY : InviteStreamType.PLAYBACK);
 
                     Long finalStartTime = startTime;
                     Long finalStopTime = stopTime;
-                    ZLMHttpHookSubscribe.Event hookEvent = (mediaServerItemInUSe, responseJSON) -> {
+                    ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInUSe, responseJSON) -> {
                         String app = responseJSON.getString("app");
                         String stream = responseJSON.getString("stream");
                         logger.info("[涓婄骇鐐规挱]涓嬬骇宸茬粡寮�濮嬫帹娴併�� 鍥炲200OK(SDP)锛� {}/{}", app, stream);
@@ -355,7 +355,7 @@
                         content.append("o=" + channelId + " 0 0 IN IP4 " + mediaServerItemInUSe.getSdpIp() + "\r\n");
                         content.append("s=" + sessionName + "\r\n");
                         content.append("c=IN IP4 " + mediaServerItemInUSe.getSdpIp() + "\r\n");
-                        if ("Playback".equals(sessionName)) {
+                        if ("Playback".equalsIgnoreCase(sessionName)) {
                             content.append("t=" + finalStartTime + " " + finalStopTime + "\r\n");
                         } else {
                             content.append("t=0 0\r\n");
@@ -399,7 +399,7 @@
                         }
                     });
                     sendRtpItem.setApp("rtp");
-                    if ("Playback".equals(sessionName)) {
+                    if ("Playback".equalsIgnoreCase(sessionName)) {
                         sendRtpItem.setPlayType(InviteStreamType.PLAYBACK);
                         SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, true, true);
                         sendRtpItem.setStreamId(ssrcInfo.getStream());
@@ -434,7 +434,14 @@
                         if (playTransaction != null) {
                             Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, "rtp", playTransaction.getStream());
                             if (!streamReady) {
-                                playTransaction = null;
+                                boolean hasRtpServer = mediaServerService.checkRtpServer(mediaServerItem, "rtp", playTransaction.getStream());
+                                if (hasRtpServer) {
+                                    logger.info("[涓婄骇鐐规挱]宸茬粡寮�鍚痳tpServer浣嗘槸灏氭湭鏀跺埌娴侊紝寮�鍚洃鍚祦鐨勫埌鏉�");
+                                    HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", playTransaction.getStream(), true, "rtsp", mediaServerItem.getId());
+                                    zlmHttpHookSubscribe.addSubscribe(hookSubscribe, hookEvent);
+                                }else {
+                                    playTransaction = null;
+                                }
                             }
                         }
                         if (playTransaction == null) {
@@ -443,6 +450,7 @@
                                 streamId = String.format("%s_%s", device.getDeviceId(), channelId);
                             }
                             SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, device.isSsrcCheck(), false);
+                            logger.info(JSONObject.toJSONString(ssrcInfo));
                             sendRtpItem.setStreamId(ssrcInfo.getStream());
                             // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶�
                             redisCatchStorage.updateSendRTPSever(sendRtpItem);
@@ -581,7 +589,6 @@
             otherWvpPushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                     mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
         }
-
     }
     /**
      * 閫氱煡娴佷笂绾�
@@ -596,7 +603,8 @@
             responseAck(evt, Response.BAD_REQUEST, "channel [" + gbStream.getGbId() + "] offline");
         } else if ("push".equals(gbStream.getStreamType())) {
             if (!platform.isStartOfflinePush()) {
-                responseAck(evt, Response.TEMPORARILY_UNAVAILABLE, "channel unavailable");
+                // 骞冲彴璁剧疆涓叧闂簡鎷夎捣绂荤嚎鐨勬帹娴佸垯鐩存帴鍥炲
+                responseAck(evt, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing");
                 return;
             }
             // 鍙戦�乺edis娑堟伅浠ヤ娇璁惧涓婄嚎
@@ -632,7 +640,7 @@
                             app, stream, channelId, mediaTransmissionTCP);
 
                     if (sendRtpItem == null) {
-                        logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�");
+                        logger.warn("涓婄骇鐐规椂鍒涘缓sendRTPItem澶辫触锛屽彲鑳芥槸鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�");
                         try {
                             responseAck(evt, Response.BUSY_HERE);
                         } catch (SipException e) {
@@ -661,6 +669,23 @@
                     // 鍏朵粬骞冲彴鍐呭
                     otherWvpPushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                             mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+                }
+            });
+
+            // 娣诲姞鍥炲鐨勬嫆缁濇垨鑰呴敊璇殑閫氱煡
+            redisPushStreamResponseListener.addEvent(gbStream.getApp(), gbStream.getStream(), response -> {
+                if (response.getCode() != 0) {
+                    dynamicTask.stop(callIdHeader.getCallId());
+                    mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream());
+                    try {
+                        responseAck(evt, Response.TEMPORARILY_UNAVAILABLE, response.getMsg());
+                    } catch (SipException e) {
+                        throw new RuntimeException(e);
+                    } catch (InvalidArgumentException e) {
+                        throw new RuntimeException(e);
+                    } catch (ParseException e) {
+                        throw new RuntimeException(e);
+                    }
                 }
             });
         }
@@ -779,13 +804,13 @@
         }
     }
 
-    public void inviteFromDeviceHandle(RequestEvent evt, String requesterId, String channelId1) throws InvalidArgumentException, ParseException, SipException, SdpException {
+    public void inviteFromDeviceHandle(RequestEvent evt, String requesterId, String channelId) throws InvalidArgumentException, ParseException, SipException, SdpException {
 
         // 闈炰笂绾у钩鍙拌姹傦紝鏌ヨ鏄惁璁惧璇锋眰锛堥�氬父涓烘帴鏀惰闊冲箍鎾殑璁惧锛�
         Device device = redisCatchStorage.getDevice(requesterId);
-        AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(requesterId, channelId1);
+        AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(requesterId, channelId);
         if (audioBroadcastCatch == null) {
-            logger.warn("鏉ヨ嚜璁惧鐨処nvite璇锋眰闈炶闊冲箍鎾紝宸插拷鐣�");
+            logger.warn("鏉ヨ嚜璁惧鐨処nvite璇锋眰闈炶闊冲箍鎾紝宸插拷鐣ワ紝requesterId锛� {}/{}", requesterId, channelId);
             responseAck(evt, Response.FORBIDDEN);
             return;
         }
@@ -883,101 +908,15 @@
 
             // hook鐩戝惉绛夊緟璁惧鎺ㄦ祦涓婃潵
             // 娣诲姞璁㈤槄
-            JSONObject subscribeKey = new JSONObject();
-            subscribeKey.put("app", app);
-            subscribeKey.put("stream", stream);
-            subscribeKey.put("regist", true);
-            subscribeKey.put("schema", "rtsp");
-            subscribeKey.put("mediaServerId", mediaServerItem.getId());
+            HookSubscribeForStreamChange subscribeKey = HookSubscribeFactory.on_stream_changed(app, stream, true, "rtsp", mediaServerItem.getId());
+
             String finalSsrc = ssrc;
             // 娴佸凡缁忓瓨鍦ㄦ椂鐩存帴鎺ㄦ祦
-//            JSONObject mediaInfo = zlmresTfulUtils.getMediaList(mediaServerItem, app, stream);
-//            System.out.println(mediaInfo != null);
-//            System.out.println(mediaInfo);
-//            if (mediaInfo != null &&
-//                    (mediaInfo.getInteger("code") != null && mediaInfo.getInteger("code") == 0
-//                            && mediaInfo.getJSONArray("data") != null && mediaInfo.getJSONArray("data").size() > 0)) {
-//                logger.info("鍙戠幇宸茬粡鍦ㄦ帹娴�");
-//                JSONArray tracks = mediaInfo.getJSONArray("data").getJSONObject(0).getJSONArray("tracks");
-//                Integer codecId = null;
-//                if (tracks != null && tracks.size() > 0) {
-//                    for (int i = 0; i < tracks.size(); i++) {
-//                        MediaItem.MediaTrack track = JSON.toJavaObject((JSON)tracks.get(i),MediaItem.MediaTrack.class);
-//                        if (track.getCodecType() == 1) {
-//                            codecId = track.getCodecId();
-//                            break;
-//                        }
-//                    }
-//                }
-//                sendRtpItem.setStatus(2);
-//                redisCatchStorage.updateSendRTPSever(sendRtpItem);
-//                StringBuffer content = new StringBuffer(200);
-//                content.append("v=0\r\n");
-//                content.append("o="+ config.getId() +" "+ sdp.getOrigin().getSessionId() +" " + sdp.getOrigin().getSessionVersion()  + " IN IP4 "+mediaServerItem.getSdpIp()+"\r\n");
-//                content.append("s=Play\r\n");
-//                content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n");
-//                content.append("t=0 0\r\n");
-//                if (codecId == null) {
-//                    if (mediaTransmissionTCP) {
-//                        content.append("m=audio "+ sendRtpItem.getLocalPort()+" TCP/RTP/AVP 8\r\n");
-//                    }else {
-//                        content.append("m=audio "+ sendRtpItem.getLocalPort()+" RTP/AVP 8\r\n");
-//                    }
-//
-//                    content.append("a=rtpmap:8 PCMA/8000\r\n");
-//                }else {
-//                    if (codecId == 4) {
-//                        if (mediaTransmissionTCP) {
-//                            content.append("m=audio "+ sendRtpItem.getLocalPort()+" TCP/RTP/AVP 0\r\n");
-//                        }else {
-//                            content.append("m=audio "+ sendRtpItem.getLocalPort()+" RTP/AVP 0\r\n");
-//                        }
-//                        content.append("a=rtpmap:0 PCMU/8000\r\n");
-//                    }else {
-//                        if (mediaTransmissionTCP) {
-//                            content.append("m=audio "+ sendRtpItem.getLocalPort()+" TCP/RTP/AVP 8\r\n");
-//                        }else {
-//                            content.append("m=audio "+ sendRtpItem.getLocalPort()+" RTP/AVP 8\r\n");
-//                        }
-//                        content.append("a=rtpmap:8 PCMA/8000\r\n");
-//                    }
-//                }
-//                if (sendRtpItem.isTcp()) {
-//                    content.append("a=connection:new\r\n");
-//                    if (!sendRtpItem.isTcpActive()) {
-//                        content.append("a=setup:active\r\n");
-//                    }else {
-//                        content.append("a=setup:passive\r\n");
-//                    }
-//                }
-//                content.append("a=sendonly\r\n");
-//                content.append("y="+ finalSsrc + "\r\n");
-//                content.append("f=v/////a/1/8/1\r\n");
-//
-//                ParentPlatform parentPlatform = new ParentPlatform();
-//                parentPlatform.setServerIP(device.getIp());
-//                parentPlatform.setServerPort(device.getPort());
-//                parentPlatform.setServerGBId(device.getDeviceId());
-//                try {
-//                    responseSdpAck(evt, content.toString(), parentPlatform);
-//                    Dialog dialog = evt.getDialog();
-//                    audioBroadcastCatch.setDialog((SIPDialog) dialog);
-//                    audioBroadcastCatch.setRequest((SIPRequest) request);
-//                    audioBroadcastManager.update(audioBroadcastCatch);
-//                } catch (SipException e) {
-//                    throw new RuntimeException(e);
-//                } catch (InvalidArgumentException e) {
-//                    throw new RuntimeException(e);
-//                } catch (ParseException e) {
-//                    throw new RuntimeException(e);
-//                }
-//            }else {
-                // 娴佷笉瀛樺湪鏃剁洃鍚祦涓婄嚎
                 // 璁剧疆绛夊緟鎺ㄦ祦鐨勮秴鏃�; 榛樿20s
                 String waiteStreamTimeoutTaskKey = "waite-stream-" + device.getDeviceId() + audioBroadcastCatch.getChannelId();
                 dynamicTask.startDelay(waiteStreamTimeoutTaskKey, ()->{
                     logger.info("绛夊緟鎺ㄦ祦瓒呮椂: {}/{}", app, stream);
-                    subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey);
+                    subscribe.removeSubscribe(subscribeKey);
                     playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId());
                     // 鍙戦�乥ye
                     try {
@@ -992,9 +931,10 @@
                 }, 20*1000);
 
                 boolean finalMediaTransmissionTCP = mediaTransmissionTCP;
-                subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
+                subscribe.addSubscribe(subscribeKey,
                         (MediaServerItem mediaServerItemInUse, JSONObject json)->{
                             logger.info("鏀跺埌璇煶瀵硅鎺ㄦ祦");
+                            dynamicTask.stop(waiteStreamTimeoutTaskKey);
                             MediaItem mediaItem = JSON.toJavaObject(json, MediaItem.class);
                             Integer audioCodecId = null;
                             if (mediaItem.getTracks() != null && mediaItem.getTracks().size() > 0) {

--
Gitblit v1.8.0