| | |
| | | package com.genersoft.iot.vmp.service.impl; |
| | | |
| | | import com.alibaba.fastjson2.JSON; |
| | | import com.alibaba.fastjson2.JSONArray; |
| | | import com.alibaba.fastjson2.JSONObject; |
| | | import com.baomidou.dynamic.datasource.annotation.DS; |
| | | import com.genersoft.iot.vmp.common.InviteInfo; |
| | | import com.genersoft.iot.vmp.common.InviteSessionStatus; |
| | | import com.genersoft.iot.vmp.common.InviteSessionType; |
| | |
| | | import com.genersoft.iot.vmp.gb28181.bean.*; |
| | | import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; |
| | | import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; |
| | | import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; |
| | | import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; |
| | | import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; |
| | | import com.genersoft.iot.vmp.gb28181.utils.SipUtils; |
| | | import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils; |
| | | import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; |
| | | import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; |
| | | import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; |
| | |
| | | import com.genersoft.iot.vmp.storager.IRedisCatchStorage; |
| | | import com.genersoft.iot.vmp.storager.IVideoManagerStorage; |
| | | import com.genersoft.iot.vmp.storager.dao.CloudRecordServiceMapper; |
| | | import com.genersoft.iot.vmp.utils.CloudRecordUtils; |
| | | import com.genersoft.iot.vmp.utils.DateUtil; |
| | | import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; |
| | | import gov.nist.javax.sip.message.SIPResponse; |
| | |
| | | |
| | | @SuppressWarnings(value = {"rawtypes", "unchecked"}) |
| | | @Service |
| | | @DS("master") |
| | | public class PlayServiceImpl implements IPlayService { |
| | | |
| | | private final static Logger logger = LoggerFactory.getLogger(PlayServiceImpl.class); |
| | |
| | | HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId()); |
| | | subscribe.removeSubscribe(hookSubscribe); |
| | | } |
| | | }else { |
| | | logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},码流类型:{},端口:{}, SSRC: {}", |
| | | device.getDeviceId(), channelId, device.isSwitchPrimarySubStream() ? "辅码流" : "主码流", |
| | | ssrcInfo.getPort(), ssrcInfo.getSsrc()); |
| | | |
| | | mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); |
| | | |
| | | mediaServerService.closeRTPServer(mediaServerItem.getId(), ssrcInfo.getStream()); |
| | | streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); |
| | | } |
| | | }, userSetting.getPlayTimeout()); |
| | | |
| | |
| | | InviteOKHandler(eventResult, ssrcInfo, mediaServerItem, device, channelId, |
| | | timeOutTaskKey, callback, inviteInfo, InviteSessionType.PLAY); |
| | | }, (event) -> { |
| | | logger.info("[点播失败] deviceId: {}, channelId:{}, {}: {}", device.getDeviceId(), channelId, event.statusCode, event.msg); |
| | | dynamicTask.stop(timeOutTaskKey); |
| | | mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); |
| | | // 释放ssrc |
| | |
| | | if (!device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) { |
| | | return; |
| | | } |
| | | String substring = contentString.substring(0, contentString.indexOf("y=")); |
| | | |
| | | String substring; |
| | | if (contentString.indexOf("y=") > 0) { |
| | | substring = contentString.substring(0, contentString.indexOf("y=")); |
| | | }else { |
| | | substring = contentString; |
| | | } |
| | | try { |
| | | SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring); |
| | | int port = -1; |
| | |
| | | deviceChannel.setStreamId(streamInfo.getStream()); |
| | | storager.startPlay(deviceId, channelId, streamInfo.getStream()); |
| | | } |
| | | InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAYBACK, deviceId, channelId); |
| | | InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(InviteSessionType.PLAYBACK, ((OnStreamChangedHookParam) param).getStream()); |
| | | if (inviteInfo != null) { |
| | | inviteInfo.setStatus(InviteSessionStatus.ok); |
| | | |
| | |
| | | } |
| | | if (mediaServerItem == null) { |
| | | logger.warn("点播时未找到可使用的ZLM..."); |
| | | } |
| | | return mediaServerItem; |
| | | } |
| | | |
| | | @Override |
| | | public MediaServerItem getNewMediaServerItemHasAssist(Device device) { |
| | | if (device == null) { |
| | | return null; |
| | | } |
| | | MediaServerItem mediaServerItem; |
| | | if (ObjectUtils.isEmpty(device.getMediaServerId()) || "auto".equals(device.getMediaServerId())) { |
| | | mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(true); |
| | | } else { |
| | | mediaServerItem = mediaServerService.getOne(device.getMediaServerId()); |
| | | } |
| | | if (mediaServerItem == null) { |
| | | logger.warn("[获取可用的ZLM节点]未找到可使用的ZLM..."); |
| | | } |
| | | return mediaServerItem; |
| | | } |
| | |
| | | // 处理收到200ok后的TCP主动连接以及SSRC不一致的问题 |
| | | InviteOKHandler(eventResult, ssrcInfo, mediaServerItem, device, channelId, |
| | | playBackTimeOutTaskKey, callback, inviteInfo, InviteSessionType.PLAYBACK); |
| | | |
| | | }, errorEvent); |
| | | } catch (InvalidArgumentException | SipException | ParseException e) { |
| | | logger.error("[命令发送失败] 录像回放: {}", e.getMessage()); |
| | |
| | | ResponseEvent responseEvent = (ResponseEvent) eventResult.event; |
| | | String contentString = new String(responseEvent.getResponse().getRawContent()); |
| | | String ssrcInResponse = SipUtils.getSsrcFromSdp(contentString); |
| | | // 兼容回复的消息中缺少ssrc(y字段)的情况 |
| | | if (ssrcInResponse == null) { |
| | | ssrcInResponse = ssrcInfo.getSsrc(); |
| | | } |
| | | if (ssrcInfo.getSsrc().equals(ssrcInResponse)) { |
| | | // ssrc 一致 |
| | | if (mediaServerItem.isRtpEnable()) { |
| | |
| | | |
| | | |
| | | |
| | | |
| | | @Override |
| | | public void download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback<Object> callback) { |
| | | Device device = storager.queryVideoDevice(deviceId); |
| | | if (device == null) { |
| | | return; |
| | | } |
| | | MediaServerItem newMediaServerItem = getNewMediaServerItemHasAssist(device); |
| | | MediaServerItem newMediaServerItem = this.getNewMediaServerItem(device); |
| | | if (newMediaServerItem == null) { |
| | | callback.run(InviteErrorCode.ERROR_FOR_ASSIST_NOT_READY.getCode(), |
| | | InviteErrorCode.ERROR_FOR_ASSIST_NOT_READY.getMsg(), |
| | |
| | | // 处理收到200ok后的TCP主动连接以及SSRC不一致的问题 |
| | | InviteOKHandler(eventResult, ssrcInfo, mediaServerItem, device, channelId, |
| | | downLoadTimeOutTaskKey, callback, inviteInfo, InviteSessionType.DOWNLOAD); |
| | | |
| | | // 注册录像回调事件,录像下载结束后写入下载地址 |
| | | ZlmHttpHookSubscribe.Event hookEventForRecord = (mediaServerItemInuse, hookParam) -> { |
| | | logger.info("[录像下载] 收到录像写入磁盘消息: , {}/{}-{}", |
| | | inviteInfo.getDeviceId(), inviteInfo.getChannelId(), ssrcInfo.getStream()); |
| | | logger.info("[录像下载] 收到录像写入磁盘消息内容: " + hookParam); |
| | | OnRecordMp4HookParam recordMp4HookParam = (OnRecordMp4HookParam)hookParam; |
| | | String filePath = recordMp4HookParam.getFile_path(); |
| | | DownloadFileInfo downloadFileInfo = CloudRecordUtils.getDownloadFilePath(mediaServerItem, filePath); |
| | | InviteInfo inviteInfoForNew = inviteStreamService.getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId() |
| | | , inviteInfo.getChannelId(), inviteInfo.getStream()); |
| | | inviteInfoForNew.getStreamInfo().setDownLoadFilePath(downloadFileInfo); |
| | | inviteStreamService.updateInviteInfo(inviteInfoForNew); |
| | | }; |
| | | HookSubscribeForRecordMp4 hookSubscribe = HookSubscribeFactory.on_record_mp4( |
| | | mediaServerItem.getId(), "rtp", ssrcInfo.getStream()); |
| | | |
| | | // 设置过期时间,下载失败时自动处理订阅数据 |
| | | // long difference = DateUtil.getDifference(startTime, endTime)/1000; |
| | | // Instant expiresInstant = Instant.now().plusSeconds(TimeUnit.MINUTES.toSeconds(difference * 2)); |
| | | // hookSubscribe.setExpires(expiresInstant); |
| | | subscribe.addSubscribe(hookSubscribe, hookEventForRecord); |
| | | }); |
| | | } catch (InvalidArgumentException | SipException | ParseException e) { |
| | | logger.error("[命令发送失败] 录像下载: {}", e.getMessage()); |
| | |
| | | BigDecimal totalCount = new BigDecimal((end - start) * 1000); |
| | | BigDecimal divide = currentCount.divide(totalCount, 2, RoundingMode.HALF_UP); |
| | | double process = divide.doubleValue(); |
| | | if (process > 0.999) { |
| | | process = 1.0; |
| | | } |
| | | inviteInfo.getStreamInfo().setProgress(process); |
| | | } |
| | | inviteStreamService.updateInviteInfo(inviteInfo); |
| | | return inviteInfo.getStreamInfo(); |
| | | } |
| | | |
| | | @Override |
| | | public void getFilePath(String deviceId, String channelId, String stream, ErrorCallback<DownloadFileInfo> callback) { |
| | | InviteInfo inviteInfo = inviteStreamService.getInviteInfo(InviteSessionType.DOWNLOAD, deviceId, channelId, stream); |
| | | if (inviteInfo == null || inviteInfo.getStreamInfo() == null) { |
| | | logger.warn("[获取录像下载文件地址] 未查询到录像下载的信息, {}/{}-{}", deviceId, channelId, stream); |
| | | callback.run(ErrorCode.ERROR100.getCode(), "未查询到录像下载的信息", null); |
| | | return ; |
| | | } |
| | | |
| | | if (!ObjectUtils.isEmpty(inviteInfo.getStreamInfo().getDownLoadFilePath())) { |
| | | callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), |
| | | inviteInfo.getStreamInfo().getDownLoadFilePath()); |
| | | return; |
| | | } |
| | | |
| | | StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo("rtp", stream); |
| | | if (streamAuthorityInfo == null) { |
| | | logger.warn("[获取录像下载文件地址] 未查询到录像的视频信息, {}/{}-{}", deviceId, channelId, stream); |
| | | callback.run(ErrorCode.ERROR100.getCode(), "未查询到录像的视频信息", null); |
| | | return ; |
| | | } |
| | | |
| | | // 获取当前已下载时长 |
| | | String mediaServerId = inviteInfo.getStreamInfo().getMediaServerId(); |
| | | MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); |
| | | if (mediaServerItem == null) { |
| | | logger.warn("[获取录像下载文件地址] 查询录像信息时发现节点不存在, {}/{}-{}", deviceId, channelId, stream); |
| | | callback.run(ErrorCode.ERROR100.getCode(), "查询录像信息时发现节点不存在", null); |
| | | return ; |
| | | } |
| | | |
| | | List<CloudRecordItem> cloudRecordItemList = cloudRecordServiceMapper.getListByCallId(streamAuthorityInfo.getCallId()); |
| | | if (!cloudRecordItemList.isEmpty()) { |
| | | String filePath = cloudRecordItemList.get(0).getFilePath(); |
| | | |
| | | DownloadFileInfo downloadFileInfo = getDownloadFilePath(mediaServerItem, filePath); |
| | | inviteInfo.getStreamInfo().setDownLoadFilePath(downloadFileInfo); |
| | | inviteStreamService.updateInviteInfo(inviteInfo); |
| | | callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), downloadFileInfo); |
| | | }else { |
| | | // 可能尚未生成,那就监听hook等着收到对应的录像通知 |
| | | ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInuse, hookParam) -> { |
| | | logger.info("[录像下载]收到订阅消息: , {}/{}-{}", deviceId, channelId, stream); |
| | | logger.info("[录像下载]收到订阅消息内容: " + hookParam); |
| | | dynamicTask.stop(streamAuthorityInfo.getCallId()); |
| | | OnRecordMp4HookParam recordMp4HookParam = (OnRecordMp4HookParam)hookParam; |
| | | String filePath = recordMp4HookParam.getFile_path(); |
| | | DownloadFileInfo downloadFileInfo = getDownloadFilePath(mediaServerItem, filePath); |
| | | inviteInfo.getStreamInfo().setDownLoadFilePath(downloadFileInfo); |
| | | inviteStreamService.updateInviteInfo(inviteInfo); |
| | | callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), downloadFileInfo); |
| | | }; |
| | | HookSubscribeForRecordMp4 hookSubscribe = HookSubscribeFactory.on_record_mp4(mediaServerId, "rtp", stream); |
| | | subscribe.addSubscribe(hookSubscribe, hookEvent); |
| | | |
| | | // 设置超时,超时结束监听 |
| | | dynamicTask.startDelay(streamAuthorityInfo.getCallId(), ()->{ |
| | | logger.info("[录像下载] 接收hook超时, {}/{}-{}", deviceId, channelId, stream); |
| | | subscribe.removeSubscribe(hookSubscribe); |
| | | callback.run(ErrorCode.ERROR100.getCode(), "接收hook超时", null); |
| | | }, 10000); |
| | | } |
| | | } |
| | | |
| | | private DownloadFileInfo getDownloadFilePath(MediaServerItem mediaServerItem, String filePath) { |
| | | DownloadFileInfo downloadFileInfo = new DownloadFileInfo(); |
| | | |
| | | String pathTemplate = "%s://%s:%s/index/api/downloadFile?file_path=" + filePath; |
| | | |
| | | downloadFileInfo.setHttpPath(String.format(pathTemplate, "http", mediaServerItem.getStreamIp(), |
| | | mediaServerItem.getHttpPort())); |
| | | |
| | | if (mediaServerItem.getHttpSSlPort() > 0) { |
| | | downloadFileInfo.setHttpsPath(String.format(pathTemplate, "https", mediaServerItem.getStreamIp(), |
| | | mediaServerItem.getHttpSSlPort())); |
| | | } |
| | | return downloadFileInfo; |
| | | } |
| | | |
| | | private StreamInfo onPublishHandlerForDownload(MediaServerItem mediaServerItemInuse, HookParam hookParam, String deviceId, String channelId, String startTime, String endTime) { |
| | |
| | | throw new ServiceException("mediaServer不存在"); |
| | | } |
| | | // zlm 暂停RTP超时检查 |
| | | JSONObject jsonObject = zlmresTfulUtils.pauseRtpCheck(mediaServerItem, streamId); |
| | | // 使用zlm中的流ID |
| | | String streamKey = inviteInfo.getStream(); |
| | | if (!mediaServerItem.isRtpEnable()) { |
| | | streamKey = Long.toHexString(Long.parseLong(inviteInfo.getSsrcInfo().getSsrc())).toUpperCase(); |
| | | } |
| | | JSONObject jsonObject = zlmresTfulUtils.pauseRtpCheck(mediaServerItem, streamKey); |
| | | if (jsonObject == null || jsonObject.getInteger("code") != 0) { |
| | | throw new ServiceException("暂停RTP接收失败"); |
| | | } |
| | |
| | | throw new ServiceException("mediaServer不存在"); |
| | | } |
| | | // zlm 暂停RTP超时检查 |
| | | JSONObject jsonObject = zlmresTfulUtils.resumeRtpCheck(mediaServerItem, streamId); |
| | | // 使用zlm中的流ID |
| | | String streamKey = inviteInfo.getStream(); |
| | | if (!mediaServerItem.isRtpEnable()) { |
| | | streamKey = Long.toHexString(Long.parseLong(inviteInfo.getSsrcInfo().getSsrc())).toUpperCase(); |
| | | } |
| | | JSONObject jsonObject = zlmresTfulUtils.resumeRtpCheck(mediaServerItem, streamKey); |
| | | if (jsonObject == null || jsonObject.getInteger("code") != 0) { |
| | | throw new ServiceException("继续RTP接收失败"); |
| | | } |