648540858
2024-01-08 b15e559eae82db811b31f1e3c47e3be027f20e27
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -1,5 +1,6 @@
package com.genersoft.iot.vmp.service.impl;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.InviteInfo;
import com.genersoft.iot.vmp.common.InviteSessionStatus;
@@ -13,27 +14,22 @@
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.*;
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRecordMp4HookParam;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.CloudRecordItem;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.service.bean.*;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.storager.dao.CloudRecordServiceMapper;
import com.genersoft.iot.vmp.utils.CloudRecordUtils;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import gov.nist.javax.sip.message.SIPResponse;
@@ -77,16 +73,13 @@
    private IInviteStreamService inviteStreamService;
    @Autowired
    private DeferredResultHolder resultHolder;
    private ZlmHttpHookSubscribe subscribe;
    @Autowired
    private ZLMRESTfulUtils zlmresTfulUtils;
    @Autowired
    private ZLMServerFactory zlmServerFactory;
    @Autowired
    private AssistRESTfulUtils assistRESTfulUtils;
    @Autowired
    private IMediaService mediaService;
@@ -105,9 +98,6 @@
    @Autowired
    private DynamicTask dynamicTask;
    @Autowired
    private ZlmHttpHookSubscribe subscribe;
    @Autowired
    private CloudRecordServiceMapper cloudRecordServiceMapper;
@@ -238,6 +228,15 @@
                    HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
                    subscribe.removeSubscribe(hookSubscribe);
                }
            }else {
                logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},码流类型:{},端口:{}, SSRC: {}",
                        device.getDeviceId(), channelId, device.isSwitchPrimarySubStream() ? "辅码流" : "主码流",
                        ssrcInfo.getPort(), ssrcInfo.getSsrc());
                mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
                mediaServerService.closeRTPServer(mediaServerItem.getId(), ssrcInfo.getStream());
                streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
            }
        }, userSetting.getPlayTimeout());
@@ -268,6 +267,7 @@
                InviteOKHandler(eventResult, ssrcInfo, mediaServerItem, device, channelId,
                        timeOutTaskKey, callback, inviteInfo, InviteSessionType.PLAY);
            }, (event) -> {
                logger.info("[点播失败] deviceId: {}, channelId:{}, {}: {}", device.getDeviceId(), channelId, event.statusCode, event.msg);
                dynamicTask.stop(timeOutTaskKey);
                mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
                // 释放ssrc
@@ -309,7 +309,13 @@
        if (!device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) {
            return;
        }
        String substring = contentString.substring(0, contentString.indexOf("y="));
        String substring;
        if (contentString.indexOf("y=") > 0) {
            substring = contentString.substring(0, contentString.indexOf("y="));
        }else {
            substring = contentString;
        }
        try {
            SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring);
            int port = -1;
@@ -399,7 +405,7 @@
                deviceChannel.setStreamId(streamInfo.getStream());
                storager.startPlay(deviceId, channelId, streamInfo.getStream());
            }
            InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAYBACK, deviceId, channelId);
            InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(InviteSessionType.PLAYBACK, ((OnStreamChangedHookParam) param).getStream());
            if (inviteInfo != null) {
                inviteInfo.setStatus(InviteSessionStatus.ok);
@@ -424,23 +430,6 @@
        }
        if (mediaServerItem == null) {
            logger.warn("点播时未找到可使用的ZLM...");
        }
        return mediaServerItem;
    }
    @Override
    public MediaServerItem getNewMediaServerItemHasAssist(Device device) {
        if (device == null) {
            return null;
        }
        MediaServerItem mediaServerItem;
        if (ObjectUtils.isEmpty(device.getMediaServerId()) || "auto".equals(device.getMediaServerId())) {
            mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(true);
        } else {
            mediaServerItem = mediaServerService.getOne(device.getMediaServerId());
        }
        if (mediaServerItem == null) {
            logger.warn("[获取可用的ZLM节点]未找到可使用的ZLM...");
        }
        return mediaServerItem;
    }
