648540858
2024-03-27 e886aa483373ccb85d964675b118ca9f0e27ee9c
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -73,9 +73,6 @@
    private AudioBroadcastManager audioBroadcastManager;
    @Autowired
    private ZLMServerFactory zlmServerFactory;
    @Autowired
    private IPlayService playService;
    @Autowired
@@ -124,9 +121,6 @@
    private VideoStreamSessionManager sessionManager;
    @Autowired
    private AssistRESTfulUtils assistRESTfulUtils;
    @Autowired
    private SSRCFactory ssrcFactory;
    @Qualifier("taskExecutor")
@@ -147,7 +141,7 @@
        taskExecutor.execute(() -> {
            List<ZlmHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_server_keepalive);
            if (subscribes != null && subscribes.size() > 0) {
            if (subscribes != null && !subscribes.isEmpty()) {
                for (ZlmHttpHookSubscribe.Event subscribe : subscribes) {
                    subscribe.response(null, param);
                }
@@ -166,7 +160,7 @@
    @PostMapping(value = "/on_play", produces = "application/json;charset=UTF-8")
    public HookResult onPlay(@RequestBody OnPlayHookParam param) {
        if (logger.isDebugEnabled()) {
            logger.debug("[ZLM HOOK] 播放鉴权:{}->{}" + param.getMediaServerId(), param);
            logger.debug("[ZLM HOOK] 播放鉴权:{}->{}", param.getMediaServerId(), param);
        }
        String mediaServerId = param.getMediaServerId();
@@ -252,11 +246,7 @@
        taskExecutor.execute(() -> {
            ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_publish, json);
            if (subscribe != null) {
                if (mediaInfo != null) {
                    subscribe.response(mediaInfo, param);
                } else {
                    new HookResultForOnPublish(1, "zlm not register");
                }
                subscribe.response(mediaInfo, param);
            }
        });
@@ -267,7 +257,7 @@
            result.setEnable_mp4(userSetting.isRecordPushLive());
        }
        // 国标流
        if ("rtp".equals(param.getApp()) ) {
        if ("rtp".equals(param.getApp())) {
            InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, param.getStream());
@@ -318,13 +308,17 @@
                    result.setEnable_audio(true);
                }
            }
        } else if (param.getApp().equals("broadcast")) {
            result.setEnable_audio(true);
        } else if (param.getApp().equals("talk")) {
            result.setEnable_audio(true);
        }
        if (param.getApp().equalsIgnoreCase("rtp")) {
            String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_" + param.getStream();
            OtherRtpSendInfo otherRtpSendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(receiveKey);
            OtherRtpSendInfo otherRtpSendInfo = (OtherRtpSendInfo) redisTemplate.opsForValue().get(receiveKey);
            String receiveKeyForPS = VideoManagerConstants.WVP_OTHER_RECEIVE_PS_INFO + userSetting.getServerId() + "_" + param.getStream();
            OtherPsSendInfo otherPsSendInfo = (OtherPsSendInfo)redisTemplate.opsForValue().get(receiveKeyForPS);
            OtherPsSendInfo otherPsSendInfo = (OtherPsSendInfo) redisTemplate.opsForValue().get(receiveKeyForPS);
            if (otherRtpSendInfo != null || otherPsSendInfo != null) {
                result.setEnable_mp4(true);
            }
@@ -347,13 +341,10 @@
            logger.info("[ZLM HOOK] 流注销, {}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream());
        }
        JSONObject ret = new JSONObject();
        ret.put("code", 0);
        ret.put("msg", "success");
        MediaServerItem mediaInfo = mediaServerService.getOne(param.getMediaServerId());
        JSONObject json = (JSONObject) JSON.toJSON(param);
        taskExecutor.execute(() -> {
            ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_stream_changed, json);
            MediaServerItem mediaInfo = mediaServerService.getOne(param.getMediaServerId());
            if (mediaInfo == null) {
                logger.info("[ZLM HOOK] 流变化未找到ZLM, {}", param.getMediaServerId());
                return;
@@ -379,7 +370,6 @@
                    redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), param.getStream(), streamAuthorityInfo);
                }
            }
            if ("rtsp".equals(param.getSchema())) {
                logger.info("流变化:注册->{}, app->{}, stream->{}", param.isRegist(), param.getApp(), param.getStream());
                if (param.isRegist()) {
@@ -500,7 +490,7 @@
                        GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream());
                        if (gbStream != null) {
                            if (userSetting.isUsePushingAsStatus()) {
                                eventPublisher.catalogEventPublishForStream(null, gbStream, param.isRegist()?CatalogEvent.ON:CatalogEvent.OFF);
                                eventPublisher.catalogEventPublishForStream(null, gbStream, param.isRegist() ? CatalogEvent.ON : CatalogEvent.OFF);
                            }
                        }
                        if (type != null) {
@@ -524,33 +514,31 @@
                                ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
                                Device device = deviceService.getDevice(platformId);
                                    try {
                                        if (platform != null) {
                                            commanderFroPlatform.streamByeCmd(platform, sendRtpItem);
                                            redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(),
                                                    sendRtpItem.getCallId(), sendRtpItem.getStream());
                                        } else {
                                            cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId());
                                            if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST)
                                                    || sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) {
                                                AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
                                                if (audioBroadcastCatch != null) {
                                                    // 来自上级平台的停止对讲
                                                    logger.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
                                                    audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
                                                }
                                try {
                                    if (platform != null) {
                                        commanderFroPlatform.streamByeCmd(platform, sendRtpItem);
                                        redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(),
                                                sendRtpItem.getCallId(), sendRtpItem.getStream());
                                    } else {
                                        cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId());
                                        if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST)
                                                || sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) {
                                            AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
                                            if (audioBroadcastCatch != null) {
                                                // 来自上级平台的停止对讲
                                                logger.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
                                                audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
                                            }
                                        }
                                    } catch (SipException | InvalidArgumentException | ParseException |
                                             SsrcTransactionNotFoundException e) {
                                        logger.error("[命令发送失败] 发送BYE: {}", e.getMessage());
                                    }
                                } catch (SipException | InvalidArgumentException | ParseException |
                                         SsrcTransactionNotFoundException e) {
                                    logger.error("[命令发送失败] 发送BYE: {}", e.getMessage());
                                }
                            }
                        }
                    }
                }
            }
        });
        return HookResult.SUCCESS();
