648540858
2022-09-05 aa6bce35c710750b68d4e6c53095e9be4e1afd8d
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -1,21 +1,43 @@
package com.genersoft.iot.vmp.service.impl;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.*;
import javax.sip.InvalidArgumentException;
import javax.sip.ResponseEvent;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.web.context.request.async.DeferredResult;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
@@ -27,8 +49,13 @@
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.AudioBroadcastEvent;
import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult;
import gov.nist.javax.sip.stack.SIPDialog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,10 +67,14 @@
import org.springframework.web.context.request.async.DeferredResult;
import javax.sip.ResponseEvent;
import javax.sip.SipException;
import java.io.FileNotFoundException;
import java.math.BigDecimal;
import java.text.ParseException;
import java.math.RoundingMode;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@SuppressWarnings(value = {"rawtypes", "unchecked"})
@Service
@@ -58,10 +89,16 @@
    private SIPCommander cmder;
    @Autowired
    private AudioBroadcastManager audioBroadcastManager;
    @Autowired
    private SIPCommanderFroPlatform sipCommanderFroPlatform;
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
    @Autowired
    private ZLMRTPServerFactory zlmrtpServerFactory;
    @Autowired
    private DeferredResultHolder resultHolder;
@@ -85,18 +122,28 @@
    private UserSetting userSetting;
    @Autowired
    private SipConfig sipConfig;
    @Autowired
    private DynamicTask dynamicTask;
    @Autowired
    private ZLMHttpHookSubscribe subscribe;
    private ZlmHttpHookSubscribe subscribe;
    @Qualifier("taskExecutor")
    @Autowired
    private ThreadPoolTaskExecutor taskExecutor;
    @Override
    public PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId,
                           ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
                           ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
                           Runnable timeoutCallback) {
        if (mediaServerItem == null) {
            throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的zlm");
        }
        PlayResult playResult = new PlayResult();
        RequestMessage msg = new RequestMessage();
        String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId;
@@ -104,30 +151,22 @@
        String uuid = UUID.randomUUID().toString();
        msg.setId(uuid);
        playResult.setUuid(uuid);
        DeferredResult<ResponseEntity<String>> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
        DeferredResult<WVPResult<String>> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
        playResult.setResult(result);
        // 录像查询以channelId作为deviceId查询
        resultHolder.put(key, uuid, result);
        if (mediaServerItem == null) {
            WVPResult wvpResult = new WVPResult();
            wvpResult.setCode(-1);
            wvpResult.setMsg("未找到可用的zlm");
            msg.setData(wvpResult);
            resultHolder.invokeResult(msg);
            return playResult;
        }
        Device device = redisCatchStorage.getDevice(deviceId);
        StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
        playResult.setDevice(device);
        result.onCompletion(()->{
            // 点播结束时调用截图接口
            // TODO 应该在上流时调用更好,结束也可能是错误结束
            String path =  "snap";
            String fileName =  deviceId + "_" + channelId + ".jpg";
            ResponseEntity responseEntity =  (ResponseEntity)result.getResult();
            if (responseEntity != null && responseEntity.getStatusCode() == HttpStatus.OK) {
                WVPResult wvpResult = (WVPResult)responseEntity.getBody();
            taskExecutor.execute(()->{
                // TODO 应该在上流时调用更好,结束也可能是错误结束
                String path =  "snap";
                String fileName =  deviceId + "_" + channelId + ".jpg";
                WVPResult wvpResult =  (WVPResult)result.getResult();
                if (Objects.requireNonNull(wvpResult).getCode() == 0) {
                    StreamInfo streamInfoForSuccess = (StreamInfo)wvpResult.getData();
                    MediaServerItem mediaInfo = mediaServerService.getOne(streamInfoForSuccess.getMediaServerId());
@@ -136,13 +175,13 @@
                    logger.info("[请求截图]: " + fileName);
                    zlmresTfulUtils.getSnap(mediaInfo, streamUrl, 15, 1, path, fileName);
                }
            }
            });
        });
        if (streamInfo != null) {
            String streamId = streamInfo.getStream();
            if (streamId == null) {
                WVPResult wvpResult = new WVPResult();
                wvpResult.setCode(-1);
                wvpResult.setCode(ErrorCode.ERROR100.getCode());
                wvpResult.setMsg("点播失败, redis缓存streamId等于null");
                msg.setData(wvpResult);
                resultHolder.invokeAllResult(msg);
@@ -156,8 +195,8 @@
                if (rtpInfo.getBoolean("exist")) {
                    WVPResult wvpResult = new WVPResult();
                    wvpResult.setCode(0);
                    wvpResult.setMsg("success");
                    wvpResult.setCode(ErrorCode.SUCCESS.getCode());
                    wvpResult.setMsg(ErrorCode.SUCCESS.getMsg());
                    wvpResult.setData(streamInfo);
                    msg.setData(wvpResult);
@@ -175,9 +214,7 @@
                redisCatchStorage.stopPlay(streamInfo);
                storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
                streamInfo = null;
            }
        }
        if (streamInfo == null) {
            String streamId = null;
@@ -185,6 +222,7 @@
                streamId = String.format("%s_%s", device.getDeviceId(), channelId);
            }
            SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck(), false);
            logger.info(JSONObject.toJSONString(ssrcInfo));
            play(mediaServerItem, ssrcInfo, device, channelId, (mediaServerItemInUse, response)->{
                if (hookEvent != null) {
                    hookEvent.response(mediaServerItem, response);
@@ -192,7 +230,7 @@
            }, event -> {
                // sip error错误
                WVPResult wvpResult = new WVPResult();
                wvpResult.setCode(-1);
                wvpResult.setCode(ErrorCode.ERROR100.getCode());
                wvpResult.setMsg(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg));
                msg.setData(wvpResult);
                resultHolder.invokeAllResult(msg);
@@ -202,7 +240,7 @@
            }, (code, msgStr)->{
                // invite点播超时
                WVPResult wvpResult = new WVPResult();
                wvpResult.setCode(-1);
                wvpResult.setCode(ErrorCode.ERROR100.getCode());
                if (code == 0) {
                    wvpResult.setMsg("点播超时,请稍候重试");
                }else if (code == 1) {
@@ -220,8 +258,8 @@
    @Override
    public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
                           ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
                           InviteTimeOutCallback timeoutCallback, String uuid) {
                     ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
                     InviteTimeOutCallback timeoutCallback, String uuid) {
        String streamId = null;
        if (mediaServerItem.isRtpEnable()) {
@@ -296,16 +334,10 @@
                    // 单端口模式streamId也有变化,需要重新设置监听
                    if (!mediaServerItem.isRtpEnable()) {
                        // 添加订阅
                        JSONObject subscribeKey = new JSONObject();
                        subscribeKey.put("app", "rtp");
                        subscribeKey.put("stream", stream);
                        subscribeKey.put("regist", true);
                        subscribeKey.put("schema", "rtmp");
                        subscribeKey.put("mediaServerId", mediaServerItem.getId());
                        subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed,subscribeKey);
                        subscribeKey.put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());
                        subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
                                (MediaServerItem mediaServerItemInUse, JSONObject response)->{
                        HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId());
                        subscribe.removeSubscribe(hookSubscribe);
                        hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());
                        subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response)->{
                                    logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString());
                                    dynamicTask.stop(timeOutTaskKey);
                                    // hook响应
@@ -316,7 +348,7 @@
                    // 关闭rtp server
                    mediaServerService.closeRTPServer(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
                    // 重新开启ssrc server
                    mediaServerService.openRTPServer(mediaServerItem, finalSsrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), false);
                    mediaServerService.openRTPServer(mediaServerItem, finalSsrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), false, finalSsrcInfo.getPort());
                }
            }
