648540858
2024-04-01 ecd1d2a414f650987579ac95ebdf848cd98d7af0
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -17,17 +17,21 @@
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.event.MediaArrivalEvent;
import com.genersoft.iot.vmp.media.event.MediaDepartureEvent;
import com.genersoft.iot.vmp.media.bean.RecordInfo;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookData;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
import com.genersoft.iot.vmp.media.event.media.MediaNotFoundEvent;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.*;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServer;
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRecordMp4HookParam;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType;
import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.*;
import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
@@ -91,13 +95,10 @@
    private IInviteStreamService inviteStreamService;
    @Autowired
    private ZlmHttpHookSubscribe subscribe;
    private HookSubscribe subscribe;
    @Autowired
    private SendRtpPortManager sendRtpPortManager;
    @Autowired
    private IMediaService mediaService;
    @Autowired
    private IMediaServerService mediaServerService;
@@ -172,6 +173,33 @@
    @Async("taskExecutor")
    @EventListener
    public void onApplicationEvent(MediaDepartureEvent event) {
        List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(event.getStream());
        if (!sendRtpItems.isEmpty()) {
            for (SendRtpItem sendRtpItem : sendRtpItems) {
                if (sendRtpItem != null && sendRtpItem.getApp().equals(event.getApp())) {
                    String platformId = sendRtpItem.getPlatformId();
                    Device device = deviceService.getDevice(platformId);
                    try {
                        if (device != null) {
                            cmder.streamByeCmd(device, sendRtpItem.getChannelId(), event.getStream(), sendRtpItem.getCallId());
                            if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST)
                                    || sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) {
                                AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
                                if (audioBroadcastCatch != null) {
                                    // 来自上级平台的停止对讲
                                    logger.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
                                    audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
                                }
                            }
                        }
                    } catch (SipException | InvalidArgumentException | ParseException |
                             SsrcTransactionNotFoundException e) {
                        logger.error("[命令发送失败] 发送BYE: {}", e.getMessage());
                    }
                }
            }
        }
        if ("broadcast".equals(event.getApp()) || "talk".equals(event.getApp())) {
            if (event.getStream().indexOf("_") > 0) {
                String[] streamArray = event.getStream().split("_");
@@ -188,9 +216,55 @@
                    }else if ("talk".equals(event.getApp())) {
                        stopTalk(device, channelId, false);
                    }
                }
            }
        }
    }
    /**
     * 流未找到的处理
     */
    @Async("taskExecutor")
    @EventListener
    public void onApplicationEvent(MediaNotFoundEvent event) {
        if (!"rtp".equals(event.getApp())) {
            return;
        }
        String[] s = event.getStream().split("_");
        if ((s.length != 2 && s.length != 4)) {
            return;
        }
        String deviceId = s[0];
        String channelId = s[1];
        Device device = redisCatchStorage.getDevice(deviceId);
        if (device == null || !device.isOnLine()) {
            return;
        }
        DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
        if (deviceChannel == null) {
            return;
        }
        if (s.length == 2) {
            logger.info("[ZLM HOOK] 预览流未找到, 发起自动点播:{}->{}->{}/{}", event.getMediaServer().getId(), event.getSchema(), event.getApp(), event.getStream());
            play(event.getMediaServer(), deviceId, channelId, null, null);
        } else if (s.length == 4) {
            // 此时为录像回放, 录像回放格式为> 设备ID_通道ID_开始时间_结束时间
            String startTimeStr = s[2];
            String endTimeStr = s[3];
            if (startTimeStr == null || endTimeStr == null || startTimeStr.length() != 14 || endTimeStr.length() != 14) {
                return;
            }
            String startTime = DateUtil.urlToyyyy_MM_dd_HH_mm_ss(startTimeStr);
            String endTime = DateUtil.urlToyyyy_MM_dd_HH_mm_ss(endTimeStr);
            logger.info("[ZLM HOOK] 回放流未找到, 发起自动点播:{}->{}->{}/{}-{}-{}",
                    event.getMediaServer().getId(), event.getSchema(),
                    event.getApp(), event.getStream(),
                    startTime, endTime
            );
            SSRCInfo ssrcInfo = mediaServerService.openRTPServer(event.getMediaServer(), event.getStream(), null,
                    device.isSsrcCheck(), true, 0, false, false, device.getStreamModeForParam());
            playBack(event.getMediaServer(), ssrcInfo, deviceId, channelId, startTime, endTime, null);
        }
    }
