From ab74d1cff90cc563e0eca8deb8f154d84eb51908 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期三, 14 九月 2022 20:51:21 +0800 Subject: [PATCH] Merge branch 'wvp-28181-2.0' --- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java | 164 +++++++++++++++++------------------------------------- 1 files changed, 52 insertions(+), 112 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 5e6e8d7..b855bf7 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -19,14 +19,11 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; -import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; -import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; -import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; -import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItemLite; -import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; +import com.genersoft.iot.vmp.media.zlm.dto.*; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IMediaService; import com.genersoft.iot.vmp.service.IPlayService; @@ -35,6 +32,7 @@ import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.service.impl.RedisGbPlayMsgListener; +import com.genersoft.iot.vmp.service.impl.RedisPushStreamResponseListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.DateUtil; @@ -53,7 +51,6 @@ import javax.sdp.*; import javax.sip.*; -import javax.sip.address.SipURI; import javax.sip.header.CallIdHeader; import javax.sip.message.Request; import javax.sip.message.Response; @@ -91,7 +88,7 @@ private DynamicTask dynamicTask; @Autowired - private SIPCommander cmder; + private RedisPushStreamResponseListener redisPushStreamResponseListener; @Autowired private IPlayService playService; @@ -115,6 +112,9 @@ private ZLMRESTfulUtils zlmresTfulUtils; @Autowired + private ZlmHttpHookSubscribe zlmHttpHookSubscribe; + + @Autowired private SIPProcessorObserver sipProcessorObserver; @Autowired @@ -130,7 +130,7 @@ private DeferredResultHolder resultHolder; @Autowired - private ZLMHttpHookSubscribe subscribe; + private ZlmHttpHookSubscribe subscribe; @Autowired private SipConfig config; @@ -282,16 +282,16 @@ String protocol = media.getProtocol(); // 鍖哄垎TCP鍙戞祦杩樻槸udp锛� 褰撳墠榛樿udp - if ("TCP/RTP/AVP".equals(protocol)) { + if ("TCP/RTP/AVP".equalsIgnoreCase(protocol)) { String setup = mediaDescription.getAttribute("setup"); if (setup != null) { mediaTransmissionTCP = true; - if ("active".equals(setup)) { + if ("active".equalsIgnoreCase(setup)) { tcpActive = true; // 涓嶆敮鎸乼cp涓诲姩 responseAck(evt, Response.NOT_IMPLEMENTED, "tcp active not support"); // 鐩綍涓嶆敮鎸佺偣鎾� return; - } else if ("passive".equals(setup)) { + } else if ("passive".equalsIgnoreCase(setup)) { tcpActive = false; } } @@ -336,11 +336,11 @@ return; } sendRtpItem.setCallId(callIdHeader.getCallId()); - sendRtpItem.setPlayType("Play".equals(sessionName) ? InviteStreamType.PLAY : InviteStreamType.PLAYBACK); + sendRtpItem.setPlayType("Play".equalsIgnoreCase(sessionName) ? InviteStreamType.PLAY : InviteStreamType.PLAYBACK); Long finalStartTime = startTime; Long finalStopTime = stopTime; - ZLMHttpHookSubscribe.Event hookEvent = (mediaServerItemInUSe, responseJSON) -> { + ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInUSe, responseJSON) -> { String app = responseJSON.getString("app"); String stream = responseJSON.getString("stream"); logger.info("[涓婄骇鐐规挱]涓嬬骇宸茬粡寮�濮嬫帹娴併�� 鍥炲200OK(SDP)锛� {}/{}", app, stream); @@ -355,7 +355,7 @@ content.append("o=" + channelId + " 0 0 IN IP4 " + mediaServerItemInUSe.getSdpIp() + "\r\n"); content.append("s=" + sessionName + "\r\n"); content.append("c=IN IP4 " + mediaServerItemInUSe.getSdpIp() + "\r\n"); - if ("Playback".equals(sessionName)) { + if ("Playback".equalsIgnoreCase(sessionName)) { content.append("t=" + finalStartTime + " " + finalStopTime + "\r\n"); } else { content.append("t=0 0\r\n"); @@ -399,7 +399,7 @@ } }); sendRtpItem.setApp("rtp"); - if ("Playback".equals(sessionName)) { + if ("Playback".equalsIgnoreCase(sessionName)) { sendRtpItem.setPlayType(InviteStreamType.PLAYBACK); SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, true, true); sendRtpItem.setStreamId(ssrcInfo.getStream()); @@ -434,7 +434,14 @@ if (playTransaction != null) { Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, "rtp", playTransaction.getStream()); if (!streamReady) { - playTransaction = null; + boolean hasRtpServer = mediaServerService.checkRtpServer(mediaServerItem, "rtp", playTransaction.getStream()); + if (hasRtpServer) { + logger.info("[涓婄骇鐐规挱]宸茬粡寮�鍚痳tpServer浣嗘槸灏氭湭鏀跺埌娴侊紝寮�鍚洃鍚祦鐨勫埌鏉�"); + HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", playTransaction.getStream(), true, "rtsp", mediaServerItem.getId()); + zlmHttpHookSubscribe.addSubscribe(hookSubscribe, hookEvent); + }else { + playTransaction = null; + } } } if (playTransaction == null) { @@ -443,6 +450,7 @@ streamId = String.format("%s_%s", device.getDeviceId(), channelId); } SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, device.isSsrcCheck(), false); + logger.info(JSONObject.toJSONString(ssrcInfo)); sendRtpItem.setStreamId(ssrcInfo.getStream()); // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� redisCatchStorage.updateSendRTPSever(sendRtpItem); @@ -581,7 +589,6 @@ otherWvpPushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); } - } /** * 閫氱煡娴佷笂绾� @@ -596,7 +603,8 @@ responseAck(evt, Response.BAD_REQUEST, "channel [" + gbStream.getGbId() + "] offline"); } else if ("push".equals(gbStream.getStreamType())) { if (!platform.isStartOfflinePush()) { - responseAck(evt, Response.TEMPORARILY_UNAVAILABLE, "channel unavailable"); + // 骞冲彴璁剧疆涓叧闂簡鎷夎捣绂荤嚎鐨勬帹娴佸垯鐩存帴鍥炲 + responseAck(evt, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing"); return; } // 鍙戦�乺edis娑堟伅浠ヤ娇璁惧涓婄嚎 @@ -632,7 +640,7 @@ app, stream, channelId, mediaTransmissionTCP); if (sendRtpItem == null) { - logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�"); + logger.warn("涓婄骇鐐规椂鍒涘缓sendRTPItem澶辫触锛屽彲鑳芥槸鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�"); try { responseAck(evt, Response.BUSY_HERE); } catch (SipException e) { @@ -661,6 +669,23 @@ // 鍏朵粬骞冲彴鍐呭 otherWvpPushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + } + }); + + // 娣诲姞鍥炲鐨勬嫆缁濇垨鑰呴敊璇殑閫氱煡 + redisPushStreamResponseListener.addEvent(gbStream.getApp(), gbStream.getStream(), response -> { + if (response.getCode() != 0) { + dynamicTask.stop(callIdHeader.getCallId()); + mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream()); + try { + responseAck(evt, Response.TEMPORARILY_UNAVAILABLE, response.getMsg()); + } catch (SipException e) { + throw new RuntimeException(e); + } catch (InvalidArgumentException e) { + throw new RuntimeException(e); + } catch (ParseException e) { + throw new RuntimeException(e); + } } }); } @@ -779,13 +804,13 @@ } } - public void inviteFromDeviceHandle(RequestEvent evt, String requesterId, String channelId1) throws InvalidArgumentException, ParseException, SipException, SdpException { + public void inviteFromDeviceHandle(RequestEvent evt, String requesterId, String channelId) throws InvalidArgumentException, ParseException, SipException, SdpException { // 闈炰笂绾у钩鍙拌姹傦紝鏌ヨ鏄惁璁惧璇锋眰锛堥�氬父涓烘帴鏀惰闊冲箍鎾殑璁惧锛� Device device = redisCatchStorage.getDevice(requesterId); - AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(requesterId, channelId1); + AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(requesterId, channelId); if (audioBroadcastCatch == null) { - logger.warn("鏉ヨ嚜璁惧鐨処nvite璇锋眰闈炶闊冲箍鎾紝宸插拷鐣�"); + logger.warn("鏉ヨ嚜璁惧鐨処nvite璇锋眰闈炶闊冲箍鎾紝宸插拷鐣ワ紝requesterId锛� {}/{}", requesterId, channelId); responseAck(evt, Response.FORBIDDEN); return; } @@ -883,101 +908,15 @@ // hook鐩戝惉绛夊緟璁惧鎺ㄦ祦涓婃潵 // 娣诲姞璁㈤槄 - JSONObject subscribeKey = new JSONObject(); - subscribeKey.put("app", app); - subscribeKey.put("stream", stream); - subscribeKey.put("regist", true); - subscribeKey.put("schema", "rtsp"); - subscribeKey.put("mediaServerId", mediaServerItem.getId()); + HookSubscribeForStreamChange subscribeKey = HookSubscribeFactory.on_stream_changed(app, stream, true, "rtsp", mediaServerItem.getId()); + String finalSsrc = ssrc; // 娴佸凡缁忓瓨鍦ㄦ椂鐩存帴鎺ㄦ祦 -// JSONObject mediaInfo = zlmresTfulUtils.getMediaList(mediaServerItem, app, stream); -// System.out.println(mediaInfo != null); -// System.out.println(mediaInfo); -// if (mediaInfo != null && -// (mediaInfo.getInteger("code") != null && mediaInfo.getInteger("code") == 0 -// && mediaInfo.getJSONArray("data") != null && mediaInfo.getJSONArray("data").size() > 0)) { -// logger.info("鍙戠幇宸茬粡鍦ㄦ帹娴�"); -// JSONArray tracks = mediaInfo.getJSONArray("data").getJSONObject(0).getJSONArray("tracks"); -// Integer codecId = null; -// if (tracks != null && tracks.size() > 0) { -// for (int i = 0; i < tracks.size(); i++) { -// MediaItem.MediaTrack track = JSON.toJavaObject((JSON)tracks.get(i),MediaItem.MediaTrack.class); -// if (track.getCodecType() == 1) { -// codecId = track.getCodecId(); -// break; -// } -// } -// } -// sendRtpItem.setStatus(2); -// redisCatchStorage.updateSendRTPSever(sendRtpItem); -// StringBuffer content = new StringBuffer(200); -// content.append("v=0\r\n"); -// content.append("o="+ config.getId() +" "+ sdp.getOrigin().getSessionId() +" " + sdp.getOrigin().getSessionVersion() + " IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); -// content.append("s=Play\r\n"); -// content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); -// content.append("t=0 0\r\n"); -// if (codecId == null) { -// if (mediaTransmissionTCP) { -// content.append("m=audio "+ sendRtpItem.getLocalPort()+" TCP/RTP/AVP 8\r\n"); -// }else { -// content.append("m=audio "+ sendRtpItem.getLocalPort()+" RTP/AVP 8\r\n"); -// } -// -// content.append("a=rtpmap:8 PCMA/8000\r\n"); -// }else { -// if (codecId == 4) { -// if (mediaTransmissionTCP) { -// content.append("m=audio "+ sendRtpItem.getLocalPort()+" TCP/RTP/AVP 0\r\n"); -// }else { -// content.append("m=audio "+ sendRtpItem.getLocalPort()+" RTP/AVP 0\r\n"); -// } -// content.append("a=rtpmap:0 PCMU/8000\r\n"); -// }else { -// if (mediaTransmissionTCP) { -// content.append("m=audio "+ sendRtpItem.getLocalPort()+" TCP/RTP/AVP 8\r\n"); -// }else { -// content.append("m=audio "+ sendRtpItem.getLocalPort()+" RTP/AVP 8\r\n"); -// } -// content.append("a=rtpmap:8 PCMA/8000\r\n"); -// } -// } -// if (sendRtpItem.isTcp()) { -// content.append("a=connection:new\r\n"); -// if (!sendRtpItem.isTcpActive()) { -// content.append("a=setup:active\r\n"); -// }else { -// content.append("a=setup:passive\r\n"); -// } -// } -// content.append("a=sendonly\r\n"); -// content.append("y="+ finalSsrc + "\r\n"); -// content.append("f=v/////a/1/8/1\r\n"); -// -// ParentPlatform parentPlatform = new ParentPlatform(); -// parentPlatform.setServerIP(device.getIp()); -// parentPlatform.setServerPort(device.getPort()); -// parentPlatform.setServerGBId(device.getDeviceId()); -// try { -// responseSdpAck(evt, content.toString(), parentPlatform); -// Dialog dialog = evt.getDialog(); -// audioBroadcastCatch.setDialog((SIPDialog) dialog); -// audioBroadcastCatch.setRequest((SIPRequest) request); -// audioBroadcastManager.update(audioBroadcastCatch); -// } catch (SipException e) { -// throw new RuntimeException(e); -// } catch (InvalidArgumentException e) { -// throw new RuntimeException(e); -// } catch (ParseException e) { -// throw new RuntimeException(e); -// } -// }else { - // 娴佷笉瀛樺湪鏃剁洃鍚祦涓婄嚎 // 璁剧疆绛夊緟鎺ㄦ祦鐨勮秴鏃�; 榛樿20s String waiteStreamTimeoutTaskKey = "waite-stream-" + device.getDeviceId() + audioBroadcastCatch.getChannelId(); dynamicTask.startDelay(waiteStreamTimeoutTaskKey, ()->{ logger.info("绛夊緟鎺ㄦ祦瓒呮椂: {}/{}", app, stream); - subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey); + subscribe.removeSubscribe(subscribeKey); playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId()); // 鍙戦�乥ye try { @@ -992,9 +931,10 @@ }, 20*1000); boolean finalMediaTransmissionTCP = mediaTransmissionTCP; - subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, + subscribe.addSubscribe(subscribeKey, (MediaServerItem mediaServerItemInUse, JSONObject json)->{ logger.info("鏀跺埌璇煶瀵硅鎺ㄦ祦"); + dynamicTask.stop(waiteStreamTimeoutTaskKey); MediaItem mediaItem = JSON.toJavaObject(json, MediaItem.class); Integer audioCodecId = null; if (mediaItem.getTracks() != null && mediaItem.getTracks().size() > 0) { -- Gitblit v1.8.0