@@ -348,15 +380,15 @@
            redisCatchStorage.startPlay(streamInfo);
            WVPResult wvpResult = new WVPResult();
            wvpResult.setCode(0);
            wvpResult.setMsg("success");
            wvpResult.setCode(ErrorCode.SUCCESS.getCode());
            wvpResult.setMsg(ErrorCode.SUCCESS.getMsg());
            wvpResult.setData(streamInfo);
            msg.setData(wvpResult);
            resultHolder.invokeAllResult(msg);
        } else {
            logger.warn("设备预览API调用失败!");
            msg.setData("设备预览API调用失败!");
            msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), "设备预览API调用失败!"));
            resultHolder.invokeAllResult(msg);
        }
    }
@@ -380,7 +412,7 @@
    }
    @Override
    public DeferredResult<ResponseEntity<String>> playBack(String deviceId, String channelId, String startTime,
    public DeferredResult<WVPResult<StreamInfo>> playBack(String deviceId, String channelId, String startTime,
                                                           String endTime,InviteStreamCallback inviteStreamCallback,
                                                           PlayBackCallback callback) {
        Device device = storager.queryVideoDevice(deviceId);
@@ -394,7 +426,7 @@
    }
    @Override
    public DeferredResult<ResponseEntity<String>> playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo,
    public DeferredResult<WVPResult<StreamInfo>> playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo,
                                                           String deviceId, String channelId, String startTime,
                                                           String endTime, InviteStreamCallback infoCallBack,
                                                           PlayBackCallback playBackCallback) {
@@ -403,24 +435,22 @@
        }
        String uuid = UUID.randomUUID().toString();
        String key = DeferredResultHolder.CALLBACK_CMD_PLAYBACK + deviceId + channelId;
        DeferredResult<ResponseEntity<String>> result = new DeferredResult<>(30000L);
        Device device = storager.queryVideoDevice(deviceId);
        if (device == null) {
            result.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
            return result;
            throw new ControllerException(ErrorCode.ERROR100.getCode(), "设备: " + deviceId + "不存在");
        }
        DeferredResult<WVPResult<StreamInfo>> result = new DeferredResult<>(30000L);
        resultHolder.put(DeferredResultHolder.CALLBACK_CMD_PLAYBACK + deviceId + channelId, uuid, result);
        RequestMessage msg = new RequestMessage();
        msg.setId(uuid);
        msg.setKey(key);
        RequestMessage requestMessage = new RequestMessage();
        requestMessage.setId(uuid);
        requestMessage.setKey(key);
        PlayBackResult<RequestMessage> playBackResult = new PlayBackResult<>();
        String  playBackTimeOutTaskKey = UUID.randomUUID().toString();
        String playBackTimeOutTaskKey = UUID.randomUUID().toString();
        dynamicTask.startDelay(playBackTimeOutTaskKey, ()->{
            logger.warn(String.format("设备回放超时,deviceId:%s ,channelId:%s", deviceId, channelId));
            playBackResult.setCode(-1);
            playBackResult.setData(msg);
            playBackCallback.call(playBackResult);
            playBackResult.setCode(ErrorCode.ERROR100.getCode());
            playBackResult.setMsg("回放超时");
            playBackResult.setData(requestMessage);
            SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, ssrcInfo.getStream());
            // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
            if (dialog != null) {
@@ -434,6 +464,8 @@
            cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null);
            // 回复之前所有的点播请求
            playBackCallback.call(playBackResult);
            result.setResult(WVPResult.fail(ErrorCode.ERROR100.getCode(), "回放超时"));
            resultHolder.exist(DeferredResultHolder.CALLBACK_CMD_PLAYBACK + deviceId + channelId, uuid);
        }, userSetting.getPlayTimeout());
        cmder.playbackStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime, infoCallBack,
