src/main/java/com/genersoft/iot/vmp/gb28181/session/AudioBroadcastManager.java
@@ -1,11 +1,26 @@ package com.genersoft.iot.vmp.gb28181.session; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatch; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.media.event.MediaDepartureEvent; import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import javax.sip.InvalidArgumentException; import javax.sip.SipException; import java.text.ParseException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -18,11 +33,56 @@ @Component public class AudioBroadcastManager { private final static Logger logger = LoggerFactory.getLogger(AudioBroadcastManager.class); @Autowired private SipConfig config; @Autowired private SIPCommander cmder; @Autowired private IRedisCatchStorage redisCatchStorage; @Autowired private IDeviceService deviceService; public static Map<String, AudioBroadcastCatch> data = new ConcurrentHashMap<>(); /** * 流离开的处理 */ @Async("taskExecutor") @EventListener public void onApplicationEvent(MediaDepartureEvent event) { List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(event.getStream()); if (!sendRtpItems.isEmpty()) { for (SendRtpItem sendRtpItem : sendRtpItems) { if (sendRtpItem != null && sendRtpItem.getApp().equals(event.getApp())) { String platformId = sendRtpItem.getPlatformId(); Device device = deviceService.getDevice(platformId); try { if (device != null) { cmder.streamByeCmd(device, sendRtpItem.getChannelId(), event.getStream(), sendRtpItem.getCallId()); if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST) || sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) { AudioBroadcastCatch audioBroadcastCatch = get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); if (audioBroadcastCatch != null) { // 来自上级平台的停止对讲 logger.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); } } } } catch (SipException | InvalidArgumentException | ParseException | SsrcTransactionNotFoundException e) { logger.error("[命令发送失败] 发送BYE: {}", e.getMessage()); } } } } } public void update(AudioBroadcastCatch audioBroadcastCatch) { if (SipUtils.isFrontEnd(audioBroadcastCatch.getDeviceId())) { audioBroadcastCatch.setChannelId(audioBroadcastCatch.getDeviceId()); src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java
@@ -15,8 +15,6 @@ import com.genersoft.iot.vmp.media.service.IMediaNodeServerService; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType; import com.genersoft.iot.vmp.service.IInviteStreamService; import com.genersoft.iot.vmp.service.bean.MediaServerLoad; import com.genersoft.iot.vmp.service.bean.SSRCInfo; @@ -85,7 +83,7 @@ public void onApplicationEvent(MediaArrivalEvent event) { if ("rtsp".equals(event.getSchema())) { logger.info("流变化:注册 app->{}, stream->{}", event.getApp(), event.getStream()); addCount(event.getSeverId()); addCount(event.getMediaServer().getId()); } } @@ -97,7 +95,7 @@ public void onApplicationEvent(MediaDepartureEvent event) { if ("rtsp".equals(event.getSchema())) { logger.info("流变化:注销, app->{}, stream->{}", event.getApp(), event.getStream()); removeCount(event.getSeverId()); removeCount(event.getMediaServer().getId()); } } src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -4,13 +4,10 @@ import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.InviteInfo; import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; @@ -18,12 +15,14 @@ import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.bean.ResultForOnPublish; import com.genersoft.iot.vmp.media.event.MediaArrivalEvent; import com.genersoft.iot.vmp.media.event.MediaDepartureEvent; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.dto.*; import com.genersoft.iot.vmp.media.zlm.dto.HookType; import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.media.zlm.dto.ZLMServerConfig; import com.genersoft.iot.vmp.media.zlm.dto.hook.*; import com.genersoft.iot.vmp.media.zlm.event.HookZlmServerKeepaliveEvent; import com.genersoft.iot.vmp.media.zlm.event.HookZlmServerStartEvent; @@ -34,9 +33,6 @@ import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.OtherPsSendInfo; import com.genersoft.iot.vmp.vmanager.bean.OtherRtpSendInfo; import com.genersoft.iot.vmp.vmanager.bean.StreamContent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -232,209 +228,6 @@ MediaDepartureEvent mediaArrivalEvent = MediaDepartureEvent.getInstance(this, param, mediaServer); applicationEventPublisher.publishEvent(mediaArrivalEvent); } return HookResult.SUCCESS(); JSONObject json = (JSONObject) JSON.toJSON(param); taskExecutor.execute(() -> { ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_stream_changed, json); MediaServer mediaInfo = mediaServerService.getOne(param.getMediaServerId()); if (mediaInfo == null) { logger.info("[ZLM HOOK] 流变化未找到ZLM, {}", param.getMediaServerId()); return; } if (subscribe != null) { subscribe.response(mediaInfo, param); } // TODO 重构此处逻辑 if (param.isRegist()) { // 处理流注册的鉴权信息, 流注销这里不再删除鉴权信息,下次来了新的鉴权信息会对就的进行覆盖 if (param.getOriginType() == OriginType.RTMP_PUSH.ordinal() || param.getOriginType() == OriginType.RTSP_PUSH.ordinal() || param.getOriginType() == OriginType.RTC_PUSH.ordinal()) { StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(param.getApp(), param.getStream()); if (streamAuthorityInfo == null) { streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(param); } else { streamAuthorityInfo.setOriginType(param.getOriginType()); streamAuthorityInfo.setOriginTypeStr(param.getOriginTypeStr()); } redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), param.getStream(), streamAuthorityInfo); } } if ("rtsp".equals(param.getSchema())) { logger.info("流变化:注册->{}, app->{}, stream->{}", param.isRegist(), param.getApp(), param.getStream()); if (param.isRegist()) { mediaServerService.addCount(param.getMediaServerId()); } else { mediaServerService.removeCount(param.getMediaServerId()); } int updateStatusResult = streamProxyService.updateStatus(param.isRegist(), param.getApp(), param.getStream()); if (updateStatusResult > 0) { } if ("rtp".equals(param.getApp()) && !param.isRegist()) { InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, param.getStream()); if (inviteInfo != null && (inviteInfo.getType() == InviteSessionType.PLAY || inviteInfo.getType() == InviteSessionType.PLAYBACK)) { inviteStreamService.removeInviteInfo(inviteInfo); storager.stopPlay(inviteInfo.getDeviceId(), inviteInfo.getChannelId()); } } else if ("broadcast".equals(param.getApp())) { // 语音对讲推流 stream需要满足格式deviceId_channelId if (param.getStream().indexOf("_") > 0) { String[] streamArray = param.getStream().split("_"); if (streamArray.length == 2) { String deviceId = streamArray[0]; String channelId = streamArray[1]; Device device = deviceService.getDevice(deviceId); if (device != null) { if (param.isRegist()) { if (audioBroadcastManager.exit(deviceId, channelId)) { playService.stopAudioBroadcast(deviceId, channelId); } // 开启语音对讲通道 try { playService.audioBroadcastCmd(device, channelId, mediaInfo, param.getApp(), param.getStream(), 60, false, (msg) -> { logger.info("[语音对讲] 通道建立成功, device: {}, channel: {}", deviceId, channelId); }); } catch (InvalidArgumentException | ParseException | SipException e) { logger.error("[命令发送失败] 语音对讲: {}", e.getMessage()); } } else { // 流注销 playService.stopAudioBroadcast(deviceId, channelId); } } else { logger.info("[语音对讲] 未找到设备:{}", deviceId); } } } } else if ("talk".equals(param.getApp())) { // 语音对讲推流 stream需要满足格式deviceId_channelId if (param.getStream().indexOf("_") > 0) { String[] streamArray = param.getStream().split("_"); if (streamArray.length == 2) { String deviceId = streamArray[0]; String channelId = streamArray[1]; Device device = deviceService.getDevice(deviceId); if (device != null) { if (param.isRegist()) { if (audioBroadcastManager.exit(deviceId, channelId)) { playService.stopAudioBroadcast(deviceId, channelId); } // 开启语音对讲通道 playService.talkCmd(device, channelId, mediaInfo, param.getStream(), (msg) -> { logger.info("[语音对讲] 通道建立成功, device: {}, channel: {}", deviceId, channelId); }); } else { // 流注销 playService.stopTalk(device, channelId, param.isRegist()); } } else { logger.info("[语音对讲] 未找到设备:{}", deviceId); } } } } else { if (!"rtp".equals(param.getApp())) { String type = OriginType.values()[param.getOriginType()].getType(); if (param.isRegist()) { StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo( param.getApp(), param.getStream()); String callId = null; if (streamAuthorityInfo != null) { callId = streamAuthorityInfo.getCallId(); } StreamInfo streamInfoByAppAndStream = mediaService.getStreamInfoByAppAndStream(mediaInfo, param.getApp(), param.getStream(), MediaInfo.getInstance(param), callId); param.setStreamInfo(new StreamContent(streamInfoByAppAndStream)); redisCatchStorage.addStream(mediaInfo, type, param.getApp(), param.getStream(), param); if (param.getOriginType() == OriginType.RTSP_PUSH.ordinal() || param.getOriginType() == OriginType.RTMP_PUSH.ordinal() || param.getOriginType() == OriginType.RTC_PUSH.ordinal()) { param.setSeverId(userSetting.getServerId()); zlmMediaListManager.addPush(param); // 冗余数据,自己系统中自用 redisCatchStorage.addPushListItem(param.getApp(), param.getStream(), param); } } else { // 兼容流注销时类型从redis记录获取 OnStreamChangedHookParam onStreamChangedHookParam = redisCatchStorage.getStreamInfo( param.getApp(), param.getStream(), param.getMediaServerId()); if (onStreamChangedHookParam != null) { type = OriginType.values()[onStreamChangedHookParam.getOriginType()].getType(); redisCatchStorage.removeStream(mediaInfo.getId(), type, param.getApp(), param.getStream()); if ("PUSH".equalsIgnoreCase(type)) { // 冗余数据,自己系统中自用 redisCatchStorage.removePushListItem(param.getApp(), param.getStream(), param.getMediaServerId()); } } GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream()); if (gbStream != null) { // eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF); } zlmMediaListManager.removeMedia(param.getApp(), param.getStream()); } GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream()); if (gbStream != null) { if (userSetting.isUsePushingAsStatus()) { eventPublisher.catalogEventPublishForStream(null, gbStream, param.isRegist() ? CatalogEvent.ON : CatalogEvent.OFF); } } if (type != null) { // 发送流变化redis消息 JSONObject jsonObject = new JSONObject(); jsonObject.put("serverId", userSetting.getServerId()); jsonObject.put("app", param.getApp()); jsonObject.put("stream", param.getStream()); jsonObject.put("register", param.isRegist()); jsonObject.put("mediaServerId", param.getMediaServerId()); redisCatchStorage.sendStreamChangeMsg(type, jsonObject); } } } if (!param.isRegist()) { List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(param.getStream()); if (!sendRtpItems.isEmpty()) { for (SendRtpItem sendRtpItem : sendRtpItems) { if (sendRtpItem != null && sendRtpItem.getApp().equals(param.getApp())) { String platformId = sendRtpItem.getPlatformId(); ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId); Device device = deviceService.getDevice(platformId); try { if (platform != null) { commanderFroPlatform.streamByeCmd(platform, sendRtpItem); redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStream()); } else { cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId()); if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST) || sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) { AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); if (audioBroadcastCatch != null) { // 来自上级平台的停止对讲 logger.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); } } } } catch (SipException | InvalidArgumentException | ParseException | SsrcTransactionNotFoundException e) { logger.error("[命令发送失败] 发送BYE: {}", e.getMessage()); } } } } } } }); return HookResult.SUCCESS(); } src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java
@@ -45,9 +45,9 @@ @Async("taskExecutor") @org.springframework.context.event.EventListener public void onApplicationEvent(MediaArrivalEvent event) { if ("rtsp".equals(event.getSchema()) && "rtp".equals(event.getApp())) { } // if ("rtsp".equals(event.getSchema()) && "rtp".equals(event.getApp())) { // // } } /** @@ -60,7 +60,7 @@ InviteInfo inviteInfo = getInviteInfoByStream(null, event.getStream()); if (inviteInfo != null && (inviteInfo.getType() == InviteSessionType.PLAY || inviteInfo.getType() == InviteSessionType.PLAYBACK)) { removeInviteInfo(inviteInfo); stopPlay(inviteInfo.getDeviceId(), inviteInfo.getChannelId()); storage.stopPlay(inviteInfo.getDeviceId(), inviteInfo.getChannelId()); } } } src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
@@ -1,9 +1,9 @@ package com.genersoft.iot.vmp.service.impl; import com.baomidou.dynamic.datasource.annotation.DS; import com.genersoft.iot.vmp.common.InviteInfo; import com.genersoft.iot.vmp.common.InviteSessionStatus; import com.genersoft.iot.vmp.common.InviteSessionType; import com.baomidou.dynamic.datasource.annotation.DS; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; @@ -12,18 +12,20 @@ import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.media.event.MediaDepartureEvent; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import com.genersoft.iot.vmp.service.IInviteStreamService; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.service.IPlatformService; import com.genersoft.iot.vmp.service.IPlayService; import com.genersoft.iot.vmp.service.bean.*; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.dao.*; import com.genersoft.iot.vmp.storager.dao.GbStreamMapper; import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper; import com.genersoft.iot.vmp.utils.DateUtil; import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; @@ -31,6 +33,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import javax.sdp.*; @@ -38,10 +42,6 @@ import javax.sip.ResponseEvent; import javax.sip.SipException; import java.text.ParseException; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.*; /** @@ -101,6 +101,34 @@ private IInviteStreamService inviteStreamService; /** * 流离开的处理 */ @Async("taskExecutor") @EventListener public void onApplicationEvent(MediaDepartureEvent event) { List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(event.getStream()); if (!sendRtpItems.isEmpty()) { for (SendRtpItem sendRtpItem : sendRtpItems) { if (sendRtpItem != null && sendRtpItem.getApp().equals(event.getApp())) { String platformId = sendRtpItem.getPlatformId(); ParentPlatform platform = platformMapper.getParentPlatByServerGBId(platformId); try { if (platform != null) { commanderForPlatform.streamByeCmd(platform, sendRtpItem); redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStream()); } } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[命令发送失败] 发送BYE: {}", e.getMessage()); } } } } } @Override public ParentPlatform queryPlatformByServerGBId(String platformGbId) { return platformMapper.getParentPlatByServerGBId(platformGbId); src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -23,11 +23,13 @@ import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.*; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRecordMp4; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRecordMp4HookParam; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType; import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.*; import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener; @@ -188,7 +190,6 @@ }else if ("talk".equals(event.getApp())) { stopTalk(device, channelId, false); } } } } src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
@@ -124,7 +124,7 @@ @EventListener public void onApplicationEvent(MediaDepartureEvent event) { if ("rtsp".equals(event.getSchema())) { updateStatus(true, event.getApp(), event.getStream()); updateStatus(false, event.getApp(), event.getStream()); } } src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
@@ -126,6 +126,7 @@ streamPushMapper.update(transform); gbStreamMapper.updateMediaServer(event.getApp(), event.getStream(), event.getMediaServer().getId()); } // TODO 相关的事件自行管理,不需要写入ZLMMediaListManager // ChannelOnlineEvent channelOnlineEventLister = getChannelOnlineEventLister(transform.getApp(), transform.getStream()); // if ( channelOnlineEventLister != null) { // try { @@ -137,6 +138,15 @@ // } // 冗余数据,自己系统中自用 redisCatchStorage.addPushListItem(event.getApp(), event.getStream(), event); // 发送流变化redis消息 JSONObject jsonObject = new JSONObject(); jsonObject.put("serverId", userSetting.getServerId()); jsonObject.put("app", event.getApp()); jsonObject.put("stream", event.getStream()); jsonObject.put("register", true); jsonObject.put("mediaServerId", event.getMediaServer().getId()); redisCatchStorage.sendStreamChangeMsg(OriginType.values()[event.getMediaInfo().getOriginType()].getType(), jsonObject); } /** @@ -145,7 +155,36 @@ @Async("taskExecutor") @EventListener public void onApplicationEvent(MediaDepartureEvent event) { // 兼容流注销时类型从redis记录获取 OnStreamChangedHookParam onStreamChangedHookParam = redisCatchStorage.getStreamInfo( event.getApp(), event.getStream(), event.getMediaServer().getId()); if (onStreamChangedHookParam != null) { String type = OriginType.values()[onStreamChangedHookParam.getOriginType()].getType(); redisCatchStorage.removeStream(event.getMediaServer().getId(), type, event.getApp(), event.getStream()); if ("PUSH".equalsIgnoreCase(type)) { // 冗余数据,自己系统中自用 redisCatchStorage.removePushListItem(event.getApp(), event.getStream(), event.getMediaServer().getId()); } if (type != null) { // 发送流变化redis消息 JSONObject jsonObject = new JSONObject(); jsonObject.put("serverId", userSetting.getServerId()); jsonObject.put("app", event.getApp()); jsonObject.put("stream", event.getStream()); jsonObject.put("register", false); jsonObject.put("mediaServerId", event.getMediaServer().getId()); redisCatchStorage.sendStreamChangeMsg(type, jsonObject); } } GbStream gbStream = storager.getGbStream(event.getApp(), event.getStream()); if (gbStream != null) { if (userSetting.isUsePushingAsStatus()) { storager.mediaOffline(event.getApp(), event.getStream()); eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF); } }else { storager.removeMedia(event.getApp(), event.getStream()); } }