648540858
2022-09-05 aa6bce35c710750b68d4e6c53095e9be4e1afd8d
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -4,16 +4,16 @@
import java.math.RoundingMode;
import java.util.*;
import javax.sip.InvalidArgumentException;
import javax.sip.ResponseEvent;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.web.context.request.async.DeferredResult;
@@ -23,25 +23,21 @@
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.InviteStreamCallback;
import com.genersoft.iot.vmp.gb28181.bean.InviteStreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
@@ -53,10 +49,32 @@
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.AudioBroadcastEvent;
import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult;
import gov.nist.javax.sip.stack.SIPDialog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import org.springframework.util.ResourceUtils;
import org.springframework.web.context.request.async.DeferredResult;
import javax.sip.ResponseEvent;
import javax.sip.SipException;
import java.io.FileNotFoundException;
import java.math.BigDecimal;
import java.text.ParseException;
import java.math.RoundingMode;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@SuppressWarnings(value = {"rawtypes", "unchecked"})
@Service
@@ -71,10 +89,16 @@
    private SIPCommander cmder;
    @Autowired
    private AudioBroadcastManager audioBroadcastManager;
    @Autowired
    private SIPCommanderFroPlatform sipCommanderFroPlatform;
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
    @Autowired
    private ZLMRTPServerFactory zlmrtpServerFactory;
    @Autowired
    private DeferredResultHolder resultHolder;
@@ -98,10 +122,13 @@
    private UserSetting userSetting;
    @Autowired
    private SipConfig sipConfig;
    @Autowired
    private DynamicTask dynamicTask;
    @Autowired
    private ZLMHttpHookSubscribe subscribe;
    private ZlmHttpHookSubscribe subscribe;
    @Qualifier("taskExecutor")
@@ -112,7 +139,7 @@
    @Override
    public PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId,
                           ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
                           ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
                           Runnable timeoutCallback) {
        if (mediaServerItem == null) {
            throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的zlm");
@@ -187,9 +214,7 @@
                redisCatchStorage.stopPlay(streamInfo);
                storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
                streamInfo = null;
            }
        }
        if (streamInfo == null) {
            String streamId = null;
@@ -197,6 +222,7 @@
                streamId = String.format("%s_%s", device.getDeviceId(), channelId);
            }
            SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck(), false);
            logger.info(JSONObject.toJSONString(ssrcInfo));
            play(mediaServerItem, ssrcInfo, device, channelId, (mediaServerItemInUse, response)->{
                if (hookEvent != null) {
                    hookEvent.response(mediaServerItem, response);
@@ -232,8 +258,8 @@
    @Override
    public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
                           ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
                           InviteTimeOutCallback timeoutCallback, String uuid) {
                     ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
                     InviteTimeOutCallback timeoutCallback, String uuid) {
        String streamId = null;
        if (mediaServerItem.isRtpEnable()) {
@@ -308,7 +334,7 @@
                    // 单端口模式streamId也有变化,需要重新设置监听
                    if (!mediaServerItem.isRtpEnable()) {
                        // 添加订阅
                        HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtmp", mediaServerItem.getId());
                        HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId());
                        subscribe.removeSubscribe(hookSubscribe);
                        hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());
                        subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response)->{
@@ -362,7 +388,7 @@
            resultHolder.invokeAllResult(msg);
        } else {
            logger.warn("设备预览API调用失败!");
            msg.setData("设备预览API调用失败!");
            msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), "设备预览API调用失败!"));
            resultHolder.invokeAllResult(msg);
        }
    }
