648540858
2022-03-14 bde0e13682ed75d2e8c0cb8a1fd6a96bb92f1dd8
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -16,6 +16,7 @@
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.InviteTimeOutCallback;
import com.genersoft.iot.vmp.service.bean.PlayBackCallback;
import com.genersoft.iot.vmp.service.bean.PlayBackResult;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
@@ -27,6 +28,7 @@
import com.genersoft.iot.vmp.service.IMediaService;
import com.genersoft.iot.vmp.service.IPlayService;
import gov.nist.javax.sip.stack.SIPDialog;
import jdk.nashorn.internal.ir.RuntimeNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -36,6 +38,9 @@
import org.springframework.util.ResourceUtils;
import org.springframework.web.context.request.async.DeferredResult;
import javax.sip.header.CallIdHeader;
import javax.sip.header.Header;
import javax.sip.message.Request;
import java.io.FileNotFoundException;
import java.util.*;
@@ -77,6 +82,8 @@
    @Autowired
    private UserSetup userSetup;
    @Override
@@ -141,67 +148,7 @@
                e.printStackTrace();
            }
        });
        if (streamInfo == null) {
            String streamId = null;
            if (mediaServerItem.isRtpEnable()) {
                streamId = String.format("%s_%s", device.getDeviceId(), channelId);
            }
            SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId);
            // 超时处理
            Timer timer = new Timer();
            timer.schedule(new TimerTask() {
                @Override
                public void run() {
                    logger.warn(String.format("设备点播超时,deviceId:%s ,channelId:%s", deviceId, channelId));
                    if (timeoutCallback != null) {
                        timeoutCallback.run();
                    }
                    WVPResult wvpResult = new WVPResult();
                    wvpResult.setCode(-1);
                    SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, ssrcInfo.getStream());
                    if (dialog != null) {
                        wvpResult.setMsg("收流超时,请稍候重试");
                        // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
                        cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream());
                    }else {
                        wvpResult.setMsg("点播超时,请稍候重试");
                        mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
                        mediaServerService.closeRTPServer(deviceId, channelId, ssrcInfo.getStream());
                        streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
                    }
                    msg.setData(wvpResult);
                    // 回复之前所有的点播请求
                    resultHolder.invokeAllResult(msg);
                }
            }, userSetup.getPlayTimeout());
            // 发送点播消息
            cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInUse, JSONObject response) -> {
                logger.info("收到订阅消息: " + response.toJSONString());
                timer.cancel();
                onPublishHandlerForPlay(mediaServerItemInUse, response, deviceId, channelId, uuid);
                if (hookEvent != null) {
                    hookEvent.response(mediaServerItem, response);
                }
            }, (event) -> {
                timer.cancel();
                WVPResult wvpResult = new WVPResult();
                wvpResult.setCode(-1);
                // 点播返回sip错误
                mediaServerService.closeRTPServer(playResult.getDevice().getDeviceId(), channelId, ssrcInfo.getStream());
                // 释放ssrc
                mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
                streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
                wvpResult.setMsg(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg));
                msg.setData(wvpResult);
                resultHolder.invokeAllResult(msg);
                if (errorEvent != null) {
                    errorEvent.response(event);
                }
            });
        } else {
        if (streamInfo != null) {
            String streamId = streamInfo.getStream();
            if (streamId == null) {
                WVPResult wvpResult = new WVPResult();
@@ -227,67 +174,109 @@
                if (hookEvent != null) {
                    hookEvent.response(mediaServerItem, JSONObject.parseObject(JSON.toJSONString(streamInfo)));
                }
            } else {
                // TODO 点播前是否重置状态
            }else {
                redisCatchStorage.stopPlay(streamInfo);
                storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
                String streamId2 = null;
                if (mediaServerItem.isRtpEnable()) {
                    streamId2 = String.format("%s_%s", device.getDeviceId(), channelId);
                }
                SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId2);
                // 超时处理
                Timer timer = new Timer();
                timer.schedule(new TimerTask() {
                    @Override
                    public void run() {
                        logger.warn(String.format("设备点播超时,deviceId:%s ,channelId:%s", deviceId, channelId));
                        if (timeoutCallback != null) {
                            timeoutCallback.run();
                        }
                        WVPResult wvpResult = new WVPResult();
                        wvpResult.setCode(-1);
                        SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, ssrcInfo.getStream());
                        if (dialog != null) {
                            wvpResult.setMsg("收流超时,请稍候重试");
                            // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
                            cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream());
                        }else {
                            wvpResult.setMsg("点播超时,请稍候重试");
                            mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
                            mediaServerService.closeRTPServer(deviceId, channelId, ssrcInfo.getStream());
                            streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
                        }
                        msg.setData(wvpResult);
                        // 回复之前所有的点播请求
                        resultHolder.invokeAllResult(msg);
                    }
                }, userSetup.getPlayTimeout());
                cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
                    logger.info("收到订阅消息: " + response.toJSONString());
                    onPublishHandlerForPlay(mediaServerItemInuse, response, deviceId, channelId, uuid);
                }, (event) -> {
                    mediaServerService.closeRTPServer(playResult.getDevice().getDeviceId(), channelId, ssrcInfo.getStream());
                    // 释放ssrc
                    mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
                    streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
                    WVPResult wvpResult = new WVPResult();
                    wvpResult.setCode(-1);
                    wvpResult.setMsg(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg));
                    msg.setData(wvpResult);
                    resultHolder.invokeAllResult(msg);
                });
                streamInfo = null;
            }
        }
        if (streamInfo == null) {
            String streamId = null;
            if (mediaServerItem.isRtpEnable()) {
                streamId = String.format("%s_%s", device.getDeviceId(), channelId);
            }
            SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId);
            play(mediaServerItem, ssrcInfo, device, channelId, (mediaServerItemInUse, response)->{
                if (hookEvent != null) {
                    hookEvent.response(mediaServerItem, response);
                }
            }, event -> {
                // sip error错误
                WVPResult wvpResult = new WVPResult();
                wvpResult.setCode(-1);
                wvpResult.setMsg(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg));
                msg.setData(wvpResult);
                resultHolder.invokeAllResult(msg);
                if (errorEvent != null) {
                    errorEvent.response(event);
                }
            }, (code, msgStr)->{
                // invite点播超时
                WVPResult wvpResult = new WVPResult();
                wvpResult.setCode(-1);
                if (code == 0) {
                    wvpResult.setMsg("点播超时,请稍候重试");
                }else if (code == 1) {
                    wvpResult.setMsg("收流超时,请稍候重试");
                }
                msg.setData(wvpResult);
                // 回复之前所有的点播请求
                resultHolder.invokeAllResult(msg);
            }, uuid);
        }
        return playResult;
    }
    @Override
    public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
                           ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
                           InviteTimeOutCallback timeoutCallback, String uuid) {
        String streamId = null;
        if (mediaServerItem.isRtpEnable()) {
            streamId = String.format("%s_%s", device.getDeviceId(), channelId);
        }
        if (ssrcInfo == null) {
            ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId);
        }
        return playResult;
        // 超时处理
        Timer timer = new Timer();
        SSRCInfo finalSsrcInfo = ssrcInfo;
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                logger.warn(String.format("设备点播超时,deviceId:%s ,channelId:%s", device.getDeviceId(), channelId));
                SIPDialog dialog = streamSession.getDialogByStream(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
                if (dialog != null) {
                    timeoutCallback.run(1, "收流超时");
                    // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
                    cmder.streamByeCmd(device.getDeviceId(), channelId, finalSsrcInfo.getStream(), null);
                }else {
                    timeoutCallback.run(0, "点播超时");
                    mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
                    mediaServerService.closeRTPServer(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
                    streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
                }
            }
        }, userSetup.getPlayTimeout());
        cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
            logger.info("收到订阅消息: " + response.toJSONString());
            timer.cancel();
            // hook响应
            onPublishHandlerForPlay(mediaServerItemInuse, response, device.getDeviceId(), channelId, uuid);
            hookEvent.response(mediaServerItemInuse, response);
        }, (event) -> {
            timer.cancel();
            mediaServerService.closeRTPServer(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
            // 释放ssrc
            mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
            streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
            errorEvent.response(event);
        });
    }
    @Override
    public void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String uuid) {
        RequestMessage msg = new RequestMessage();
        msg.setId(uuid);
        if (uuid != null) {
            msg.setId(uuid);
        }
        msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId);
        StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId);
        if (streamInfo != null) {
@@ -297,7 +286,6 @@
                storager.startPlay(deviceId, channelId, streamInfo.getStream());
            }
            redisCatchStorage.startPlay(streamInfo);
            msg.setData(JSON.toJSONString(streamInfo));
            WVPResult wvpResult = new WVPResult();
            wvpResult.setCode(0);
