648540858
2023-07-01 c551164c89f70e664b498c3a09e615928261e01a
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -16,16 +16,11 @@
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.*;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
@@ -95,7 +90,7 @@
    private IInviteStreamService inviteStreamService;
    @Autowired
    private DeferredResultHolder resultHolder;
    private SendRtpPortManager sendRtpPortManager;
    @Autowired
    private ZLMRESTfulUtils zlmresTfulUtils;
@@ -235,8 +230,8 @@
        sendRtpItem.setUsePs(false);
        sendRtpItem.setReceiveStream(stream + "_talk");
        int port = zlmrtpServerFactory.keepPort(mediaServerItem, playSsrc, null);
        String callId = SipUtils.getNewCallId();
        int port = sendRtpPortManager.getNextPort(mediaServerItem.getId());
        //端口获取失败的ssrcInfo 没有必要发送点播指令
        if (port <= 0) {
            logger.info("[语音对讲] 端口分配异常,deviceId={},channelId={}", device.getDeviceId(), channelId);
@@ -264,9 +259,6 @@
            }
        }, userSetting.getPlayTimeout());
        String callId = SipUtils.getNewCallId();
        zlmrtpServerFactory.releasePort(mediaServerItem, playSsrc);
        Map<String, Object> param = new HashMap<>(12);
        param.put("vhost","__defaultVhost__");
        param.put("app", sendRtpItem.getApp());
@@ -293,12 +285,12 @@
        // 查看设备是否已经在推流
        try {
            cmder.talkStreamCmd(mediaServerItem, sendRtpItem, device, channelId, callId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
                logger.info("[语音对讲] 流已生成, 开始推流: " + response.toJSONString());
            cmder.talkStreamCmd(mediaServerItem, sendRtpItem, device, channelId, callId, (mediaServerItemInuse, hookParam) -> {
                logger.info("[语音对讲] 流已生成, 开始推流: " + hookParam);
                dynamicTask.stop(timeOutTaskKey);
                // TODO 暂不做处理
            }, (MediaServerItem mediaServerItemInuse, JSONObject json) -> {
                logger.info("[语音对讲] 设备开始推流: " + json.toJSONString());
            }, (mediaServerItemInuse, hookParam) -> {
                logger.info("[语音对讲] 设备开始推流: " + hookParam);
                dynamicTask.stop(timeOutTaskKey);
            }, (event) -> {
@@ -362,7 +354,7 @@
                    null);
            return;
        }
        logger.info("[点播开始] deviceId: {}, channelId: {},码流类型:{},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验:{}",
        logger.info("[点播开始] deviceId: {}, channelId: {},码流类型:{}, 收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验:{}",
                device.getDeviceId(), channelId, device.isSwitchPrimarySubStream() ? "辅码流" : "主码流", ssrcInfo.getPort(),
                device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
        //端口获取失败的ssrcInfo 没有必要发送点播指令
@@ -445,7 +437,7 @@
                        InviteErrorCode.SUCCESS.getCode(),
                        InviteErrorCode.SUCCESS.getMsg(),
                        streamInfo);
                logger.info("[点播成功] deviceId: {}, channelId: {},码流类型:{}", device.getDeviceId(),
                logger.info("[点播成功] deviceId: {}, channelId:{}, 码流类型:{}", device.getDeviceId(),
                        device.isSwitchPrimarySubStream() ? "辅码流" : "主码流");
                String streamUrl;
                if (mediaServerItemInuse.getRtspPort() != 0) {
@@ -617,10 +609,10 @@
    }
    @Override
    public StreamInfo onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId) {
        StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId);
    public StreamInfo onPublishHandlerForPlay(MediaServerItem mediaServerItem, HookParam hookParam, String deviceId, String channelId) {
        OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam) hookParam;
        StreamInfo streamInfo = onPublishHandler(mediaServerItem, streamChangedHookParam, deviceId, channelId);
        Device device = redisCatchStorage.getDevice(deviceId);
        OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam;
        if (streamInfo != null) {
            DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
            if (deviceChannel != null) {
@@ -1434,14 +1426,10 @@
    @Override
    public void startPushStream(SendRtpItem sendRtpItem, SIPResponse sipResponse, ParentPlatform platform, CallIdHeader callIdHeader) {
        // 开始发流
        // 取消设置的超时任务
//         String channelId = request.getCallIdHeader().getCallId();
        String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
        MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
        logger.info("收到ACK,rtp/{}开始推流, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getStream(),
        logger.info("[开始推流] rtp/{}, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getStream(),
                sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp());
        Map<String, Object> param = new HashMap<>(12);
        param.put("vhost", "__defaultVhost__");
@@ -1467,19 +1455,15 @@
                startSendRtpStreamHand(sendRtpItem, platform, json, param, callIdHeader);
            });
        } else {
            // 如果是非严格模式,需要关闭端口占用
            // 如果是严格模式,需要关闭端口占用
            JSONObject startSendRtpStreamResult = null;
            if (sendRtpItem.getLocalPort() != 0) {
                HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(sendRtpItem.getSsrc(), null, mediaInfo.getId());
                hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout);
                if (zlmrtpServerFactory.releasePort(mediaInfo, sendRtpItem.getSsrc())) {
                    if (sendRtpItem.isTcpActive()) {
                        startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param);
                    } else {
                        param.put("dst_url", sendRtpItem.getIp());
                        param.put("dst_port", sendRtpItem.getPort());
                        startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
                    }
                if (sendRtpItem.isTcpActive()) {
                    startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param);
                } else {
                    param.put("dst_url", sendRtpItem.getIp());
                    param.put("dst_port", sendRtpItem.getPort());
                    startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
                }
            } else {
                if (sendRtpItem.isTcpActive()) {
@@ -1503,7 +1487,8 @@
            logger.error("RTP推流失败: 请检查ZLM服务");
        } else if (jsonObject.getInteger("code") == 0) {
            logger.info("调用ZLM推流接口, 结果: {}", jsonObject);
            logger.info("RTP推流成功[ {}/{} ],{}->{}:{}, ", param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port"));
            logger.info("RTP推流成功[ {}/{} ],{}->{}, ", param.get("app"), param.get("stream"), jsonObject.getString("local_port"),
                    sendRtpItem.isTcpActive()?"被动发流": param.get("dst_url") + ":" + param.get("dst_port"));
        } else {
            logger.error("RTP推流失败: {}, 参数:{}", jsonObject.getString("msg"), JSONObject.toJSONString(param));
            if (sendRtpItem.isOnlyAudio()) {
@@ -1571,7 +1556,7 @@
            }
        }
        talk(mediaServerItem, device, channelId, stream, (MediaServerItem mediaServerItem1, JSONObject response) -> {
        talk(mediaServerItem, device, channelId, stream, (mediaServerItem1, hookParam) -> {
            logger.info("[语音对讲] 收到设备发来的流");
        }, eventResult -> {
            logger.warn("[语音对讲] 失败,{}/{}, 错误码 {} {}", device.getDeviceId(), channelId, eventResult.statusCode, eventResult.msg);