From c551164c89f70e664b498c3a09e615928261e01a Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期六, 01 七月 2023 18:33:42 +0800 Subject: [PATCH] 合并主线 --- src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java | 264 +++++++++++++++++++++++++++++----------------------- 1 files changed, 145 insertions(+), 119 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index 38811c2..71e9b65 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -1,6 +1,5 @@ package com.genersoft.iot.vmp.service.impl; -import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.InviteInfo; import com.genersoft.iot.vmp.common.InviteSessionStatus; @@ -17,20 +16,17 @@ import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; -import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; -import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils; -import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; -import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; -import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; +import com.genersoft.iot.vmp.media.zlm.*; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; +import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import com.genersoft.iot.vmp.service.*; -import com.genersoft.iot.vmp.service.bean.InviteErrorCallback; +import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.service.bean.InviteErrorCode; import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg; import com.genersoft.iot.vmp.service.bean.SSRCInfo; @@ -57,6 +53,7 @@ import javax.sip.ResponseEvent; import javax.sip.SipException; import javax.sip.header.CallIdHeader; +import java.io.File; import java.math.BigDecimal; import java.math.RoundingMode; import java.text.ParseException; @@ -93,7 +90,7 @@ private IInviteStreamService inviteStreamService; @Autowired - private DeferredResultHolder resultHolder; + private SendRtpPortManager sendRtpPortManager; @Autowired private ZLMRESTfulUtils zlmresTfulUtils; @@ -144,14 +141,13 @@ @Override - public SSRCInfo play(MediaServerItem mediaServerItem, String deviceId, String channelId, InviteErrorCallback<Object> callback) { + public SSRCInfo play(MediaServerItem mediaServerItem, String deviceId, String channelId, ErrorCallback<Object> callback) { if (mediaServerItem == null) { throw new ControllerException(ErrorCode.ERROR100.getCode(), "鏈壘鍒板彲鐢ㄧ殑zlm"); } Device device = redisCatchStorage.getDevice(deviceId); InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId); - if (inviteInfo != null ) { if (inviteInfo.getStreamInfo() == null) { // 鐐规挱鍙戣捣浜嗕絾鏄皻鏈垚鍔�, 浠呮敞鍐屽洖璋冪瓑寰呯粨鏋滃嵆鍙� @@ -234,8 +230,8 @@ sendRtpItem.setUsePs(false); sendRtpItem.setReceiveStream(stream + "_talk"); - - int port = zlmrtpServerFactory.keepPort(mediaServerItem, playSsrc, null); + String callId = SipUtils.getNewCallId(); + int port = sendRtpPortManager.getNextPort(mediaServerItem.getId()); //绔彛鑾峰彇澶辫触鐨剆srcInfo 娌℃湁蹇呰鍙戦�佺偣鎾寚浠� if (port <= 0) { logger.info("[璇煶瀵硅] 绔彛鍒嗛厤寮傚父锛宒eviceId={},channelId={}", device.getDeviceId(), channelId); @@ -263,9 +259,6 @@ } }, userSetting.getPlayTimeout()); - String callId = SipUtils.getNewCallId(); - - zlmrtpServerFactory.releasePort(mediaServerItem, playSsrc); Map<String, Object> param = new HashMap<>(12); param.put("vhost","__defaultVhost__"); param.put("app", sendRtpItem.getApp()); @@ -292,12 +285,12 @@ // 鏌ョ湅璁惧鏄惁宸茬粡鍦ㄦ帹娴� try { - cmder.talkStreamCmd(mediaServerItem, sendRtpItem, device, channelId, callId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> { - logger.info("[璇煶瀵硅] 娴佸凡鐢熸垚锛� 寮�濮嬫帹娴侊細 " + response.toJSONString()); + cmder.talkStreamCmd(mediaServerItem, sendRtpItem, device, channelId, callId, (mediaServerItemInuse, hookParam) -> { + logger.info("[璇煶瀵硅] 娴佸凡鐢熸垚锛� 寮�濮嬫帹娴侊細 " + hookParam); dynamicTask.stop(timeOutTaskKey); // TODO 鏆備笉鍋氬鐞� - }, (MediaServerItem mediaServerItemInuse, JSONObject json) -> { - logger.info("[璇煶瀵硅] 璁惧寮�濮嬫帹娴侊細 " + json.toJSONString()); + }, (mediaServerItemInuse, hookParam) -> { + logger.info("[璇煶瀵硅] 璁惧寮�濮嬫帹娴侊細 " + hookParam); dynamicTask.stop(timeOutTaskKey); }, (event) -> { @@ -353,7 +346,7 @@ @Override public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, - InviteErrorCallback<Object> callback) { + ErrorCallback<Object> callback) { if (mediaServerItem == null || ssrcInfo == null) { callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(), @@ -361,8 +354,9 @@ null); return; } - logger.info("[鐐规挱寮�濮媇 deviceId: {}, channelId: {},鏀舵祦绔彛锛歿}, 鏀舵祦妯″紡锛歿}, SSRC: {}, SSRC鏍¢獙锛歿}", device.getDeviceId(), channelId, ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck()); - + logger.info("[鐐规挱寮�濮媇 deviceId: {}, channelId: {},鐮佹祦绫诲瀷锛歿}, 鏀舵祦绔彛锛� {}, 鏀舵祦妯″紡锛歿}, SSRC: {}, SSRC鏍¢獙锛歿}", + device.getDeviceId(), channelId, device.isSwitchPrimarySubStream() ? "杈呯爜娴�" : "涓荤爜娴�", ssrcInfo.getPort(), + device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck()); //绔彛鑾峰彇澶辫触鐨剆srcInfo 娌℃湁蹇呰鍙戦�佺偣鎾寚浠� if (ssrcInfo.getPort() <= 0) { logger.info("[鐐规挱绔彛鍒嗛厤寮傚父]锛宒eviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channelId, ssrcInfo); @@ -377,9 +371,10 @@ } // 鍒濆鍖杛edis涓殑invite娑堟伅鐘舵�� - InviteInfo inviteInfo = InviteInfo.getinviteInfo(device.getDeviceId(), channelId, ssrcInfo.getStream(), ssrcInfo, + InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channelId, ssrcInfo.getStream(), ssrcInfo, mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAY, InviteSessionStatus.ready); + inviteInfo.setSubStream(device.isSwitchPrimarySubStream()); inviteStreamService.updateInviteInfo(inviteInfo); // 瓒呮椂澶勭悊 String timeOutTaskKey = UUID.randomUUID().toString(); @@ -387,7 +382,10 @@ // 鎵ц瓒呮椂浠诲姟鏃舵煡璇㈡槸鍚﹀凡缁忔垚鍔燂紝鎴愬姛浜嗗垯涓嶆墽琛岃秴鏃朵换鍔★紝闃叉瓒呮椂浠诲姟鍙栨秷澶辫触鐨勬儏鍐� InviteInfo inviteInfoForTimeOut = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId); if (inviteInfoForTimeOut == null || inviteInfoForTimeOut.getStreamInfo() == null) { - logger.info("[鐐规挱瓒呮椂] 鏀舵祦瓒呮椂 deviceId: {}, channelId: {}锛岀鍙o細{}, SSRC: {}", device.getDeviceId(), channelId, ssrcInfo.getPort(), ssrcInfo.getSsrc()); + logger.info("[鐐规挱瓒呮椂] 鏀舵祦瓒呮椂 deviceId: {}, channelId: {},鐮佹祦绫诲瀷锛歿}锛岀鍙o細{}, SSRC: {}", + device.getDeviceId(), channelId, device.isSwitchPrimarySubStream() ? "杈呯爜娴�" : "涓荤爜娴�", + ssrcInfo.getPort(), ssrcInfo.getSsrc()); + // 鐐规挱瓒呮椂鍥炲BYE 鍚屾椂閲婃斁ssrc浠ュ強姝ゆ鐐规挱鐨勮祫婧� // InviteInfo inviteInfoForTimeout = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.play, device.getDeviceId(), channelId); // if (inviteInfoForTimeout == null) { @@ -401,8 +399,8 @@ callback.run(InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null); inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null, InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null); - inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId); + try { cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null); } catch (InvalidArgumentException | ParseException | SipException | @@ -421,11 +419,11 @@ }, userSetting.getPlayTimeout()); try { - cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> { - logger.info("鏀跺埌璁㈤槄娑堟伅锛� " + response.toJSONString()); + cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (mediaServerItemInuse, hookParam ) -> { + logger.info("鏀跺埌璁㈤槄娑堟伅锛� " + hookParam); dynamicTask.stop(timeOutTaskKey); // hook鍝嶅簲 - StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInuse, response, device.getDeviceId(), channelId); + StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInuse, hookParam, device.getDeviceId(), channelId); if (streamInfo == null){ callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(), InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null); @@ -439,7 +437,8 @@ InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); - logger.info("[鐐规挱鎴愬姛] deviceId: {}, channelId: {}", device.getDeviceId(), channelId); + logger.info("[鐐规挱鎴愬姛] deviceId: {}, channelId:{}, 鐮佹祦绫诲瀷锛歿}", device.getDeviceId(), + device.isSwitchPrimarySubStream() ? "杈呯爜娴�" : "涓荤爜娴�"); String streamUrl; if (mediaServerItemInuse.getRtspPort() != 0) { streamUrl = String.format("rtsp://127.0.0.1:%s/%s/%s", mediaServerItemInuse.getRtspPort(), "rtp", ssrcInfo.getStream()); @@ -505,21 +504,8 @@ logger.info("[鐐规挱娑堟伅] 鏀跺埌invite 200, 鍙戠幇涓嬬骇鑷畾涔変簡ssrc: {}", ssrcInResponse); if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) { logger.info("[鐐规挱娑堟伅] SSRC淇 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse); - if (!ssrcFactory.checkSsrc(mediaServerItem.getId(),ssrcInResponse)) { - // ssrc 涓嶅彲鐢� - logger.info("[鐐规挱娑堟伅] SSRC淇鏃跺彂鐜皊src涓嶅彲浣跨敤 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse); - // 閲婃斁ssrc - ssrcFactory.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); - - callback.run(InviteErrorCode.ERROR_FOR_SSRC_UNAVAILABLE.getCode(), - InviteErrorCode.ERROR_FOR_SSRC_UNAVAILABLE.getMsg(), null); - inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null, - InviteErrorCode.ERROR_FOR_SSRC_UNAVAILABLE.getCode(), - InviteErrorCode.ERROR_FOR_SSRC_UNAVAILABLE.getMsg(), null); - - return; - } + // 閲婃斁ssrc + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); // 鍗曠鍙fā寮弒treamId涔熸湁鍙樺寲锛岄噸鏂拌缃洃鍚嵆鍙� if (!mediaServerItem.isRtpEnable()) { // 娣诲姞璁㈤槄 @@ -527,12 +513,12 @@ subscribe.removeSubscribe(hookSubscribe); String stream = String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase(); hookSubscribe.getContent().put("stream", stream); - inviteInfo.setStream(stream); - subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> { - logger.info("[ZLM HOOK] ssrc淇鍚庢敹鍒拌闃呮秷鎭細 " + response.toJSONString()); + inviteStreamService.updateInviteInfoForStream(inviteInfo, stream); + subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> { + logger.info("[ZLM HOOK] ssrc淇鍚庢敹鍒拌闃呮秷鎭細 " + hookParam); dynamicTask.stop(timeOutTaskKey); // hook鍝嶅簲 - StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInUse, response, device.getDeviceId(), channelId); + StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInUse, hookParam, device.getDeviceId(), channelId); if (streamInfo == null){ callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(), InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null); @@ -542,7 +528,7 @@ return; } callback.run(InviteErrorCode.SUCCESS.getCode(), - InviteErrorCode.SUCCESS.getMsg(), null); + InviteErrorCode.SUCCESS.getMsg(), streamInfo); inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null, InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), @@ -574,6 +560,9 @@ "涓嬬骇鑷畾涔変簡ssrc,閲嶆柊璁剧疆鏀舵祦淇℃伅澶辫触", null); }else { + if (ssrcInfo.getStream()!= null && !ssrcInfo.getStream().equals(inviteInfo.getStream())) { + inviteStreamService.removeInviteInfo(inviteInfo); + } ssrcInfo.setSsrc(ssrcInResponse); inviteInfo.setSsrcInfo(ssrcInfo); inviteInfo.setStream(ssrcInfo.getStream()); @@ -620,8 +609,10 @@ } @Override - public StreamInfo onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId) { - StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId); + public StreamInfo onPublishHandlerForPlay(MediaServerItem mediaServerItem, HookParam hookParam, String deviceId, String channelId) { + OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam) hookParam; + StreamInfo streamInfo = onPublishHandler(mediaServerItem, streamChangedHookParam, deviceId, channelId); + Device device = redisCatchStorage.getDevice(deviceId); if (streamInfo != null) { DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId); if (deviceChannel != null) { @@ -639,9 +630,9 @@ } - private StreamInfo onPublishHandlerForPlayback(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String startTime, String endTime) { - - StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId); + private StreamInfo onPublishHandlerForPlayback(MediaServerItem mediaServerItem, HookParam param, String deviceId, String channelId, String startTime, String endTime) { + OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam) param; + StreamInfo streamInfo = onPublishHandler(mediaServerItem, streamChangedHookParam, deviceId, channelId); if (streamInfo != null) { streamInfo.setStartTime(startTime); streamInfo.setEndTime(endTime); @@ -698,7 +689,7 @@ @Override public void playBack(String deviceId, String channelId, String startTime, - String endTime, InviteErrorCallback<Object> callback) { + String endTime, ErrorCallback<Object> callback) { Device device = storager.queryVideoDevice(deviceId); if (device == null) { return; @@ -711,7 +702,7 @@ @Override public void playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, - String endTime, InviteErrorCallback<Object> callback) { + String endTime, ErrorCallback<Object> callback) { if (mediaServerItem == null || ssrcInfo == null) { callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(), InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(), @@ -727,7 +718,7 @@ device.getDeviceId(), channelId, startTime, endTime, ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck()); // 鍒濆鍖杛edis涓殑invite娑堟伅鐘舵�� - InviteInfo inviteInfo = InviteInfo.getinviteInfo(device.getDeviceId(), channelId, ssrcInfo.getStream(), ssrcInfo, + InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channelId, ssrcInfo.getStream(), ssrcInfo, mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAYBACK, InviteSessionStatus.ready); inviteStreamService.updateInviteInfo(inviteInfo); @@ -760,10 +751,10 @@ inviteStreamService.removeInviteInfo(inviteInfo); }; - ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInuse, jsonObject) -> { - logger.info("鏀跺埌鍥炴斁璁㈤槄娑堟伅锛� " + jsonObject); + ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInuse, hookParam) -> { + logger.info("鏀跺埌鍥炴斁璁㈤槄娑堟伅锛� " + hookParam); dynamicTask.stop(playBackTimeOutTaskKey); - StreamInfo streamInfo = onPublishHandlerForPlayback(mediaServerItemInuse, jsonObject, deviceId, channelId, startTime, endTime); + StreamInfo streamInfo = onPublishHandlerForPlayback(mediaServerItemInuse, hookParam, deviceId, channelId, startTime, endTime); if (streamInfo == null) { logger.warn("璁惧鍥炴斁API璋冪敤澶辫触锛�"); callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(), @@ -829,17 +820,8 @@ if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) { logger.info("[褰曞儚鍥炴斁] SSRC淇 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse); - if (!ssrcFactory.checkSsrc(mediaServerItem.getId(),ssrcInResponse)) { - // ssrc 涓嶅彲鐢� - logger.info("[褰曞儚鍥炴斁] SSRC淇鏃跺彂鐜皊src涓嶅彲浣跨敤 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse); - // 閲婃斁ssrc - dynamicTask.stop(playBackTimeOutTaskKey); - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); - callback.run(InviteErrorCode.ERROR_FOR_SSRC_UNAVAILABLE.getCode(), - InviteErrorCode.ERROR_FOR_SSRC_UNAVAILABLE.getMsg(), null); - return; - } + // 閲婃斁ssrc + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); // 鍗曠鍙fā寮弒treamId涔熸湁鍙樺寲锛岄渶瑕侀噸鏂拌缃洃鍚� if (!mediaServerItem.isRtpEnable()) { @@ -848,12 +830,12 @@ subscribe.removeSubscribe(hookSubscribe); String stream = String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase(); hookSubscribe.getContent().put("stream", stream); - inviteInfo.setStream(stream); - subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> { - logger.info("[ZLM HOOK] ssrc淇鍚庢敹鍒拌闃呮秷鎭細 " + response.toJSONString()); + inviteStreamService.updateInviteInfoForStream(inviteInfo, stream); + subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> { + logger.info("[ZLM HOOK] ssrc淇鍚庢敹鍒拌闃呮秷鎭細 " + hookParam); dynamicTask.stop(playBackTimeOutTaskKey); // hook鍝嶅簲 - hookEvent.response(mediaServerItemInUse, response); + hookEvent.response(mediaServerItemInUse, hookParam); }); } // 鏇存柊ssrc @@ -877,6 +859,10 @@ "涓嬬骇鑷畾涔変簡ssrc,閲嶆柊璁剧疆鏀舵祦淇℃伅澶辫触", null); }else { + if (ssrcInfo.getStream()!= null && !ssrcInfo.getStream().equals(inviteInfo.getStream())) { + inviteStreamService.removeInviteInfo(inviteInfo); + } + ssrcInfo.setSsrc(ssrcInResponse); inviteInfo.setSsrcInfo(ssrcInfo); inviteInfo.setStream(ssrcInfo.getStream()); @@ -900,7 +886,7 @@ @Override - public void download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteErrorCallback<Object> callback) { + public void download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback<Object> callback) { Device device = storager.queryVideoDevice(deviceId); if (device == null) { return; @@ -918,7 +904,7 @@ @Override - public void download(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteErrorCallback<Object> callback) { + public void download(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback<Object> callback) { if (mediaServerItem == null || ssrcInfo == null) { callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(), InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(), @@ -934,7 +920,7 @@ } logger.info("[褰曞儚涓嬭浇] deviceId: {}, channelId: {}, 涓嬭浇閫熷害锛歿}, 鏀舵祦绔彛锛歿}, 鏀舵祦妯″紡锛歿}, SSRC: {}, SSRC鏍¢獙锛歿}", device.getDeviceId(), channelId, downloadSpeed, ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck()); // 鍒濆鍖杛edis涓殑invite娑堟伅鐘舵�� - InviteInfo inviteInfo = InviteInfo.getinviteInfo(device.getDeviceId(), channelId, ssrcInfo.getStream(), ssrcInfo, + InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channelId, ssrcInfo.getStream(), ssrcInfo, mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.DOWNLOAD, InviteSessionStatus.ready); inviteStreamService.updateInviteInfo(inviteInfo); @@ -964,10 +950,10 @@ streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); inviteStreamService.removeInviteInfo(inviteInfo); }; - ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInuse, jsonObject) -> { - logger.info("[褰曞儚涓嬭浇]鏀跺埌璁㈤槄娑堟伅锛� " + jsonObject); + ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInuse, hookParam) -> { + logger.info("[褰曞儚涓嬭浇]鏀跺埌璁㈤槄娑堟伅锛� " + hookParam); dynamicTask.stop(downLoadTimeOutTaskKey); - StreamInfo streamInfo = onPublishHandlerForDownload(mediaServerItemInuse, jsonObject, deviceId, channelId, startTime, endTime); + StreamInfo streamInfo = onPublishHandlerForDownload(mediaServerItemInuse, hookParam, deviceId, channelId, startTime, endTime); if (streamInfo == null) { logger.warn("[褰曞儚涓嬭浇] 鑾峰彇娴佸湴鍧�淇℃伅澶辫触"); callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(), @@ -1032,26 +1018,21 @@ if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) { logger.info("[褰曞儚涓嬭浇] SSRC淇 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse); - if (!ssrcFactory.checkSsrc(mediaServerItem.getId(),ssrcInResponse)) { - // ssrc 涓嶅彲鐢� - // 閲婃斁ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); - callback.run(InviteErrorCode.ERROR_FOR_SSRC_UNAVAILABLE.getCode(), - InviteErrorCode.ERROR_FOR_SSRC_UNAVAILABLE.getMsg(), null); - return; - } + // 閲婃斁ssrc + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); // 鍗曠鍙fā寮弒treamId涔熸湁鍙樺寲锛岄渶瑕侀噸鏂拌缃洃鍚� if (!mediaServerItem.isRtpEnable()) { // 娣诲姞璁㈤槄 HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId()); subscribe.removeSubscribe(hookSubscribe); - hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase()); - subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> { - logger.info("[ZLM HOOK] ssrc淇鍚庢敹鍒拌闃呮秷鎭細 " + response.toJSONString()); + String stream = String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase(); + hookSubscribe.getContent().put("stream", stream); + inviteStreamService.updateInviteInfoForStream(inviteInfo, stream); + subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> { + logger.info("[ZLM HOOK] ssrc淇鍚庢敹鍒拌闃呮秷鎭細 " + hookParam); dynamicTask.stop(downLoadTimeOutTaskKey); - hookEvent.response(mediaServerItemInUse, response); + hookEvent.response(mediaServerItemInUse, hookParam); }); } @@ -1075,6 +1056,9 @@ "涓嬬骇鑷畾涔変簡ssrc,閲嶆柊璁剧疆鏀舵祦淇℃伅澶辫触", null); }else { + if (ssrcInfo.getStream()!= null && !ssrcInfo.getStream().equals(inviteInfo.getStream())) { + inviteStreamService.removeInviteInfo(inviteInfo); + } ssrcInfo.setSsrc(ssrcInResponse); inviteInfo.setSsrcInfo(ssrcInfo); inviteInfo.setStream(ssrcInfo.getStream()); @@ -1083,6 +1067,7 @@ logger.info("[褰曞儚涓嬭浇] 鏀跺埌invite 200, 涓嬬骇鑷畾涔変簡ssrc, 浣嗘槸褰撳墠妯″紡鏃犻渶淇"); } } + inviteStreamService.updateInviteInfo(inviteInfo); }); } catch (InvalidArgumentException | SipException | ParseException e) { logger.error("[鍛戒护鍙戦�佸け璐 褰曞儚涓嬭浇: {}", e.getMessage()); @@ -1141,8 +1126,9 @@ return null; } - private StreamInfo onPublishHandlerForDownload(MediaServerItem mediaServerItemInuse, JSONObject response, String deviceId, String channelId, String startTime, String endTime) { - StreamInfo streamInfo = onPublishHandler(mediaServerItemInuse, response, deviceId, channelId); + private StreamInfo onPublishHandlerForDownload(MediaServerItem mediaServerItemInuse, HookParam hookParam, String deviceId, String channelId, String startTime, String endTime) { + OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam) hookParam; + StreamInfo streamInfo = onPublishHandler(mediaServerItemInuse, streamChangedHookParam, deviceId, channelId); if (streamInfo != null) { streamInfo.setProgress(0); streamInfo.setStartTime(startTime); @@ -1159,14 +1145,13 @@ } - public StreamInfo onPublishHandler(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId) { - String streamId = resonse.getString("stream"); - JSONArray tracks = resonse.getJSONArray("tracks"); - StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, "rtp", streamId, tracks, null); + public StreamInfo onPublishHandler(MediaServerItem mediaServerItem, OnStreamChangedHookParam hookParam, String deviceId, String channelId) { + StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, "rtp", hookParam.getStream(), hookParam.getTracks(), null); streamInfo.setDeviceID(deviceId); streamInfo.setChannelId(channelId); return streamInfo; } + @Override public void zlmServerOffline(String mediaServerId) { @@ -1441,14 +1426,10 @@ @Override public void startPushStream(SendRtpItem sendRtpItem, SIPResponse sipResponse, ParentPlatform platform, CallIdHeader callIdHeader) { - // 寮�濮嬪彂娴� - // 鍙栨秷璁剧疆鐨勮秴鏃朵换鍔� -// String channelId = request.getCallIdHeader().getCallId(); - String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - logger.info("鏀跺埌ACK锛宺tp/{}寮�濮嬫帹娴�, 鐩爣={}:{}锛孲SRC={}, RTCP={}", sendRtpItem.getStream(), + logger.info("[寮�濮嬫帹娴乚 rtp/{}, 鐩爣={}:{}锛孲SRC={}, RTCP={}", sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp()); Map<String, Object> param = new HashMap<>(12); param.put("vhost", "__defaultVhost__"); @@ -1474,19 +1455,15 @@ startSendRtpStreamHand(sendRtpItem, platform, json, param, callIdHeader); }); } else { - // 濡傛灉鏄潪涓ユ牸妯″紡锛岄渶瑕佸叧闂鍙e崰鐢� + // 濡傛灉鏄弗鏍兼ā寮忥紝闇�瑕佸叧闂鍙e崰鐢� JSONObject startSendRtpStreamResult = null; if (sendRtpItem.getLocalPort() != 0) { - HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(sendRtpItem.getSsrc(), null, mediaInfo.getId()); - hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout); - if (zlmrtpServerFactory.releasePort(mediaInfo, sendRtpItem.getSsrc())) { - if (sendRtpItem.isTcpActive()) { - startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param); - } else { - param.put("dst_url", sendRtpItem.getIp()); - param.put("dst_port", sendRtpItem.getPort()); - startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); - } + if (sendRtpItem.isTcpActive()) { + startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param); + } else { + param.put("dst_url", sendRtpItem.getIp()); + param.put("dst_port", sendRtpItem.getPort()); + startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); } } else { if (sendRtpItem.isTcpActive()) { @@ -1510,7 +1487,8 @@ logger.error("RTP鎺ㄦ祦澶辫触: 璇锋鏌LM鏈嶅姟"); } else if (jsonObject.getInteger("code") == 0) { logger.info("璋冪敤ZLM鎺ㄦ祦鎺ュ彛, 缁撴灉锛� {}", jsonObject); - logger.info("RTP鎺ㄦ祦鎴愬姛[ {}/{} ]锛寋}->{}:{}, ", param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port")); + logger.info("RTP鎺ㄦ祦鎴愬姛[ {}/{} ]锛寋}->{}, ", param.get("app"), param.get("stream"), jsonObject.getString("local_port"), + sendRtpItem.isTcpActive()?"琚姩鍙戞祦": param.get("dst_url") + ":" + param.get("dst_port")); } else { logger.error("RTP鎺ㄦ祦澶辫触: {}, 鍙傛暟锛歿}", jsonObject.getString("msg"), JSONObject.toJSONString(param)); if (sendRtpItem.isOnlyAudio()) { @@ -1578,7 +1556,7 @@ } } - talk(mediaServerItem, device, channelId, stream, (MediaServerItem mediaServerItem1, JSONObject response) -> { + talk(mediaServerItem, device, channelId, stream, (mediaServerItem1, hookParam) -> { logger.info("[璇煶瀵硅] 鏀跺埌璁惧鍙戞潵鐨勬祦"); }, eventResult -> { logger.warn("[璇煶瀵硅] 澶辫触锛寋}/{}, 閿欒鐮� {} {}", device.getDeviceId(), channelId, eventResult.statusCode, eventResult.msg); @@ -1635,4 +1613,52 @@ } redisCatchStorage.deleteSendRTPServer(device.getDeviceId(), channelId,null, null); } + + @Override + public void getSnap(String deviceId, String channelId, String fileName, ErrorCallback errorCallback) { + Device device = deviceService.getDevice(deviceId); + if (device == null) { + errorCallback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(), InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(), null); + return; + } + InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId); + if (inviteInfo != null) { + if (inviteInfo.getStreamInfo() != null) { + // 宸插瓨鍦ㄧ嚎鐩存帴鎴浘 + MediaServerItem mediaServerItemInuse = mediaServerService.getOne(inviteInfo.getStreamInfo().getMediaServerId()); + String streamUrl; + if (mediaServerItemInuse.getRtspPort() != 0) { + streamUrl = String.format("rtsp://127.0.0.1:%s/%s/%s", mediaServerItemInuse.getRtspPort(), "rtp", inviteInfo.getStreamInfo().getStream()); + }else { + streamUrl = String.format("http://127.0.0.1:%s/%s/%s.live.mp4", mediaServerItemInuse.getHttpPort(), "rtp", inviteInfo.getStreamInfo().getStream()); + } + String path = "snap"; + // 璇锋眰鎴浘 + logger.info("[璇锋眰鎴浘]: " + fileName); + zlmresTfulUtils.getSnap(mediaServerItemInuse, streamUrl, 15, 1, path, fileName); + File snapFile = new File(path + File.separator + fileName); + if (snapFile.exists()) { + errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), snapFile.getAbsoluteFile()); + }else { + errorCallback.run(InviteErrorCode.FAIL.getCode(), InviteErrorCode.FAIL.getMsg(), null); + } + return; + } + } + + MediaServerItem newMediaServerItem = getNewMediaServerItem(device); + play(newMediaServerItem, deviceId, channelId, (code, msg, data)->{ + if (code == InviteErrorCode.SUCCESS.getCode()) { + InviteInfo inviteInfoForPlay = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId); + if (inviteInfoForPlay != null && inviteInfoForPlay.getStreamInfo() != null) { + getSnap(deviceId, channelId, fileName, errorCallback); + }else { + errorCallback.run(InviteErrorCode.FAIL.getCode(), InviteErrorCode.FAIL.getMsg(), null); + } + }else { + errorCallback.run(InviteErrorCode.FAIL.getCode(), InviteErrorCode.FAIL.getMsg(), null); + } + }); + } + } -- Gitblit v1.8.0