648540858
2023-12-05 ad36354ef46a31f24b2583263f575d6736c0ad28
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
old mode 100644 new mode 100755
@@ -1,5 +1,7 @@
package com.genersoft.iot.vmp.service.impl;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.InviteInfo;
import com.genersoft.iot.vmp.common.InviteSessionStatus;
@@ -21,17 +23,15 @@
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.*;
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRecordMp4HookParam;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.service.bean.*;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.storager.dao.CloudRecordServiceMapper;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import gov.nist.javax.sip.message.SIPResponse;
@@ -75,16 +75,13 @@
    private IInviteStreamService inviteStreamService;
    @Autowired
    private DeferredResultHolder resultHolder;
    private ZlmHttpHookSubscribe subscribe;
    @Autowired
    private ZLMRESTfulUtils zlmresTfulUtils;
    @Autowired
    private ZLMServerFactory zlmServerFactory;
    @Autowired
    private AssistRESTfulUtils assistRESTfulUtils;
    @Autowired
    private IMediaService mediaService;
@@ -105,7 +102,7 @@
    private DynamicTask dynamicTask;
    @Autowired
    private ZlmHttpHookSubscribe subscribe;
    private CloudRecordServiceMapper cloudRecordServiceMapper;
    @Override
@@ -158,10 +155,7 @@
                }
            }
        }
        String streamId = null;
        if (mediaServerItem.isRtpEnable()) {
            streamId = String.format("%s_%s", device.getDeviceId(), channelId);
        }
        String streamId = String.format("%s_%s", device.getDeviceId(), channelId);;
        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, ssrc, device.isSsrcCheck(),  false, 0, false, device.getStreamModeForParam());
        if (ssrcInfo == null) {
            callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getMsg(), null);
@@ -457,16 +451,13 @@
            logger.warn("[录像回放] 单端口收流时不支持TCP主动方式收流 deviceId: {},channelId:{}", deviceId, channelId);
            throw new ControllerException(ErrorCode.ERROR100.getCode(), "单端口收流时不支持TCP主动方式收流");
        }
        String stream = null;
        if (newMediaServerItem.isRtpEnable()) {
            String startTimeStr = startTime.replace("-", "")
                    .replace(":", "")
                    .replace(" ", "");
            String endTimeTimeStr = endTime.replace("-", "")
                    .replace(":", "")
                    .replace(" ", "");
            stream = deviceId + "_" + channelId + "_" + startTimeStr + "_" + endTimeTimeStr;
        }
        String startTimeStr = startTime.replace("-", "")
                .replace(":", "")
                .replace(" ", "");
        String endTimeTimeStr = endTime.replace("-", "")
                .replace(":", "")
                .replace(" ", "");
        String stream = deviceId + "_" + channelId + "_" + startTimeStr + "_" + endTimeTimeStr;
        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, stream, null, device.isSsrcCheck(),  true, 0, false, device.getStreamModeForParam());
        playBack(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, callback);
    }
@@ -628,44 +619,13 @@
                if (ssrcInResponse != null) {
                    // 单端口
                    // 重新订阅流上线
                    HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp",
                            ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
                    subscribe.removeSubscribe(hookSubscribe);
                    SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(inviteInfo.getDeviceId(),
                            inviteInfo.getChannelId(), null, inviteInfo.getStream());
                    streamSession.remove(inviteInfo.getDeviceId(),
                            inviteInfo.getChannelId(), inviteInfo.getStream());
                    String stream = String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase();
                    hookSubscribe.getContent().put("stream", stream);
                    inviteStreamService.updateInviteInfoForStream(inviteInfo, stream);
                    inviteStreamService.updateInviteInfoForSSRC(inviteInfo, ssrcInResponse);
                    streamSession.put(device.getDeviceId(), channelId, ssrcTransaction.getCallId(),
                            stream, ssrcInResponse, mediaServerItem.getId(), (SIPResponse) responseEvent.getResponse(), inviteSessionType);
                    subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> {
                        logger.info("[Invite 200OK] ssrc修正后收到订阅消息: " + hookParam);
                        dynamicTask.stop(timeOutTaskKey);
                        subscribe.removeSubscribe(hookSubscribe);
                        // hook响应
                        StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInUse, hookParam, device.getDeviceId(), channelId);
                        if (streamInfo == null){
                            callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
                                    InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
                            inviteStreamService.call(inviteSessionType, device.getDeviceId(), channelId, null,
                                    InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
                                    InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
                            return;
                        }
                        callback.run(InviteErrorCode.SUCCESS.getCode(),
                                InviteErrorCode.SUCCESS.getMsg(), streamInfo);
                        inviteStreamService.call(inviteSessionType, device.getDeviceId(), channelId, null,
                                InviteErrorCode.SUCCESS.getCode(),
                                InviteErrorCode.SUCCESS.getMsg(),
                                streamInfo);
                        if (inviteSessionType == InviteSessionType.PLAY) {
                            snapOnPlay(mediaServerItemInUse, device.getDeviceId(), channelId, stream);
                        }
                    });
                            inviteInfo.getStream(), ssrcInResponse, mediaServerItem.getId(), (SIPResponse) responseEvent.getResponse(), inviteSessionType);
                }
            }
        }