@@ -265,7 +339,7 @@
    }
    private void talk(MediaServer mediaServerItem, Device device, String channelId, String stream,
                      ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
                      HookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
                      Runnable timeoutCallback, AudioBroadcastEvent audioEvent) {
        String playSsrc = ssrcFactory.getPlaySsrc(mediaServerItem.getId());
@@ -347,12 +421,12 @@
        // 查看设备是否已经在推流
        try {
            cmder.talkStreamCmd(mediaServerItem, sendRtpItem, device, channelId, callId, (mediaServerItemInuse, hookParam) -> {
                logger.info("[语音对讲] 流已生成, 开始推流: " + hookParam);
            cmder.talkStreamCmd(mediaServerItem, sendRtpItem, device, channelId, callId, (hookData) -> {
                logger.info("[语音对讲] 流已生成, 开始推流: " + hookData);
                dynamicTask.stop(timeOutTaskKey);
                // TODO 暂不做处理
            }, (mediaServerItemInuse, hookParam) -> {
                logger.info("[语音对讲] 设备开始推流: " + hookParam);
            }, (hookData) -> {
                logger.info("[语音对讲] 设备开始推流: " + hookData);
                dynamicTask.stop(timeOutTaskKey);
            }, (event) -> {
@@ -462,8 +536,7 @@
                    streamSession.remove(device.getDeviceId(), channel.getChannelId(), ssrcInfo.getStream());
                    mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
                    // 取消订阅消息监听
                    HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
                    subscribe.removeSubscribe(hookSubscribe);
                    subscribe.removeSubscribe(Hook.getInstance(HookType.on_media_arrival, "rtp", ssrcInfo.getStream(), mediaServerItem.getId()));
                }
            }else {
                logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},码流:{},端口:{}, SSRC: {}",
@@ -478,11 +551,11 @@
        }, userSetting.getPlayTimeout());
        try {
            cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channel, (mediaServerItemInuse, hookParam ) -> {
                logger.info("收到订阅消息: " + hookParam);
            cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channel, (hookData ) -> {
                logger.info("收到订阅消息: " + hookData);
                dynamicTask.stop(timeOutTaskKey);
                // hook响应
                StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInuse, hookParam, device.getDeviceId(), channel.getChannelId());
                StreamInfo streamInfo = onPublishHandlerForPlay(hookData.getMediaServer(), hookData.getMediaInfo(), device.getDeviceId(), channel.getChannelId());
                if (streamInfo == null){
                    callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
                            InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
@@ -498,7 +571,7 @@
                        streamInfo);
                logger.info("[点播成功] deviceId: {}, channelId:{}, 码流类型:{}", device.getDeviceId(), channel.getChannelId(),
                        channel.getStreamIdentification());
                snapOnPlay(mediaServerItemInuse, device.getDeviceId(), channel.getChannelId(), ssrcInfo.getStream());
                snapOnPlay(hookData.getMediaServer(), device.getDeviceId(), channel.getChannelId(), ssrcInfo.getStream());
            }, (eventResult) -> {
                // 处理收到200ok后的TCP主动连接以及SSRC不一致的问题
                InviteOKHandler(eventResult, ssrcInfo, mediaServerItem, device, channel.getChannelId(),
@@ -624,11 +697,10 @@
        mediaServerService.getSnap(mediaServerItemInuse, streamUrl, 15, 1, path, fileName);
    }
    public StreamInfo onPublishHandlerForPlay(MediaServer mediaServerItem, HookParam hookParam, String deviceId, String channelId) {
    public StreamInfo onPublishHandlerForPlay(MediaServer mediaServerItem, MediaInfo mediaInfo, String deviceId, String channelId) {
        StreamInfo streamInfo = null;
        Device device = redisCatchStorage.getDevice(deviceId);
        OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam;
        streamInfo = onPublishHandler(mediaServerItem, streamChangedHookParam, deviceId, channelId);
        streamInfo = onPublishHandler(mediaServerItem, mediaInfo, deviceId, channelId);
        if (streamInfo != null) {
            DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
            if (deviceChannel != null) {
@@ -646,9 +718,8 @@
    }
    private StreamInfo onPublishHandlerForPlayback(MediaServer mediaServerItem, HookParam param, String deviceId, String channelId, String startTime, String endTime) {
        OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam) param;
        StreamInfo streamInfo = onPublishHandler(mediaServerItem, streamChangedHookParam, deviceId, channelId);
    private StreamInfo onPublishHandlerForPlayback(MediaServer mediaServerItem, MediaInfo mediaInfo, String deviceId, String channelId, String startTime, String endTime) {
        StreamInfo streamInfo = onPublishHandler(mediaServerItem, mediaInfo, deviceId, channelId);
        if (streamInfo != null) {
            streamInfo.setStartTime(startTime);
            streamInfo.setEndTime(endTime);
@@ -657,7 +728,7 @@
                deviceChannel.setStreamId(streamInfo.getStream());
                storager.startPlay(deviceId, channelId, streamInfo.getStream());
            }
            InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(InviteSessionType.PLAYBACK, ((OnStreamChangedHookParam) param).getStream());
            InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(InviteSessionType.PLAYBACK, mediaInfo.getStream());
            if (inviteInfo != null) {
                inviteInfo.setStatus(InviteSessionStatus.ok);
@@ -763,10 +834,10 @@
            inviteStreamService.removeInviteInfo(inviteInfo);
        };
        ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInuse, hookParam) -> {
            logger.info("收到回放订阅消息: " + hookParam);
        HookSubscribe.Event hookEvent = (hookData) -> {
            logger.info("收到回放订阅消息: " + hookData);
            dynamicTask.stop(playBackTimeOutTaskKey);
            StreamInfo streamInfo = onPublishHandlerForPlayback(mediaServerItemInuse, hookParam, deviceId, channelId, startTime, endTime);
            StreamInfo streamInfo = onPublishHandlerForPlayback(hookData.getMediaServer(), hookData.getMediaInfo(), deviceId, channelId, startTime, endTime);
            if (streamInfo == null) {
                logger.warn("设备回放API调用失败!");
                callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
@@ -952,10 +1023,10 @@
            streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
            inviteStreamService.removeInviteInfo(inviteInfo);
        };
        ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInuse, hookParam) -> {
            logger.info("[录像下载]收到订阅消息: " + hookParam);
        HookSubscribe.Event hookEvent = (hookData) -> {
            logger.info("[录像下载]收到订阅消息: " + hookData);
            dynamicTask.stop(downLoadTimeOutTaskKey);
            StreamInfo streamInfo = onPublishHandlerForDownload(mediaServerItemInuse, hookParam, deviceId, channelId, startTime, endTime);
            StreamInfo streamInfo = onPublishHandlerForDownload(hookData.getMediaServer(), hookData.getMediaInfo(), deviceId, channelId, startTime, endTime);
            if (streamInfo == null) {
                logger.warn("[录像下载] 获取流地址信息失败");
                callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
@@ -973,26 +1044,24 @@
                                downLoadTimeOutTaskKey, callback, inviteInfo, InviteSessionType.DOWNLOAD);
                        // 注册录像回调事件,录像下载结束后写入下载地址
                        ZlmHttpHookSubscribe.Event hookEventForRecord = (mediaServerItemInuse, hookParam) -> {
                        HookSubscribe.Event hookEventForRecord = (hookData) -> {
                            logger.info("[录像下载] 收到录像写入磁盘消息: , {}/{}-{}",
                                    inviteInfo.getDeviceId(), inviteInfo.getChannelId(), ssrcInfo.getStream());
                            logger.info("[录像下载] 收到录像写入磁盘消息内容: " + hookParam);
                            OnRecordMp4HookParam recordMp4HookParam = (OnRecordMp4HookParam)hookParam;
                            String filePath = recordMp4HookParam.getFile_path();
                            logger.info("[录像下载] 收到录像写入磁盘消息内容: " + hookData);
                            RecordInfo recordInfo = hookData.getRecordInfo();
                            String filePath = recordInfo.getFilePath();
                            DownloadFileInfo downloadFileInfo = CloudRecordUtils.getDownloadFilePath(mediaServerItem, filePath);
                            InviteInfo inviteInfoForNew = inviteStreamService.getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId()
                                    , inviteInfo.getChannelId(), inviteInfo.getStream());
                            inviteInfoForNew.getStreamInfo().setDownLoadFilePath(downloadFileInfo);
                            inviteStreamService.updateInviteInfo(inviteInfoForNew);
                        };
                        HookSubscribeForRecordMp4 hookSubscribe = HookSubscribeFactory.on_record_mp4(
                                mediaServerItem.getId(), "rtp", ssrcInfo.getStream());
                        Hook hook = Hook.getInstance(HookType.on_record_mp4, "rtp", ssrcInfo.getStream(), mediaServerItem.getId());
                        // 设置过期时间,下载失败时自动处理订阅数据
//                        long difference = DateUtil.getDifference(startTime, endTime)/1000;
//                        Instant expiresInstant = Instant.now().plusSeconds(TimeUnit.MINUTES.toSeconds(difference * 2));
//                        hookSubscribe.setExpires(expiresInstant);
                        subscribe.addSubscribe(hookSubscribe, hookEventForRecord);
                        subscribe.addSubscribe(hook, hookEventForRecord);
                    });
        } catch (InvalidArgumentException | SipException | ParseException e) {
            logger.error("[命令发送失败] 录像下载: {}", e.getMessage());
@@ -1058,9 +1127,8 @@
        return inviteInfo.getStreamInfo();
    }
    private StreamInfo onPublishHandlerForDownload(MediaServer mediaServerItemInuse, HookParam hookParam, String deviceId, String channelId, String startTime, String endTime) {
        OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam) hookParam;
        StreamInfo streamInfo = onPublishHandler(mediaServerItemInuse, streamChangedHookParam, deviceId, channelId);
    private StreamInfo onPublishHandlerForDownload(MediaServer mediaServerItemInuse, MediaInfo mediaInfo, String deviceId, String channelId, String startTime, String endTime) {
        StreamInfo streamInfo = onPublishHandler(mediaServerItemInuse, mediaInfo, deviceId, channelId);
        if (streamInfo != null) {
            streamInfo.setProgress(0);
            streamInfo.setStartTime(startTime);
@@ -1077,9 +1145,8 @@
    }
    public StreamInfo onPublishHandler(MediaServer mediaServerItem, OnStreamChangedHookParam hookParam, String deviceId, String channelId) {
        MediaInfo mediaInfo = MediaInfo.getInstance(hookParam);
        StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, "rtp", hookParam.getStream(), mediaInfo, null);
    public StreamInfo onPublishHandler(MediaServer mediaServerItem, MediaInfo mediaInfo, String deviceId, String channelId) {
        StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream(mediaServerItem, "rtp", mediaInfo.getStream(), mediaInfo, null);
        streamInfo.setDeviceID(deviceId);
        streamInfo.setChannelId(channelId);
        return streamInfo;
@@ -1144,7 +1211,7 @@
        AudioBroadcastResult audioBroadcastResult = new AudioBroadcastResult();
        audioBroadcastResult.setApp(app);
        audioBroadcastResult.setStream(stream);
        audioBroadcastResult.setStreamInfo(new StreamContent(mediaService.getStreamInfoByAppAndStream(mediaServerItem, app, stream, null, null, null, false)));
        audioBroadcastResult.setStreamInfo(new StreamContent(mediaServerService.getStreamInfoByAppAndStream(mediaServerItem, app, stream, null, null, null, false)));
        audioBroadcastResult.setCodec("G.711");
        return audioBroadcastResult;
    }
@@ -1515,7 +1582,7 @@
            }
        }
        talk(mediaServerItem, device, channelId, stream, (mediaServerItem1, hookParam) -> {
        talk(mediaServerItem, device, channelId, stream, (hookData) -> {
            logger.info("[语音对讲] 收到设备发来的流");
        }, eventResult -> {
            logger.warn("[语音对讲] 失败,{}/{}, 错误码 {} {}", device.getDeviceId(), channelId, eventResult.statusCode, eventResult.msg);