@@ -443,24 +475,26 @@
                    StreamInfo streamInfo = onPublishHandler(inviteStreamInfo.getMediaServerItem(), inviteStreamInfo.getResponse(), deviceId, channelId);
                    if (streamInfo == null) {
                        logger.warn("设备回放API调用失败!");
                        msg.setData("设备回放API调用失败!");
                        playBackResult.setCode(-1);
                        playBackResult.setData(msg);
                        playBackResult.setCode(ErrorCode.ERROR100.getCode());
                        playBackResult.setMsg("设备回放API调用失败!");
                        playBackCallback.call(playBackResult);
                        return;
                    }
                    redisCatchStorage.startPlayback(streamInfo, inviteStreamInfo.getCallId());
                    msg.setData(JSON.toJSONString(streamInfo));
                    playBackResult.setCode(0);
                    playBackResult.setData(msg);
                    WVPResult<StreamInfo> success = WVPResult.success(streamInfo);
                    requestMessage.setData(success);
                    playBackResult.setCode(ErrorCode.SUCCESS.getCode());
                    playBackResult.setMsg(ErrorCode.SUCCESS.getMsg());
                    playBackResult.setData(requestMessage);
                    playBackResult.setMediaServerItem(inviteStreamInfo.getMediaServerItem());
                    playBackResult.setResponse(inviteStreamInfo.getResponse());
                    playBackCallback.call(playBackResult);
                }, event -> {
                    dynamicTask.stop(playBackTimeOutTaskKey);
                    msg.setData(String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg));
                    playBackResult.setCode(-1);
                    playBackResult.setData(msg);
                    requestMessage.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg)));
                    playBackResult.setCode(ErrorCode.ERROR100.getCode());
                    playBackResult.setMsg(String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg));
                    playBackResult.setData(requestMessage);
                    playBackResult.setEvent(event);
                    playBackCallback.call(playBackResult);
                    streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
