From dea44dcd78418ed3e7f191a73cee2b81a7a0019f Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期五, 13 一月 2023 16:21:17 +0800 Subject: [PATCH] Merge branch 'wvp-28181-2.0' --- src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java | 137 +++++++++++++++++++++------------------------ 1 files changed, 63 insertions(+), 74 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index 616aaad..d198882 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -45,11 +45,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; import org.springframework.util.ObjectUtils; -import org.springframework.web.context.request.async.DeferredResult; import javax.sip.InvalidArgumentException; import javax.sip.ResponseEvent; @@ -454,6 +451,9 @@ mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); + // 鍙栨秷璁㈤槄娑堟伅鐩戝惉 + HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId()); + subscribe.removeSubscribe(hookSubscribe); } } }, userSetting.getPlayTimeout()); @@ -463,7 +463,6 @@ dynamicTask.stop(timeOutTaskKey); // 閲婃斁ssrc mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); RequestMessage msg = new RequestMessage(); @@ -481,7 +480,7 @@ onPublishHandlerForPlay(mediaServerItemInuse, response, device.getDeviceId(), channelId); hookEvent.response(mediaServerItemInuse, response); logger.info("[鐐规挱鎴愬姛] deviceId: {}, channelId: {}", device.getDeviceId(), channelId); - String streamUrl = String.format("rtsp://127.0.0.1:%s/%s/%s", mediaServerItemInuse.getRtspPort(), "rtp", ssrcInfo.getStream()); + String streamUrl = String.format("http://127.0.0.1:%s/%s/%s.live.flv", mediaServerItemInuse.getHttpPort(), "rtp", ssrcInfo.getStream()); String path = "snap"; String fileName = device.getDeviceId() + "_" + channelId + ".jpg"; // 璇锋眰鎴浘 @@ -589,14 +588,10 @@ } } - private void onPublishHandlerForPlayback(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String uuid) { - RequestMessage msg = new RequestMessage(); - msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId); - if (!ObjectUtils.isEmpty(uuid)) { - msg.setId(uuid); - } - StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId); + private void onPublishHandlerForPlayback(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, PlayBackCallback playBackCallback) { + StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId); + PlayBackResult<StreamInfo> playBackResult = new PlayBackResult<>(); if (streamInfo != null) { DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId); if (deviceChannel != null) { @@ -605,17 +600,16 @@ } redisCatchStorage.startPlay(streamInfo); - WVPResult wvpResult = new WVPResult(); - wvpResult.setCode(ErrorCode.SUCCESS.getCode()); - wvpResult.setMsg(ErrorCode.SUCCESS.getMsg()); - wvpResult.setData(streamInfo); - msg.setData(wvpResult); - resultHolder.invokeAllResult(msg); + playBackResult.setCode(ErrorCode.SUCCESS.getCode()); + playBackResult.setMsg(ErrorCode.SUCCESS.getMsg()); + playBackResult.setData(streamInfo); + playBackCallback.call(playBackResult); } else { logger.warn("褰曞儚鍥炴斁璋冪敤澶辫触锛�"); - msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), "褰曞儚鍥炴斁璋冪敤澶辫触锛�")); - resultHolder.invokeAllResult(msg); + playBackResult.setCode(ErrorCode.ERROR100.getCode()); + playBackResult.setMsg("褰曞儚鍥炴斁璋冪敤澶辫触锛�"); + playBackCallback.call(playBackResult); } } @@ -626,7 +620,7 @@ } MediaServerItem mediaServerItem; if (ObjectUtils.isEmpty(device.getMediaServerId()) || "auto".equals(device.getMediaServerId())) { - mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(); + mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(null); } else { mediaServerItem = mediaServerService.getOne(device.getMediaServerId()); } @@ -637,45 +631,56 @@ } @Override - public DeferredResult<WVPResult<StreamInfo>> playBack(String deviceId, String channelId, String startTime, + public MediaServerItem getNewMediaServerItemHasAssist(Device device) { + if (device == null) { + return null; + } + MediaServerItem mediaServerItem; + if (ObjectUtils.isEmpty(device.getMediaServerId()) || "auto".equals(device.getMediaServerId())) { + mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(true); + } else { + mediaServerItem = mediaServerService.getOne(device.getMediaServerId()); + } + if (mediaServerItem == null) { + logger.warn("[鑾峰彇鍙敤鐨刏LM鑺傜偣]鏈壘鍒板彲浣跨敤鐨刏LM..."); + } + return mediaServerItem; + } + + @Override + public void playBack(String deviceId, String channelId, String startTime, String endTime, InviteStreamCallback inviteStreamCallback, PlayBackCallback callback) { Device device = storager.queryVideoDevice(deviceId); if (device == null) { - return null; + return; } MediaServerItem newMediaServerItem = getNewMediaServerItem(device); SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, device.isSsrcCheck(), true); - return playBack(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, inviteStreamCallback, callback); + playBack(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, inviteStreamCallback, callback); } @Override - public DeferredResult<WVPResult<StreamInfo>> playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, + public void playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, InviteStreamCallback infoCallBack, PlayBackCallback playBackCallback) { if (mediaServerItem == null || ssrcInfo == null) { - return null; + return; } - String uuid = UUID.randomUUID().toString(); - String key = DeferredResultHolder.CALLBACK_CMD_PLAYBACK + deviceId + channelId; + Device device = storager.queryVideoDevice(deviceId); if (device == null) { throw new ControllerException(ErrorCode.ERROR100.getCode(), "璁惧锛� " + deviceId + "涓嶅瓨鍦�"); } - DeferredResult<WVPResult<StreamInfo>> result = new DeferredResult<>(30000L); - resultHolder.put(DeferredResultHolder.CALLBACK_CMD_PLAYBACK + deviceId + channelId, uuid, result); - RequestMessage requestMessage = new RequestMessage(); - requestMessage.setId(uuid); - requestMessage.setKey(key); - PlayBackResult<RequestMessage> playBackResult = new PlayBackResult<>(); + + PlayBackResult<StreamInfo> playBackResult = new PlayBackResult<>(); String playBackTimeOutTaskKey = UUID.randomUUID().toString(); dynamicTask.startDelay(playBackTimeOutTaskKey, () -> { logger.warn(String.format("璁惧鍥炴斁瓒呮椂锛宒eviceId锛�%s 锛宑hannelId锛�%s", deviceId, channelId)); playBackResult.setCode(ErrorCode.ERROR100.getCode()); playBackResult.setMsg("鍥炴斁瓒呮椂"); - playBackResult.setData(requestMessage); try { cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null); @@ -687,19 +692,14 @@ mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); streamSession.remove(deviceId, channelId, ssrcInfo.getStream()); } - // 鍥炲涔嬪墠鎵�鏈夌殑鐐规挱璇锋眰 playBackCallback.call(playBackResult); - result.setResult(WVPResult.fail(ErrorCode.ERROR100.getCode(), "鍥炴斁瓒呮椂")); - resultHolder.exist(DeferredResultHolder.CALLBACK_CMD_PLAYBACK + deviceId + channelId, uuid); }, userSetting.getPlayTimeout()); SipSubscribe.Event errorEvent = event -> { dynamicTask.stop(playBackTimeOutTaskKey); - requestMessage.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), String.format("鍥炴斁澶辫触锛� 閿欒鐮侊細 %s, %s", event.statusCode, event.msg))); playBackResult.setCode(ErrorCode.ERROR100.getCode()); playBackResult.setMsg(String.format("鍥炴斁澶辫触锛� 閿欒鐮侊細 %s, %s", event.statusCode, event.msg)); - playBackResult.setData(requestMessage); playBackResult.setEvent(event); playBackCallback.call(playBackResult); streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); @@ -717,11 +717,9 @@ return; } redisCatchStorage.startPlayback(streamInfo, inviteStreamInfo.getCallId()); - WVPResult<StreamInfo> success = WVPResult.success(streamInfo); - requestMessage.setData(success); playBackResult.setCode(ErrorCode.SUCCESS.getCode()); playBackResult.setMsg(ErrorCode.SUCCESS.getMsg()); - playBackResult.setData(requestMessage); + playBackResult.setData(streamInfo); playBackResult.setMediaServerItem(inviteStreamInfo.getMediaServerItem()); playBackResult.setResponse(inviteStreamInfo.getResponse()); playBackCallback.call(playBackResult); @@ -768,7 +766,7 @@ logger.info("[ZLM HOOK] ssrc淇鍚庢敹鍒拌闃呮秷鎭細 " + response.toJSONString()); dynamicTask.stop(playBackTimeOutTaskKey); // hook鍝嶅簲 - onPublishHandlerForPlayback(mediaServerItemInUse, response, device.getDeviceId(), channelId, uuid); + onPublishHandlerForPlayback(mediaServerItemInUse, response, device.getDeviceId(), channelId, playBackCallback); hookEvent.call(new InviteStreamInfo(mediaServerItem, null, eventResult.callId, "rtp", ssrcInfo.getStream())); }); } @@ -788,50 +786,45 @@ eventResult.msg = "鍛戒护鍙戦�佸け璐�"; errorEvent.response(eventResult); } - return result; } @Override - public DeferredResult<WVPResult<StreamInfo>> download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack) { + public void download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback playBackCallback) { Device device = storager.queryVideoDevice(deviceId); if (device == null) { - return null; + return; } - MediaServerItem newMediaServerItem = getNewMediaServerItem(device); + MediaServerItem newMediaServerItem = getNewMediaServerItemHasAssist(device); + if (newMediaServerItem == null) { + PlayBackResult<StreamInfo> downloadResult = new PlayBackResult<>(); + downloadResult.setCode(ErrorCode.ERROR100.getCode()); + downloadResult.setMsg("鏈壘鍒癮ssist鏈嶅姟"); + playBackCallback.call(downloadResult); + return; + } SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, device.isSsrcCheck(), true); - return download(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, downloadSpeed, infoCallBack, hookCallBack); + download(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, downloadSpeed, infoCallBack, playBackCallback); } + @Override - public DeferredResult<WVPResult<StreamInfo>> download(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack) { + public void download(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack) { if (mediaServerItem == null || ssrcInfo == null) { - return null; + return; } - String uuid = UUID.randomUUID().toString(); - String key = DeferredResultHolder.CALLBACK_CMD_DOWNLOAD + deviceId + channelId; - DeferredResult<WVPResult<StreamInfo>> result = new DeferredResult<>(30000L); + Device device = storager.queryVideoDevice(deviceId); if (device == null) { throw new ControllerException(ErrorCode.ERROR400.getCode(), "璁惧锛�" + deviceId + "涓嶅瓨鍦�"); } - - resultHolder.put(key, uuid, result); - RequestMessage requestMessage = new RequestMessage(); - requestMessage.setId(uuid); - requestMessage.setKey(key); - WVPResult<StreamInfo> wvpResult = new WVPResult<>(); - requestMessage.setData(wvpResult); - PlayBackResult<RequestMessage> downloadResult = new PlayBackResult<>(); - downloadResult.setData(requestMessage); + PlayBackResult<StreamInfo> downloadResult = new PlayBackResult<>(); String downLoadTimeOutTaskKey = UUID.randomUUID().toString(); dynamicTask.startDelay(downLoadTimeOutTaskKey, () -> { logger.warn(String.format("褰曞儚涓嬭浇璇锋眰瓒呮椂锛宒eviceId锛�%s 锛宑hannelId锛�%s", deviceId, channelId)); - wvpResult.setCode(ErrorCode.ERROR100.getCode()); - wvpResult.setMsg("褰曞儚涓嬭浇璇锋眰瓒呮椂"); downloadResult.setCode(ErrorCode.ERROR100.getCode()); downloadResult.setMsg("褰曞儚涓嬭浇璇锋眰瓒呮椂"); hookCallBack.call(downloadResult); @@ -846,16 +839,12 @@ mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); streamSession.remove(deviceId, channelId, ssrcInfo.getStream()); } - // 鍥炲涔嬪墠鎵�鏈夌殑鐐规挱璇锋眰 - hookCallBack.call(downloadResult); }, userSetting.getPlayTimeout()); SipSubscribe.Event errorEvent = event -> { dynamicTask.stop(downLoadTimeOutTaskKey); downloadResult.setCode(ErrorCode.ERROR100.getCode()); downloadResult.setMsg(String.format("褰曞儚涓嬭浇澶辫触锛� 閿欒鐮侊細 %s, %s", event.statusCode, event.msg)); - wvpResult.setCode(ErrorCode.ERROR100.getCode()); - wvpResult.setMsg(String.format("褰曞儚涓嬭浇澶辫触锛� 閿欒鐮侊細 %s, %s", event.statusCode, event.msg)); downloadResult.setEvent(event); hookCallBack.call(downloadResult); streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); @@ -870,11 +859,9 @@ streamInfo.setStartTime(startTime); streamInfo.setEndTime(endTime); redisCatchStorage.startDownload(streamInfo, inviteStreamInfo.getCallId()); - wvpResult.setCode(ErrorCode.SUCCESS.getCode()); - wvpResult.setMsg(ErrorCode.SUCCESS.getMsg()); - wvpResult.setData(streamInfo); downloadResult.setCode(ErrorCode.SUCCESS.getCode()); downloadResult.setMsg(ErrorCode.SUCCESS.getMsg()); + downloadResult.setData(streamInfo); downloadResult.setMediaServerItem(inviteStreamInfo.getMediaServerItem()); downloadResult.setResponse(inviteStreamInfo.getResponse()); hookCallBack.call(downloadResult); @@ -886,7 +873,6 @@ eventResult.msg = "鍛戒护鍙戦�佸け璐�"; errorEvent.response(eventResult); } - return result; } @Override @@ -906,7 +892,10 @@ } if (mediaServerItem.getRecordAssistPort() > 0) { JSONObject jsonObject = assistRESTfulUtils.fileDuration(mediaServerItem, streamInfo.getApp(), streamInfo.getStream(), null); - if (jsonObject != null && jsonObject.getInteger("code") == 0) { + if (jsonObject == null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "杩炴帴Assist鏈嶅姟澶辫触"); + } + if (jsonObject.getInteger("code") == 0) { long duration = jsonObject.getLong("data"); if (duration == 0) { -- Gitblit v1.8.0