| | |
| | | import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage; |
| | | import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest; |
| | | import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse; |
| | | import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; |
| | | import com.genersoft.iot.vmp.service.redisMsg.control.RedisRpcController; |
| | | import org.slf4j.Logger; |
| | | import org.slf4j.LoggerFactory; |
| | |
| | | |
| | | @Autowired |
| | | private RedisTemplate<Object, Object> redisTemplate; |
| | | |
| | | @Autowired |
| | | private ZlmHttpHookSubscribe hookSubscribe; |
| | | |
| | | private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); |
| | | |
| | |
| | | import com.genersoft.iot.vmp.media.bean.MediaInfo; |
| | | import com.genersoft.iot.vmp.media.bean.MediaServer; |
| | | import com.genersoft.iot.vmp.media.service.IMediaServerService; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; |
| | | import com.genersoft.iot.vmp.service.*; |
| | | import com.genersoft.iot.vmp.service.bean.RequestStopPushStreamMsg; |
| | | import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; |
| | | import com.genersoft.iot.vmp.storager.IRedisCatchStorage; |
| | | import com.genersoft.iot.vmp.storager.IVideoManagerStorage; |
| | |
| | | logger.info("[收到bye] 停止推流:{}, 媒体节点: {}", streamId, sendRtpItem.getMediaServerId()); |
| | | |
| | | if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) { |
| | | // 查询这路流是否是本平台的 |
| | | StreamPushItem push = pushService.getPush(sendRtpItem.getApp(), sendRtpItem.getStream()); |
| | | if (push!= null && !push.isSelf()) { |
| | | // 不是本平台的就发送redis消息让其他wvp停止发流 |
| | | ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId()); |
| | | if (platform != null) { |
| | | RequestStopPushStreamMsg streamMsg = RequestStopPushStreamMsg.getInstance(sendRtpItem, platform.getName(), platform.getId()); |
| | | redisGbPlayMsgListener.sendMsgForStopSendRtpStream(sendRtpItem.getServerId(), streamMsg); |
| | | } |
| | | }else { |
| | | MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); |
| | | redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), |
| | | callIdHeader.getCallId(), null); |
| | | mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc()); |
| | | if (userSetting.getUseCustomSsrcForParentInvite()) { |
| | | mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc()); |
| | | // 不是本平台的就发送redis消息让其他wvp停止发流 |
| | | ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId()); |
| | | if (platform != null) { |
| | | redisCatchStorage.sendPlatformStopPlayMsg(sendRtpItem, platform); |
| | | if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) { |
| | | redisRpcService.stopSendRtp(sendRtpItem.getRedisKey()); |
| | | redisCatchStorage.deleteSendRTPServer(null, null, sendRtpItem.getCallId(), null); |
| | | }else { |
| | | MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId()); |
| | | redisCatchStorage.deleteSendRTPServer(null, null, callIdHeader.getCallId(), null); |
| | | mediaServerService.stopSendRtp(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc()); |
| | | if (userSetting.getUseCustomSsrcForParentInvite()) { |
| | | mediaServerService.releaseSsrc(mediaServer.getId(), sendRtpItem.getSsrc()); |
| | | } |
| | | } |
| | | }else { |
| | | logger.info("[上级平台停止观看] 未找到平台{}的信息,发送redis消息失败", sendRtpItem.getPlatformId()); |
| | |
| | | import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; |
| | | import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; |
| | | import com.genersoft.iot.vmp.gb28181.utils.SipUtils; |
| | | import com.genersoft.iot.vmp.media.bean.MediaInfo; |
| | | import com.genersoft.iot.vmp.media.bean.MediaServer; |
| | | import com.genersoft.iot.vmp.media.event.hook.Hook; |
| | | import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; |
| | | import com.genersoft.iot.vmp.media.event.hook.HookType; |
| | | import com.genersoft.iot.vmp.media.service.IMediaServerService; |
| | | import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; |
| | | import com.genersoft.iot.vmp.service.IInviteStreamService; |
| | |
| | | import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; |
| | | import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; |
| | | import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; |
| | | import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.*; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; |
| | | import com.genersoft.iot.vmp.service.*; |
| | |
| | | String startTimeStr = DateUtil.urlFormatter.format(start); |
| | | String endTimeStr = DateUtil.urlFormatter.format(end); |
| | | String stream = device.getDeviceId() + "_" + channelId + "_" + startTimeStr + "_" + endTimeStr; |
| | | SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, stream, null, device.isSsrcCheck(), true, 0,false,!channel.isHasAudio(), false, device.getStreamModeForParam()); |
| | | SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, stream, null, device.isSsrcCheck(), true, 0,false,!channel.getHasAudio(), false, device.getStreamModeForParam()); |
| | | sendRtpItem.setStream(stream); |
| | | // 写入redis, 超时时回复 |
| | | redisCatchStorage.updateSendRTPSever(sendRtpItem); |
| | |
| | | } |
| | | |
| | | sendRtpItem.setPlayType(InviteStreamType.DOWNLOAD); |
| | | SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, null, device.isSsrcCheck(), true, 0, false,!channel.isHasAudio(), false, device.getStreamModeForParam()); |
| | | SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, null, device.isSsrcCheck(), true, 0, false,!channel.getHasAudio(), false, device.getStreamModeForParam()); |
| | | sendRtpItem.setStream(ssrcInfo.getStream()); |
| | | // 写入redis, 超时时回复 |
| | | redisCatchStorage.updateSendRTPSever(sendRtpItem); |
| | |
| | | // 从redis查询是否正在接收这个推流 |
| | | StreamPushItem pushListItem = redisCatchStorage.getPushListItem(gbStream.getApp(), gbStream.getStream()); |
| | | if (pushListItem != null) { |
| | | sendRtpItem.setServerId(pushListItem.getSeverId()); |
| | | sendRtpItem.setServerId(pushListItem.getServerId()); |
| | | sendRtpItem.setMediaServerId(pushListItem.getMediaServerId()); |
| | | |
| | | StreamPushItem transform = streamPushService.transform(pushListItem); |
| | | transform.setSelf(userSetting.getServerId().equals(pushListItem.getSeverId())); |
| | | transform.setSelf(userSetting.getServerId().equals(pushListItem.getServerId())); |
| | | redisCatchStorage.updateSendRTPSever(sendRtpItem); |
| | | // 开始推流 |
| | | sendPushStream(sendRtpItem, mediaServerItem, platform, request); |
| | |
| | | /** |
| | | * 安排推流 |
| | | */ |
| | | private void sendProxyStream(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) { |
| | | Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream()); |
| | | if (streamReady != null && streamReady) { |
| | | private void sendProxyStream(SendRtpItem sendRtpItem, MediaServer mediaServerItem, ParentPlatform platform, SIPRequest request) { |
| | | MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream()); |
| | | |
| | | if (mediaInfo != null) { |
| | | |
| | | // 自平台内容 |
| | | int localPort = sendRtpPortManager.getNextPort(mediaServerItem); |
| | |
| | | sendRtpItem.setStatus(1); |
| | | sendRtpItem.setLocalIp(mediaServerItem.getSdpIp()); |
| | | |
| | | SIPResponse response = sendStreamAck(mediaServer, request, sendRtpItem, platform, evt); |
| | | SIPResponse response = sendStreamAck(mediaServerItem, request, sendRtpItem, platform); |
| | | if (response != null) { |
| | | sendRtpItem.setToTag(response.getToTag()); |
| | | } |
| | |
| | | } |
| | | } |
| | | |
| | | private void sendPushStream(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) { |
| | | private void sendPushStream(SendRtpItem sendRtpItem, MediaServer mediaServerItem, ParentPlatform platform, SIPRequest request) { |
| | | // 推流 |
| | | if (sendRtpItem.getServerId().equals(userSetting.getServerId())) { |
| | | Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream()); |
| | | if (streamReady != null && streamReady) { |
| | | MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream()); |
| | | if (mediaInfo != null ) { |
| | | // 自平台内容 |
| | | int localPort = sendRtpPortManager.getNextPort(mediaServerItem); |
| | | if (localPort == 0) { |
| | |
| | | } |
| | | // 写入redis, 超时时回复 |
| | | sendRtpItem.setStatus(1); |
| | | SIPResponse response = sendStreamAck(request, sendRtpItem, platform); |
| | | SIPResponse response = sendStreamAck(mediaServerItem, request, sendRtpItem, platform); |
| | | if (response != null) { |
| | | sendRtpItem.setToTag(response.getToTag()); |
| | | } |
| | |
| | | /** |
| | | * 通知流上线 |
| | | */ |
| | | private void notifyProxyStreamOnline(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) { |
| | | private void notifyProxyStreamOnline(SendRtpItem sendRtpItem, MediaServer mediaServerItem, ParentPlatform platform, SIPRequest request) { |
| | | // TODO 控制启用以使设备上线 |
| | | logger.info("[ app={}, stream={} ]通道未推流,启用流后开始推流", sendRtpItem.getApp(), sendRtpItem.getStream()); |
| | | // 监听流上线 |
| | | HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", mediaServerItem.getId()); |
| | | zlmHttpHookSubscribe.addSubscribe(hookSubscribe, (mediaServerItemInUSe, hookParam) -> { |
| | | OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam; |
| | | logger.info("[上级点播]拉流代理已经就绪, {}/{}", streamChangedHookParam.getApp(), streamChangedHookParam.getStream()); |
| | | Hook hook = Hook.getInstance(HookType.on_media_arrival, sendRtpItem.getApp(), sendRtpItem.getStream(), mediaServerItem.getId()); |
| | | hookSubscribe.addSubscribe(hook, (hookData)->{ |
| | | logger.info("[上级点播]拉流代理已经就绪, {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); |
| | | dynamicTask.stop(sendRtpItem.getCallId()); |
| | | sendProxyStream(sendRtpItem, mediaServerItem, platform, request); |
| | | }); |
| | | dynamicTask.startDelay(sendRtpItem.getCallId(), () -> { |
| | | logger.info("[ app={}, stream={} ] 等待拉流代理流超时", sendRtpItem.getApp(), sendRtpItem.getStream()); |
| | | zlmHttpHookSubscribe.removeSubscribe(hookSubscribe); |
| | | hookSubscribe.removeSubscribe(hook); |
| | | }, userSetting.getPlatformPlayTimeout()); |
| | | boolean start = streamProxyService.start(sendRtpItem.getApp(), sendRtpItem.getStream()); |
| | | if (!start) { |
| | |
| | | } catch (SipException | InvalidArgumentException | ParseException e) { |
| | | logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage()); |
| | | } |
| | | zlmHttpHookSubscribe.removeSubscribe(hookSubscribe); |
| | | hookSubscribe.removeSubscribe(hook); |
| | | dynamicTask.stop(sendRtpItem.getCallId()); |
| | | } |
| | | } |
| | |
| | | /** |
| | | * 通知流上线 |
| | | */ |
| | | private void notifyPushStreamOnline(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) { |
| | | private void notifyPushStreamOnline(SendRtpItem sendRtpItem, MediaServer mediaServerItem, ParentPlatform platform, SIPRequest request) { |
| | | // 发送redis消息以使设备上线,流上线后被 |
| | | logger.info("[ app={}, stream={} ]通道未推流,发送redis信息控制设备开始推流", sendRtpItem.getApp(), sendRtpItem.getStream()); |
| | | MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1, |
| | |
| | | redisCatchStorage.updateSendRTPSever(sendRtpItem); |
| | | } |
| | | |
| | | public SIPResponse sendStreamAck(MediaServerItem mediaServerItem, SIPRequest request, SendRtpItem sendRtpItem, ParentPlatform platform, RequestEvent evt) { |
| | | public SIPResponse sendStreamAck(MediaServer mediaServerItem, SIPRequest request, SendRtpItem sendRtpItem, ParentPlatform platform) { |
| | | |
| | | String sdpIp = sendRtpItem.getLocalIp(); |
| | | if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) { |
| | |
| | | logger.info("设备{}请求语音流,地址:{}:{},ssrc:{}, {}", requesterId, addressStr, port, gb28181Sdp.getSsrc(), |
| | | mediaTransmissionTCP ? (tcpActive ? "TCP主动" : "TCP被动") : "UDP"); |
| | | |
| | | MediaServer mediaServerItem = broadcastCatch.getMediaServerItem(); |
| | | MediaServer mediaServerItem = broadcastCatch.getMediaServer(); |
| | | if (mediaServerItem == null) { |
| | | logger.warn("未找到语音喊话使用的zlm"); |
| | | try { |
| | |
| | | }catch (Exception e) { |
| | | logger.error("[向上级转发移动位置失败] ", e); |
| | | } |
| | | if (mobilePosition.getChannelId().equals(mobilePosition.getDeviceId()) || mobilePosition.getChannelId() == null) { |
| | | if (mobilePosition.getChannelId() == null || mobilePosition.getChannelId().equals(mobilePosition.getDeviceId())) { |
| | | List<DeviceChannel> channels = deviceChannelService.queryChaneListByDeviceId(mobilePosition.getDeviceId()); |
| | | channels.forEach(channel -> { |
| | | // 发送redis消息。 通知位置信息的变化 |
| | |
| | | |
| | | SendRtpItem createSendRtpItem(MediaServer serverItem, String ip, int port, String ssrc, String platformId, |
| | | String app, String stream, String channelId, boolean tcp, boolean rtcp); |
| | | |
| | | MediaServer getMediaServerByAppAndStream(String app, String stream); |
| | | } |
| | |
| | | sendRtpItem.setRtcp(rtcp); |
| | | return sendRtpItem; |
| | | } |
| | | |
| | | @Override |
| | | public MediaServer getMediaServerByAppAndStream(String app, String stream) { |
| | | List<MediaServer> mediaServerList = getAll(); |
| | | for (MediaServer mediaServer : mediaServerList) { |
| | | MediaInfo mediaInfo = getMediaInfo(mediaServer, app, stream); |
| | | if (mediaInfo != null) { |
| | | return mediaServer; |
| | | } |
| | | } |
| | | return null; |
| | | } |
| | | } |
| | |
| | | import com.genersoft.iot.vmp.conf.UserSetting;
|
| | | import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
|
| | | import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
|
| | | import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
|
| | | import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
|
| | | import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
|
| | | import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
|
| | | import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
|
| | | import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
|
| | | import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
|
| | | import com.genersoft.iot.vmp.media.bean.MediaServer;
|
| | | import com.genersoft.iot.vmp.media.bean.ResultForOnPublish;
|
| | | import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
|
| | | import com.genersoft.iot.vmp.media.event.media.*;
|
| | | import com.genersoft.iot.vmp.media.event.mediaServer.MediaSendRtpStoppedEvent;
|
| | | import com.genersoft.iot.vmp.media.service.IMediaServerService;
|
| | | import com.genersoft.iot.vmp.media.bean.MediaServer;
|
| | | import com.genersoft.iot.vmp.media.zlm.dto.ZLMServerConfig;
|
| | | import com.genersoft.iot.vmp.media.zlm.dto.hook.*;
|
| | | import com.genersoft.iot.vmp.media.zlm.event.HookZlmServerKeepaliveEvent;
|
| | | import com.genersoft.iot.vmp.media.zlm.event.HookZlmServerStartEvent;
|
| | | import com.genersoft.iot.vmp.service.*;
|
| | | import com.genersoft.iot.vmp.service.bean.SSRCInfo;
|
| | | import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
|
| | | import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
| | | import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
|
| | |
| | | if (mediaServer == null) {
|
| | | return new HookResultForOnPublish(0, "success");
|
| | | }
|
| | | // 推流鉴权的处理
|
| | | if (!"rtp".equals(param.getApp())) {
|
| | | StreamProxyItem stream = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream());
|
| | | if (stream != null) {
|
| | | HookResultForOnPublish result = HookResultForOnPublish.SUCCESS();
|
| | | result.setEnable_audio(stream.isEnableAudio());
|
| | | result.setEnable_mp4(stream.isEnableMp4());
|
| | | return result;
|
| | | }
|
| | | if (userSetting.getPushAuthority()) {
|
| | | // 推流鉴权
|
| | | if (param.getParams() == null) {
|
| | | logger.info("推流鉴权失败: 缺少必要参数:sign=md5(user表的pushKey)");
|
| | | return new HookResultForOnPublish(401, "Unauthorized");
|
| | | }
|
| | | Map<String, String> paramMap = urlParamToMap(param.getParams());
|
| | | String sign = paramMap.get("sign");
|
| | | if (sign == null) {
|
| | | logger.info("推流鉴权失败: 缺少必要参数:sign=md5(user表的pushKey)");
|
| | | return new HookResultForOnPublish(401, "Unauthorized");
|
| | | }
|
| | | // 推流自定义播放鉴权码
|
| | | String callId = paramMap.get("callId");
|
| | | // 鉴权配置
|
| | | boolean hasAuthority = userService.checkPushAuthority(callId, sign);
|
| | | if (!hasAuthority) {
|
| | | logger.info("推流鉴权失败: sign 无权限: callId={}. sign={}", callId, sign);
|
| | | return new HookResultForOnPublish(401, "Unauthorized");
|
| | | }
|
| | | StreamAuthorityInfo streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(param);
|
| | | streamAuthorityInfo.setCallId(callId);
|
| | | streamAuthorityInfo.setSign(sign);
|
| | | // 鉴权通过
|
| | | redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), param.getStream(), streamAuthorityInfo);
|
| | | }
|
| | | } else {
|
| | | zlmMediaListManager.sendStreamEvent(param.getApp(), param.getStream(), param.getMediaServerId());
|
| | | }
|
| | |
|
| | |
|
| | | HookResultForOnPublish result = HookResultForOnPublish.SUCCESS();
|
| | | result.setEnable_audio(true);
|
| | | taskExecutor.execute(() -> {
|
| | | ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_publish, json);
|
| | | if (subscribe != null) {
|
| | | subscribe.response(mediaInfo, param);
|
| | | }
|
| | | });
|
| | |
|
| | | ResultForOnPublish resultForOnPublish = mediaService.authenticatePublish(mediaServer, param.getApp(), param.getStream(), param.getParams());
|
| | | if (resultForOnPublish != null) {
|
| | |
| | | applicationEventPublisher.publishEvent(mediaDepartureEvent);
|
| | | }
|
| | |
|
| | | JSONObject json = (JSONObject) JSON.toJSON(param);
|
| | | taskExecutor.execute(() -> {
|
| | | ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_stream_changed, json);
|
| | | MediaServerItem mediaInfo = mediaServerService.getOne(param.getMediaServerId());
|
| | | if (mediaInfo == null) {
|
| | | logger.info("[ZLM HOOK] 流变化未找到ZLM, {}", param.getMediaServerId());
|
| | | return;
|
| | | }
|
| | | if (subscribe != null) {
|
| | | subscribe.response(mediaInfo, param);
|
| | | }
|
| | |
|
| | | List<OnStreamChangedHookParam.MediaTrack> tracks = param.getTracks();
|
| | | // TODO 重构此处逻辑
|
| | | if (param.isRegist()) {
|
| | | // 处理流注册的鉴权信息, 流注销这里不再删除鉴权信息,下次来了新的鉴权信息会对就的进行覆盖
|
| | | if (param.getOriginType() == OriginType.RTMP_PUSH.ordinal()
|
| | | || param.getOriginType() == OriginType.RTSP_PUSH.ordinal()
|
| | | || param.getOriginType() == OriginType.RTC_PUSH.ordinal()) {
|
| | | StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(param.getApp(), param.getStream());
|
| | | if (streamAuthorityInfo == null) {
|
| | | streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(param);
|
| | | } else {
|
| | | streamAuthorityInfo.setOriginType(param.getOriginType());
|
| | | streamAuthorityInfo.setOriginTypeStr(param.getOriginTypeStr());
|
| | | }
|
| | | redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), param.getStream(), streamAuthorityInfo);
|
| | | }
|
| | | }
|
| | | if ("rtsp".equals(param.getSchema())) {
|
| | | logger.info("流变化:注册->{}, app->{}, stream->{}", param.isRegist(), param.getApp(), param.getStream());
|
| | | if (param.isRegist()) {
|
| | | mediaServerService.addCount(param.getMediaServerId());
|
| | | } else {
|
| | | mediaServerService.removeCount(param.getMediaServerId());
|
| | | }
|
| | |
|
| | | int updateStatusResult = streamProxyService.updateStatus(param.isRegist(), param.getApp(), param.getStream());
|
| | | if (updateStatusResult > 0) {
|
| | |
|
| | | }
|
| | |
|
| | | if ("rtp".equals(param.getApp()) && !param.isRegist()) {
|
| | | InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, param.getStream());
|
| | | if (inviteInfo != null && (inviteInfo.getType() == InviteSessionType.PLAY || inviteInfo.getType() == InviteSessionType.PLAYBACK)) {
|
| | | inviteStreamService.removeInviteInfo(inviteInfo);
|
| | | storager.stopPlay(inviteInfo.getDeviceId(), inviteInfo.getChannelId());
|
| | | }
|
| | | } else if ("broadcast".equals(param.getApp())) {
|
| | | // 语音对讲推流 stream需要满足格式deviceId_channelId
|
| | | if (param.getStream().indexOf("_") > 0) {
|
| | | String[] streamArray = param.getStream().split("_");
|
| | | if (streamArray.length == 2) {
|
| | | String deviceId = streamArray[0];
|
| | | String channelId = streamArray[1];
|
| | | Device device = deviceService.getDevice(deviceId);
|
| | | if (device != null) {
|
| | | if (param.isRegist()) {
|
| | | if (audioBroadcastManager.exit(deviceId, channelId)) {
|
| | | playService.stopAudioBroadcast(deviceId, channelId);
|
| | | }
|
| | | // 开启语音对讲通道
|
| | | try {
|
| | | playService.audioBroadcastCmd(device, channelId, mediaInfo, param.getApp(), param.getStream(), 60, false, (msg) -> {
|
| | | logger.info("[语音对讲] 通道建立成功, device: {}, channel: {}", deviceId, channelId);
|
| | | });
|
| | | } catch (InvalidArgumentException | ParseException | SipException e) {
|
| | | logger.error("[命令发送失败] 语音对讲: {}", e.getMessage());
|
| | | }
|
| | | } else {
|
| | | // 流注销
|
| | | playService.stopAudioBroadcast(deviceId, channelId);
|
| | | }
|
| | | } else {
|
| | | logger.info("[语音对讲] 未找到设备:{}", deviceId);
|
| | | }
|
| | | }
|
| | | }
|
| | | } else if ("talk".equals(param.getApp())) {
|
| | | // 语音对讲推流 stream需要满足格式deviceId_channelId
|
| | | if (param.getStream().indexOf("_") > 0) {
|
| | | String[] streamArray = param.getStream().split("_");
|
| | | if (streamArray.length == 2) {
|
| | | String deviceId = streamArray[0];
|
| | | String channelId = streamArray[1];
|
| | | Device device = deviceService.getDevice(deviceId);
|
| | | if (device != null) {
|
| | | if (param.isRegist()) {
|
| | | if (audioBroadcastManager.exit(deviceId, channelId)) {
|
| | | playService.stopAudioBroadcast(deviceId, channelId);
|
| | | }
|
| | | // 开启语音对讲通道
|
| | | playService.talkCmd(device, channelId, mediaInfo, param.getStream(), (msg) -> {
|
| | | logger.info("[语音对讲] 通道建立成功, device: {}, channel: {}", deviceId, channelId);
|
| | | });
|
| | | } else {
|
| | | // 流注销
|
| | | playService.stopTalk(device, channelId, param.isRegist());
|
| | | }
|
| | | } else {
|
| | | logger.info("[语音对讲] 未找到设备:{}", deviceId);
|
| | | }
|
| | | }
|
| | | }
|
| | |
|
| | | } else {
|
| | | if (!"rtp".equals(param.getApp())) {
|
| | | String type = OriginType.values()[param.getOriginType()].getType();
|
| | | if (param.isRegist()) {
|
| | | StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(
|
| | | param.getApp(), param.getStream());
|
| | | String callId = null;
|
| | | if (streamAuthorityInfo != null) {
|
| | | callId = streamAuthorityInfo.getCallId();
|
| | | }
|
| | | StreamInfo streamInfoByAppAndStream = mediaService.getStreamInfoByAppAndStream(mediaInfo,
|
| | | param.getApp(), param.getStream(), tracks, callId);
|
| | | param.setStreamInfo(new StreamContent(streamInfoByAppAndStream));
|
| | | redisCatchStorage.addStream(mediaInfo, type, param.getApp(), param.getStream(), param);
|
| | | if (param.getOriginType() == OriginType.RTSP_PUSH.ordinal()
|
| | | || param.getOriginType() == OriginType.RTMP_PUSH.ordinal()
|
| | | || param.getOriginType() == OriginType.RTC_PUSH.ordinal()) {
|
| | | param.setSeverId(userSetting.getServerId());
|
| | | zlmMediaListManager.addPush(param);
|
| | |
|
| | | // 冗余数据,自己系统中自用
|
| | | redisCatchStorage.addPushListItem(param.getApp(), param.getStream(), param);
|
| | | }
|
| | | } else {
|
| | | // 兼容流注销时类型从redis记录获取
|
| | | OnStreamChangedHookParam onStreamChangedHookParam = redisCatchStorage.getStreamInfo(
|
| | | param.getApp(), param.getStream(), param.getMediaServerId());
|
| | | if (onStreamChangedHookParam != null) {
|
| | | type = OriginType.values()[onStreamChangedHookParam.getOriginType()].getType();
|
| | | redisCatchStorage.removeStream(mediaInfo.getId(), type, param.getApp(), param.getStream());
|
| | | if ("PUSH".equalsIgnoreCase(type)) {
|
| | | // 冗余数据,自己系统中自用
|
| | | redisCatchStorage.removePushListItem(param.getApp(), param.getStream(), param.getMediaServerId());
|
| | | }
|
| | | }
|
| | | GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream());
|
| | | if (gbStream != null) {
|
| | | // eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF);
|
| | | }
|
| | | zlmMediaListManager.removeMedia(param.getApp(), param.getStream());
|
| | | }
|
| | | GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream());
|
| | | if (gbStream != null) {
|
| | | if (userSetting.isUsePushingAsStatus()) {
|
| | | eventPublisher.catalogEventPublishForStream(null, gbStream, param.isRegist() ? CatalogEvent.ON : CatalogEvent.OFF);
|
| | | }
|
| | | }
|
| | | if (type != null) {
|
| | | // 发送流变化redis消息
|
| | | JSONObject jsonObject = new JSONObject();
|
| | | jsonObject.put("serverId", userSetting.getServerId());
|
| | | jsonObject.put("app", param.getApp());
|
| | | jsonObject.put("stream", param.getStream());
|
| | | jsonObject.put("register", param.isRegist());
|
| | | jsonObject.put("mediaServerId", param.getMediaServerId());
|
| | | redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
|
| | | }
|
| | | }
|
| | | }
|
| | | if (!param.isRegist()) {
|
| | | List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(param.getStream());
|
| | | if (!sendRtpItems.isEmpty()) {
|
| | | for (SendRtpItem sendRtpItem : sendRtpItems) {
|
| | | if (sendRtpItem == null) {
|
| | | continue;
|
| | | }
|
| | |
|
| | | if (sendRtpItem.getApp().equals(param.getApp())) {
|
| | | logger.info(sendRtpItem.toString());
|
| | | if (userSetting.getServerId().equals(sendRtpItem.getServerId())) {
|
| | | MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
|
| | | sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
|
| | | sendRtpItem.getPlatformId(), null, userSetting.getServerId(), param.getMediaServerId());
|
| | | // 通知其他wvp停止发流
|
| | | redisCatchStorage.sendPushStreamClose(messageForPushChannel);
|
| | | }else {
|
| | | String platformId = sendRtpItem.getPlatformId();
|
| | | ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
|
| | | Device device = deviceService.getDevice(platformId);
|
| | |
|
| | | try {
|
| | | if (platform != null) {
|
| | | commanderFroPlatform.streamByeCmd(platform, sendRtpItem);
|
| | | redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(),
|
| | | sendRtpItem.getCallId(), sendRtpItem.getStream());
|
| | | } else {
|
| | | cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId());
|
| | | if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST)
|
| | | || sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) {
|
| | | AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
|
| | | if (audioBroadcastCatch != null) {
|
| | | // 来自上级平台的停止对讲
|
| | | logger.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
|
| | | audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
|
| | | }
|
| | | }
|
| | | }
|
| | | } catch (SipException | InvalidArgumentException | ParseException |
|
| | | SsrcTransactionNotFoundException e) {
|
| | | logger.error("[命令发送失败] 发送BYE: {}", e.getMessage());
|
| | | }
|
| | | }
|
| | |
|
| | | }
|
| | | }
|
| | | }
|
| | | }
|
| | | }
|
| | | });
|
| | | return HookResult.SUCCESS();
|
| | | }
|
| | |
|
| | |
| | | logger.info("[ZLM HOOK]流无人观看:{}->{}->{}/{}", param.getMediaServerId(), param.getSchema(),
|
| | | param.getApp(), param.getStream());
|
| | | JSONObject ret = new JSONObject();
|
| | | ret.put("code", 0);
|
| | | // 国标类型的流
|
| | | if ("rtp".equals(param.getApp())) {
|
| | | ret.put("close", userSetting.getStreamOnDemand());
|
| | | // 国标流, 点播/录像回放/录像下载
|
| | | InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, param.getStream());
|
| | | // 点播
|
| | | if (inviteInfo != null) {
|
| | | // 录像下载
|
| | | if (inviteInfo.getType() == InviteSessionType.DOWNLOAD) {
|
| | | ret.put("close", false);
|
| | | return ret;
|
| | | }
|
| | | // 收到无人观看说明流也没有在往上级推送
|
| | | if (redisCatchStorage.isChannelSendingRTP(inviteInfo.getChannelId())) {
|
| | | List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChannelId(
|
| | | inviteInfo.getChannelId());
|
| | | if (!sendRtpItems.isEmpty()) {
|
| | | for (SendRtpItem sendRtpItem : sendRtpItems) {
|
| | | ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
|
| | | try {
|
| | | commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem.getCallId());
|
| | | } catch (SipException | InvalidArgumentException | ParseException e) {
|
| | | logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
|
| | | }
|
| | | redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(),
|
| | | sendRtpItem.getCallId(), sendRtpItem.getStream());
|
| | | if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) {
|
| | | MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
|
| | | sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
|
| | | sendRtpItem.getPlatformId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
|
| | | messageForPushChannel.setPlatFormIndex(parentPlatform.getId());
|
| | | redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
|
| | | }
|
| | | }
|
| | | }
|
| | | }
|
| | | Device device = deviceService.getDevice(inviteInfo.getDeviceId());
|
| | | if (device != null) {
|
| | | try {
|
| | | // 多查询一次防止已经被处理了
|
| | | InviteInfo info = inviteStreamService.getInviteInfo(inviteInfo.getType(),
|
| | | inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
|
| | | if (info != null) {
|
| | | cmder.streamByeCmd(device, inviteInfo.getChannelId(),
|
| | | inviteInfo.getStream(), null);
|
| | | } else {
|
| | | logger.info("[无人观看] 未找到设备的点播信息: {}, 流:{}", inviteInfo.getDeviceId(), param.getStream());
|
| | | }
|
| | | } catch (InvalidArgumentException | ParseException | SipException |
|
| | | SsrcTransactionNotFoundException e) {
|
| | | logger.error("[无人观看]点播, 发送BYE失败 {}", e.getMessage());
|
| | | }
|
| | | } else {
|
| | | logger.info("[无人观看] 未找到设备: {},流:{}", inviteInfo.getDeviceId(), param.getStream());
|
| | | }
|
| | |
|
| | | boolean close = mediaService.closeStreamOnNoneReader(param.getMediaServerId(), param.getApp(), param.getStream(), param.getSchema());
|
| | | ret.put("code", close);
|
| | | return ret;
|
| | |
| | | if (!"rtp".equals(param.getApp())) {
|
| | | return HookResult.SUCCESS();
|
| | | }
|
| | | taskExecutor.execute(() -> {
|
| | | List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(param.getStream());
|
| | | if (sendRtpItems.size() > 0) {
|
| | | for (SendRtpItem sendRtpItem : sendRtpItems) {
|
| | | ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
|
| | | ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
|
| | | try {
|
| | | commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem.getCallId());
|
| | | } catch (SipException | InvalidArgumentException | ParseException e) {
|
| | | logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
|
| | | }
|
| | | redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(),
|
| | | sendRtpItem.getCallId(), sendRtpItem.getStream());
|
| | | }
|
| | | try {
|
| | | MediaSendRtpStoppedEvent event = new MediaSendRtpStoppedEvent(this);
|
| | | MediaServer mediaServerItem = mediaServerService.getOne(param.getMediaServerId());
|
| | | if (mediaServerItem != null) {
|
| | | event.setMediaServer(mediaServerItem);
|
| | | applicationEventPublisher.publishEvent(event);
|
| | | }
|
| | | });
|
| | | }catch (Exception e) {
|
| | | logger.info("[ZLM-HOOK-rtp发送关闭] 发送通知失败 ", e);
|
| | | }
|
| | |
|
| | | return HookResult.SUCCESS();
|
| | | }
|
| | |
| | | param.put("ssrc", ssrc); |
| | | } |
| | | JSONObject jsonObject = zlmresTfulUtils.stopSendRtp(mediaInfo, param); |
| | | return (jsonObject != null && jsonObject.getInteger("code") == 0); |
| | | if (jsonObject == null || jsonObject.getInteger("code") != 0 ) { |
| | | logger.error("停止发流失败: {}, 参数:{}", jsonObject.getString("msg"), JSON.toJSONString(param)); |
| | | throw new ControllerException(jsonObject.getInteger("code"), jsonObject.getString("msg")); |
| | | } |
| | | return true; |
| | | |
| | | } |
| | | |
| | |
| | | return result; |
| | | } |
| | | |
| | | public JSONObject stopSendRtpStream(MediaServerItem mediaServerItem, SendRtpItem sendRtpItem) { |
| | | public JSONObject stopSendRtpStream(MediaServer mediaServerItem, SendRtpItem sendRtpItem) { |
| | | Map<String, Object> param = new HashMap<>(); |
| | | param.put("vhost", "__defaultVhost__"); |
| | | param.put("app", sendRtpItem.getApp()); |
| | |
| | | |
| | | import com.alibaba.fastjson2.JSONObject; |
| | | import com.genersoft.iot.vmp.conf.UserSetting; |
| | | import com.genersoft.iot.vmp.conf.exception.ControllerException; |
| | | import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig; |
| | | import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage; |
| | | import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest; |
| | |
| | | import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; |
| | | import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; |
| | | import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; |
| | | import com.genersoft.iot.vmp.media.bean.MediaInfo; |
| | | import com.genersoft.iot.vmp.media.bean.MediaServer; |
| | | import com.genersoft.iot.vmp.media.event.hook.Hook; |
| | | import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; |
| | | import com.genersoft.iot.vmp.media.event.hook.HookType; |
| | | import com.genersoft.iot.vmp.media.service.IMediaServerService; |
| | | import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; |
| | | import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; |
| | | import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; |
| | | import com.genersoft.iot.vmp.service.IMediaServerService; |
| | | import com.genersoft.iot.vmp.storager.IRedisCatchStorage; |
| | | import com.genersoft.iot.vmp.storager.IVideoManagerStorage; |
| | | import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; |
| | |
| | | private UserSetting userSetting; |
| | | |
| | | @Autowired |
| | | private ZlmHttpHookSubscribe hookSubscribe; |
| | | |
| | | @Autowired |
| | | private ZLMServerFactory zlmServerFactory; |
| | | |
| | | private HookSubscribe hookSubscribe; |
| | | |
| | | @Autowired |
| | | private RedisTemplate<Object, Object> redisTemplate; |
| | |
| | | } |
| | | logger.info("[redis-rpc] 获取发流的信息: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort()); |
| | | // 查询本级是否有这个流 |
| | | MediaServerItem mediaServerItem = mediaServerService.getMediaServerByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream()); |
| | | MediaServer mediaServerItem = mediaServerService.getMediaServerByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream()); |
| | | if (mediaServerItem == null) { |
| | | RedisRpcResponse response = request.getResponse(); |
| | | response.setStatusCode(200); |
| | |
| | | SendRtpItem sendRtpItem = JSONObject.parseObject(request.getParam().toString(), SendRtpItem.class); |
| | | logger.info("[redis-rpc] 监听流上线: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort()); |
| | | // 查询本级是否有这个流 |
| | | MediaServerItem mediaServerItem = mediaServerService.getMediaServerByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream()); |
| | | if (mediaServerItem != null) { |
| | | MediaServer mediaServer = mediaServerService.getMediaServerByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream()); |
| | | if (mediaServer != null) { |
| | | logger.info("[redis-rpc] 监听流上线时发现流已存在直接返回: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() ); |
| | | // 读取redis中的上级点播信息,生成sendRtpItm发送出去 |
| | | if (sendRtpItem.getSsrc() == null) { |
| | | // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 |
| | | String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); |
| | | String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServer.getId()) : ssrcFactory.getPlayBackSsrc(mediaServer.getId()); |
| | | sendRtpItem.setSsrc(ssrc); |
| | | } |
| | | sendRtpItem.setMediaServerId(mediaServerItem.getId()); |
| | | sendRtpItem.setLocalIp(mediaServerItem.getSdpIp()); |
| | | sendRtpItem.setMediaServerId(mediaServer.getId()); |
| | | sendRtpItem.setLocalIp(mediaServer.getSdpIp()); |
| | | sendRtpItem.setServerId(userSetting.getServerId()); |
| | | |
| | | redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem); |
| | |
| | | response.setStatusCode(200); |
| | | } |
| | | // 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者 |
| | | HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed( |
| | | sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null); |
| | | |
| | | hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> { |
| | | Hook hook = Hook.getInstance(HookType.on_media_arrival, sendRtpItem.getApp(), sendRtpItem.getStream(), null); |
| | | hookSubscribe.addSubscribe(hook, (hookData) -> { |
| | | logger.info("[redis-rpc] 监听流上线,流已上线: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort()); |
| | | // 读取redis中的上级点播信息,生成sendRtpItm发送出去 |
| | | if (sendRtpItem.getSsrc() == null) { |
| | | // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 |
| | | String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItemInUse.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItemInUse.getId()); |
| | | String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(hookData.getMediaServer().getId()) : ssrcFactory.getPlayBackSsrc(hookData.getMediaServer().getId()); |
| | | sendRtpItem.setSsrc(ssrc); |
| | | } |
| | | sendRtpItem.setMediaServerId(mediaServerItemInUse.getId()); |
| | | sendRtpItem.setLocalIp(mediaServerItemInUse.getSdpIp()); |
| | | sendRtpItem.setMediaServerId(hookData.getMediaServer().getId()); |
| | | sendRtpItem.setLocalIp(hookData.getMediaServer().getSdpIp()); |
| | | sendRtpItem.setServerId(userSetting.getServerId()); |
| | | |
| | | redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem); |
| | |
| | | SendRtpItem sendRtpItem = JSONObject.parseObject(request.getParam().toString(), SendRtpItem.class); |
| | | logger.info("[redis-rpc] 停止监听流上线: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() ); |
| | | // 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者 |
| | | HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed( |
| | | sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null); |
| | | Hook hook = Hook.getInstance(HookType.on_media_arrival, sendRtpItem.getApp(), sendRtpItem.getStream(), null); |
| | | hookSubscribe.removeSubscribe(hook); |
| | | RedisRpcResponse response = request.getResponse(); |
| | | response.setStatusCode(200); |
| | |
| | | return response; |
| | | } |
| | | logger.info("[redis-rpc] 开始发流: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort()); |
| | | MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId()); |
| | | if (mediaServerItem == null) { |
| | | MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId()); |
| | | if (mediaServer == null) { |
| | | logger.info("[redis-rpc] startSendRtp->未找到MediaServer: {}", sendRtpItem.getMediaServerId() ); |
| | | WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到MediaServer"); |
| | | response.setBody(wvpResult); |
| | | return response; |
| | | } |
| | | |
| | | Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream()); |
| | | if (!streamReady) { |
| | | MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream()); |
| | | if (mediaInfo != null) { |
| | | logger.info("[redis-rpc] startSendRtp->流不在线: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() ); |
| | | WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "流不在线"); |
| | | response.setBody(wvpResult); |
| | | return response; |
| | | } |
| | | JSONObject jsonObject = zlmServerFactory.startSendRtp(mediaServerItem, sendRtpItem); |
| | | if (jsonObject.getInteger("code") == 0) { |
| | | logger.info("[redis-rpc] 发流成功: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort()); |
| | | WVPResult wvpResult = WVPResult.success(); |
| | | try { |
| | | mediaServerService.startSendRtp(mediaServer, null, sendRtpItem); |
| | | }catch (ControllerException exception) { |
| | | logger.info("[redis-rpc] 发流失败: {}/{}, 目标地址: {}:{}, {}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), exception.getMsg()); |
| | | WVPResult wvpResult = WVPResult.fail(exception.getCode(), exception.getMsg()); |
| | | response.setBody(wvpResult); |
| | | }else { |
| | | logger.info("[redis-rpc] 发流失败: {}/{}, 目标地址: {}:{}, {}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), jsonObject); |
| | | WVPResult wvpResult = WVPResult.fail(jsonObject.getInteger("code"), jsonObject.getString("msg")); |
| | | response.setBody(wvpResult); |
| | | return response; |
| | | } |
| | | logger.info("[redis-rpc] 发流成功: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort()); |
| | | WVPResult wvpResult = WVPResult.success(); |
| | | response.setBody(wvpResult); |
| | | return response; |
| | | } |
| | | |
| | |
| | | return response; |
| | | } |
| | | logger.info("[redis-rpc] 停止推流: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() ); |
| | | MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId()); |
| | | if (mediaServerItem == null) { |
| | | MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId()); |
| | | if (mediaServer == null) { |
| | | logger.info("[redis-rpc] stopSendRtp->未找到MediaServer: {}", sendRtpItem.getMediaServerId() ); |
| | | WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到MediaServer"); |
| | | response.setBody(wvpResult); |
| | | return response; |
| | | } |
| | | JSONObject jsonObject = zlmServerFactory.stopSendRtpStream(mediaServerItem, sendRtpItem); |
| | | if (jsonObject.getInteger("code") == 0) { |
| | | logger.info("[redis-rpc] 停止推流成功: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() ); |
| | | response.setBody(WVPResult.success()); |
| | | return response; |
| | | }else { |
| | | int code = jsonObject.getInteger("code"); |
| | | String msg = jsonObject.getString("msg"); |
| | | logger.info("[redis-rpc] 停止推流失败: {}/{}, 目标地址: {}:{}, code: {}, msg: {}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), code, msg ); |
| | | response.setBody(WVPResult.fail(code, msg)); |
| | | try { |
| | | mediaServerService.stopSendRtp(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc()); |
| | | }catch (ControllerException exception) { |
| | | logger.info("[redis-rpc] 停止推流失败: {}/{}, 目标地址: {}:{}, code: {}, msg: {}", sendRtpItem.getApp(), |
| | | sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), exception.getCode(), exception.getMsg() ); |
| | | response.setBody(WVPResult.fail(exception.getCode(), exception.getMsg())); |
| | | return response; |
| | | } |
| | | logger.info("[redis-rpc] 停止推流成功: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() ); |
| | | response.setBody(WVPResult.success()); |
| | | return response; |
| | | } |
| | | |
| | | /** |
| | |
| | | |
| | | import com.alibaba.fastjson2.JSONObject; |
| | | import com.genersoft.iot.vmp.common.SystemAllInfo; |
| | | import com.genersoft.iot.vmp.gb28181.bean.AlarmChannelMessage; |
| | | import com.genersoft.iot.vmp.gb28181.bean.Device; |
| | | import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch; |
| | | import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; |
| | | import com.genersoft.iot.vmp.gb28181.bean.*; |
| | | import com.genersoft.iot.vmp.media.bean.MediaInfo; |
| | | import com.genersoft.iot.vmp.media.bean.MediaServer; |
| | | import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; |
| | |
| | | "<if test='item.gpsTime != null'>, gps_time=#{item.gpsTime}</if>" + |
| | | "<if test='item.streamIdentification != null'>, stream_identification=#{item.streamIdentification}</if>" + |
| | | "<if test='item.id > 0'>WHERE id=#{item.id}</if>" + |
| | | "<if test='item.id == 0'>WHERE device_id=#{item.deviceId} AND channel_id=#{item.channelId}</if>" + |
| | | "<if test='item.id == 0 and item.channelId != null '>WHERE device_id=#{item.deviceId} AND channel_id=#{item.channelId}</if>" + |
| | | "<if test='item.id == 0 and item.channelId == null '>WHERE device_id=#{item.deviceId}</if>" + |
| | | "</foreach>" + |
| | | "</script>"}) |
| | | int batchUpdate(List<DeviceChannel> updateChannels); |