@@ -469,7 +503,7 @@
    }
    @Override
    public DeferredResult<ResponseEntity<String>> download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack) {
    public DeferredResult<WVPResult<StreamInfo>> download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack) {
        Device device = storager.queryVideoDevice(deviceId);
        if (device == null) {
            return null;
@@ -481,34 +515,34 @@
    }
    @Override
    public DeferredResult<ResponseEntity<String>> download(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack) {
    public DeferredResult<WVPResult<StreamInfo>> download(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack) {
        if (mediaServerItem == null || ssrcInfo == null) {
            return null;
        }
        String uuid = UUID.randomUUID().toString();
        String key = DeferredResultHolder.CALLBACK_CMD_DOWNLOAD + deviceId + channelId;
        DeferredResult<ResponseEntity<String>> result = new DeferredResult<>(30000L);
        DeferredResult<WVPResult<StreamInfo>> result = new DeferredResult<>(30000L);
        Device device = storager.queryVideoDevice(deviceId);
        if (device == null) {
            result.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
            return result;
            throw new ControllerException(ErrorCode.ERROR400.getCode(), "设备:" + deviceId + "不存在");
        }
        resultHolder.put(key, uuid, result);
        RequestMessage msg = new RequestMessage();
        msg.setId(uuid);
        msg.setKey(key);
        RequestMessage requestMessage = new RequestMessage();
        requestMessage.setId(uuid);
        requestMessage.setKey(key);
        WVPResult<StreamInfo> wvpResult = new WVPResult<>();
        msg.setData(wvpResult);
        requestMessage.setData(wvpResult);
        PlayBackResult<RequestMessage> downloadResult = new PlayBackResult<>();
        downloadResult.setData(msg);
        downloadResult.setData(requestMessage);
        String downLoadTimeOutTaskKey = UUID.randomUUID().toString();
        dynamicTask.startDelay(downLoadTimeOutTaskKey, ()->{
            logger.warn(String.format("录像下载请求超时,deviceId:%s ,channelId:%s", deviceId, channelId));
            wvpResult.setCode(-1);
            wvpResult.setCode(ErrorCode.ERROR100.getCode());
            wvpResult.setMsg("录像下载请求超时");
            downloadResult.setCode(-1);
            downloadResult.setCode(ErrorCode.ERROR100.getCode());
            downloadResult.setMsg("录像下载请求超时");
            hookCallBack.call(downloadResult);
            SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, ssrcInfo.getStream());
            // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
@@ -540,17 +574,19 @@
                        return ;
                    }
                    redisCatchStorage.startDownload(streamInfo, inviteStreamInfo.getCallId());
                    wvpResult.setCode(0);
                    wvpResult.setMsg("success");
                    wvpResult.setCode(ErrorCode.SUCCESS.getCode());
                    wvpResult.setMsg(ErrorCode.SUCCESS.getMsg());
                    wvpResult.setData(streamInfo);
                    downloadResult.setCode(0);
                    downloadResult.setCode(ErrorCode.SUCCESS.getCode());
                    downloadResult.setMsg(ErrorCode.SUCCESS.getMsg());
                    downloadResult.setMediaServerItem(inviteStreamInfo.getMediaServerItem());
                    downloadResult.setResponse(inviteStreamInfo.getResponse());
                    hookCallBack.call(downloadResult);
                }, event -> {
                    dynamicTask.stop(downLoadTimeOutTaskKey);
                    downloadResult.setCode(-1);
                    wvpResult.setCode(-1);
                    downloadResult.setCode(ErrorCode.ERROR100.getCode());
                    downloadResult.setMsg(String.format("录像下载失败, 错误码: %s, %s", event.statusCode, event.msg));
                    wvpResult.setCode(ErrorCode.ERROR100.getCode());
                    wvpResult.setMsg(String.format("录像下载失败, 错误码: %s, %s", event.statusCode, event.msg));
                    downloadResult.setEvent(event);
                    hookCallBack.call(downloadResult);
@@ -574,7 +610,7 @@
                logger.warn("查询录像信息时发现节点已离线");
                return null;
            }
            if (mediaServerItem.getRecordAssistPort() != 0) {
            if (mediaServerItem.getRecordAssistPort() > 0) {
                JSONObject jsonObject = assistRESTfulUtils.fileDuration(mediaServerItem, streamInfo.getApp(), streamInfo.getStream(), null);
                if (jsonObject != null && jsonObject.getInteger("code") == 0) {
                    long duration = jsonObject.getLong("data");
@@ -611,7 +647,7 @@
            resultHolder.invokeResult(msg);
        } else {
            logger.warn("设备预览API调用失败!");
            msg.setData("设备预览API调用失败!");
            msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), "设备预览API调用失败!"));
            resultHolder.invokeResult(msg);
        }
    }