@@ -581,9 +569,9 @@
                }
                // 收到无人观看说明流也没有在往上级推送
                if (redisCatchStorage.isChannelSendingRTP(inviteInfo.getChannelId())) {
                    List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId(
                    List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChannelId(
                            inviteInfo.getChannelId());
                    if (sendRtpItems.size() > 0) {
                    if (!sendRtpItems.isEmpty()) {
                        for (SendRtpItem sendRtpItem : sendRtpItems) {
                            ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
                            try {
@@ -612,14 +600,14 @@
                        if (info != null) {
                            cmder.streamByeCmd(device, inviteInfo.getChannelId(),
                                    inviteInfo.getStream(), null);
                        }else {
                        } else {
                            logger.info("[无人观看] 未找到设备的点播信息: {}, 流:{}", inviteInfo.getDeviceId(), param.getStream());
                        }
                    } catch (InvalidArgumentException | ParseException | SipException |
                             SsrcTransactionNotFoundException e) {
                        logger.error("[无人观看]点播, 发送BYE失败 {}", e.getMessage());
                    }
                }else {
                } else {
                    logger.info("[无人观看] 未找到设备: {},流:{}", inviteInfo.getDeviceId(), param.getStream());
                }
@@ -729,7 +717,7 @@
                    });
                }
                return result;
            }else if(s.length == 4){
            } else if (s.length == 4) {
                // 此时为录像回放, 录像回放格式为> 设备ID_通道ID_开始时间_结束时间
                String startTimeStr = s[2];
                String endTimeStr = s[3];
@@ -763,14 +751,14 @@
                if (!exist) {
                    SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaInfo, param.getStream(), null,
                            device.isSsrcCheck(),  true, 0, false, false, device.getStreamModeForParam());
                            device.isSsrcCheck(), true, 0, false, false, device.getStreamModeForParam());
                    playService.playBack(mediaInfo, ssrcInfo, deviceId, channelId, startTime, endTime, (code, message, data) -> {
                        msg.setData(new HookResult(code, message));
                        resultHolder.invokeResult(msg);
                    });
                }
                return result;
            }else {
            } else {
                defaultResult.setResult(HookResult.SUCCESS());
                return defaultResult;
            }
@@ -800,7 +788,7 @@
        logger.info("[ZLM HOOK] zlm 启动 " + zlmServerConfig.getGeneralMediaServerId());
        taskExecutor.execute(() -> {
            List<ZlmHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_server_started);
            if (subscribes != null && subscribes.size() > 0) {
            if (subscribes != null && !subscribes.isEmpty()) {
                for (ZlmHttpHookSubscribe.Event subscribe : subscribes) {
                    subscribe.response(null, zlmServerConfig);
                }
@@ -849,12 +837,11 @@
     */
    @ResponseBody
    @PostMapping(value = "/on_rtp_server_timeout", produces = "application/json;charset=UTF-8")
    public HookResult onRtpServerTimeout(HttpServletRequest request, @RequestBody OnRtpServerTimeoutHookParam
    public HookResult onRtpServerTimeout(@RequestBody OnRtpServerTimeoutHookParam
            param) {
        logger.info("[ZLM HOOK] rtpServer收流超时:{}->{}({})", param.getMediaServerId(), param.getStream_id(), param.getSsrc());
        taskExecutor.execute(() -> {
            JSONObject json = (JSONObject) JSON.toJSON(param);
            List<ZlmHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_rtp_server_timeout);
            if (subscribes != null && !subscribes.isEmpty()) {
                for (ZlmHttpHookSubscribe.Event subscribe : subscribes) {