@@ -329,9 +317,24 @@
        return mediaServerItem;
    }
    @Override
    public DeferredResult<ResponseEntity<String>> playBack(String deviceId, String channelId, String startTime,
                                                           String endTime,InviteStreamCallback inviteStreamCallback,
                                                           PlayBackCallback callback) {
        Device device = storager.queryVideoDevice(deviceId);
        if (device == null) return null;
        MediaServerItem newMediaServerItem = getNewMediaServerItem(device);
        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, true);
        return playBack(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, inviteStreamCallback, callback);
    }
    @Override
    public DeferredResult<ResponseEntity<String>> playBack(String deviceId, String channelId, String startTime, String endTime, PlayBackCallback callback) {
    public DeferredResult<ResponseEntity<String>> playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo,
                                                           String deviceId, String channelId, String startTime,
                                                           String endTime, InviteStreamCallback infoCallBack,
                                                           PlayBackCallback playBackCallback) {
        if (mediaServerItem == null || ssrcInfo == null) return null;
        String uuid = UUID.randomUUID().toString();
        String key = DeferredResultHolder.CALLBACK_CMD_PLAYBACK + deviceId + channelId;
        DeferredResult<ResponseEntity<String>> result = new DeferredResult<>(30000L);
@@ -341,13 +344,12 @@
            return result;
        }
        MediaServerItem newMediaServerItem = getNewMediaServerItem(device);
        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, true);
        resultHolder.put(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId, uuid, result);
        RequestMessage msg = new RequestMessage();
        msg.setId(uuid);
        msg.setKey(key);
        PlayBackResult<RequestMessage> playBackResult = new PlayBackResult<>();
        Timer timer = new Timer();
        timer.schedule(new TimerTask() {
            @Override
@@ -355,54 +357,62 @@
                logger.warn(String.format("设备回放超时,deviceId:%s ,channelId:%s", deviceId, channelId));
                playBackResult.setCode(-1);
                playBackResult.setData(msg);
                callback.call(playBackResult);
                playBackCallback.call(playBackResult);
                SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, ssrcInfo.getStream());
                // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
                cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream());
                if (dialog != null) {
                    // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
                    cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null);
                }else {
                    mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
                    mediaServerService.closeRTPServer(deviceId, channelId, ssrcInfo.getStream());
                    streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
                }
                cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null);
                // 回复之前所有的点播请求
                callback.call(playBackResult);
                playBackCallback.call(playBackResult);
            }
        }, userSetup.getPlayTimeout());
        cmder.playbackStreamCmd(newMediaServerItem, ssrcInfo, device, channelId, startTime, endTime, (MediaServerItem mediaServerItem, JSONObject response) -> {
            logger.info("收到订阅消息: " + response.toJSONString());
            timer.cancel();
            StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId);
            if (streamInfo == null) {
                logger.warn("设备回放API调用失败!");
                msg.setData("设备回放API调用失败!");
                playBackResult.setCode(-1);
                playBackResult.setData(msg);
                callback.call(playBackResult);
                return;
            }
            redisCatchStorage.startPlayback(streamInfo);
            msg.setData(JSON.toJSONString(streamInfo));
            playBackResult.setCode(0);
            playBackResult.setData(msg);
            playBackResult.setMediaServerItem(mediaServerItem);
            playBackResult.setResponse(response);
            callback.call(playBackResult);
        }, event -> {
            timer.cancel();
            msg.setData(String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg));
            playBackResult.setCode(-1);
            playBackResult.setData(msg);
            playBackResult.setEvent(event);
            callback.call(playBackResult);
            streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
        });
        cmder.playbackStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime, infoCallBack,
                (InviteStreamInfo inviteStreamInfo) -> {
                    logger.info("收到订阅消息: " + inviteStreamInfo.getResponse().toJSONString());
                    timer.cancel();
                    StreamInfo streamInfo = onPublishHandler(inviteStreamInfo.getMediaServerItem(), inviteStreamInfo.getResponse(), deviceId, channelId);
                    if (streamInfo == null) {
                        logger.warn("设备回放API调用失败!");
                        msg.setData("设备回放API调用失败!");
                        playBackResult.setCode(-1);
                        playBackResult.setData(msg);
                        playBackCallback.call(playBackResult);
                        return;
                    }
                    redisCatchStorage.startPlayback(streamInfo, inviteStreamInfo.getCallId());
                    msg.setData(JSON.toJSONString(streamInfo));
                    playBackResult.setCode(0);
                    playBackResult.setData(msg);
                    playBackResult.setMediaServerItem(inviteStreamInfo.getMediaServerItem());
                    playBackResult.setResponse(inviteStreamInfo.getResponse());
                    playBackCallback.call(playBackResult);
                }, event -> {
                    timer.cancel();
                    msg.setData(String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg));
                    playBackResult.setCode(-1);
                    playBackResult.setData(msg);
                    playBackResult.setEvent(event);
                    playBackCallback.call(playBackResult);
                    streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
                });
        return result;
    }
    @Override
    public void onPublishHandlerForDownload(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String uuid) {
    public void onPublishHandlerForDownload(InviteStreamInfo inviteStreamInfo, String deviceId, String channelId, String uuid) {
        RequestMessage msg = new RequestMessage();
        msg.setKey(DeferredResultHolder.CALLBACK_CMD_DOWNLOAD + deviceId + channelId);
        msg.setId(uuid);
        StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId);
        StreamInfo streamInfo = onPublishHandler(inviteStreamInfo.getMediaServerItem(), inviteStreamInfo.getResponse(), deviceId, channelId);
        if (streamInfo != null) {
            redisCatchStorage.startDownload(streamInfo);
            redisCatchStorage.startDownload(streamInfo, inviteStreamInfo.getCallId());
            msg.setData(JSON.toJSONString(streamInfo));
            resultHolder.invokeResult(msg);
        } else {
@@ -439,7 +449,8 @@
        if (allSsrc.size() > 0) {
            for (SsrcTransaction ssrcTransaction : allSsrc) {
                if(ssrcTransaction.getMediaServerId().equals(mediaServerId)) {
                    cmder.streamByeCmd(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getStream());
                    cmder.streamByeCmd(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(),
                            ssrcTransaction.getStream(), null);
                }
            }
        }