| | |
| | | @Autowired |
| | | private RedisPushStreamResponseListener redisPushStreamResponseListener; |
| | | |
| | | @Autowired |
| | | private RedisPushStreamCloseResponseListener redisPushStreamCloseResponseListener; |
| | | |
| | | |
| | | /** |
| | | * redis消息监听器容器 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器 |
| | |
| | | container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE)); |
| | | container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE)); |
| | | container.addMessageListener(redisPushStreamResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE)); |
| | | // container.addMessageListener(, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED)); |
| | | container.addMessageListener(redisPushStreamCloseResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED)); |
| | | return container; |
| | | } |
| | | } |
| | |
| | | viaHeader.setRPort(); |
| | | viaHeaders.add(viaHeader); |
| | | // from |
| | | SipURI fromSipURI = SipFactory.getInstance().createAddressFactory().createSipURI(platform.getDeviceGBId(), |
| | | SipURI fromSipURI = SipFactory.getInstance().createAddressFactory().createSipURI(sendRtpItem.getChannelId(), |
| | | platform.getDeviceIp() + ":" + platform.getDevicePort()); |
| | | Address fromAddress = SipFactory.getInstance().createAddressFactory().createAddress(fromSipURI); |
| | | FromHeader fromHeader = SipFactory.getInstance().createHeaderFactory().createFromHeader(fromAddress, sendRtpItem.getToTag()); |
| | |
| | | MaxForwardsHeader maxForwards = SipFactory.getInstance().createHeaderFactory().createMaxForwardsHeader(70); |
| | | // ceq |
| | | CSeqHeader cSeqHeader = SipFactory.getInstance().createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(), Request.BYE); |
| | | MessageFactoryImpl messageFactory = (MessageFactoryImpl) SipFactory.getInstance().createMessageFactory(); |
| | | // 设置编码, 防止中文乱码 |
| | | messageFactory.setDefaultContentEncodingCharset("gb2312"); |
| | | |
| | | CallIdHeader callIdHeader = SipFactory.getInstance().createHeaderFactory().createCallIdHeader(sendRtpItem.getCallId()); |
| | | |
| | | request = (SIPRequest) messageFactory.createRequest(requestURI, Request.BYE, callIdHeader, cSeqHeader, fromHeader, |
| | | request = (SIPRequest) SipFactory.getInstance().createMessageFactory().createRequest(requestURI, Request.BYE, callIdHeader, cSeqHeader, fromHeader, |
| | | toHeader, viaHeaders, maxForwards); |
| | | |
| | | request.addHeader(SipUtils.createUserAgentHeader(gitUtil)); |
| | |
| | | String sipAddress = platform.getDeviceIp() + ":" + platform.getDevicePort(); |
| | | Address concatAddress = SipFactory.getInstance().createAddressFactory().createAddress(SipFactory.getInstance().createAddressFactory() |
| | | .createSipURI(platform.getDeviceGBId(), sipAddress)); |
| | | |
| | | request.addHeader(SipFactory.getInstance().createHeaderFactory().createContactHeader(concatAddress)); |
| | | |
| | | return request; |
| | |
| | | mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
|
| | | errorEvent.response(e);
|
| | | }), e -> {
|
| | | // 这里为例避免一个通道的点播只有一个callID这个参数使用一个固定值
|
| | | ResponseEvent responseEvent = (ResponseEvent) e.event;
|
| | | SIPResponse response = (SIPResponse) responseEvent.getResponse();
|
| | | streamSession.put(device.getDeviceId(), channelId, "play", stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), response, VideoStreamSessionManager.SessionType.play);
|
| | | streamSession.put(device.getDeviceId(), channelId, e.callId, stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), response, VideoStreamSessionManager.SessionType.play);
|
| | | okEvent.response(e);
|
| | | });
|
| | | }
|
| | |
| | | sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent, event -> {
|
| | | ResponseEvent responseEvent = (ResponseEvent) event.event;
|
| | | SIPResponse response = (SIPResponse) responseEvent.getResponse();
|
| | | streamSession.put(device.getDeviceId(), channelId,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()).getCallId(), ssrcInfo.getStream(), ssrcInfo.getSsrc(), mediaServerItem.getId(), response, VideoStreamSessionManager.SessionType.playback);
|
| | | streamSession.put(device.getDeviceId(), channelId, event.callId, ssrcInfo.getStream(), ssrcInfo.getSsrc(), mediaServerItem.getId(), response, VideoStreamSessionManager.SessionType.playback);
|
| | | okEvent.response(event);
|
| | | });
|
| | | if (inviteStreamCallback != null) {
|
| | |
| | | if (ssrcIndex >= 0) {
|
| | | ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
|
| | | }
|
| | | streamSession.put(device.getDeviceId(), channelId, response.getCallIdHeader().getCallId(), ssrcInfo.getStream(), ssrc, mediaServerItem.getId(), response, VideoStreamSessionManager.SessionType.download);
|
| | | streamSession.put(device.getDeviceId(), channelId, newCallIdHeader.getCallId(), ssrcInfo.getStream(), ssrc, mediaServerItem.getId(), response, VideoStreamSessionManager.SessionType.download);
|
| | | okEvent.response(event);
|
| | | });
|
| | | }
|
| | |
| | | import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; |
| | | import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; |
| | | import com.genersoft.iot.vmp.service.IDeviceChannelService; |
| | | import com.genersoft.iot.vmp.service.IDeviceService; |
| | | import com.genersoft.iot.vmp.service.IMediaServerService; |
| | | import com.genersoft.iot.vmp.service.IPlatformService; |
| | |
| | | import javax.sip.InvalidArgumentException; |
| | | import javax.sip.RequestEvent; |
| | | import javax.sip.SipException; |
| | | import javax.sip.address.SipURI; |
| | | import javax.sip.header.CallIdHeader; |
| | | import javax.sip.header.FromHeader; |
| | | import javax.sip.header.HeaderAddress; |
| | | import javax.sip.header.ToHeader; |
| | | import javax.sip.message.Response; |
| | | import java.text.ParseException; |
| | | import java.util.HashMap; |
| | |
| | | |
| | | @Autowired |
| | | private IDeviceService deviceService; |
| | | |
| | | @Autowired |
| | | private IDeviceChannelService channelService; |
| | | |
| | | @Autowired |
| | | private IVideoManagerStorage storager; |
| | |
| | | */ |
| | | @Override |
| | | public void process(RequestEvent evt) { |
| | | |
| | | SIPRequest request = (SIPRequest) evt.getRequest(); |
| | | try { |
| | | responseAck((SIPRequest) evt.getRequest(), Response.OK); |
| | | responseAck(request, Response.OK); |
| | | } catch (SipException | InvalidArgumentException | ParseException e) { |
| | | logger.error("[回复BYE信息失败],{}", e.getMessage()); |
| | | } |
| | | CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME); |
| | | String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser(); |
| | | String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); |
| | | SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId, null, callIdHeader.getCallId()); |
| | | logger.info("[收到bye] {}/{}", platformGbId, channelId); |
| | | |
| | | SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId()); |
| | | |
| | | if (sendRtpItem != null){ |
| | | logger.info("[收到bye] 来自平台{}, 停止通道:{}", sendRtpItem.getPlatformId(), sendRtpItem.getChannelId()); |
| | | String streamId = sendRtpItem.getStreamId(); |
| | | Map<String, Object> param = new HashMap<>(); |
| | | param.put("vhost","__defaultVhost__"); |
| | |
| | | param.put("ssrc",sendRtpItem.getSsrc()); |
| | | logger.info("[收到bye] 停止向上级推流:{}", streamId); |
| | | MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); |
| | | redisCatchStorage.deleteSendRTPServer(platformGbId, channelId, callIdHeader.getCallId(), null); |
| | | redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), |
| | | callIdHeader.getCallId(), null); |
| | | zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param); |
| | | if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) { |
| | | ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId()); |
| | |
| | | logger.info("[收到bye] {} 通知设备停止推流时未找到设备信息", streamId); |
| | | } |
| | | try { |
| | | logger.warn("[停止点播] {}/{}", sendRtpItem.getDeviceId(), channelId); |
| | | cmder.streamByeCmd(device, channelId, streamId, null); |
| | | logger.warn("[停止点播] {}/{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); |
| | | cmder.streamByeCmd(device, sendRtpItem.getChannelId(), streamId, null); |
| | | } catch (InvalidArgumentException | ParseException | SipException | |
| | | SsrcTransactionNotFoundException e) { |
| | | logger.error("[收到bye] {} 无其它观看者,通知设备停止推流, 发送BYE失败 {}",streamId, e.getMessage()); |
| | | } |
| | | } |
| | | // if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) { |
| | | // MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, |
| | | // sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(), |
| | | // sendRtpItem.getPlatformId(), null, null, sendRtpItem.getMediaServerId()); |
| | | // redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel); |
| | | // } |
| | | } |
| | | }else { |
| | | // 可能是设备发送的停止 |
| | | SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(null, null, callIdHeader.getCallId(), null); |
| | | if (ssrcTransaction == null) { |
| | | logger.info("[收到bye] 但是无法获取推流信息和发流信息,忽略此请求"); |
| | | logger.info(request.toString()); |
| | | return; |
| | | } |
| | | // 可能是设备主动停止 |
| | | Device device = storager.queryVideoDeviceByChannelId(platformGbId); |
| | | if (device != null) { |
| | | storager.stopPlay(device.getDeviceId(), channelId); |
| | | StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId); |
| | | logger.info("[收到bye] 来自设备:{}, 通道已停止推流: {}", ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId()); |
| | | |
| | | Device device = deviceService.getDevice(ssrcTransaction.getDeviceId()); |
| | | if (device == null) { |
| | | logger.info("[收到bye] 未找到设备:{} ", ssrcTransaction.getDeviceId()); |
| | | return; |
| | | } |
| | | DeviceChannel channel = channelService.getOne(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId()); |
| | | if (channel == null) { |
| | | logger.info("[收到bye] 未找到通道,设备:{}, 通道:{}", ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId()); |
| | | return; |
| | | } |
| | | storager.stopPlay(device.getDeviceId(), channel.getChannelId()); |
| | | StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channel.getChannelId()); |
| | | if (streamInfo != null) { |
| | | redisCatchStorage.stopPlay(streamInfo); |
| | | mediaServerService.closeRTPServer(streamInfo.getMediaServerId(), streamInfo.getStream()); |
| | | } |
| | | SsrcTransaction ssrcTransactionForPlay = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, "play", null); |
| | | if (ssrcTransactionForPlay != null){ |
| | | if (ssrcTransactionForPlay.getCallId().equals(callIdHeader.getCallId())){ |
| | | // 释放ssrc |
| | | MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransactionForPlay.getMediaServerId()); |
| | | MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransaction.getMediaServerId()); |
| | | if (mediaServerItem != null) { |
| | | mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransactionForPlay.getSsrc()); |
| | | mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransaction.getSsrc()); |
| | | } |
| | | streamSession.remove(device.getDeviceId(), channelId, ssrcTransactionForPlay.getStream()); |
| | | streamSession.remove(device.getDeviceId(), channel.getChannelId(), ssrcTransaction.getStream()); |
| | | } |
| | | } |
| | | SsrcTransaction ssrcTransactionForPlayBack = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, callIdHeader.getCallId(), null); |
| | | if (ssrcTransactionForPlayBack != null) { |
| | | // 释放ssrc |
| | | MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransactionForPlayBack.getMediaServerId()); |
| | | if (mediaServerItem != null) { |
| | | mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransactionForPlayBack.getSsrc()); |
| | | } |
| | | streamSession.remove(device.getDeviceId(), channelId, ssrcTransactionForPlayBack.getStream()); |
| | | } |
| | | } |
| | | |
| | | } |
| | | } |
| | |
| | | if (rtpInfo.getBoolean("exist")) { |
| | | int localPort = rtpInfo.getInteger("local_port"); |
| | | if (localPort == 0) { |
| | | logger.warn("[点播],点播时发现rtpServerC存在,但是尚未开始推流"); |
| | | logger.warn("[点播],点播时发现rtpServer存在,但是尚未开始推流"); |
| | | // 此时说明rtpServer已经创建但是流还没有推上来 |
| | | WVPResult wvpResult = new WVPResult(); |
| | | wvpResult.setCode(ErrorCode.ERROR100.getCode()); |
| | |
| | | package com.genersoft.iot.vmp.service.redisMsg; |
| | | |
| | | import com.alibaba.fastjson2.JSON; |
| | | import com.genersoft.iot.vmp.conf.UserSetting; |
| | | import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType; |
| | | import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; |
| | | import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; |
| | | import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; |
| | | import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; |
| | | import com.genersoft.iot.vmp.service.IMediaServerService; |
| | | import com.genersoft.iot.vmp.service.IStreamPushService; |
| | | import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; |
| | | import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse; |
| | | import com.genersoft.iot.vmp.storager.IRedisCatchStorage; |
| | | import com.genersoft.iot.vmp.storager.IVideoManagerStorage; |
| | | import org.slf4j.Logger; |
| | | import org.slf4j.LoggerFactory; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.beans.factory.annotation.Qualifier; |
| | | import org.springframework.data.redis.connection.Message; |
| | | import org.springframework.data.redis.connection.MessageListener; |
| | | import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; |
| | | import org.springframework.stereotype.Component; |
| | | import org.springframework.util.ObjectUtils; |
| | | |
| | | import javax.sip.InvalidArgumentException; |
| | | import javax.sip.SipException; |
| | | import java.text.ParseException; |
| | | import java.util.HashMap; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.ConcurrentLinkedQueue; |
| | | |
| | | /** |
| | | * 接收redis发送的结束推流请求 |
| | |
| | | |
| | | private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamCloseResponseListener.class); |
| | | |
| | | private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); |
| | | |
| | | @Qualifier("taskExecutor") |
| | | @Autowired |
| | | private ThreadPoolTaskExecutor taskExecutor; |
| | | private IStreamPushService streamPushService; |
| | | |
| | | @Autowired |
| | | private IRedisCatchStorage redisCatchStorage; |
| | | |
| | | @Autowired |
| | | private IVideoManagerStorage storager; |
| | | |
| | | @Autowired |
| | | private ISIPCommanderForPlatform commanderFroPlatform; |
| | | |
| | | @Autowired |
| | | private UserSetting userSetting; |
| | | |
| | | @Autowired |
| | | private IMediaServerService mediaServerService; |
| | | |
| | | @Autowired |
| | | private ZLMRTPServerFactory zlmrtpServerFactory; |
| | | |
| | | |
| | | private Map<String, PushStreamResponseEvent> responseEvents = new ConcurrentHashMap<>(); |
| | |
| | | |
| | | @Override |
| | | public void onMessage(Message message, byte[] bytes) { |
| | | logger.info("[REDIS消息-请求推流结果]: {}", new String(message.getBody())); |
| | | boolean isEmpty = taskQueue.isEmpty(); |
| | | taskQueue.offer(message); |
| | | if (isEmpty) { |
| | | taskExecutor.execute(() -> { |
| | | while (!taskQueue.isEmpty()) { |
| | | Message msg = taskQueue.poll(); |
| | | logger.info("[REDIS消息-推流结束]: {}", new String(message.getBody())); |
| | | MessageForPushChannel pushChannel = JSON.parseObject(message.getBody(), MessageForPushChannel.class); |
| | | StreamPushItem push = streamPushService.getPush(pushChannel.getApp(), pushChannel.getStream()); |
| | | if (push != null) { |
| | | if (redisCatchStorage.isChannelSendingRTP(push.getGbId())) { |
| | | List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId( |
| | | push.getGbId()); |
| | | if (sendRtpItems.size() > 0) { |
| | | for (SendRtpItem sendRtpItem : sendRtpItems) { |
| | | ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId()); |
| | | // 停止向上级推流 |
| | | String streamId = sendRtpItem.getStreamId(); |
| | | Map<String, Object> param = new HashMap<>(); |
| | | param.put("vhost","__defaultVhost__"); |
| | | param.put("app",sendRtpItem.getApp()); |
| | | param.put("stream",streamId); |
| | | param.put("ssrc",sendRtpItem.getSsrc()); |
| | | logger.info("[REDIS消息-推流结束] 停止向上级推流:{}", streamId); |
| | | MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); |
| | | redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStreamId()); |
| | | zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param); |
| | | |
| | | try { |
| | | MessageForPushChannelResponse response = JSON.parseObject(new String(msg.getBody()), MessageForPushChannelResponse.class); |
| | | if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){ |
| | | logger.info("[REDIS消息-请求推流结果]:参数不全"); |
| | | continue; |
| | | commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem); |
| | | } catch (SipException | InvalidArgumentException | ParseException e) { |
| | | logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); |
| | | } |
| | | // 查看正在等待的invite消息 |
| | | if (responseEvents.get(response.getApp() + response.getStream()) != null) { |
| | | responseEvents.get(response.getApp() + response.getStream()).run(response); |
| | | } |
| | | }catch (Exception e) { |
| | | logger.warn("[REDIS消息-请求推流结果] 发现未处理的异常, \r\n{}", JSON.toJSONString(message)); |
| | | logger.error("[REDIS消息-请求推流结果] 异常内容: ", e); |
| | | if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) { |
| | | MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, |
| | | sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(), |
| | | sendRtpItem.getPlatformId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); |
| | | messageForPushChannel.setPlatFormIndex(parentPlatform.getId()); |
| | | redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel); |
| | | } |
| | | } |
| | | }); |
| | | } |
| | | } |
| | | } |
| | | |
| | | } |
| | | |
| | | public void addEvent(String app, String stream, PushStreamResponseEvent callback) { |
| | | responseEvents.put(app + stream, callback); |