@@ -386,7 +412,7 @@
    }
    @Override
    public DeferredResult<String> playBack(String deviceId, String channelId, String startTime,
    public DeferredResult<WVPResult<StreamInfo>> playBack(String deviceId, String channelId, String startTime,
                                                           String endTime,InviteStreamCallback inviteStreamCallback,
                                                           PlayBackCallback callback) {
        Device device = storager.queryVideoDevice(deviceId);
@@ -400,7 +426,7 @@
    }
    @Override
    public DeferredResult<String> playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo,
    public DeferredResult<WVPResult<StreamInfo>> playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo,
                                                           String deviceId, String channelId, String startTime,
                                                           String endTime, InviteStreamCallback infoCallBack,
                                                           PlayBackCallback playBackCallback) {
@@ -413,18 +439,18 @@
        if (device == null) {
            throw new ControllerException(ErrorCode.ERROR100.getCode(), "设备: " + deviceId + "不存在");
        }
        DeferredResult<String> result = new DeferredResult<>(30000L);
        DeferredResult<WVPResult<StreamInfo>> result = new DeferredResult<>(30000L);
        resultHolder.put(DeferredResultHolder.CALLBACK_CMD_PLAYBACK + deviceId + channelId, uuid, result);
        RequestMessage msg = new RequestMessage();
        msg.setId(uuid);
        msg.setKey(key);
        RequestMessage requestMessage = new RequestMessage();
        requestMessage.setId(uuid);
        requestMessage.setKey(key);
        PlayBackResult<RequestMessage> playBackResult = new PlayBackResult<>();
        String  playBackTimeOutTaskKey = UUID.randomUUID().toString();
        String playBackTimeOutTaskKey = UUID.randomUUID().toString();
        dynamicTask.startDelay(playBackTimeOutTaskKey, ()->{
            logger.warn(String.format("设备回放超时,deviceId:%s ,channelId:%s", deviceId, channelId));
            playBackResult.setCode(-1);
            playBackResult.setData(msg);
            playBackCallback.call(playBackResult);
            playBackResult.setCode(ErrorCode.ERROR100.getCode());
            playBackResult.setMsg("回放超时");
            playBackResult.setData(requestMessage);
            SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, ssrcInfo.getStream());
            // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
            if (dialog != null) {
@@ -438,6 +464,8 @@
            cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null);
            // 回复之前所有的点播请求
            playBackCallback.call(playBackResult);
            result.setResult(WVPResult.fail(ErrorCode.ERROR100.getCode(), "回放超时"));
            resultHolder.exist(DeferredResultHolder.CALLBACK_CMD_PLAYBACK + deviceId + channelId, uuid);
        }, userSetting.getPlayTimeout());
        cmder.playbackStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime, infoCallBack,
@@ -447,24 +475,26 @@
                    StreamInfo streamInfo = onPublishHandler(inviteStreamInfo.getMediaServerItem(), inviteStreamInfo.getResponse(), deviceId, channelId);
                    if (streamInfo == null) {
                        logger.warn("设备回放API调用失败!");
                        msg.setData("设备回放API调用失败!");
                        playBackResult.setCode(-1);
                        playBackResult.setData(msg);
                        playBackResult.setCode(ErrorCode.ERROR100.getCode());
                        playBackResult.setMsg("设备回放API调用失败!");
                        playBackCallback.call(playBackResult);
                        return;
                    }
                    redisCatchStorage.startPlayback(streamInfo, inviteStreamInfo.getCallId());
                    msg.setData(JSON.toJSONString(streamInfo));
                    playBackResult.setCode(0);
                    playBackResult.setData(msg);
                    WVPResult<StreamInfo> success = WVPResult.success(streamInfo);
                    requestMessage.setData(success);
                    playBackResult.setCode(ErrorCode.SUCCESS.getCode());
                    playBackResult.setMsg(ErrorCode.SUCCESS.getMsg());
                    playBackResult.setData(requestMessage);
                    playBackResult.setMediaServerItem(inviteStreamInfo.getMediaServerItem());
                    playBackResult.setResponse(inviteStreamInfo.getResponse());
                    playBackCallback.call(playBackResult);
                }, event -> {
                    dynamicTask.stop(playBackTimeOutTaskKey);
                    msg.setData(String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg));
                    playBackResult.setCode(-1);
                    playBackResult.setData(msg);
                    requestMessage.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg)));
                    playBackResult.setCode(ErrorCode.ERROR100.getCode());
                    playBackResult.setMsg(String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg));
                    playBackResult.setData(requestMessage);
                    playBackResult.setEvent(event);
                    playBackCallback.call(playBackResult);
                    streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