@@ -620,7 +656,7 @@
    public StreamInfo onPublishHandler(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId) {
        String streamId = resonse.getString("stream");
        JSONArray tracks = resonse.getJSONArray("tracks");
        StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem,"rtp", streamId, tracks);
        StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem,"rtp", streamId, tracks, null);
        streamInfo.setDeviceID(deviceId);
        streamInfo.setChannelId(channelId);
        return streamInfo;
@@ -651,7 +687,131 @@
    }
    @Override
    public void audioBroadcast(Device device, String channelId, int timeout, AudioBroadcastEvent event) {
        if (device == null || channelId == null) {
            return;
        }
        DeviceChannel deviceChannel = storager.queryChannel(device.getDeviceId(), channelId);
        if (deviceChannel == null) {
            logger.warn("开启语音广播的时候未找到通道: {}", channelId);
            event.call("开启语音广播的时候未找到通道");
            return;
        }
        // 查询通道使用状态
        if (audioBroadcastManager.exit(device.getDeviceId(), channelId)) {
            SendRtpItem sendRtpItem =  redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null);
            if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) {
                // 查询流是否存在,不存在则认为是异常状态
                MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStreamId());
                if (streamReady) {
                    logger.warn("语音广播已经开启: {}", channelId);
                    event.call("语音广播已经开启");
                    return;
                }else {
                    audioBroadcastManager.del(deviceChannel.getDeviceId(),channelId);
                    redisCatchStorage.deleteSendRTPServer(device.getDeviceId(), channelId, sendRtpItem.getCallId(), sendRtpItem.getStreamId());
                }
            }
        }
        // 发送通知
        cmder.audioBroadcastCmd(device, channelId, eventResultForOk -> {
            // 发送成功
            AudioBroadcastCatch audioBroadcastCatch = new AudioBroadcastCatch(device.getDeviceId(), channelId, AudioBroadcastCatchStatus.Ready);
            audioBroadcastManager.add(audioBroadcastCatch);
        }, eventResultForError -> {
            // 发送失败
            logger.error("语音广播发送失败: {}:{}", channelId, eventResultForError.msg);
            event.call("语音广播发送失败");
            stopAudioBroadcast(device.getDeviceId(), channelId);
        });
    }
    @Override
    public void stopAudioBroadcast(String deviceId, String channelId){
        AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(deviceId, channelId);
        if (audioBroadcastCatch != null) {
            try {
                SendRtpItem sendRtpItem =  redisCatchStorage.querySendRTPServer(deviceId, audioBroadcastCatch.getChannelId(), null, null);
                if (sendRtpItem != null) {
                    redisCatchStorage.deleteSendRTPServer(deviceId, sendRtpItem.getChannelId(), null, null);
                    MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                    Map<String, Object> param = new HashMap<>();
                    param.put("vhost", "__defaultVhost__");
                    param.put("app", sendRtpItem.getApp());
                    param.put("stream", sendRtpItem.getStreamId());
                    zlmresTfulUtils.stopSendRtp(mediaInfo, param);
                    // 立刻结束设备的推流,等待自行结束太慢
                    zlmresTfulUtils.closeStreams(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStreamId());
                }
                if (audioBroadcastCatch.getStatus() == AudioBroadcastCatchStatus.Ok) {
                    cmder.streamByeCmd(audioBroadcastCatch.getDialog(), audioBroadcastCatch.getChannelId(), audioBroadcastCatch.getRequest(), null);
                }
                audioBroadcastManager.del(deviceId, channelId);
            } catch (SipException e) {
                throw new RuntimeException(e);
            } catch (ParseException e) {
                throw new RuntimeException(e);
            } catch (InvalidArgumentException e) {
                throw new RuntimeException(e);
            }
        }
    }
    @Override
    public void zlmServerOnline(String mediaServerId) {
        // 似乎没啥需要做的
        // TODO 查找之前的点播,流如果不存在则给下级发送bye
//        MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
//        zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{
//            Integer code = mediaList.getInteger("code");
//            if (code == 0) {
//                JSONArray data = mediaList.getJSONArray("data");
//                if (data == null || data.size() == 0) {
//                    zlmServerOffline(mediaServerId);
//                }else {
//                    Map<String, JSONObject> mediaListMap = new HashMap<>();
//                    for (int i = 0; i < data.size(); i++) {
//                        JSONObject json = data.getJSONObject(i);
//                        String app = json.getString("app");
//                        if ("rtp".equals(app)) {
//                            String stream = json.getString("stream");
//                            if (mediaListMap.get(stream) != null) {
//                                continue;
//                            }
//                            mediaListMap.put(stream, json);
//                            // 处理正在观看的国标设备
//                            List<SsrcTransaction> ssrcTransactions = streamSession.getSsrcTransactionForAll(null, null, null, stream);
//                            if (ssrcTransactions.size() > 0) {
//                                for (SsrcTransaction ssrcTransaction : ssrcTransactions) {
//                                    if(ssrcTransaction.getMediaServerId().equals(mediaServerId)) {
//                                        cmder.streamByeCmd(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(),
//                                                ssrcTransaction.getStream(), null);
//                                    }
//                                }
//                            }
//                        }
//                    }
//                    if (mediaListMap.size() > 0 ) {
//                        // 处理正在向上推流的上级平台
//                        List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServer(null);
//                        if (sendRtpItems.size() > 0) {
//                            for (SendRtpItem sendRtpItem : sendRtpItems) {
//                                if (sendRtpItem.getMediaServerId().equals(mediaServerId)) {
//                                    if (mediaListMap.get(sendRtpItem.getStreamId()) == null) {
//                                        ParentPlatform platform = storager.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
//                                        sipCommanderFroPlatform.streamByeCmd(platform, sendRtpItem.getCallId());
//                                    }
//                                }
//                            }
//                        }
//                    }
//                }
//            }
//        }));
    }
}