From 8b0ff3767b23f9479a0b3ebe8e343ed6470bd194 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期二, 09 八月 2022 14:41:21 +0800 Subject: [PATCH] Merge branch 'wvp-28181-2.0' --- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java | 265 ++++++++++++++++++++++++++++++++-------------------- 1 files changed, 163 insertions(+), 102 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 04982c3..5e6e8d7 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -30,6 +30,7 @@ import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IMediaService; import com.genersoft.iot.vmp.service.IPlayService; +import com.genersoft.iot.vmp.service.IStreamProxyService; import com.genersoft.iot.vmp.service.IStreamPushService; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.bean.SSRCInfo; @@ -81,6 +82,9 @@ private IStreamPushService streamPushService; @Autowired + private IStreamProxyService streamProxyService; + + @Autowired private IRedisCatchStorage redisCatchStorage; @Autowired @@ -94,7 +98,7 @@ @Autowired private ISIPCommander commander; - + @Autowired private AudioBroadcastManager audioBroadcastManager; @@ -153,10 +157,7 @@ // Invite Request娑堟伅瀹炵幇锛屾娑堟伅涓�鑸负绾ц仈娑堟伅锛屼笂绾х粰涓嬬骇鍙戦�佽姹傝棰戞寚浠� try { Request request = evt.getRequest(); - SipURI sipUri = (SipURI) request.getRequestURI(); - //浠巗ubject璇诲彇channelId,涓嶅啀浠巖equest-line璇诲彇銆� 鏈変簺骞冲彴request-line鏄钩鍙板浗鏍囩紪鐮侊紝涓嶆槸璁惧鍥芥爣缂栫爜銆� - //String channelId = sipURI.getUser(); - String channelId = SipUtils.getChannelIdFromHeader(request); + String channelId = SipUtils.getChannelIdFromRequest(request); String requesterId = SipUtils.getUserIdFromFromHeader(request); CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME); if (requesterId == null || channelId == null) { @@ -178,6 +179,7 @@ MediaServerItem mediaServerItem = null; StreamPushItem streamPushItem = null; + StreamProxyItem proxyByAppAndStream =null; // 涓嶆槸閫氶亾鍙兘鏄洿鎾祦 if (channel != null && gbStream == null) { if (channel.getStatus() == 0) { @@ -207,6 +209,13 @@ if ("push".equals(gbStream.getStreamType())) { streamPushItem = streamPushService.getPush(gbStream.getApp(), gbStream.getStream()); if (streamPushItem == null) { + logger.info("[ app={}, stream={} ]鎵句笉鍒皕lm {}锛岃繑鍥�410", gbStream.getApp(), gbStream.getStream(), mediaServerId); + responseAck(evt, Response.GONE); + return; + } + }else if("proxy".equals(gbStream.getStreamType())){ + proxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(gbStream.getApp(), gbStream.getStream()); + if (proxyByAppAndStream == null) { logger.info("[ app={}, stream={} ]鎵句笉鍒皕lm {}锛岃繑鍥�410", gbStream.getApp(), gbStream.getStream(), mediaServerId); responseAck(evt, Response.GONE); return; @@ -452,18 +461,35 @@ } } } else if (gbStream != null) { - if (streamPushItem.isStatus()) { - // 鍦ㄧ嚎鐘舵�� - pushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); - } else { - // 涓嶅湪绾� 鎷夎捣 - notifyStreamOnline(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + if("push".equals(gbStream.getStreamType())) { + if (streamPushItem != null && streamPushItem.isPushIng()) { + // 鎺ㄦ祦鐘舵�� + pushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + } else { + // 鏈帹娴� 鎷夎捣 + notifyStreamOnline(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + } + }else if ("proxy".equals(gbStream.getStreamType())){ + if(null != proxyByAppAndStream &&proxyByAppAndStream.isStatus()){ + pushProxyStream(evt, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive, + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + }else{ + //寮�鍚唬鐞嗘媺娴� + boolean start1 = streamProxyService.start(gbStream.getApp(), gbStream.getStream()); + if(start1) { + pushProxyStream(evt, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive, + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + }else{ + //澶辫触鍚庨�氱煡 + notifyStreamOnline(evt, gbStream, null, platform, callIdHeader, mediaServerItem, port, tcpActive, + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + } + } + } - } - } } catch (SipException | InvalidArgumentException | ParseException e) { @@ -480,13 +506,45 @@ /** * 瀹夋帓鎺ㄦ祦 */ + private void pushProxyStream(RequestEvent evt, GbStream gbStream, ParentPlatform platform, + CallIdHeader callIdHeader, MediaServerItem mediaServerItem, + int port, Boolean tcpActive, boolean mediaTransmissionTCP, + String channelId, String addressStr, String ssrc, String requesterId) throws InvalidArgumentException, ParseException, SipException { + Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream()); + if (streamReady) { + // 鑷钩鍙板唴瀹� + SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, + gbStream.getApp(), gbStream.getStream(), channelId, + mediaTransmissionTCP); + if (sendRtpItem == null) { + logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�"); + responseAck(evt, Response.BUSY_HERE); + return; + } + if (tcpActive != null) { + sendRtpItem.setTcpActive(tcpActive); + } + sendRtpItem.setPlayType(InviteStreamType.PUSH); + // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� + sendRtpItem.setStatus(1); + sendRtpItem.setCallId(callIdHeader.getCallId()); + byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog()); + sendRtpItem.setDialog(dialogByteArray); + byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction()); + sendRtpItem.setTransaction(transactionByteArray); + redisCatchStorage.updateSendRTPSever(sendRtpItem); + sendStreamAck(mediaServerItem, sendRtpItem, platform, evt); + + } + + } private void pushStream(RequestEvent evt, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform, CallIdHeader callIdHeader, MediaServerItem mediaServerItem, int port, Boolean tcpActive, boolean mediaTransmissionTCP, String channelId, String addressStr, String ssrc, String requesterId) throws InvalidArgumentException, ParseException, SipException { // 鎺ㄦ祦 - if (streamPushItem.getServerId().equals(userSetting.getServerId())) { + if (streamPushItem.isSelf()) { Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream()); if (streamReady) { // 鑷钩鍙板唴瀹� @@ -525,7 +583,6 @@ } } - /** * 閫氱煡娴佷笂绾� */ @@ -535,7 +592,7 @@ String channelId, String addressStr, String ssrc, String requesterId) throws InvalidArgumentException, ParseException, SipException { if ("proxy".equals(gbStream.getStreamType())) { // TODO 鎺у埗鍚敤浠ヤ娇璁惧涓婄嚎 - logger.info("[ app={}, stream={} ]閫氶亾绂荤嚎锛屽惎鐢ㄦ祦鍚庡紑濮嬫帹娴�", gbStream.getApp(), gbStream.getStream()); + logger.info("[ app={}, stream={} ]閫氶亾鏈帹娴侊紝鍚敤娴佸悗寮�濮嬫帹娴�", gbStream.getApp(), gbStream.getStream()); responseAck(evt, Response.BAD_REQUEST, "channel [" + gbStream.getGbId() + "] offline"); } else if ("push".equals(gbStream.getStreamType())) { if (!platform.isStartOfflinePush()) { @@ -543,7 +600,7 @@ return; } // 鍙戦�乺edis娑堟伅浠ヤ娇璁惧涓婄嚎 - logger.info("[ app={}, stream={} ]閫氶亾绂荤嚎锛屽彂閫乺edis淇℃伅鎺у埗璁惧寮�濮嬫帹娴�", gbStream.getApp(), gbStream.getStream()); + logger.info("[ app={}, stream={} ]閫氶亾鏈帹娴侊紝鍙戦�乺edis淇℃伅鎺у埗璁惧寮�濮嬫帹娴�", gbStream.getApp(), gbStream.getStream()); MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1, gbStream.getApp(), gbStream.getStream(), gbStream.getGbId(), gbStream.getPlatformId(), @@ -553,7 +610,7 @@ dynamicTask.startDelay(callIdHeader.getCallId(), () -> { logger.info("[ app={}, stream={} ] 绛夊緟璁惧寮�濮嬫帹娴佽秴鏃�", gbStream.getApp(), gbStream.getStream()); try { - mediaListManager.removedChannelOnlineEventLister(gbStream.getGbId()); + mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream()); responseAck(evt, Response.REQUEST_TIMEOUT); // 瓒呮椂 } catch (SipException e) { e.printStackTrace(); @@ -568,7 +625,7 @@ Boolean finalTcpActive = tcpActive; // 娣诲姞鍦ㄦ湰鏈轰笂绾跨殑閫氱煡 - mediaListManager.addChannelOnlineEventLister(gbStream.getGbId(), (app, stream, serverId) -> { + mediaListManager.addChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream(), (app, stream, serverId) -> { dynamicTask.stop(callIdHeader.getCallId()); if (serverId.equals(userSetting.getServerId())) { SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId, @@ -656,7 +713,7 @@ // 绂荤嚎 // 鏌ヨ鏄惁鍦ㄦ湰鏈轰笂绾夸簡 StreamPushItem currentStreamPushItem = streamPushService.getPush(streamPushItem.getApp(), streamPushItem.getStream()); - if (currentStreamPushItem.isStatus()) { + if (currentStreamPushItem.isPushIng()) { // 鍦ㄧ嚎鐘舵�� pushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); @@ -830,87 +887,91 @@ subscribeKey.put("app", app); subscribeKey.put("stream", stream); subscribeKey.put("regist", true); - subscribeKey.put("schema", "rtmp"); + subscribeKey.put("schema", "rtsp"); subscribeKey.put("mediaServerId", mediaServerItem.getId()); String finalSsrc = ssrc; // 娴佸凡缁忓瓨鍦ㄦ椂鐩存帴鎺ㄦ祦 - JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(mediaServerItem, app, "rtsp", stream); - JSONArray tracks = mediaInfo.getJSONArray("tracks"); - Integer codecId = null; - if (tracks != null && tracks.size() > 0) { - for (int i = 0; i < tracks.size(); i++) { - MediaItem.MediaTrack track = JSON.toJavaObject((JSON)tracks.get(i),MediaItem.MediaTrack.class); - if (track.getCodecType() == 1) { - codecId = track.getCodecId(); - break; - } - } - } - if ((mediaInfo.getInteger("code") == 0 && mediaInfo.getBoolean("online"))) { - logger.info("鍙戠幇宸茬粡鍦ㄦ帹娴�"); - sendRtpItem.setStatus(2); - redisCatchStorage.updateSendRTPSever(sendRtpItem); - StringBuffer content = new StringBuffer(200); - content.append("v=0\r\n"); - content.append("o="+ config.getId() +" "+ sdp.getOrigin().getSessionId() +" " + sdp.getOrigin().getSessionVersion() + " IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); - content.append("s=Play\r\n"); - content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); - content.append("t=0 0\r\n"); - if (codecId == null) { - if (mediaTransmissionTCP) { - content.append("m=audio "+ sendRtpItem.getLocalPort()+" TCP/RTP/AVP 8\r\n"); - }else { - content.append("m=audio "+ sendRtpItem.getLocalPort()+" RTP/AVP 8\r\n"); - } - - content.append("a=rtpmap:8 PCMA/8000\r\n"); - }else { - if (codecId == 4) { - if (mediaTransmissionTCP) { - content.append("m=audio "+ sendRtpItem.getLocalPort()+" TCP/RTP/AVP 0\r\n"); - }else { - content.append("m=audio "+ sendRtpItem.getLocalPort()+" RTP/AVP 0\r\n"); - } - content.append("a=rtpmap:0 PCMU/8000\r\n"); - }else { - if (mediaTransmissionTCP) { - content.append("m=audio "+ sendRtpItem.getLocalPort()+" TCP/RTP/AVP 8\r\n"); - }else { - content.append("m=audio "+ sendRtpItem.getLocalPort()+" RTP/AVP 8\r\n"); - } - content.append("a=rtpmap:8 PCMA/8000\r\n"); - } - } - if (sendRtpItem.isTcp()) { - content.append("a=connection:new\r\n"); - if (!sendRtpItem.isTcpActive()) { - content.append("a=setup:active\r\n"); - }else { - content.append("a=setup:passive\r\n"); - } - } - content.append("a=sendonly\r\n"); - content.append("y="+ finalSsrc + "\r\n"); - content.append("f=v/////a/1/8/1\r\n"); - - ParentPlatform parentPlatform = new ParentPlatform(); - parentPlatform.setServerIP(device.getIp()); - parentPlatform.setServerPort(device.getPort()); - parentPlatform.setServerGBId(device.getDeviceId()); - try { - responseSdpAck(evt, content.toString(), parentPlatform); - Dialog dialog = evt.getDialog(); - audioBroadcastCatch.setDialog((SIPDialog) dialog); - audioBroadcastCatch.setRequest((SIPRequest) request); - audioBroadcastManager.update(audioBroadcastCatch); - } catch (SipException e) { - throw new RuntimeException(e); - } catch (InvalidArgumentException e) { - throw new RuntimeException(e); - } catch (ParseException e) { - throw new RuntimeException(e); - } - }else { +// JSONObject mediaInfo = zlmresTfulUtils.getMediaList(mediaServerItem, app, stream); +// System.out.println(mediaInfo != null); +// System.out.println(mediaInfo); +// if (mediaInfo != null && +// (mediaInfo.getInteger("code") != null && mediaInfo.getInteger("code") == 0 +// && mediaInfo.getJSONArray("data") != null && mediaInfo.getJSONArray("data").size() > 0)) { +// logger.info("鍙戠幇宸茬粡鍦ㄦ帹娴�"); +// JSONArray tracks = mediaInfo.getJSONArray("data").getJSONObject(0).getJSONArray("tracks"); +// Integer codecId = null; +// if (tracks != null && tracks.size() > 0) { +// for (int i = 0; i < tracks.size(); i++) { +// MediaItem.MediaTrack track = JSON.toJavaObject((JSON)tracks.get(i),MediaItem.MediaTrack.class); +// if (track.getCodecType() == 1) { +// codecId = track.getCodecId(); +// break; +// } +// } +// } +// sendRtpItem.setStatus(2); +// redisCatchStorage.updateSendRTPSever(sendRtpItem); +// StringBuffer content = new StringBuffer(200); +// content.append("v=0\r\n"); +// content.append("o="+ config.getId() +" "+ sdp.getOrigin().getSessionId() +" " + sdp.getOrigin().getSessionVersion() + " IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); +// content.append("s=Play\r\n"); +// content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); +// content.append("t=0 0\r\n"); +// if (codecId == null) { +// if (mediaTransmissionTCP) { +// content.append("m=audio "+ sendRtpItem.getLocalPort()+" TCP/RTP/AVP 8\r\n"); +// }else { +// content.append("m=audio "+ sendRtpItem.getLocalPort()+" RTP/AVP 8\r\n"); +// } +// +// content.append("a=rtpmap:8 PCMA/8000\r\n"); +// }else { +// if (codecId == 4) { +// if (mediaTransmissionTCP) { +// content.append("m=audio "+ sendRtpItem.getLocalPort()+" TCP/RTP/AVP 0\r\n"); +// }else { +// content.append("m=audio "+ sendRtpItem.getLocalPort()+" RTP/AVP 0\r\n"); +// } +// content.append("a=rtpmap:0 PCMU/8000\r\n"); +// }else { +// if (mediaTransmissionTCP) { +// content.append("m=audio "+ sendRtpItem.getLocalPort()+" TCP/RTP/AVP 8\r\n"); +// }else { +// content.append("m=audio "+ sendRtpItem.getLocalPort()+" RTP/AVP 8\r\n"); +// } +// content.append("a=rtpmap:8 PCMA/8000\r\n"); +// } +// } +// if (sendRtpItem.isTcp()) { +// content.append("a=connection:new\r\n"); +// if (!sendRtpItem.isTcpActive()) { +// content.append("a=setup:active\r\n"); +// }else { +// content.append("a=setup:passive\r\n"); +// } +// } +// content.append("a=sendonly\r\n"); +// content.append("y="+ finalSsrc + "\r\n"); +// content.append("f=v/////a/1/8/1\r\n"); +// +// ParentPlatform parentPlatform = new ParentPlatform(); +// parentPlatform.setServerIP(device.getIp()); +// parentPlatform.setServerPort(device.getPort()); +// parentPlatform.setServerGBId(device.getDeviceId()); +// try { +// responseSdpAck(evt, content.toString(), parentPlatform); +// Dialog dialog = evt.getDialog(); +// audioBroadcastCatch.setDialog((SIPDialog) dialog); +// audioBroadcastCatch.setRequest((SIPRequest) request); +// audioBroadcastManager.update(audioBroadcastCatch); +// } catch (SipException e) { +// throw new RuntimeException(e); +// } catch (InvalidArgumentException e) { +// throw new RuntimeException(e); +// } catch (ParseException e) { +// throw new RuntimeException(e); +// } +// }else { // 娴佷笉瀛樺湪鏃剁洃鍚祦涓婄嚎 // 璁剧疆绛夊緟鎺ㄦ祦鐨勮秴鏃�; 榛樿20s String waiteStreamTimeoutTaskKey = "waite-stream-" + device.getDeviceId() + audioBroadcastCatch.getChannelId(); @@ -1012,7 +1073,7 @@ throw new RuntimeException(e); } }); - } +// } String key = DeferredResultHolder.CALLBACK_CMD_BROADCAST + device.getDeviceId(); WVPResult<AudioBroadcastResult> wvpResult = new WVPResult<>(); wvpResult.setCode(0); @@ -1020,7 +1081,7 @@ AudioBroadcastResult audioBroadcastResult = new AudioBroadcastResult(); audioBroadcastResult.setApp(app); audioBroadcastResult.setStream(stream); - audioBroadcastResult.setStreamInfo(mediaService.getStreamInfoByAppAndStream(mediaServerItem, app, stream, null, null, false)); + audioBroadcastResult.setStreamInfo(mediaService.getStreamInfoByAppAndStream(mediaServerItem, app, stream, null, null, null,false)); audioBroadcastResult.setCodec("G.711"); wvpResult.setData(audioBroadcastResult); RequestMessage requestMessage = new RequestMessage(); -- Gitblit v1.8.0