@@ -473,7 +503,7 @@
    }
    @Override
    public DeferredResult<String> download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack) {
    public DeferredResult<WVPResult<StreamInfo>> download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack) {
        Device device = storager.queryVideoDevice(deviceId);
        if (device == null) {
            return null;
@@ -485,33 +515,34 @@
    }
    @Override
    public DeferredResult<String> download(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack) {
    public DeferredResult<WVPResult<StreamInfo>> download(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack) {
        if (mediaServerItem == null || ssrcInfo == null) {
            return null;
        }
        String uuid = UUID.randomUUID().toString();
        String key = DeferredResultHolder.CALLBACK_CMD_DOWNLOAD + deviceId + channelId;
        DeferredResult<String> result = new DeferredResult<>(30000L);
        DeferredResult<WVPResult<StreamInfo>> result = new DeferredResult<>(30000L);
        Device device = storager.queryVideoDevice(deviceId);
        if (device == null) {
            throw new ControllerException(ErrorCode.ERROR400.getCode(), "设备:" + deviceId + "不存在");
        }
        resultHolder.put(key, uuid, result);
        RequestMessage msg = new RequestMessage();
        msg.setId(uuid);
        msg.setKey(key);
        RequestMessage requestMessage = new RequestMessage();
        requestMessage.setId(uuid);
        requestMessage.setKey(key);
        WVPResult<StreamInfo> wvpResult = new WVPResult<>();
        msg.setData(wvpResult);
        requestMessage.setData(wvpResult);
        PlayBackResult<RequestMessage> downloadResult = new PlayBackResult<>();
        downloadResult.setData(msg);
        downloadResult.setData(requestMessage);
        String downLoadTimeOutTaskKey = UUID.randomUUID().toString();
        dynamicTask.startDelay(downLoadTimeOutTaskKey, ()->{
            logger.warn(String.format("录像下载请求超时,deviceId:%s ,channelId:%s", deviceId, channelId));
            wvpResult.setCode(ErrorCode.ERROR100.getCode());
            wvpResult.setMsg("录像下载请求超时");
            downloadResult.setCode(-1);
            downloadResult.setCode(ErrorCode.ERROR100.getCode());
            downloadResult.setMsg("录像下载请求超时");
            hookCallBack.call(downloadResult);
            SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, ssrcInfo.getStream());
            // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
@@ -534,17 +565,27 @@
                    StreamInfo streamInfo = onPublishHandler(inviteStreamInfo.getMediaServerItem(), inviteStreamInfo.getResponse(), deviceId, channelId);
                    streamInfo.setStartTime(startTime);
                    streamInfo.setEndTime(endTime);
                    if (streamInfo == null) {
                        logger.warn("录像下载API调用失败!");
                        wvpResult.setCode(-1);
                        wvpResult.setMsg("录像下载API调用失败");
                        downloadResult.setCode(-1);
                        hookCallBack.call(downloadResult);
                        return ;
                    }
                    redisCatchStorage.startDownload(streamInfo, inviteStreamInfo.getCallId());
                    wvpResult.setCode(ErrorCode.SUCCESS.getCode());
                    wvpResult.setMsg(ErrorCode.SUCCESS.getMsg());
                    wvpResult.setData(streamInfo);
                    downloadResult.setCode(0);
                    downloadResult.setCode(ErrorCode.SUCCESS.getCode());
                    downloadResult.setMsg(ErrorCode.SUCCESS.getMsg());
                    downloadResult.setMediaServerItem(inviteStreamInfo.getMediaServerItem());
                    downloadResult.setResponse(inviteStreamInfo.getResponse());
                    hookCallBack.call(downloadResult);
                }, event -> {
                    dynamicTask.stop(downLoadTimeOutTaskKey);
                    downloadResult.setCode(-1);
                    downloadResult.setCode(ErrorCode.ERROR100.getCode());
                    downloadResult.setMsg(String.format("录像下载失败, 错误码: %s, %s", event.statusCode, event.msg));
                    wvpResult.setCode(ErrorCode.ERROR100.getCode());
                    wvpResult.setMsg(String.format("录像下载失败, 错误码: %s, %s", event.statusCode, event.msg));
                    downloadResult.setEvent(event);
@@ -569,7 +610,7 @@
                logger.warn("查询录像信息时发现节点已离线");
                return null;
            }
            if (mediaServerItem.getRecordAssistPort() != 0) {
            if (mediaServerItem.getRecordAssistPort() > 0) {
                JSONObject jsonObject = assistRESTfulUtils.fileDuration(mediaServerItem, streamInfo.getApp(), streamInfo.getStream(), null);
                if (jsonObject != null && jsonObject.getInteger("code") == 0) {
                    long duration = jsonObject.getLong("data");
@@ -606,7 +647,7 @@
            resultHolder.invokeResult(msg);
        } else {
            logger.warn("设备预览API调用失败!");
            msg.setData("设备预览API调用失败!");
            msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), "设备预览API调用失败!"));
            resultHolder.invokeResult(msg);
        }
    }
@@ -643,6 +684,83 @@
                }
            }
        }
    }
    @Override
    public void audioBroadcast(Device device, String channelId, int timeout, AudioBroadcastEvent event) {
        if (device == null || channelId == null) {
            return;
        }
        DeviceChannel deviceChannel = storager.queryChannel(device.getDeviceId(), channelId);
        if (deviceChannel == null) {
            logger.warn("开启语音广播的时候未找到通道: {}", channelId);
            event.call("开启语音广播的时候未找到通道");
            return;
        }
        // 查询通道使用状态
        if (audioBroadcastManager.exit(device.getDeviceId(), channelId)) {
            SendRtpItem sendRtpItem =  redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null);
            if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) {
                // 查询流是否存在,不存在则认为是异常状态
                MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStreamId());
                if (streamReady) {
                    logger.warn("语音广播已经开启: {}", channelId);
                    event.call("语音广播已经开启");
                    return;
                }else {
                    audioBroadcastManager.del(deviceChannel.getDeviceId(),channelId);
                    redisCatchStorage.deleteSendRTPServer(device.getDeviceId(), channelId, sendRtpItem.getCallId(), sendRtpItem.getStreamId());
                }
            }
        }
        // 发送通知
        cmder.audioBroadcastCmd(device, channelId, eventResultForOk -> {
            // 发送成功
            AudioBroadcastCatch audioBroadcastCatch = new AudioBroadcastCatch(device.getDeviceId(), channelId, AudioBroadcastCatchStatus.Ready);
            audioBroadcastManager.add(audioBroadcastCatch);
        }, eventResultForError -> {
            // 发送失败
            logger.error("语音广播发送失败: {}:{}", channelId, eventResultForError.msg);
            event.call("语音广播发送失败");
            stopAudioBroadcast(device.getDeviceId(), channelId);
        });
    }
    @Override
    public void stopAudioBroadcast(String deviceId, String channelId){
        AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(deviceId, channelId);
        if (audioBroadcastCatch != null) {
            try {
                SendRtpItem sendRtpItem =  redisCatchStorage.querySendRTPServer(deviceId, audioBroadcastCatch.getChannelId(), null, null);
                if (sendRtpItem != null) {
                    redisCatchStorage.deleteSendRTPServer(deviceId, sendRtpItem.getChannelId(), null, null);
                    MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                    Map<String, Object> param = new HashMap<>();
                    param.put("vhost", "__defaultVhost__");
                    param.put("app", sendRtpItem.getApp());
                    param.put("stream", sendRtpItem.getStreamId());
                    zlmresTfulUtils.stopSendRtp(mediaInfo, param);
                    // 立刻结束设备的推流,等待自行结束太慢
                    zlmresTfulUtils.closeStreams(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStreamId());
                }
                if (audioBroadcastCatch.getStatus() == AudioBroadcastCatchStatus.Ok) {
                    cmder.streamByeCmd(audioBroadcastCatch.getDialog(), audioBroadcastCatch.getChannelId(), audioBroadcastCatch.getRequest(), null);
                }
                audioBroadcastManager.del(deviceId, channelId);
            } catch (SipException e) {
                throw new RuntimeException(e);
            } catch (ParseException e) {
                throw new RuntimeException(e);
            } catch (InvalidArgumentException e) {
                throw new RuntimeException(e);
            }
        }
    }
    @Override
@@ -685,7 +803,7 @@
//                            for (SendRtpItem sendRtpItem : sendRtpItems) {
//                                if (sendRtpItem.getMediaServerId().equals(mediaServerId)) {
//                                    if (mediaListMap.get(sendRtpItem.getStreamId()) == null) {
//                                        ParentPlatform platform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
//                                        ParentPlatform platform = storager.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
//                                        sipCommanderFroPlatform.streamByeCmd(platform, sendRtpItem.getCallId());
//                                    }
//                                }