@@ -542,7 +531,6 @@
                        // 处理收到200ok后的TCP主动连接以及SSRC不一致的问题
                        InviteOKHandler(eventResult, ssrcInfo, mediaServerItem, device, channelId,
                                playBackTimeOutTaskKey, callback, inviteInfo, InviteSessionType.PLAYBACK);
                    }, errorEvent);
        } catch (InvalidArgumentException | SipException | ParseException e) {
            logger.error("[命令发送失败] 录像回放: {}", e.getMessage());
@@ -563,6 +551,10 @@
        ResponseEvent responseEvent = (ResponseEvent) eventResult.event;
        String contentString = new String(responseEvent.getResponse().getRawContent());
        String ssrcInResponse = SipUtils.getSsrcFromSdp(contentString);
        // 兼容回复的消息中缺少ssrc(y字段)的情况
        if (ssrcInResponse == null) {
            ssrcInResponse = ssrcInfo.getSsrc();
        }
        if (ssrcInfo.getSsrc().equals(ssrcInResponse)) {
            // ssrc 一致
            if (mediaServerItem.isRtpEnable()) {
@@ -641,13 +633,14 @@
    @Override
    public void download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback<Object> callback) {
        Device device = storager.queryVideoDevice(deviceId);
        if (device == null) {
            return;
        }
        MediaServerItem newMediaServerItem = getNewMediaServerItemHasAssist(device);
        MediaServerItem newMediaServerItem = this.getNewMediaServerItem(device);
        if (newMediaServerItem == null) {
            callback.run(InviteErrorCode.ERROR_FOR_ASSIST_NOT_READY.getCode(),
                    InviteErrorCode.ERROR_FOR_ASSIST_NOT_READY.getMsg(),
@@ -726,6 +719,28 @@
                        // 处理收到200ok后的TCP主动连接以及SSRC不一致的问题
                        InviteOKHandler(eventResult, ssrcInfo, mediaServerItem, device, channelId,
                                downLoadTimeOutTaskKey, callback, inviteInfo, InviteSessionType.DOWNLOAD);
                        // 注册录像回调事件,录像下载结束后写入下载地址
                        ZlmHttpHookSubscribe.Event hookEventForRecord = (mediaServerItemInuse, hookParam) -> {
                            logger.info("[录像下载] 收到录像写入磁盘消息: , {}/{}-{}",
                                    inviteInfo.getDeviceId(), inviteInfo.getChannelId(), ssrcInfo.getStream());
                            logger.info("[录像下载] 收到录像写入磁盘消息内容: " + hookParam);
                            OnRecordMp4HookParam recordMp4HookParam = (OnRecordMp4HookParam)hookParam;
                            String filePath = recordMp4HookParam.getFile_path();
                            DownloadFileInfo downloadFileInfo = CloudRecordUtils.getDownloadFilePath(mediaServerItem, filePath);
                            InviteInfo inviteInfoForNew = inviteStreamService.getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId()
                                    , inviteInfo.getChannelId(), inviteInfo.getStream());
                            inviteInfoForNew.getStreamInfo().setDownLoadFilePath(downloadFileInfo);
                            inviteStreamService.updateInviteInfo(inviteInfoForNew);
                        };
                        HookSubscribeForRecordMp4 hookSubscribe = HookSubscribeFactory.on_record_mp4(
                                mediaServerItem.getId(), "rtp", ssrcInfo.getStream());
                        // 设置过期时间,下载失败时自动处理订阅数据
//                        long difference = DateUtil.getDifference(startTime, endTime)/1000;
//                        Instant expiresInstant = Instant.now().plusSeconds(TimeUnit.MINUTES.toSeconds(difference * 2));
//                        hookSubscribe.setExpires(expiresInstant);
                        subscribe.addSubscribe(hookSubscribe, hookEventForRecord);
                    });
        } catch (InvalidArgumentException | SipException | ParseException e) {
            logger.error("[命令发送失败] 录像下载: {}", e.getMessage());
@@ -741,59 +756,71 @@
    @Override
    public StreamInfo getDownLoadInfo(String deviceId, String channelId, String stream) {
        InviteInfo inviteInfo = inviteStreamService.getInviteInfo(InviteSessionType.DOWNLOAD, deviceId, channelId, stream);
        if (inviteInfo == null || inviteInfo.getStreamInfo() == null) {
            logger.warn("[获取下载进度] 未查询到录像下载的信息");
            return null;
        }
        if (inviteInfo != null && inviteInfo.getStreamInfo() != null) {
            if (inviteInfo.getStreamInfo().getProgress() == 1) {
                return inviteInfo.getStreamInfo();
            }
            // 获取当前已下载时长
            String mediaServerId = inviteInfo.getStreamInfo().getMediaServerId();
            MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
            if (mediaServerItem == null) {
                logger.warn("查询录像信息时发现节点已离线");
                return null;
            }
            if (mediaServerItem.getRecordAssistPort() == 0) {
                throw new ControllerException(ErrorCode.ERROR100.getCode(), "未配置Assist服务,无法完成录像下载");
            }
            SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(deviceId, channelId, null, stream);
            if (ssrcTransaction == null) {
                logger.warn("[获取下载进度],未找到下载事务信息");
                return null;
            }
            // 为了支持多个数据库,这里不能使用求和函数来直接获取总数了
            List<CloudRecordItem> cloudRecordItemList = cloudRecordServiceMapper.getList(null, "rtp", inviteInfo.getStream(), null, null, ssrcTransaction.getCallId(), null);
            if (cloudRecordItemList.isEmpty()) {
                logger.warn("[获取下载进度],未找到下载视频信息");
                return null;
            }
            long duration = 0;
            for (CloudRecordItem cloudRecordItem : cloudRecordItemList) {
                duration += cloudRecordItem.getTimeLen();
            }
            if (duration == 0) {
                inviteInfo.getStreamInfo().setProgress(0);
            } else {
                String startTime = inviteInfo.getStreamInfo().getStartTime();
                String endTime = inviteInfo.getStreamInfo().getEndTime();
                long start = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(startTime);
                long end = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(endTime);
                BigDecimal currentCount = new BigDecimal(duration);
                BigDecimal totalCount = new BigDecimal(end - start);
                BigDecimal divide = currentCount.divide(totalCount, 2, RoundingMode.HALF_UP);
                double process = divide.doubleValue();
                inviteInfo.getStreamInfo().setProgress(process);
            }
            inviteStreamService.updateInviteInfo(inviteInfo);
        if (inviteInfo.getStreamInfo().getProgress() == 1) {
            return inviteInfo.getStreamInfo();
        }
        return null;
        // 获取当前已下载时长
        String mediaServerId = inviteInfo.getStreamInfo().getMediaServerId();
        MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
        if (mediaServerItem == null) {
            logger.warn("[获取下载进度] 查询录像信息时发现节点不存在");
            return null;
        }
        SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(deviceId, channelId, null, stream);
        if (ssrcTransaction == null) {
            logger.warn("[获取下载进度] 下载已结束");
            return null;
        }
        JSONObject mediaListJson= zlmresTfulUtils.getMediaList(mediaServerItem, "rtp", stream);
        if (mediaListJson == null) {
            logger.warn("[获取下载进度] 从zlm查询进度失败");
            return null;
        }
        if (mediaListJson.getInteger("code") != 0) {
            logger.warn("[获取下载进度] 从zlm查询进度出现错误: {}", mediaListJson.getString("msg"));
            return null;
        }
        JSONArray data = mediaListJson.getJSONArray("data");
        if (data == null) {
            logger.warn("[获取下载进度] 从zlm查询进度时未返回数据");
            return null;
        }
        JSONObject mediaJSON = data.getJSONObject(0);
        JSONArray tracks = mediaJSON.getJSONArray("tracks");
        if (tracks.isEmpty()) {
            logger.warn("[获取下载进度] 从zlm查询进度时未返回数据");
            return null;
        }
        JSONObject jsonObject = tracks.getJSONObject(0);
        long duration = jsonObject.getLongValue("duration");
        if (duration == 0) {
            inviteInfo.getStreamInfo().setProgress(0);
        } else {
            String startTime = inviteInfo.getStreamInfo().getStartTime();
            String endTime = inviteInfo.getStreamInfo().getEndTime();
            // 此时start和end单位是秒
            long start = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(startTime);
            long end = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(endTime);
            BigDecimal currentCount = new BigDecimal(duration);
            BigDecimal totalCount = new BigDecimal((end - start) * 1000);
            BigDecimal divide = currentCount.divide(totalCount, 2, RoundingMode.HALF_UP);
            double process = divide.doubleValue();
            if (process > 0.999) {
                process = 1.0;
            }
            inviteInfo.getStreamInfo().setProgress(process);
        }
        inviteStreamService.updateInviteInfo(inviteInfo);
        return inviteInfo.getStreamInfo();
    }
    private StreamInfo onPublishHandlerForDownload(MediaServerItem mediaServerItemInuse, HookParam hookParam, String deviceId, String channelId, String startTime, String endTime) {