648540858
2023-08-07 e898c344aaa515b4fe16ae7ce3d979160d1e962b
src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
@@ -1,6 +1,5 @@
package com.genersoft.iot.vmp.service.impl;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.InviteInfo;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.conf.DynamicTask;
@@ -11,11 +10,12 @@
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IPlatformService;
@@ -59,15 +59,6 @@
    private ParentPlatformMapper platformMapper;
    @Autowired
    private PlatformCatalogMapper catalogMapper;
    @Autowired
    private PlatformChannelMapper platformChannelMapper;
    @Autowired
    private PlatformGbStreamMapper platformGbStreamMapper;
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
    @Autowired
@@ -83,7 +74,7 @@
    private DynamicTask dynamicTask;
    @Autowired
    private ZLMRTPServerFactory zlmrtpServerFactory;
    private ZLMServerFactory zlmServerFactory;
    @Autowired
    private SubscribeHolder subscribeHolder;
@@ -364,7 +355,7 @@
                param.put("vhost", "__defaultVhost__");
                param.put("app", sendRtpItem.getApp());
                param.put("stream", sendRtpItem.getStream());
                zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param);
                zlmServerFactory.stopSendRtpStream(mediaInfo, param);
            }
        }
    }
@@ -436,16 +427,17 @@
            // 如果zlm不存在这个流,则删除数据即可
            MediaServerItem mediaServerItemForStreamInfo = mediaServerService.getOne(inviteInfo.getStreamInfo().getMediaServerId());
            if (mediaServerItemForStreamInfo != null) {
                Boolean ready = zlmrtpServerFactory.isStreamReady(mediaServerItemForStreamInfo, inviteInfo.getStreamInfo().getApp(), inviteInfo.getStreamInfo().getStream());
                Boolean ready = zlmServerFactory.isStreamReady(mediaServerItemForStreamInfo, inviteInfo.getStreamInfo().getApp(), inviteInfo.getStreamInfo().getStream());
                if (!ready) {
                    // 错误存在于redis中的数据
                    inviteStreamService.removeInviteInfo(inviteInfo);
                }else {
                    // 流确实尚在推流,直接回调结果
                    JSONObject json = new JSONObject();
                    json.put("app", inviteInfo.getStreamInfo().getApp());
                    json.put("stream", inviteInfo.getStreamInfo().getStream());
                    hookEvent.response(mediaServerItemForStreamInfo, json);
                    OnStreamChangedHookParam hookParam = new OnStreamChangedHookParam();
                    hookParam.setApp(inviteInfo.getStreamInfo().getApp());
                    hookParam.setStream(inviteInfo.getStreamInfo().getStream());
                    hookEvent.response(mediaServerItemForStreamInfo, hookParam);
                    return;
                }
            }
@@ -498,14 +490,14 @@
                }
            }
        }, userSetting.getPlayTimeout());
        commanderForPlatform.broadcastInviteCmd(platform, channelId, mediaServerItem, ssrcInfo, (mediaServerItemForInvite, response)->{
        commanderForPlatform.broadcastInviteCmd(platform, channelId, mediaServerItem, ssrcInfo, (mediaServerItemForInvite, hookParam)->{
            logger.info("[国标级联] 发起语音喊话 收到上级推流 deviceId: {}, channelId: {}", platform.getServerGBId(), channelId);
            dynamicTask.stop(timeOutTaskKey);
            // hook响应
            playService.onPublishHandlerForPlay(mediaServerItemForInvite, response, platform.getServerGBId(), channelId);
            playService.onPublishHandlerForPlay(mediaServerItemForInvite, hookParam, platform.getServerGBId(), channelId);
            // 收到流
            if (hookEvent != null) {
                hookEvent.response(mediaServerItem, response);
                hookEvent.response(mediaServerItem, hookParam);
            }
        }, event -> {
            // 收到200OK 检测ssrc是否有变化,防止上级自定义了ssrc
@@ -524,30 +516,20 @@
                logger.info("[点播消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse);
                if (!mediaServerItem.isRtpEnable()) {
                    logger.info("[点播消息] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
                    if (!ssrcFactory.checkSsrc(mediaServerItem.getId(), ssrcInResponse)) {
                        // ssrc 不可用
                        // 释放ssrc
                        mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
                        streamSession.remove(platform.getServerGBId(), channelId, ssrcInfo.getStream());
                        event.msg = "下级自定义了ssrc,但是此ssrc不可用";
                        event.statusCode = 400;
                        errorEvent.response(event);
                        return;
                    }
                    // 释放ssrc
                    mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
                    // 单端口模式streamId也有变化,需要重新设置监听
                    if (!mediaServerItem.isRtpEnable()) {
                        // 添加订阅
                        HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
                        subscribe.removeSubscribe(hookSubscribe);
                        hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());
                        subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> {
                            logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString());
                        subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> {
                            logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + hookParam);
                            dynamicTask.stop(timeOutTaskKey);
                            // hook响应
                            playService.onPublishHandlerForPlay(mediaServerItemInUse, response, platform.getServerGBId(), channelId);
                            hookEvent.response(mediaServerItemInUse, response);
                            playService.onPublishHandlerForPlay(mediaServerItemInUse, hookParam, platform.getServerGBId(), channelId);
                            hookEvent.response(mediaServerItemInUse, hookParam);
                        });
                    }
                    // 关闭rtp server