From 82adc0cb23f3ee47322e78889cdaba57e9309000 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期二, 21 三月 2023 15:55:24 +0800 Subject: [PATCH] 完善语音对讲级联 --- src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java | 636 ++++++++++++++++++++++++++++++++------------------------- 1 files changed, 360 insertions(+), 276 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index 870b6a3..68c2f77 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -39,6 +39,7 @@ import com.genersoft.iot.vmp.utils.redis.RedisUtil; import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; +import com.genersoft.iot.vmp.vmanager.bean.StreamContent; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.AudioBroadcastEvent; import gov.nist.javax.sip.message.SIPResponse; @@ -49,7 +50,6 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; import org.springframework.util.ObjectUtils; -import org.springframework.web.context.request.async.DeferredResult; import javax.sip.InvalidArgumentException; import javax.sip.ResponseEvent; @@ -134,8 +134,8 @@ @Override public void play(MediaServerItem mediaServerItem, String deviceId, String channelId, - ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent, - Runnable timeoutCallback) { + ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent, + Runnable timeoutCallback) { if (mediaServerItem == null) { throw new ControllerException(ErrorCode.ERROR100.getCode(), "鏈壘鍒板彲鐢ㄧ殑zlm"); } @@ -243,193 +243,147 @@ } } - @Override - public void talk(MediaServerItem mediaServerItem, Device device, String channelId, - ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent, - Runnable timeoutCallback) { - String streamId = null; - if (mediaServerItem.isRtpEnable()) { - streamId = String.format("%s_%s", device.getDeviceId(), channelId); + private void talk(MediaServerItem mediaServerItem, Device device, String channelId, String stream, + ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent, + Runnable timeoutCallback, AudioBroadcastEvent audioEvent) { + + String playSsrc = mediaServerItem.getSsrcConfig().getPlaySsrc(); + if (playSsrc == null) { + audioEvent.call("ssrc宸茬粡鐢ㄥ敖"); + return; } - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck(), false); - logger.info("[瀵硅寮�濮媇 deviceId: {}, channelId: {},鏀舵祦绔彛锛� {}, 鏀舵祦妯″紡锛歿}, SSRC: {}, SSRC鏍¢獙锛歿}", device.getDeviceId(), channelId, ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck()); + SendRtpItem sendRtpItem = new SendRtpItem(); + sendRtpItem.setApp("talk"); + sendRtpItem.setStream(stream); + sendRtpItem.setSsrc(playSsrc); + sendRtpItem.setDeviceId(device.getDeviceId()); + sendRtpItem.setPlatformId(device.getDeviceId()); + sendRtpItem.setChannelId(channelId); + sendRtpItem.setRtcp(false); + sendRtpItem.setMediaServerId(mediaServerItem.getId()); + sendRtpItem.setOnlyAudio(true); + sendRtpItem.setPlayType(InviteStreamType.TALK); + sendRtpItem.setPt(8); + sendRtpItem.setStatus(1); + sendRtpItem.setTcpActive(false); + sendRtpItem.setTcp(true); + sendRtpItem.setUsePs(false); + sendRtpItem.setReceiveStream(stream + "_talk"); + + + int port = zlmrtpServerFactory.keepPort(mediaServerItem, playSsrc); + //绔彛鑾峰彇澶辫触鐨剆srcInfo 娌℃湁蹇呰鍙戦�佺偣鎾寚浠� + if (port <= 0) { + logger.info("[璇煶瀵硅] 绔彛鍒嗛厤寮傚父锛宒eviceId={},channelId={}", device.getDeviceId(), channelId); + audioEvent.call("绔彛鍒嗛厤寮傚父"); + return; + } + sendRtpItem.setLocalPort(port); + sendRtpItem.setPort(port); + logger.info("[璇煶瀵硅]寮�濮� deviceId: {}, channelId: {},鏀舵祦绔彛锛� {}, 鏀舵祦妯″紡锛歿}, SSRC: {}, SSRC鏍¢獙锛歿}", device.getDeviceId(), channelId, sendRtpItem.getLocalPort(), device.getStreamMode(), sendRtpItem.getSsrc(), false); // 瓒呮椂澶勭悊 String timeOutTaskKey = UUID.randomUUID().toString(); - SSRCInfo finalSsrcInfo = ssrcInfo; - System.out.println("璁剧疆瓒呮椂浠诲姟锛� " + timeOutTaskKey); dynamicTask.startDelay(timeOutTaskKey, () -> { - logger.info("[瀵硅瓒呮椂] 鏀舵祦瓒呮椂 deviceId: {}, channelId: {}锛岀鍙o細{}, SSRC: {}", device.getDeviceId(), channelId, finalSsrcInfo.getPort(), finalSsrcInfo.getSsrc()); + logger.info("[璇煶瀵硅] 鏀舵祦瓒呮椂 deviceId: {}, channelId: {}锛岀鍙o細{}, SSRC: {}", device.getDeviceId(), channelId, sendRtpItem.getPort(), sendRtpItem.getSsrc()); timeoutCallback.run(); // 鐐规挱瓒呮椂鍥炲BYE 鍚屾椂閲婃斁ssrc浠ュ強姝ゆ鐐规挱鐨勮祫婧� try { - cmder.streamByeCmd(device, channelId, finalSsrcInfo.getStream(), null); - } catch (InvalidArgumentException | ParseException | SipException e) { - logger.error("[瀵硅瓒呮椂]锛� 鍙戦�丅YE澶辫触 {}", e.getMessage()); - } catch (SsrcTransactionNotFoundException e) { + cmder.streamByeCmd(device, channelId, sendRtpItem.getStream(), null); + } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) { + logger.error("[璇煶瀵硅]瓒呮椂锛� 鍙戦�丅YE澶辫触 {}", e.getMessage()); + } finally { timeoutCallback.run(); - mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc()); - mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream()); - streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); + mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc()); + streamSession.remove(device.getDeviceId(), channelId, sendRtpItem.getStream()); } }, userSetting.getPlayTimeout()); - final String ssrc = ssrcInfo.getSsrc(); - final String stream = ssrcInfo.getStream(); - //绔彛鑾峰彇澶辫触鐨剆srcInfo 娌℃湁蹇呰鍙戦�佺偣鎾寚浠� - if (ssrcInfo.getPort() <= 0) { - logger.info("[瀵硅] 绔彛鍒嗛厤寮傚父锛宒eviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channelId, ssrcInfo); - return; - } String callId = SipUtils.getNewCallId(); - boolean pushing = false; + + zlmrtpServerFactory.releasePort(mediaServerItem, playSsrc); + Map<String, Object> param = new HashMap<>(12); + param.put("vhost","__defaultVhost__"); + param.put("app", sendRtpItem.getApp()); + param.put("stream", sendRtpItem.getStream()); + param.put("ssrc", sendRtpItem.getSsrc()); + param.put("src_port", sendRtpItem.getLocalPort()); + param.put("pt", sendRtpItem.getPt()); + param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); + param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); + param.put("is_udp", sendRtpItem.isTcp() ? "0" : "1"); + param.put("recv_stream_id", sendRtpItem.getReceiveStream()); + param.put("close_delay_ms", userSetting.getPlayTimeout() * 1000); + + zlmrtpServerFactory.startSendRtpPassive(mediaServerItem, param, jsonObject -> { + if (jsonObject == null || jsonObject.getInteger("code") != 0 ) { + mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc()); + logger.info("[璇煶瀵硅]澶辫触 deviceId: {}, channelId: {}", device.getDeviceId(), channelId); + audioEvent.call("澶辫触, " + jsonObject.getString("msg")); + // 鏌ョ湅鏄惁宸茬粡寤虹珛浜嗛�氶亾锛屽瓨鍦ㄥ垯鍙戦�乥ye + stopTalk(device, channelId); + } + }); + + // 鏌ョ湅璁惧鏄惁宸茬粡鍦ㄦ帹娴� -// MediaItem mediaItem = zlmrtpServerFactory.getMediaInfo(mediaServerItem, "rtp",ssrcInfo.getStream()); -// if (mediaItem != null) { -// SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, -// mediaItem.getOriginSock().getPeer_ip(), mediaItem.getOriginSock().getPeer_port(), ssrcInfo.getSsrc(), device.getDeviceId(), -// device.getDeviceId(), channelId, -// false); -// -// sendRtpItem.setTcpActive(false); -// sendRtpItem.setCallId(callId); -// sendRtpItem.setPlayType(InviteStreamType.TALK); -// sendRtpItem.setStatus(1); -// sendRtpItem.setIp(mediaItem.getOriginSock().getPeer_ip()); -// sendRtpItem.setPort(mediaItem.getOriginSock().getPeer_port()); -// sendRtpItem.setTcpActive(false); -// sendRtpItem.setStreamId(ssrcInfo.getStream()); -// sendRtpItem.setApp("1000"); -// sendRtpItem.setStreamId("1000"); -// sendRtpItem.setSsrc(ssrc); -// sendRtpItem.setOnlyAudio(true); -// redisCatchStorage.updateSendRTPSever(sendRtpItem); -// -// Map<String, Object> param = new HashMap<>(12); -// param.put("vhost","__defaultVhost__"); -// param.put("app",sendRtpItem.getApp()); -// param.put("stream",sendRtpItem.getStreamId()); -// param.put("ssrc", sendRtpItem.getSsrc()); -// param.put("dst_url", sendRtpItem.getIp()); -// param.put("dst_port", sendRtpItem.getPort()); -// param.put("src_port", sendRtpItem.getLocalPort()); -// param.put("pt", sendRtpItem.getPt()); -// param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); -// param.put("is_udp", sendRtpItem.isTcp() ? "0" : "1"); -// param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); -// JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaServerItem, param); -// System.out.println(2222); -// System.out.println(jsonObject); -// }else { - try { - cmder.talkStreamCmd(mediaServerItem, ssrcInfo, device, channelId, callId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> { - logger.info("[瀵硅] 娴佸凡鐢熸垚锛� 寮�濮嬫帹娴侊細 " + response.toJSONString()); - dynamicTask.stop(timeOutTaskKey); - // TODO 鏆備笉鍋氬鐞� - }, (MediaServerItem mediaServerItemInuse, JSONObject json) -> { - logger.info("[瀵硅] 璁惧寮�濮嬫帹娴侊細 " + json.toJSONString()); - dynamicTask.stop(timeOutTaskKey); - // 鑾峰彇杩滅▼IP绔彛 浣滀负鍥炲璇煶娴佺殑鍦板潃 - String ip = json.getString("ip"); - Integer port = json.getInteger("port"); - logger.info("[璁惧寮�濮嬫帹娴乚{}/{}, 鏉ヨ嚜ip锛歿}, 绔彛锛歿}", device.getDeviceId(), channelId, ip, port); - // 鏌ョ湅骞冲彴鎺ㄦ祦鏄惁灏辩华 -// Boolean ready = zlmrtpServerFactory.isStreamReady(mediaServerItemInuse, "talk", stream); -// if (!ready) { -// try { -// cmder.streamByeCmd(device, channelId, finalSsrcInfo.getStream(), null); -// } catch (InvalidArgumentException | ParseException | SipException e) { -// logger.error("[瀵硅瓒呮椂]锛� 鍙戦�丅YE澶辫触 {}", e.getMessage()); -// } catch (SsrcTransactionNotFoundException e) { -// timeoutCallback.run(); -// mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc()); -// mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream()); -// streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); -// } -// }else { -// try { -// Thread.sleep(1000); -// } catch (InterruptedException e) { -// throw new RuntimeException(e); -// } - SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, ip, port, ssrcInfo.getSsrc(), device.getDeviceId(), - device.getDeviceId(), channelId, - false, false); + try { + cmder.talkStreamCmd(mediaServerItem, sendRtpItem, device, channelId, callId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> { + logger.info("[璇煶瀵硅] 娴佸凡鐢熸垚锛� 寮�濮嬫帹娴侊細 " + response.toJSONString()); + dynamicTask.stop(timeOutTaskKey); + // TODO 鏆備笉鍋氬鐞� + }, (MediaServerItem mediaServerItemInuse, JSONObject json) -> { + logger.info("[璇煶瀵硅] 璁惧寮�濮嬫帹娴侊細 " + json.toJSONString()); + dynamicTask.stop(timeOutTaskKey); + }, (event) -> { + dynamicTask.stop(timeOutTaskKey); -// if (sendRtpItem.getLocalPort() == 0) { -// logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�"); -// try { -// cmder.streamByeCmd(device, channelId, finalSsrcInfo.getStream(), null); -// } catch (InvalidArgumentException | ParseException | SipException e) { -// logger.error("[瀵硅瓒呮椂]锛� 鍙戦�丅YE澶辫触 {}", e.getMessage()); -// } catch (SsrcTransactionNotFoundException e) { -// timeoutCallback.run(); -// mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc()); -// mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream()); -// streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); -// } -// return; -// } - sendRtpItem.setTcpActive(false); - sendRtpItem.setCallId(callId); - sendRtpItem.setPlayType(InviteStreamType.TALK); - sendRtpItem.setStatus(1); - sendRtpItem.setIp(ip); - sendRtpItem.setPort(port); - sendRtpItem.setTcpActive(false); - sendRtpItem.setApp("1000"); - sendRtpItem.setStreamId("1000"); - sendRtpItem.setSsrc(ssrc); - sendRtpItem.setOnlyAudio(true); - sendRtpItem.setRtcp(false); + if (event.event instanceof ResponseEvent) { + ResponseEvent responseEvent = (ResponseEvent) event.event; + if (responseEvent.getResponse() instanceof SIPResponse) { + SIPResponse response = (SIPResponse) responseEvent.getResponse(); + sendRtpItem.setFromTag(response.getFromTag()); + sendRtpItem.setToTag(response.getToTag()); + sendRtpItem.setCallId(response.getCallIdHeader().getCallId()); redisCatchStorage.updateSendRTPSever(sendRtpItem); - Map<String, Object> param = new HashMap<>(12); - param.put("vhost","__defaultVhost__"); - param.put("app",sendRtpItem.getApp()); - param.put("stream",sendRtpItem.getStreamId()); - param.put("ssrc", sendRtpItem.getSsrc()); - param.put("dst_url", sendRtpItem.getIp()); - param.put("dst_port", sendRtpItem.getPort()); - param.put("src_port", sendRtpItem.getLocalPort()); - param.put("pt", sendRtpItem.getPt()); - param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); - param.put("is_udp", sendRtpItem.isTcp() ? "0" : "1"); - param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); - JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaServerItemInuse, param); - System.out.println(11111); - System.out.println(sendRtpItem.getIp() + ":" + sendRtpItem.getPort()); -// System.out.println(jsonObject); -// } + streamSession.put(device.getDeviceId(), channelId, "talk", + sendRtpItem.getStream(), sendRtpItem.getSsrc(), sendRtpItem.getMediaServerId(), + response, VideoStreamSessionManager.SessionType.talk); + } else { + logger.error("[璇煶瀵硅]鏀跺埌鐨勬秷鎭敊璇紝response涓嶆槸SIPResponse"); + } + } else { + logger.error("[璇煶瀵硅]鏀跺埌鐨勬秷鎭敊璇紝event涓嶆槸ResponseEvent"); + } - }, (event) -> { - - }, (event) -> { - dynamicTask.stop(timeOutTaskKey); - mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream()); - // 閲婃斁ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc()); - - streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); - errorEvent.response(event); - }); - } catch (InvalidArgumentException | SipException | ParseException e) { - - logger.error("[鍛戒护鍙戦�佸け璐 瀵硅娑堟伅: {}", e.getMessage()); + }, (event) -> { dynamicTask.stop(timeOutTaskKey); - mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream()); + mediaServerService.closeRTPServer(mediaServerItem, sendRtpItem.getStream()); // 閲婃斁ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc()); + mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc()); + streamSession.remove(device.getDeviceId(), channelId, sendRtpItem.getStream()); + errorEvent.response(event); + }); + } catch (InvalidArgumentException | SipException | ParseException e) { - streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); - SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(new CmdSendFailEvent(null)); - eventResult.msg = "鍛戒护鍙戦�佸け璐�"; - errorEvent.response(eventResult); - } + logger.error("[鍛戒护鍙戦�佸け璐 瀵硅娑堟伅: {}", e.getMessage()); + dynamicTask.stop(timeOutTaskKey); + mediaServerService.closeRTPServer(mediaServerItem, sendRtpItem.getStream()); + // 閲婃斁ssrc + mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc()); + + streamSession.remove(device.getDeviceId(), channelId, sendRtpItem.getStream()); + SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(new CmdSendFailEvent(null)); + eventResult.msg = "鍛戒护鍙戦�佸け璐�"; + errorEvent.response(eventResult); + } // } } + + @Override public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, @@ -446,7 +400,8 @@ // 鐐规挱瓒呮椂鍥炲BYE 鍚屾椂閲婃斁ssrc浠ュ強姝ゆ鐐规挱鐨勮祫婧� try { cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null); - } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) { + } catch (InvalidArgumentException | ParseException | SipException | + SsrcTransactionNotFoundException e) { logger.error("[鐐规挱瓒呮椂]锛� 鍙戦�丅YE澶辫触 {}", e.getMessage()); } finally { timeoutCallback.run(1, "鏀舵祦瓒呮椂"); @@ -454,6 +409,9 @@ mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); + // 鍙栨秷璁㈤槄娑堟伅鐩戝惉 + HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId()); + subscribe.removeSubscribe(hookSubscribe); } } }, userSetting.getPlayTimeout()); @@ -463,7 +421,6 @@ dynamicTask.stop(timeOutTaskKey); // 閲婃斁ssrc mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); RequestMessage msg = new RequestMessage(); @@ -481,7 +438,7 @@ onPublishHandlerForPlay(mediaServerItemInuse, response, device.getDeviceId(), channelId); hookEvent.response(mediaServerItemInuse, response); logger.info("[鐐规挱鎴愬姛] deviceId: {}, channelId: {}", device.getDeviceId(), channelId); - String streamUrl = String.format("rtsp://127.0.0.1:%s/%s/%s", mediaServerItemInuse.getRtspPort(), "rtp", ssrcInfo.getStream()); + String streamUrl = String.format("http://127.0.0.1:%s/%s/%s.live.flv", mediaServerItemInuse.getHttpPort(), "rtp", ssrcInfo.getStream()); String path = "snap"; String fileName = device.getDeviceId() + "_" + channelId + ".jpg"; // 璇锋眰鎴浘 @@ -533,7 +490,7 @@ // 鍏抽棴rtp server mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); // 閲嶆柊寮�鍚痵src server - mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), false, ssrcInfo.getPort()); + mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), false, ssrcInfo.getPort(), false); } } @@ -589,14 +546,10 @@ } } - private void onPublishHandlerForPlayback(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String uuid) { - RequestMessage msg = new RequestMessage(); - msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId); - if (!ObjectUtils.isEmpty(uuid)) { - msg.setId(uuid); - } - StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId); + private void onPublishHandlerForPlayback(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, PlayBackCallback playBackCallback) { + StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId); + PlayBackResult<StreamInfo> playBackResult = new PlayBackResult<>(); if (streamInfo != null) { DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId); if (deviceChannel != null) { @@ -605,17 +558,16 @@ } redisCatchStorage.startPlay(streamInfo); - WVPResult wvpResult = new WVPResult(); - wvpResult.setCode(ErrorCode.SUCCESS.getCode()); - wvpResult.setMsg(ErrorCode.SUCCESS.getMsg()); - wvpResult.setData(streamInfo); - msg.setData(wvpResult); - resultHolder.invokeAllResult(msg); + playBackResult.setCode(ErrorCode.SUCCESS.getCode()); + playBackResult.setMsg(ErrorCode.SUCCESS.getMsg()); + playBackResult.setData(streamInfo); + playBackCallback.call(playBackResult); } else { logger.warn("褰曞儚鍥炴斁璋冪敤澶辫触锛�"); - msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), "褰曞儚鍥炴斁璋冪敤澶辫触锛�")); - resultHolder.invokeAllResult(msg); + playBackResult.setCode(ErrorCode.ERROR100.getCode()); + playBackResult.setMsg("褰曞儚鍥炴斁璋冪敤澶辫触锛�"); + playBackCallback.call(playBackResult); } } @@ -626,7 +578,7 @@ } MediaServerItem mediaServerItem; if (ObjectUtils.isEmpty(device.getMediaServerId()) || "auto".equals(device.getMediaServerId())) { - mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(); + mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(null); } else { mediaServerItem = mediaServerService.getOne(device.getMediaServerId()); } @@ -637,45 +589,56 @@ } @Override - public DeferredResult<WVPResult<StreamInfo>> playBack(String deviceId, String channelId, String startTime, - String endTime, InviteStreamCallback inviteStreamCallback, - PlayBackCallback callback) { - Device device = storager.queryVideoDevice(deviceId); + public MediaServerItem getNewMediaServerItemHasAssist(Device device) { if (device == null) { return null; + } + MediaServerItem mediaServerItem; + if (ObjectUtils.isEmpty(device.getMediaServerId()) || "auto".equals(device.getMediaServerId())) { + mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(true); + } else { + mediaServerItem = mediaServerService.getOne(device.getMediaServerId()); + } + if (mediaServerItem == null) { + logger.warn("[鑾峰彇鍙敤鐨刏LM鑺傜偣]鏈壘鍒板彲浣跨敤鐨刏LM..."); + } + return mediaServerItem; + } + + @Override + public void playBack(String deviceId, String channelId, String startTime, + String endTime, InviteStreamCallback inviteStreamCallback, + PlayBackCallback callback) { + Device device = storager.queryVideoDevice(deviceId); + if (device == null) { + return; } MediaServerItem newMediaServerItem = getNewMediaServerItem(device); SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, device.isSsrcCheck(), true); - return playBack(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, inviteStreamCallback, callback); + playBack(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, inviteStreamCallback, callback); } @Override - public DeferredResult<WVPResult<StreamInfo>> playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, - String deviceId, String channelId, String startTime, - String endTime, InviteStreamCallback infoCallBack, - PlayBackCallback playBackCallback) { + public void playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, + String deviceId, String channelId, String startTime, + String endTime, InviteStreamCallback infoCallBack, + PlayBackCallback playBackCallback) { if (mediaServerItem == null || ssrcInfo == null) { - return null; + return; } - String uuid = UUID.randomUUID().toString(); - String key = DeferredResultHolder.CALLBACK_CMD_PLAYBACK + deviceId + channelId; + Device device = storager.queryVideoDevice(deviceId); if (device == null) { throw new ControllerException(ErrorCode.ERROR100.getCode(), "璁惧锛� " + deviceId + "涓嶅瓨鍦�"); } - DeferredResult<WVPResult<StreamInfo>> result = new DeferredResult<>(30000L); - resultHolder.put(DeferredResultHolder.CALLBACK_CMD_PLAYBACK + deviceId + channelId, uuid, result); - RequestMessage requestMessage = new RequestMessage(); - requestMessage.setId(uuid); - requestMessage.setKey(key); - PlayBackResult<RequestMessage> playBackResult = new PlayBackResult<>(); + + PlayBackResult<StreamInfo> playBackResult = new PlayBackResult<>(); String playBackTimeOutTaskKey = UUID.randomUUID().toString(); dynamicTask.startDelay(playBackTimeOutTaskKey, () -> { logger.warn(String.format("璁惧鍥炴斁瓒呮椂锛宒eviceId锛�%s 锛宑hannelId锛�%s", deviceId, channelId)); playBackResult.setCode(ErrorCode.ERROR100.getCode()); playBackResult.setMsg("鍥炴斁瓒呮椂"); - playBackResult.setData(requestMessage); try { cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null); @@ -687,19 +650,14 @@ mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); streamSession.remove(deviceId, channelId, ssrcInfo.getStream()); } - // 鍥炲涔嬪墠鎵�鏈夌殑鐐规挱璇锋眰 playBackCallback.call(playBackResult); - result.setResult(WVPResult.fail(ErrorCode.ERROR100.getCode(), "鍥炴斁瓒呮椂")); - resultHolder.exist(DeferredResultHolder.CALLBACK_CMD_PLAYBACK + deviceId + channelId, uuid); }, userSetting.getPlayTimeout()); SipSubscribe.Event errorEvent = event -> { dynamicTask.stop(playBackTimeOutTaskKey); - requestMessage.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), String.format("鍥炴斁澶辫触锛� 閿欒鐮侊細 %s, %s", event.statusCode, event.msg))); playBackResult.setCode(ErrorCode.ERROR100.getCode()); playBackResult.setMsg(String.format("鍥炴斁澶辫触锛� 閿欒鐮侊細 %s, %s", event.statusCode, event.msg)); - playBackResult.setData(requestMessage); playBackResult.setEvent(event); playBackCallback.call(playBackResult); streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); @@ -717,11 +675,9 @@ return; } redisCatchStorage.startPlayback(streamInfo, inviteStreamInfo.getCallId()); - WVPResult<StreamInfo> success = WVPResult.success(streamInfo); - requestMessage.setData(success); playBackResult.setCode(ErrorCode.SUCCESS.getCode()); playBackResult.setMsg(ErrorCode.SUCCESS.getMsg()); - playBackResult.setData(requestMessage); + playBackResult.setData(streamInfo); playBackResult.setMediaServerItem(inviteStreamInfo.getMediaServerItem()); playBackResult.setResponse(inviteStreamInfo.getResponse()); playBackCallback.call(playBackResult); @@ -768,14 +724,14 @@ logger.info("[ZLM HOOK] ssrc淇鍚庢敹鍒拌闃呮秷鎭細 " + response.toJSONString()); dynamicTask.stop(playBackTimeOutTaskKey); // hook鍝嶅簲 - onPublishHandlerForPlayback(mediaServerItemInUse, response, device.getDeviceId(), channelId, uuid); + onPublishHandlerForPlayback(mediaServerItemInUse, response, device.getDeviceId(), channelId, playBackCallback); hookEvent.call(new InviteStreamInfo(mediaServerItem, null, eventResult.callId, "rtp", ssrcInfo.getStream())); }); } // 鍏抽棴rtp server mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); // 閲嶆柊寮�鍚痵src server - mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), true, ssrcInfo.getPort()); + mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), true, ssrcInfo.getPort(), false); } } } @@ -788,50 +744,44 @@ eventResult.msg = "鍛戒护鍙戦�佸け璐�"; errorEvent.response(eventResult); } - return result; } - @Override - public DeferredResult<WVPResult<StreamInfo>> download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack) { + public void download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback playBackCallback) { Device device = storager.queryVideoDevice(deviceId); if (device == null) { - return null; + return; } - MediaServerItem newMediaServerItem = getNewMediaServerItem(device); + MediaServerItem newMediaServerItem = getNewMediaServerItemHasAssist(device); + if (newMediaServerItem == null) { + PlayBackResult<StreamInfo> downloadResult = new PlayBackResult<>(); + downloadResult.setCode(ErrorCode.ERROR100.getCode()); + downloadResult.setMsg("鏈壘鍒癮ssist鏈嶅姟"); + playBackCallback.call(downloadResult); + return; + } SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, device.isSsrcCheck(), true); - return download(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, downloadSpeed, infoCallBack, hookCallBack); + download(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, downloadSpeed, infoCallBack, playBackCallback); } + @Override - public DeferredResult<WVPResult<StreamInfo>> download(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack) { + public void download(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack) { if (mediaServerItem == null || ssrcInfo == null) { - return null; + return; } - String uuid = UUID.randomUUID().toString(); - String key = DeferredResultHolder.CALLBACK_CMD_DOWNLOAD + deviceId + channelId; - DeferredResult<WVPResult<StreamInfo>> result = new DeferredResult<>(30000L); + Device device = storager.queryVideoDevice(deviceId); if (device == null) { throw new ControllerException(ErrorCode.ERROR400.getCode(), "璁惧锛�" + deviceId + "涓嶅瓨鍦�"); } - - resultHolder.put(key, uuid, result); - RequestMessage requestMessage = new RequestMessage(); - requestMessage.setId(uuid); - requestMessage.setKey(key); - WVPResult<StreamInfo> wvpResult = new WVPResult<>(); - requestMessage.setData(wvpResult); - PlayBackResult<RequestMessage> downloadResult = new PlayBackResult<>(); - downloadResult.setData(requestMessage); + PlayBackResult<StreamInfo> downloadResult = new PlayBackResult<>(); String downLoadTimeOutTaskKey = UUID.randomUUID().toString(); dynamicTask.startDelay(downLoadTimeOutTaskKey, () -> { logger.warn(String.format("褰曞儚涓嬭浇璇锋眰瓒呮椂锛宒eviceId锛�%s 锛宑hannelId锛�%s", deviceId, channelId)); - wvpResult.setCode(ErrorCode.ERROR100.getCode()); - wvpResult.setMsg("褰曞儚涓嬭浇璇锋眰瓒呮椂"); downloadResult.setCode(ErrorCode.ERROR100.getCode()); downloadResult.setMsg("褰曞儚涓嬭浇璇锋眰瓒呮椂"); hookCallBack.call(downloadResult); @@ -846,16 +796,12 @@ mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); streamSession.remove(deviceId, channelId, ssrcInfo.getStream()); } - // 鍥炲涔嬪墠鎵�鏈夌殑鐐规挱璇锋眰 - hookCallBack.call(downloadResult); }, userSetting.getPlayTimeout()); SipSubscribe.Event errorEvent = event -> { dynamicTask.stop(downLoadTimeOutTaskKey); downloadResult.setCode(ErrorCode.ERROR100.getCode()); downloadResult.setMsg(String.format("褰曞儚涓嬭浇澶辫触锛� 閿欒鐮侊細 %s, %s", event.statusCode, event.msg)); - wvpResult.setCode(ErrorCode.ERROR100.getCode()); - wvpResult.setMsg(String.format("褰曞儚涓嬭浇澶辫触锛� 閿欒鐮侊細 %s, %s", event.statusCode, event.msg)); downloadResult.setEvent(event); hookCallBack.call(downloadResult); streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); @@ -870,11 +816,9 @@ streamInfo.setStartTime(startTime); streamInfo.setEndTime(endTime); redisCatchStorage.startDownload(streamInfo, inviteStreamInfo.getCallId()); - wvpResult.setCode(ErrorCode.SUCCESS.getCode()); - wvpResult.setMsg(ErrorCode.SUCCESS.getMsg()); - wvpResult.setData(streamInfo); downloadResult.setCode(ErrorCode.SUCCESS.getCode()); downloadResult.setMsg(ErrorCode.SUCCESS.getMsg()); + downloadResult.setData(streamInfo); downloadResult.setMediaServerItem(inviteStreamInfo.getMediaServerItem()); downloadResult.setResponse(inviteStreamInfo.getResponse()); hookCallBack.call(downloadResult); @@ -886,7 +830,6 @@ eventResult.msg = "鍛戒护鍙戦�佸け璐�"; errorEvent.response(eventResult); } - return result; } @Override @@ -906,7 +849,10 @@ } if (mediaServerItem.getRecordAssistPort() > 0) { JSONObject jsonObject = assistRESTfulUtils.fileDuration(mediaServerItem, streamInfo.getApp(), streamInfo.getStream(), null); - if (jsonObject != null && jsonObject.getInteger("code") == 0) { + if (jsonObject == null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "杩炴帴Assist鏈嶅姟澶辫触"); + } + if (jsonObject.getInteger("code") == 0) { long duration = jsonObject.getLong("data"); if (duration == 0) { @@ -985,7 +931,7 @@ cmder.streamByeCmd(device, ssrcTransaction.getChannelId(), ssrcTransaction.getStream(), null); } catch (InvalidArgumentException | ParseException | SipException | - SsrcTransactionNotFoundException e) { + SsrcTransactionNotFoundException e) { logger.error("[zlm绂荤嚎]涓烘鍦ㄤ娇鐢ㄦzlm鐨勮澶囷紝 鍙戦�丅YE澶辫触 {}", e.getMessage()); } } @@ -994,7 +940,8 @@ } @Override - public AudioBroadcastResult audioBroadcast(Device device, String channelId) { + public AudioBroadcastResult audioBroadcast(Device device, String channelId, Boolean broadcastMode) { + // TODO 蹇呴』澶氱鍙fā寮忔墠鏀寔璇煶鍠婅瘽楣よ闊冲璁� if (device == null || channelId == null) { return null; } @@ -1004,53 +951,64 @@ logger.warn("寮�鍚闊冲箍鎾殑鏃跺�欐湭鎵惧埌閫氶亾锛� {}", channelId); return null; } - MediaServerItem mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(); - String app = "broadcast"; - // TODO 浠巗ip user agent涓垽鏂槸浠�涔堝搧鐗岃澶囷紝澶у崕榛樿浣跨敤talk妯″紡锛屽叾浠栦娇鐢╞roadcast妯″紡 -// String app = "talk"; + MediaServerItem mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(null); + if (broadcastMode == null) { + broadcastMode = true; + } + String app = broadcastMode?"broadcast":"talk"; String stream = device.getDeviceId() + "_" + channelId; - StreamInfo broadcast = mediaService.getStreamInfoByAppAndStream(mediaServerItem, "broadcast", stream, null, null, null, false); AudioBroadcastResult audioBroadcastResult = new AudioBroadcastResult(); audioBroadcastResult.setApp(app); audioBroadcastResult.setStream(stream); - audioBroadcastResult.setStreamInfo(mediaService.getStreamInfoByAppAndStream(mediaServerItem, app, stream, null, null, null,false)); + audioBroadcastResult.setStreamInfo(new StreamContent(mediaService.getStreamInfoByAppAndStream(mediaServerItem, app, stream, null, null, null, false))); audioBroadcastResult.setCodec("G.711"); return audioBroadcastResult; } @Override - public void audioBroadcastCmd(Device device, String channelId, int timeout, AudioBroadcastEvent event) throws InvalidArgumentException, ParseException, SipException { + public boolean audioBroadcastCmd(Device device, String channelId, MediaServerItem mediaServerItem, String app, String stream, int timeout, boolean isFromPlatform, AudioBroadcastEvent event) throws InvalidArgumentException, ParseException, SipException { if (device == null || channelId == null) { - return; + return false; } logger.info("[璇煶鍠婅瘽] device锛� {}, channel: {}", device.getDeviceId(), channelId); DeviceChannel deviceChannel = storager.queryChannel(device.getDeviceId(), channelId); if (deviceChannel == null) { logger.warn("寮�鍚闊冲箍鎾殑鏃跺�欐湭鎵惧埌閫氶亾锛� {}", channelId); event.call("寮�鍚闊冲箍鎾殑鏃跺�欐湭鎵惧埌閫氶亾"); - return; + return false; } // 鏌ヨ閫氶亾浣跨敤鐘舵�� if (audioBroadcastManager.exit(device.getDeviceId(), channelId)) { SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null); if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) { // 鏌ヨ娴佹槸鍚﹀瓨鍦紝涓嶅瓨鍦ㄥ垯璁や负鏄紓甯哥姸鎬� - MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStreamId()); + Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream()); if (streamReady) { logger.warn("璇煶骞挎挱宸茬粡寮�鍚細 {}", channelId); event.call("璇煶骞挎挱宸茬粡寮�鍚�"); - return; + return false; } else { stopAudioBroadcast(device.getDeviceId(), channelId); } + } + } + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null); + if (sendRtpItem != null) { + MediaServerItem mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServer, "rtp", sendRtpItem.getReceiveStream()); + if (streamReady) { + logger.warn("[璇煶瀵硅] 杩涜涓細 {}", channelId); + event.call("璇煶瀵硅杩涜涓�"); + return false; + } else { + stopTalk(device, channelId); } } // 鍙戦�侀�氱煡 cmder.audioBroadcastCmd(device, channelId, eventResultForOk -> { // 鍙戦�佹垚鍔� - AudioBroadcastCatch audioBroadcastCatch = new AudioBroadcastCatch(device.getDeviceId(), channelId, AudioBroadcastCatchStatus.Ready); + AudioBroadcastCatch audioBroadcastCatch = new AudioBroadcastCatch(device.getDeviceId(), channelId, mediaServerItem, app, stream, event, AudioBroadcastCatchStatus.Ready, isFromPlatform); audioBroadcastManager.update(audioBroadcastCatch); }, eventResultForError -> { // 鍙戦�佸け璐� @@ -1058,22 +1016,40 @@ event.call("璇煶骞挎挱鍙戦�佸け璐�"); stopAudioBroadcast(device.getDeviceId(), channelId); }); + return true; } + @Override + public boolean audioBroadcastInUse(Device device, String channelId) { + if (audioBroadcastManager.exit(device.getDeviceId(), channelId)) { + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null); + if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) { + // 鏌ヨ娴佹槸鍚﹀瓨鍦紝涓嶅瓨鍦ㄥ垯璁や负鏄紓甯哥姸鎬� + MediaServerItem mediaServerServiceOne = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerServiceOne, sendRtpItem.getApp(), sendRtpItem.getStream()); + if (streamReady) { + logger.warn("璇煶骞挎挱閫氶亾浣跨敤涓細 {}", channelId); + return true; + } + } + } + return false; + } @Override public void stopAudioBroadcast(String deviceId, String channelId) { + logger.info("[鍋滄瀵硅] 璁惧锛歿}, 閫氶亾锛歿}", deviceId, channelId); List<AudioBroadcastCatch> audioBroadcastCatchList = new ArrayList<>(); if (channelId == null) { audioBroadcastCatchList.addAll(audioBroadcastManager.get(deviceId)); - }else { + } else { audioBroadcastCatchList.add(audioBroadcastManager.get(deviceId, channelId)); } if (audioBroadcastCatchList.size() > 0) { for (AudioBroadcastCatch audioBroadcastCatch : audioBroadcastCatchList) { Device device = deviceService.getDevice(deviceId); - if (device == null || audioBroadcastCatch == null ) { + if (device == null || audioBroadcastCatch == null) { return; } SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(deviceId, audioBroadcastCatch.getChannelId(), null, null); @@ -1083,14 +1059,21 @@ Map<String, Object> param = new HashMap<>(); param.put("vhost", "__defaultVhost__"); param.put("app", sendRtpItem.getApp()); - param.put("stream", sendRtpItem.getStreamId()); + param.put("stream", sendRtpItem.getStream()); zlmresTfulUtils.stopSendRtp(mediaInfo, param); + try { + cmder.streamByeCmd(device, sendRtpItem.getChannelId(), audioBroadcastCatch.getSipTransactionInfo(), null); + } catch (InvalidArgumentException | ParseException | SipException | + SsrcTransactionNotFoundException e) { + logger.error("[娑堟伅鍙戦�佸け璐 鍙戦�佽闊冲枈璇滲YE澶辫触"); + } } audioBroadcastManager.del(deviceId, channelId); } } } + @Override public void zlmServerOnline(String mediaServerId) { @@ -1201,12 +1184,12 @@ String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - logger.info("鏀跺埌ACK锛宺tp/{}寮�濮嬪悜涓婄骇鎺ㄦ祦, 鐩爣={}:{}锛孲SRC={}, RTCP={}", sendRtpItem.getStreamId(), + logger.info("鏀跺埌ACK锛宺tp/{}寮�濮嬫帹娴�, 鐩爣={}:{}锛孲SRC={}, RTCP={}", sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp()); Map<String, Object> param = new HashMap<>(12); - param.put("vhost","__defaultVhost__"); - param.put("app",sendRtpItem.getApp()); - param.put("stream",sendRtpItem.getStreamId()); + param.put("vhost", "__defaultVhost__"); + param.put("app", sendRtpItem.getApp()); + param.put("stream", sendRtpItem.getStream()); param.put("ssrc", sendRtpItem.getSsrc()); param.put("src_port", sendRtpItem.getLocalPort()); param.put("pt", sendRtpItem.getPt()); @@ -1215,12 +1198,12 @@ param.put("is_udp", is_Udp); if (!sendRtpItem.isTcp()) { // udp妯″紡涓嬪紑鍚痳tcp淇濇椿 - param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0"); + param.put("udp_rtcp_timeout", sendRtpItem.isRtcp() ? "1" : "0"); } if (mediaInfo == null) { RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance( - sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStreamId(), + sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(), sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio()); redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> { @@ -1235,16 +1218,16 @@ if (zlmrtpServerFactory.releasePort(mediaInfo, sendRtpItem.getSsrc())) { if (sendRtpItem.isTcpActive()) { startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param); - }else { + } else { param.put("dst_url", sendRtpItem.getIp()); param.put("dst_port", sendRtpItem.getPort()); startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); } } - }else { + } else { if (sendRtpItem.isTcpActive()) { startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param); - }else { + } else { param.put("dst_url", sendRtpItem.getIp()); param.put("dst_port", sendRtpItem.getPort()); startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); @@ -1262,10 +1245,10 @@ if (jsonObject == null) { logger.error("RTP鎺ㄦ祦澶辫触: 璇锋鏌LM鏈嶅姟"); } else if (jsonObject.getInteger("code") == 0) { - logger.info("璋冪敤ZLM鎺ㄦ祦鎺ュ彛, 缁撴灉锛� {}", jsonObject); - logger.info("RTP鎺ㄦ祦鎴愬姛[ {}/{} ]锛寋}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port")); + logger.info("璋冪敤ZLM鎺ㄦ祦鎺ュ彛, 缁撴灉锛� {}", jsonObject); + logger.info("RTP鎺ㄦ祦鎴愬姛[ {}/{} ]锛寋}->{}:{}, ", param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port")); } else { - logger.error("RTP鎺ㄦ祦澶辫触: {}, 鍙傛暟锛歿}",jsonObject.getString("msg"), JSON.toJSONString(param)); + logger.error("RTP鎺ㄦ祦澶辫触: {}, 鍙傛暟锛歿}", jsonObject.getString("msg"), JSON.toJSONString(param)); if (sendRtpItem.isOnlyAudio()) { Device device = deviceService.getDevice(sendRtpItem.getDeviceId()); AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); @@ -1277,7 +1260,7 @@ logger.error("[鍛戒护鍙戦�佸け璐 鍋滄璇煶瀵硅: {}", e.getMessage()); } } - }else { + } else { // 鍚戜笂绾у钩鍙� try { commanderForPlatform.streamByeCmd(parentPlatform, callIdHeader.getCallId()); @@ -1287,4 +1270,105 @@ } } } + + @Override + public void talkCmd(Device device, String channelId, MediaServerItem mediaServerItem, String stream, AudioBroadcastEvent event) { + if (device == null || channelId == null) { + return; + } + // TODO 蹇呴』澶氱鍙fā寮忔墠鏀寔璇煶鍠婅瘽楣よ闊冲璁� + logger.info("[璇煶瀵硅] device锛� {}, channel: {}", device.getDeviceId(), channelId); + DeviceChannel deviceChannel = storager.queryChannel(device.getDeviceId(), channelId); + if (deviceChannel == null) { + logger.warn("寮�鍚闊冲璁茬殑鏃跺�欐湭鎵惧埌閫氶亾锛� {}", channelId); + event.call("寮�鍚闊冲璁茬殑鏃跺�欐湭鎵惧埌閫氶亾"); + return; + } + // 鏌ヨ閫氶亾浣跨敤鐘舵�� + if (audioBroadcastManager.exit(device.getDeviceId(), channelId)) { + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null); + if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) { + // 鏌ヨ娴佹槸鍚﹀瓨鍦紝涓嶅瓨鍦ㄥ垯璁や负鏄紓甯哥姸鎬� + MediaServerItem mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream()); + if (streamReady) { + logger.warn("[璇煶瀵硅] 姝e湪璇煶骞挎挱锛屾棤娉曞紑鍚闊抽�氳瘽锛� {}", channelId); + event.call("姝e湪璇煶骞挎挱"); + return; + } else { + stopAudioBroadcast(device.getDeviceId(), channelId); + } + } + } + + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, stream, null); + if (sendRtpItem != null) { + MediaServerItem mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServer, "rtp", sendRtpItem.getReceiveStream()); + if (streamReady) { + logger.warn("[璇煶瀵硅] 杩涜涓細 {}", channelId); + event.call("璇煶瀵硅杩涜涓�"); + return; + } else { + stopTalk(device, channelId); + } + } + + talk(mediaServerItem, device, channelId, stream, (MediaServerItem mediaServerItem1, JSONObject response) -> { + logger.info("[璇煶瀵硅] 鏀跺埌璁惧鍙戞潵鐨勬祦"); + }, eventResult -> { + logger.warn("[璇煶瀵硅] 澶辫触锛寋}/{}, 閿欒鐮� {} {}", device.getDeviceId(), channelId, eventResult.statusCode, eventResult.msg); + event.call("澶辫触锛岄敊璇爜 " + eventResult.statusCode + ", " + eventResult.msg); + }, () -> { + logger.warn("[璇煶瀵硅] 澶辫触锛寋}/{} 瓒呮椂", device.getDeviceId(), channelId); + event.call("澶辫触锛岃秴鏃� "); + stopTalk(device, channelId); + }, errorMsg -> { + logger.warn("[璇煶瀵硅] 澶辫触锛寋}/{} {}", device.getDeviceId(), channelId, errorMsg); + event.call(errorMsg); + stopTalk(device, channelId); + }); + } + + private void stopTalk(Device device, String channelId) { + stopTalk(device, channelId, null); + } + + @Override + public void stopTalk(Device device, String channelId, Boolean streamIsReady) { + logger.info("[璇煶瀵硅] 鍋滄锛� {}/{}", device.getDeviceId(), channelId); + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null); + if (sendRtpItem == null) { + logger.info("[璇煶瀵硅] 鍋滄澶辫触锛� 鏈壘鍒板彂閫佷俊鎭紝鍙兘宸茬粡鍋滄"); + return; + } + // 鍋滄鍚戣澶囨帹娴� + String mediaServerId = sendRtpItem.getMediaServerId(); + if (mediaServerId == null) { + return; + } + + MediaServerItem mediaServer = mediaServerService.getOne(mediaServerId); + + if (streamIsReady == null || streamIsReady) { + Map<String, Object> param = new HashMap<>(); + param.put("vhost", "__defaultVhost__"); + param.put("app", sendRtpItem.getApp()); + param.put("stream", sendRtpItem.getStream()); + param.put("ssrc", sendRtpItem.getSsrc()); + zlmrtpServerFactory.stopSendRtpStream(mediaServer, param); + } + + mediaServer.getSsrcConfig().releaseSsrc(sendRtpItem.getSsrc()); + + SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, null, sendRtpItem.getStream()); + if (ssrcTransaction != null) { + try { + cmder.streamByeCmd(device, channelId, sendRtpItem.getStream(), null); + } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) { + logger.info("[璇煶瀵硅] 鍋滄娑堟伅鍙戦�佸け璐ワ紝鍙兘宸茬粡鍋滄"); + } + } + redisCatchStorage.deleteSendRTPServer(device.getDeviceId(), channelId,null, null); + } } -- Gitblit v1.8.0