@@ -773,47 +733,147 @@
    @Override
    public StreamInfo getDownLoadInfo(String deviceId, String channelId, String stream) {
        InviteInfo inviteInfo = inviteStreamService.getInviteInfo(InviteSessionType.DOWNLOAD, deviceId, channelId, stream);
        if (inviteInfo == null || inviteInfo.getStreamInfo() == null) {
            logger.warn("[获取下载进度] 未查询到录像下载的信息");
            return null;
        }
        if (inviteInfo != null && inviteInfo.getStreamInfo() != null) {
            if (inviteInfo.getStreamInfo().getProgress() == 1) {
                return inviteInfo.getStreamInfo();
            }
            // 获取当前已下载时长
            String mediaServerId = inviteInfo.getStreamInfo().getMediaServerId();
            MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
            if (mediaServerItem == null) {
                logger.warn("查询录像信息时发现节点已离线");
                return null;
            }
            if (mediaServerItem.getRecordAssistPort() > 0) {
                JSONObject jsonObject = assistRESTfulUtils.fileDuration(mediaServerItem, inviteInfo.getStreamInfo().getApp(), inviteInfo.getStreamInfo().getStream(), null);
                if (jsonObject == null) {
                    throw new ControllerException(ErrorCode.ERROR100.getCode(), "连接Assist服务失败");
                }
                if (jsonObject.getInteger("code") == 0) {
                    long duration = jsonObject.getLong("data");
                    if (duration == 0) {
                        inviteInfo.getStreamInfo().setProgress(0);
                    } else {
                        String startTime = inviteInfo.getStreamInfo().getStartTime();
                        String endTime = inviteInfo.getStreamInfo().getEndTime();
                        long start = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(startTime);
                        long end = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(endTime);
                        BigDecimal currentCount = new BigDecimal(duration / 1000);
                        BigDecimal totalCount = new BigDecimal(end - start);
                        BigDecimal divide = currentCount.divide(totalCount, 2, RoundingMode.HALF_UP);
                        double process = divide.doubleValue();
                        inviteInfo.getStreamInfo().setProgress(process);
                    }
                    inviteStreamService.updateInviteInfo(inviteInfo);
                }
            }
        if (inviteInfo.getStreamInfo().getProgress() == 1) {
            return inviteInfo.getStreamInfo();
        }
        return null;
        // 获取当前已下载时长
        String mediaServerId = inviteInfo.getStreamInfo().getMediaServerId();
        MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
        if (mediaServerItem == null) {
            logger.warn("[获取下载进度] 查询录像信息时发现节点不存在");
            return null;
        }
        SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(deviceId, channelId, null, stream);
        if (ssrcTransaction == null) {
            logger.warn("[获取下载进度] 下载已结束");
            return null;
        }
        JSONObject mediaListJson= zlmresTfulUtils.getMediaList(mediaServerItem, "rtp", stream);
        if (mediaListJson == null) {
            logger.warn("[获取下载进度] 从zlm查询进度失败");
            return null;
        }
        if (mediaListJson.getInteger("code") != 0) {
            logger.warn("[获取下载进度] 从zlm查询进度出现错误: {}", mediaListJson.getString("msg"));
            return null;
        }
        JSONArray data = mediaListJson.getJSONArray("data");
        if (data == null) {
            logger.warn("[获取下载进度] 从zlm查询进度时未返回数据");
            return null;
        }
        JSONObject mediaJSON = data.getJSONObject(0);
        JSONArray tracks = mediaJSON.getJSONArray("tracks");
        if (tracks.isEmpty()) {
            logger.warn("[获取下载进度] 从zlm查询进度时未返回数据");
            return null;
        }
        JSONObject jsonObject = tracks.getJSONObject(0);
        long duration = jsonObject.getLongValue("duration");
        if (duration == 0) {
            inviteInfo.getStreamInfo().setProgress(0);
        } else {
            String startTime = inviteInfo.getStreamInfo().getStartTime();
            String endTime = inviteInfo.getStreamInfo().getEndTime();
            // 此时start和end单位是秒
            long start = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(startTime);
            long end = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(endTime);
            BigDecimal currentCount = new BigDecimal(duration);
            BigDecimal totalCount = new BigDecimal((end - start) * 1000);
            BigDecimal divide = currentCount.divide(totalCount, 2, RoundingMode.HALF_UP);
            double process = divide.doubleValue();
            inviteInfo.getStreamInfo().setProgress(process);
        }
        inviteStreamService.updateInviteInfo(inviteInfo);
        return inviteInfo.getStreamInfo();
    }
    @Override
    public void getFilePath(String deviceId, String channelId, String stream, ErrorCallback<DownloadFileInfo> callback) {
        InviteInfo inviteInfo = inviteStreamService.getInviteInfo(InviteSessionType.DOWNLOAD, deviceId, channelId, stream);
        if (inviteInfo == null || inviteInfo.getStreamInfo() == null) {
            logger.warn("[获取录像下载文件地址] 未查询到录像下载的信息, {}/{}-{}", deviceId, channelId, stream);
            callback.run(ErrorCode.ERROR100.getCode(), "未查询到录像下载的信息", null);
            return ;
        }
        if (!ObjectUtils.isEmpty(inviteInfo.getStreamInfo().getDownLoadFilePath())) {
            callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(),
                    inviteInfo.getStreamInfo().getDownLoadFilePath());
            return;
        }
        StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo("rtp", stream);
        if (streamAuthorityInfo == null) {
            logger.warn("[获取录像下载文件地址] 未查询到录像的视频信息, {}/{}-{}", deviceId, channelId, stream);
            callback.run(ErrorCode.ERROR100.getCode(), "未查询到录像的视频信息", null);
            return ;
        }
        // 获取当前已下载时长
        String mediaServerId = inviteInfo.getStreamInfo().getMediaServerId();
        MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
        if (mediaServerItem == null) {
            logger.warn("[获取录像下载文件地址] 查询录像信息时发现节点不存在, {}/{}-{}", deviceId, channelId, stream);
            callback.run(ErrorCode.ERROR100.getCode(), "查询录像信息时发现节点不存在", null);
            return ;
        }
        List<CloudRecordItem> cloudRecordItemList =  cloudRecordServiceMapper.getListByCallId(streamAuthorityInfo.getCallId());
        if (!cloudRecordItemList.isEmpty()) {
            String filePath = cloudRecordItemList.get(0).getFilePath();
            DownloadFileInfo downloadFileInfo = getDownloadFilePath(mediaServerItem, filePath);
            inviteInfo.getStreamInfo().setDownLoadFilePath(downloadFileInfo);
            inviteStreamService.updateInviteInfo(inviteInfo);
            callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), downloadFileInfo);
        }else {
            // 可能尚未生成,那就监听hook等着收到对应的录像通知
            ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInuse, hookParam) -> {
                logger.info("[录像下载]收到订阅消息: , {}/{}-{}", deviceId, channelId, stream);
                logger.info("[录像下载]收到订阅消息内容: " + hookParam);
                dynamicTask.stop(streamAuthorityInfo.getCallId());
                OnRecordMp4HookParam recordMp4HookParam = (OnRecordMp4HookParam)hookParam;
                String filePath = recordMp4HookParam.getFile_path();
                DownloadFileInfo downloadFileInfo = getDownloadFilePath(mediaServerItem, filePath);
                inviteInfo.getStreamInfo().setDownLoadFilePath(downloadFileInfo);
                inviteStreamService.updateInviteInfo(inviteInfo);
                callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), downloadFileInfo);
            };
            HookSubscribeForRecordMp4 hookSubscribe = HookSubscribeFactory.on_record_mp4(mediaServerId, "rtp", stream);
            subscribe.addSubscribe(hookSubscribe, hookEvent);
            // 设置超时,超时结束监听
            dynamicTask.startDelay(streamAuthorityInfo.getCallId(), ()->{
                logger.info("[录像下载] 接收hook超时, {}/{}-{}", deviceId, channelId, stream);
                subscribe.removeSubscribe(hookSubscribe);
                callback.run(ErrorCode.ERROR100.getCode(), "接收hook超时", null);
            }, 10000);
        }
    }
    private DownloadFileInfo getDownloadFilePath(MediaServerItem mediaServerItem, String filePath) {
        DownloadFileInfo downloadFileInfo = new DownloadFileInfo();
        String pathTemplate = "%s://%s:%s/index/api/downloadFile?file_path=" + filePath;
        downloadFileInfo.setHttpPath(String.format(pathTemplate, "http", mediaServerItem.getStreamIp(),
                mediaServerItem.getHttpPort()));
        if (mediaServerItem.getHttpSSlPort() > 0) {
            downloadFileInfo.setHttpsPath(String.format(pathTemplate, "https", mediaServerItem.getStreamIp(),
                    mediaServerItem.getHttpSSlPort()));
        }
        return downloadFileInfo;
    }
    private StreamInfo onPublishHandlerForDownload(MediaServerItem mediaServerItemInuse, HookParam hookParam, String deviceId, String channelId, String startTime, String endTime) {