29个文件已修改
7个文件已添加
1 文件已重命名
| | |
| | | alter table parent_platform |
| | | add startOfflinePush int default 0 null; |
| | | alter table stream_push |
| | | add serverId varchar(50) not null; |
| | | |
| | | alter table parent_platform |
| | | add administrativeDivision varchar(50) not null; |
| | | |
| | | alter table parent_platform |
| | | add catalogGroup int default 1 null; |
| | | |
| | | alter table device |
| | | add ssrcCheck int default 0 null; |
| | | |
| | |
| | | //************************** 第三方 ****************************************
|
| | | public static final String WVP_STREAM_GB_ID_PREFIX = "memberNo_";
|
| | | public static final String WVP_STREAM_GPS_MSG_PREFIX = "WVP_STREAM_GPS_MSG_";
|
| | |
|
| | | }
|
| | |
| | |
|
| | | import com.genersoft.iot.vmp.common.VideoManagerConstants;
|
| | | import com.genersoft.iot.vmp.service.impl.RedisAlarmMsgListener;
|
| | | import com.genersoft.iot.vmp.service.impl.RedisGPSMsgListener;
|
| | | import com.genersoft.iot.vmp.service.impl.RedisGpsMsgListener;
|
| | | import com.genersoft.iot.vmp.service.impl.RedisGbPlayMsgListener;
|
| | | import com.genersoft.iot.vmp.service.impl.RedisStreamMsgListener;
|
| | | import org.apache.commons.lang3.StringUtils;
|
| | | import org.springframework.beans.factory.annotation.Autowired;
|
| | | import org.springframework.beans.factory.annotation.Value;
|
| | |
| | | private int poolMaxWait;
|
| | |
|
| | | @Autowired
|
| | | private RedisGPSMsgListener redisGPSMsgListener;
|
| | | private RedisGpsMsgListener redisGPSMsgListener;
|
| | |
|
| | | @Autowired
|
| | | private RedisAlarmMsgListener redisAlarmMsgListener;
|
| | |
|
| | | @Autowired
|
| | | private RedisStreamMsgListener redisStreamMsgListener;
|
| | |
|
| | | @Autowired
|
| | | private RedisGbPlayMsgListener redisGbPlayMsgListener;
|
| | |
|
| | | @Bean
|
| | | public JedisPool jedisPool() {
|
| | |
| | | container.setConnectionFactory(connectionFactory);
|
| | | container.addMessageListener(redisGPSMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_GPS));
|
| | | container.addMessageListener(redisAlarmMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_SUBSCRIBE_ALARM_RECEIVE));
|
| | | container.addMessageListener(redisStreamMsgListener, new PatternTopic(VideoManagerConstants.WVP_MSG_STREAM_CHANGE_PREFIX + "PUSH"));
|
| | | container.addMessageListener(redisGbPlayMsgListener, new PatternTopic(RedisGbPlayMsgListener.WVP_PUSH_STREAM_KEY));
|
| | | return container;
|
| | | }
|
| | |
|
| | |
| | | private String mediaServerId; |
| | | |
| | | /** |
| | | * 使用的服务的ID |
| | | */ |
| | | private String serverId; |
| | | |
| | | /** |
| | | * invite的callId |
| | | */ |
| | | private String CallId; |
| | |
| | | public void setOnlyAudio(boolean onlyAudio) { |
| | | this.onlyAudio = onlyAudio; |
| | | } |
| | | |
| | | public String getServerId() { |
| | | return serverId; |
| | | } |
| | | |
| | | public void setServerId(String serverId) { |
| | | this.serverId = serverId; |
| | | } |
| | | } |
| | |
| | | String gbId = gbStream.getGbId(); |
| | | GPSMsgInfo gpsMsgInfo = redisCatchStorage.getGpsMsgInfo(gbId); |
| | | if (gpsMsgInfo != null) { // 无最新位置不发送 |
| | | logger.info("无最新位置不发送"); |
| | | if (logger.isDebugEnabled()) { |
| | | logger.debug("无最新位置不发送"); |
| | | } |
| | | // 经纬度都为0不发送 |
| | | if (gpsMsgInfo.getLng() == 0 && gpsMsgInfo.getLat() == 0) { |
| | | continue; |
| | |
| | | import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; |
| | | import com.genersoft.iot.vmp.service.IMediaServerService; |
| | | import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg; |
| | | import com.genersoft.iot.vmp.service.impl.RedisGbPlayMsgListener; |
| | | import com.genersoft.iot.vmp.storager.IRedisCatchStorage; |
| | | import com.genersoft.iot.vmp.storager.IVideoManagerStorage; |
| | | import com.genersoft.iot.vmp.utils.SerializeUtils; |
| | |
| | | public class AckRequestProcessor extends SIPRequestProcessorParent implements InitializingBean, ISIPRequestProcessor { |
| | | |
| | | private Logger logger = LoggerFactory.getLogger(AckRequestProcessor.class); |
| | | private String method = "ACK"; |
| | | private final String method = "ACK"; |
| | | |
| | | @Autowired |
| | | private SIPProcessorObserver sipProcessorObserver; |
| | |
| | | |
| | | @Autowired |
| | | private ISIPCommanderForPlatform commanderForPlatform; |
| | | |
| | | @Autowired |
| | | private RedisGbPlayMsgListener redisGbPlayMsgListener; |
| | | |
| | | |
| | | /** |
| | |
| | | param.put("pt", sendRtpItem.getPt()); |
| | | param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); |
| | | param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); |
| | | if (mediaInfo == null) { |
| | | RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance( |
| | | sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStreamId(), |
| | | sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(), |
| | | sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio()); |
| | | redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, jsonObject->{ |
| | | startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, jsonObject, param, callIdHeader); |
| | | }); |
| | | }else { |
| | | JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); |
| | | startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, jsonObject, param, callIdHeader); |
| | | } |
| | | |
| | | |
| | | } |
| | | } |
| | | private void startSendRtpStreamHand(RequestEvent evt, SendRtpItem sendRtpItem, ParentPlatform parentPlatform, |
| | | JSONObject jsonObject, Map<String, Object> param, CallIdHeader callIdHeader) { |
| | | if (jsonObject == null) { |
| | | logger.error("RTP推流失败: 请检查ZLM服务"); |
| | | } else if (jsonObject.getInteger("code") == 0) { |
| | |
| | | // 向上级平台 |
| | | commanderForPlatform.streamByeCmd(parentPlatform, callIdHeader.getCallId()); |
| | | } |
| | | } |
| | | |
| | | |
| | | // if (streamInfo == null) { // 流还没上来,对方就回复ack |
| | | // logger.info("监听流以等待流上线1 rtp/{}", sendRtpItem.getStreamId()); |
| | | // // 监听流上线 |
| | | // // 添加订阅 |
| | | // JSONObject subscribeKey = new JSONObject(); |
| | | // subscribeKey.put("app", "rtp"); |
| | | // subscribeKey.put("stream", sendRtpItem.getStreamId()); |
| | | // subscribeKey.put("regist", true); |
| | | // subscribeKey.put("schema", "rtmp"); |
| | | // subscribeKey.put("mediaServerId", sendRtpItem.getMediaServerId()); |
| | | // subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, |
| | | // (MediaServerItem mediaServerItemInUse, JSONObject json)->{ |
| | | // Map<String, Object> param = new HashMap<>(); |
| | | // param.put("vhost","__defaultVhost__"); |
| | | // param.put("app",json.getString("app")); |
| | | // param.put("stream",json.getString("stream")); |
| | | // param.put("ssrc", sendRtpItem.getSsrc()); |
| | | // param.put("dst_url",sendRtpItem.getIp()); |
| | | // param.put("dst_port", sendRtpItem.getPort()); |
| | | // param.put("is_udp", is_Udp); |
| | | // param.put("src_port", sendRtpItem.getLocalPort()); |
| | | // zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); |
| | | // }); |
| | | // }else { |
| | | // Map<String, Object> param = new HashMap<>(); |
| | | // param.put("vhost","__defaultVhost__"); |
| | | // param.put("app",streamInfo.getApp()); |
| | | // param.put("stream",streamInfo.getStream()); |
| | | // param.put("ssrc", sendRtpItem.getSsrc()); |
| | | // param.put("dst_url",sendRtpItem.getIp()); |
| | | // param.put("dst_port", sendRtpItem.getPort()); |
| | | // param.put("is_udp", is_Udp); |
| | | // param.put("src_port", sendRtpItem.getLocalPort()); |
| | | // |
| | | // JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); |
| | | // if (jsonObject.getInteger("code") != 0) { |
| | | // logger.info("监听流以等待流上线2 {}/{}", streamInfo.getApp(), streamInfo.getStream()); |
| | | // // 监听流上线 |
| | | // // 添加订阅 |
| | | // JSONObject subscribeKey = new JSONObject(); |
| | | // subscribeKey.put("app", "rtp"); |
| | | // subscribeKey.put("stream", streamInfo.getStream()); |
| | | // subscribeKey.put("regist", true); |
| | | // subscribeKey.put("schema", "rtmp"); |
| | | // subscribeKey.put("mediaServerId", sendRtpItem.getMediaServerId()); |
| | | // subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, |
| | | // (MediaServerItem mediaServerItemInUse, JSONObject json)->{ |
| | | // zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); |
| | | // }); |
| | | // } |
| | | // } |
| | | } |
| | | } |
| | | } |
| | |
| | | cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId, streamId, null); |
| | | } |
| | | if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) { |
| | | MessageForPushChannel messageForPushChannel = new MessageForPushChannel(); |
| | | messageForPushChannel.setType(0); |
| | | messageForPushChannel.setGbId(sendRtpItem.getChannelId()); |
| | | messageForPushChannel.setApp(sendRtpItem.getApp()); |
| | | messageForPushChannel.setStream(sendRtpItem.getStreamId()); |
| | | messageForPushChannel.setMediaServerId(sendRtpItem.getMediaServerId()); |
| | | messageForPushChannel.setPlatFormId(sendRtpItem.getPlatformId()); |
| | | MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, |
| | | sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(), |
| | | sendRtpItem.getPlatformId(), null, null, sendRtpItem.getMediaServerId()); |
| | | redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel); |
| | | } |
| | | } |
| | |
| | | @Component |
| | | public class CancelRequestProcessor extends SIPRequestProcessorParent implements InitializingBean, ISIPRequestProcessor { |
| | | |
| | | private String method = "CANCEL"; |
| | | private final String method = "CANCEL"; |
| | | |
| | | @Autowired |
| | | private SIPProcessorObserver sipProcessorObserver; |
| | |
| | | import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; |
| | | import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; |
| | | import com.genersoft.iot.vmp.service.IMediaServerService; |
| | | import com.genersoft.iot.vmp.service.IPlayService; |
| | | import com.genersoft.iot.vmp.service.IStreamPushService; |
| | | import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; |
| | | import com.genersoft.iot.vmp.service.bean.SSRCInfo; |
| | | import com.genersoft.iot.vmp.service.impl.RedisGbPlayMsgListener; |
| | | import com.genersoft.iot.vmp.storager.IRedisCatchStorage; |
| | | import com.genersoft.iot.vmp.storager.IVideoManagerStorage; |
| | | import com.genersoft.iot.vmp.utils.DateUtil; |
| | |
| | | |
| | | private final static Logger logger = LoggerFactory.getLogger(InviteRequestProcessor.class); |
| | | |
| | | private String method = "INVITE"; |
| | | private final String method = "INVITE"; |
| | | |
| | | @Autowired |
| | | private SIPCommanderFroPlatform cmderFroPlatform; |
| | | |
| | | @Autowired |
| | | private IVideoManagerStorage storager; |
| | | |
| | | @Autowired |
| | | private IStreamPushService streamPushService; |
| | | |
| | | @Autowired |
| | | private IRedisCatchStorage redisCatchStorage; |
| | |
| | | private ZLMMediaListManager mediaListManager; |
| | | |
| | | |
| | | @Autowired |
| | | private RedisGbPlayMsgListener redisGbPlayMsgListener; |
| | | |
| | | |
| | | @Override |
| | | public void afterPropertiesSet() throws Exception { |
| | | // 添加消息处理的订阅 |
| | |
| | | /** |
| | | * 处理invite请求 |
| | | * |
| | | * @param evt |
| | | * 请求消息 |
| | | * @param evt 请求消息 |
| | | */ |
| | | @Override |
| | | public void process(RequestEvent evt) { |
| | | // Invite Request消息实现,此消息一般为级联消息,上级给下级发送请求视频指令 |
| | | try { |
| | | Request request = evt.getRequest(); |
| | | SipURI sipURI = (SipURI) request.getRequestURI(); |
| | | SipURI sipUri = (SipURI) request.getRequestURI(); |
| | | //从subject读取channelId,不再从request-line读取。 有些平台request-line是平台国标编码,不是设备国标编码。 |
| | | //String channelId = sipURI.getUser(); |
| | | String channelId = SipUtils.getChannelIdFromHeader(request); |
| | |
| | | CallIdHeader callIdHeader = (CallIdHeader)request.getHeader(CallIdHeader.NAME); |
| | | if (requesterId == null || channelId == null) { |
| | | logger.info("无法从FromHeader的Address中获取到平台id,返回400"); |
| | | responseAck(evt, Response.BAD_REQUEST); // 参数不全, 发400,请求错误 |
| | | // 参数不全, 发400,请求错误 |
| | | responseAck(evt, Response.BAD_REQUEST); |
| | | return; |
| | | } |
| | | |
| | |
| | | DeviceChannel channel = storager.queryChannelInParentPlatform(requesterId, channelId); |
| | | GbStream gbStream = storager.queryStreamInParentPlatform(requesterId, channelId); |
| | | PlatformCatalog catalog = storager.getCatalog(channelId); |
| | | |
| | | MediaServerItem mediaServerItem = null; |
| | | StreamPushItem streamPushItem = null; |
| | | // 不是通道可能是直播流 |
| | | if (channel != null && gbStream == null ) { |
| | | if (channel.getStatus() == 0) { |
| | |
| | | } |
| | | responseAck(evt, Response.CALL_IS_BEING_FORWARDED); // 通道存在,发181,呼叫转接中 |
| | | }else if(channel == null && gbStream != null){ |
| | | |
| | | String mediaServerId = gbStream.getMediaServerId(); |
| | | mediaServerItem = mediaServerService.getOne(mediaServerId); |
| | | if (mediaServerItem == null) { |
| | | if ("proxy".equals(gbStream.getStreamType())) { |
| | | logger.info("[ app={}, stream={} ]找不到zlm {},返回410",gbStream.getApp(), gbStream.getStream(), mediaServerId); |
| | | responseAck(evt, Response.GONE); |
| | | return; |
| | | } else { |
| | | streamPushItem = streamPushService.getPush(gbStream.getApp(), gbStream.getStream()); |
| | | if (streamPushItem == null || streamPushItem.getServerId().equals(userSetting.getServerId())) { |
| | | logger.info("[ app={}, stream={} ]找不到zlm {},返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId); |
| | | responseAck(evt, Response.GONE); |
| | | return; |
| | | } |
| | | } |
| | | } else { |
| | | if ("push".equals(gbStream.getStreamType())) { |
| | | streamPushItem = streamPushService.getPush(gbStream.getApp(), gbStream.getStream()); |
| | | if (streamPushItem == null) { |
| | | logger.info("[ app={}, stream={} ]找不到zlm {},返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId); |
| | | responseAck(evt, Response.GONE); |
| | | return; |
| | | } |
| | | } |
| | | } |
| | | responseAck(evt, Response.CALL_IS_BEING_FORWARDED); // 通道存在,发181,呼叫转接中 |
| | | }else if (catalog != null) { |
| | |
| | | } |
| | | } |
| | | }else if (gbStream != null) { |
| | | if (streamPushItem.isStatus()) { |
| | | // 在线状态 |
| | | pushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, |
| | | mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); |
| | | } else { |
| | | // 不在线 拉起 |
| | | notifyStreamOnline(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, |
| | | mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); |
| | | } |
| | | |
| | | } |
| | | |
| | | } |
| | | |
| | | } catch (SipException | InvalidArgumentException | ParseException e) { |
| | | e.printStackTrace(); |
| | | logger.warn("sdp解析错误"); |
| | | e.printStackTrace(); |
| | | } catch (SdpParseException e) { |
| | | e.printStackTrace(); |
| | | } catch (SdpException e) { |
| | | e.printStackTrace(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 安排推流 |
| | | */ |
| | | |
| | | private void pushStream(RequestEvent evt, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform, |
| | | CallIdHeader callIdHeader, MediaServerItem mediaServerItem, |
| | | int port, Boolean tcpActive, boolean mediaTransmissionTCP, |
| | | String channelId, String addressStr, String ssrc, String requesterId) throws InvalidArgumentException, ParseException, SipException { |
| | | // 推流 |
| | | if (streamPushItem.getServerId().equals(userSetting.getServerId())) { |
| | | Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream()); |
| | | if (!streamReady ) { |
| | | if (streamReady) { |
| | | // 自平台内容 |
| | | SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, |
| | | gbStream.getApp(), gbStream.getStream(), channelId, |
| | | mediaTransmissionTCP); |
| | | |
| | | if (sendRtpItem == null) { |
| | | logger.warn("服务器端口资源不足"); |
| | | responseAck(evt, Response.BUSY_HERE); |
| | | return; |
| | | } |
| | | if (tcpActive != null) { |
| | | sendRtpItem.setTcpActive(tcpActive); |
| | | } |
| | | sendRtpItem.setPlayType(InviteStreamType.PUSH); |
| | | // 写入redis, 超时时回复 |
| | | sendRtpItem.setStatus(1); |
| | | sendRtpItem.setCallId(callIdHeader.getCallId()); |
| | | byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog()); |
| | | sendRtpItem.setDialog(dialogByteArray); |
| | | byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction()); |
| | | sendRtpItem.setTransaction(transactionByteArray); |
| | | redisCatchStorage.updateSendRTPSever(sendRtpItem); |
| | | sendStreamAck(mediaServerItem, sendRtpItem, platform, evt); |
| | | } else { |
| | | // 不在线 拉起 |
| | | notifyStreamOnline(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, |
| | | mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); |
| | | } |
| | | |
| | | } else { |
| | | // 其他平台内容 |
| | | otherWvpPushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, |
| | | mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); |
| | | } |
| | | |
| | | } |
| | | |
| | | /** |
| | | * 通知流上线 |
| | | */ |
| | | private void notifyStreamOnline(RequestEvent evt, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform, |
| | | CallIdHeader callIdHeader, MediaServerItem mediaServerItem, |
| | | int port, Boolean tcpActive, boolean mediaTransmissionTCP, |
| | | String channelId, String addressStr, String ssrc, String requesterId) throws InvalidArgumentException, ParseException, SipException { |
| | | if ("proxy".equals(gbStream.getStreamType())) { |
| | | // TODO 控制启用以使设备上线 |
| | | logger.info("[ app={}, stream={} ]通道离线,启用流后开始推流",gbStream.getApp(), gbStream.getStream()); |
| | |
| | | } |
| | | // 发送redis消息以使设备上线 |
| | | logger.info("[ app={}, stream={} ]通道离线,发送redis信息控制设备开始推流",gbStream.getApp(), gbStream.getStream()); |
| | | MessageForPushChannel messageForPushChannel = new MessageForPushChannel(); |
| | | messageForPushChannel.setType(1); |
| | | messageForPushChannel.setGbId(gbStream.getGbId()); |
| | | messageForPushChannel.setApp(gbStream.getApp()); |
| | | messageForPushChannel.setStream(gbStream.getStream()); |
| | | // TODO 获取低负载的节点 |
| | | messageForPushChannel.setMediaServerId(gbStream.getMediaServerId()); |
| | | messageForPushChannel.setPlatFormId(platform.getServerGBId()); |
| | | messageForPushChannel.setPlatFormName(platform.getName()); |
| | | |
| | | MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1, |
| | | gbStream.getApp(), gbStream.getStream(), gbStream.getGbId(), gbStream.getPlatformId(), |
| | | platform.getName(), null, gbStream.getMediaServerId()); |
| | | redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel); |
| | | // 设置超时 |
| | | dynamicTask.startDelay(callIdHeader.getCallId(), ()->{ |
| | |
| | | } |
| | | }, userSetting.getPlatformPlayTimeout()); |
| | | // 添加监听 |
| | | MediaServerItem finalMediaServerItem = mediaServerItem; |
| | | int finalPort = port; |
| | | boolean finalMediaTransmissionTCP = mediaTransmissionTCP; |
| | | Boolean finalTcpActive = tcpActive; |
| | | mediaListManager.addChannelOnlineEventLister(gbStream.getGbId(), (app, stream)->{ |
| | | SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(finalMediaServerItem, addressStr, finalPort, ssrc, requesterId, |
| | | app, stream, channelId, finalMediaTransmissionTCP); |
| | | |
| | | // 添加在本机上线的通知 |
| | | mediaListManager.addChannelOnlineEventLister(gbStream.getGbId(), (app, stream, serverId) -> { |
| | | dynamicTask.stop(callIdHeader.getCallId()); |
| | | if (serverId.equals(userSetting.getServerId())) { |
| | | SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId, |
| | | app, stream, channelId, mediaTransmissionTCP); |
| | | |
| | | if (sendRtpItem == null) { |
| | | logger.warn("服务器端口资源不足"); |
| | |
| | | byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction()); |
| | | sendRtpItem.setTransaction(transactionByteArray); |
| | | redisCatchStorage.updateSendRTPSever(sendRtpItem); |
| | | sendStreamAck(finalMediaServerItem, sendRtpItem, platform, evt); |
| | | |
| | | sendStreamAck(mediaServerItem, sendRtpItem, platform, evt); |
| | | } else { |
| | | // 其他平台内容 |
| | | otherWvpPushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, |
| | | mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); |
| | | } |
| | | }); |
| | | } |
| | | }else { |
| | | SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, |
| | | gbStream.getApp(), gbStream.getStream(), channelId, |
| | | mediaTransmissionTCP); |
| | | } |
| | | |
| | | |
| | | if (sendRtpItem == null) { |
| | | /** |
| | | * 来自其他wvp的推流 |
| | | */ |
| | | private void otherWvpPushStream(RequestEvent evt, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform, |
| | | CallIdHeader callIdHeader, MediaServerItem mediaServerItem, |
| | | int port, Boolean tcpActive, boolean mediaTransmissionTCP, |
| | | String channelId, String addressStr, String ssrc, String requesterId) { |
| | | logger.info("[级联点播]直播流来自其他平台,发送redis消息"); |
| | | // 发送redis消息 |
| | | redisGbPlayMsgListener.sendMsg(streamPushItem.getServerId(), streamPushItem.getMediaServerId(), |
| | | streamPushItem.getApp(), streamPushItem.getStream(), addressStr, port, ssrc, requesterId, |
| | | channelId, mediaTransmissionTCP, null, responseSendItemMsg -> { |
| | | SendRtpItem sendRtpItem = responseSendItemMsg.getSendRtpItem(); |
| | | if (sendRtpItem == null || responseSendItemMsg.getMediaServerItem() == null) { |
| | | logger.warn("服务器端口资源不足"); |
| | | try { |
| | | responseAck(evt, Response.BUSY_HERE); |
| | | } catch (SipException e) { |
| | | e.printStackTrace(); |
| | | } catch (InvalidArgumentException e) { |
| | | e.printStackTrace(); |
| | | } catch (ParseException e) { |
| | | e.printStackTrace(); |
| | | } |
| | | return; |
| | | } |
| | | // 收到sendItem |
| | | if (tcpActive != null) { |
| | | sendRtpItem.setTcpActive(tcpActive); |
| | | } |
| | |
| | | byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction()); |
| | | sendRtpItem.setTransaction(transactionByteArray); |
| | | redisCatchStorage.updateSendRTPSever(sendRtpItem); |
| | | sendStreamAck(mediaServerItem, sendRtpItem, platform, evt); |
| | | sendStreamAck(responseSendItemMsg.getMediaServerItem(), sendRtpItem, platform, evt); |
| | | }, (wvpResult) -> { |
| | | try { |
| | | // 错误 |
| | | if (wvpResult.getCode() == RedisGbPlayMsgListener.ERROR_CODE_OFFLINE) { |
| | | // 离线 |
| | | // 查询是否在本机上线了 |
| | | StreamPushItem currentStreamPushItem = streamPushService.getPush(streamPushItem.getApp(), streamPushItem.getStream()); |
| | | if (currentStreamPushItem.isStatus()) { |
| | | // 在线状态 |
| | | pushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, |
| | | mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); |
| | | |
| | | } else { |
| | | // 不在线 拉起 |
| | | notifyStreamOnline(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, |
| | | mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); |
| | | } |
| | | } |
| | | } catch (InvalidArgumentException e) { |
| | | throw new RuntimeException(e); |
| | | } catch (ParseException e) { |
| | | throw new RuntimeException(e); |
| | | } catch (SipException e) { |
| | | throw new RuntimeException(e); |
| | | } |
| | | |
| | | |
| | | } |
| | | |
| | | } |
| | | |
| | | } catch (SipException | InvalidArgumentException | ParseException e) { |
| | | try { |
| | | responseAck(evt, Response.BUSY_HERE); |
| | | } catch (SipException e) { |
| | | e.printStackTrace(); |
| | | logger.warn("sdp解析错误"); |
| | | } catch (InvalidArgumentException e) { |
| | | e.printStackTrace(); |
| | | } catch (SdpParseException e) { |
| | | e.printStackTrace(); |
| | | } catch (SdpException e) { |
| | | } catch (ParseException e) { |
| | | e.printStackTrace(); |
| | | } |
| | | return; |
| | | }); |
| | | } |
| | | |
| | | public void sendStreamAck(MediaServerItem mediaServerItem, SendRtpItem sendRtpItem, ParentPlatform platform, RequestEvent evt){ |
| | |
| | | |
| | | private final Logger logger = LoggerFactory.getLogger(RegisterRequestProcessor.class); |
| | | |
| | | public String method = "REGISTER"; |
| | | public final String method = "REGISTER"; |
| | | |
| | | @Autowired |
| | | private SipConfig sipConfig; |
| | |
| | | public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler { |
| | | |
| | | private Logger logger = LoggerFactory.getLogger(RecordInfoResponseMessageHandler.class); |
| | | public static volatile List<String> threadNameList = new ArrayList(); |
| | | private final String cmdType = "RecordInfo"; |
| | | private final static String CACHE_RECORDINFO_KEY = "CACHE_RECORDINFO_"; |
| | | |
| | | private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>(); |
| | | |
| | |
| | | @Component |
| | | public class ByeResponseProcessor extends SIPResponseProcessorAbstract { |
| | | |
| | | private String method = "BYE"; |
| | | private final String method = "BYE"; |
| | | |
| | | @Autowired |
| | | private SipLayer sipLayer; |
| | |
| | | @Component |
| | | public class CancelResponseProcessor extends SIPResponseProcessorAbstract { |
| | | |
| | | private String method = "CANCEL"; |
| | | private final String method = "CANCEL"; |
| | | |
| | | @Autowired |
| | | private SipLayer sipLayer; |
| | |
| | | public class InviteResponseProcessor extends SIPResponseProcessorAbstract { |
| | | |
| | | private final static Logger logger = LoggerFactory.getLogger(InviteResponseProcessor.class); |
| | | private String method = "INVITE"; |
| | | private final String method = "INVITE"; |
| | | |
| | | @Autowired |
| | | private SipLayer sipLayer; |
| | |
| | | public class RegisterResponseProcessor extends SIPResponseProcessorAbstract { |
| | | |
| | | private Logger logger = LoggerFactory.getLogger(RegisterResponseProcessor.class); |
| | | private String method = "REGISTER"; |
| | | private final String method = "REGISTER"; |
| | | |
| | | @Autowired |
| | | private ISIPCommanderForPlatform sipCommanderForPlatform; |
| | |
| | | if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal()
|
| | | || item.getOriginType() == OriginType.RTMP_PUSH.ordinal()
|
| | | || item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) {
|
| | | streamPushItem = zlmMediaListManager.addPush(item);
|
| | | item.setSeverId(userSetting.getServerId());
|
| | | zlmMediaListManager.addPush(item);
|
| | | }
|
| | |
|
| | | List<GbStream> gbStreams = new ArrayList<>();
|
| | | if (streamPushItem == null || streamPushItem.getGbId() == null) {
|
| | | GbStream gbStream = storager.getGbStream(app, streamId);
|
| | | gbStreams.add(gbStream);
|
| | | }else {
|
| | | if (streamPushItem.getGbId() != null) {
|
| | | gbStreams.add(streamPushItem);
|
| | | }
|
| | | }
|
| | | if (gbStreams.size() > 0) {
|
| | | // List<GbStream> gbStreams = new ArrayList<>();
|
| | | // if (streamPushItem == null || streamPushItem.getGbId() == null) {
|
| | | // GbStream gbStream = storager.getGbStream(app, streamId);
|
| | | // gbStreams.add(gbStream);
|
| | | // }else {
|
| | | // if (streamPushItem.getGbId() != null) {
|
| | | // gbStreams.add(streamPushItem);
|
| | | // }
|
| | | // }
|
| | | // if (gbStreams.size() > 0) {
|
| | | // eventPublisher.catalogEventPublishForStream(null, gbStreams, CatalogEvent.ON);
|
| | | }
|
| | | // }
|
| | |
|
| | | }else {
|
| | | // 兼容流注销时类型从redis记录获取
|
| | |
| | | import java.util.regex.Matcher; |
| | | import java.util.regex.Pattern; |
| | | |
| | | /** |
| | | * @author lin |
| | | */ |
| | | @Component |
| | | public class ZLMMediaListManager { |
| | | |
| | |
| | | } |
| | | } |
| | | } |
| | | // StreamProxyItem streamProxyItem = gbStreamMapper.selectOne(transform.getApp(), transform.getStream()); |
| | | List<GbStream> gbStreamList = gbStreamMapper.selectByGBId(transform.getGbId()); |
| | | if (gbStreamList != null && gbStreamList.size() == 1) { |
| | | transform.setGbStreamId(gbStreamList.get(0).getGbStreamId()); |
| | |
| | | } |
| | | if (transform != null) { |
| | | if (channelOnlineEvents.get(transform.getGbId()) != null) { |
| | | channelOnlineEvents.get(transform.getGbId()).run(transform.getApp(), transform.getStream()); |
| | | channelOnlineEvents.get(transform.getGbId()).run(transform.getApp(), transform.getStream(), transform.getServerId()); |
| | | channelOnlineEvents.remove(transform.getGbId()); |
| | | } |
| | | } |
| | | } |
| | | |
| | | |
| | | storager.updateMedia(transform); |
| | | return transform; |
| | |
| | | |
| | | import com.alibaba.fastjson.JSONArray; |
| | | import com.alibaba.fastjson.JSONObject; |
| | | import com.genersoft.iot.vmp.conf.UserSetting; |
| | | import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; |
| | | import org.slf4j.Logger; |
| | |
| | | |
| | | @Autowired |
| | | private ZLMRESTfulUtils zlmresTfulUtils; |
| | | |
| | | @Autowired |
| | | private UserSetting userSetting; |
| | | |
| | | private int[] portRangeArray = new int[2]; |
| | | |
| | |
| | | sendRtpItem.setTcp(tcp); |
| | | sendRtpItem.setApp("rtp"); |
| | | sendRtpItem.setLocalPort(localPort); |
| | | sendRtpItem.setServerId(userSetting.getServerId()); |
| | | sendRtpItem.setMediaServerId(serverItem.getId()); |
| | | return sendRtpItem; |
| | | } |
| | |
| | | sendRtpItem.setChannelId(channelId); |
| | | sendRtpItem.setTcp(tcp); |
| | | sendRtpItem.setLocalPort(localPort); |
| | | sendRtpItem.setServerId(userSetting.getServerId()); |
| | | sendRtpItem.setMediaServerId(serverItem.getId()); |
| | | return sendRtpItem; |
| | | } |
| | |
| | | package com.genersoft.iot.vmp.media.zlm.dto; |
| | | |
| | | /** |
| | | * @author lin |
| | | */ |
| | | public interface ChannelOnlineEvent { |
| | | |
| | | void run(String app, String stream); |
| | | void run(String app, String stream, String serverId); |
| | | } |
| | |
| | | private String originUrl; |
| | | |
| | | /** |
| | | * 服务器id |
| | | * 流媒体服务器id |
| | | */ |
| | | private String mediaServerId; |
| | | |
| | | /** |
| | | * 服务器id |
| | | */ |
| | | private String severId; |
| | | |
| | | /** |
| | | * GMT unix系统时间戳,单位秒 |
| | |
| | | public void setStreamInfo(StreamInfo streamInfo) { |
| | | this.streamInfo = streamInfo; |
| | | } |
| | | |
| | | public String getSeverId() { |
| | | return severId; |
| | | } |
| | | |
| | | public void setSeverId(String severId) { |
| | | this.severId = severId; |
| | | } |
| | | } |
| | |
| | | */ |
| | | private String mediaServerId; |
| | | |
| | | /** |
| | | * 使用的服务ID |
| | | */ |
| | | private String serverId; |
| | | |
| | | public String getVhost() { |
| | | return vhost; |
| | | } |
| | |
| | | public void setMediaServerId(String mediaServerId) { |
| | | this.mediaServerId = mediaServerId; |
| | | } |
| | | |
| | | public String getServerId() { |
| | | return serverId; |
| | | } |
| | | |
| | | public void setServerId(String serverId) { |
| | | this.serverId = serverId; |
| | | } |
| | | } |
| | | |
| | |
| | | private IVideoManagerStorage storager; |
| | | |
| | | |
| | | |
| | | @Scheduled(fixedRate = 30 * 1000) //每30秒执行一次 |
| | | public void execute(){ |
| | | List<GPSMsgInfo> gpsMsgInfo = redisCatchStorage.getAllGpsMsgInfo(); |
| | |
| | | package com.genersoft.iot.vmp.service.bean; |
| | | |
| | | import java.util.stream.Stream; |
| | | |
| | | /** |
| | | * 当上级平台 |
| | | * @author lin |
| | | */ |
| | | public class MessageForPushChannel { |
| | | /** |
| | |
| | | */ |
| | | private String mediaServerId; |
| | | |
| | | public static MessageForPushChannel getInstance(int type, String app, String stream, String gbId, |
| | | String platFormId, String platFormName, String serverId, |
| | | String mediaServerId){ |
| | | MessageForPushChannel messageForPushChannel = new MessageForPushChannel(); |
| | | messageForPushChannel.setType(type); |
| | | messageForPushChannel.setGbId(gbId); |
| | | messageForPushChannel.setApp(app); |
| | | messageForPushChannel.setStream(stream); |
| | | messageForPushChannel.setMediaServerId(mediaServerId); |
| | | messageForPushChannel.setPlatFormId(platFormId); |
| | | messageForPushChannel.setPlatFormName(platFormName); |
| | | return messageForPushChannel; |
| | | } |
| | | |
| | | |
| | | public int getType() { |
| | | return type; |
New file |
| | |
| | | package com.genersoft.iot.vmp.service.bean; |
| | | |
| | | /** |
| | | * redis消息:请求下级推送流信息 |
| | | * @author lin |
| | | */ |
| | | public class RequestPushStreamMsg { |
| | | |
| | | |
| | | /** |
| | | * 下级服务ID |
| | | */ |
| | | private String mediaServerId; |
| | | |
| | | /** |
| | | * 流ID |
| | | */ |
| | | private String app; |
| | | |
| | | /** |
| | | * 应用名 |
| | | */ |
| | | private String stream; |
| | | |
| | | /** |
| | | * 目标IP |
| | | */ |
| | | private String ip; |
| | | |
| | | /** |
| | | * 目标端口 |
| | | */ |
| | | private int port; |
| | | |
| | | /** |
| | | * ssrc |
| | | */ |
| | | private String ssrc; |
| | | |
| | | /** |
| | | * 是否使用TCP方式 |
| | | */ |
| | | private boolean tcp; |
| | | |
| | | /** |
| | | * 本地使用的端口 |
| | | */ |
| | | private int srcPort; |
| | | |
| | | /** |
| | | * 发送时,rtp的pt(uint8_t),不传时默认为96 |
| | | */ |
| | | private int pt; |
| | | |
| | | /** |
| | | * 发送时,rtp的负载类型。为true时,负载为ps;为false时,为es; |
| | | */ |
| | | private boolean ps; |
| | | |
| | | /** |
| | | * 是否只有音频 |
| | | */ |
| | | private boolean onlyAudio; |
| | | |
| | | |
| | | public static RequestPushStreamMsg getInstance(String mediaServerId, String app, String stream, String ip, int port, String ssrc, |
| | | boolean tcp, int srcPort, int pt, boolean ps, boolean onlyAudio) { |
| | | RequestPushStreamMsg requestPushStreamMsg = new RequestPushStreamMsg(); |
| | | requestPushStreamMsg.setMediaServerId(mediaServerId); |
| | | requestPushStreamMsg.setApp(app); |
| | | requestPushStreamMsg.setStream(stream); |
| | | requestPushStreamMsg.setIp(ip); |
| | | requestPushStreamMsg.setPort(port); |
| | | requestPushStreamMsg.setSsrc(ssrc); |
| | | requestPushStreamMsg.setTcp(tcp); |
| | | requestPushStreamMsg.setSrcPort(srcPort); |
| | | requestPushStreamMsg.setPt(pt); |
| | | requestPushStreamMsg.setPs(ps); |
| | | requestPushStreamMsg.setOnlyAudio(onlyAudio); |
| | | return requestPushStreamMsg; |
| | | } |
| | | |
| | | public String getMediaServerId() { |
| | | return mediaServerId; |
| | | } |
| | | |
| | | public void setMediaServerId(String mediaServerId) { |
| | | this.mediaServerId = mediaServerId; |
| | | } |
| | | |
| | | public String getApp() { |
| | | return app; |
| | | } |
| | | |
| | | public void setApp(String app) { |
| | | this.app = app; |
| | | } |
| | | |
| | | public String getStream() { |
| | | return stream; |
| | | } |
| | | |
| | | public void setStream(String stream) { |
| | | this.stream = stream; |
| | | } |
| | | |
| | | public String getIp() { |
| | | return ip; |
| | | } |
| | | |
| | | public void setIp(String ip) { |
| | | this.ip = ip; |
| | | } |
| | | |
| | | public int getPort() { |
| | | return port; |
| | | } |
| | | |
| | | public void setPort(int port) { |
| | | this.port = port; |
| | | } |
| | | |
| | | public String getSsrc() { |
| | | return ssrc; |
| | | } |
| | | |
| | | public void setSsrc(String ssrc) { |
| | | this.ssrc = ssrc; |
| | | } |
| | | |
| | | public boolean isTcp() { |
| | | return tcp; |
| | | } |
| | | |
| | | public void setTcp(boolean tcp) { |
| | | this.tcp = tcp; |
| | | } |
| | | |
| | | public int getSrcPort() { |
| | | return srcPort; |
| | | } |
| | | |
| | | public void setSrcPort(int srcPort) { |
| | | this.srcPort = srcPort; |
| | | } |
| | | |
| | | public int getPt() { |
| | | return pt; |
| | | } |
| | | |
| | | public void setPt(int pt) { |
| | | this.pt = pt; |
| | | } |
| | | |
| | | public boolean isPs() { |
| | | return ps; |
| | | } |
| | | |
| | | public void setPs(boolean ps) { |
| | | this.ps = ps; |
| | | } |
| | | |
| | | public boolean isOnlyAudio() { |
| | | return onlyAudio; |
| | | } |
| | | |
| | | public void setOnlyAudio(boolean onlyAudio) { |
| | | this.onlyAudio = onlyAudio; |
| | | } |
| | | } |
New file |
| | |
| | | package com.genersoft.iot.vmp.service.bean; |
| | | |
| | | /** |
| | | * redis消息:请求下级回复推送信息 |
| | | * @author lin |
| | | */ |
| | | public class RequestSendItemMsg { |
| | | |
| | | /** |
| | | * 下级服务ID |
| | | */ |
| | | private String serverId; |
| | | |
| | | /** |
| | | * 下级服务ID |
| | | */ |
| | | private String mediaServerId; |
| | | |
| | | /** |
| | | * 流ID |
| | | */ |
| | | private String app; |
| | | |
| | | /** |
| | | * 应用名 |
| | | */ |
| | | private String stream; |
| | | |
| | | /** |
| | | * 目标IP |
| | | */ |
| | | private String ip; |
| | | |
| | | /** |
| | | * 目标端口 |
| | | */ |
| | | private int port; |
| | | |
| | | /** |
| | | * ssrc |
| | | */ |
| | | private String ssrc; |
| | | |
| | | /** |
| | | * 平台国标编号 |
| | | */ |
| | | private String platformId; |
| | | |
| | | /** |
| | | * 平台名称 |
| | | */ |
| | | private String platformName; |
| | | |
| | | /** |
| | | * 通道ID |
| | | */ |
| | | private String channelId; |
| | | |
| | | |
| | | /** |
| | | * 是否使用TCP |
| | | */ |
| | | private Boolean isTcp; |
| | | |
| | | |
| | | |
| | | |
| | | public static RequestSendItemMsg getInstance(String serverId, String mediaServerId, String app, String stream, String ip, int port, |
| | | String ssrc, String platformId, String channelId, Boolean isTcp, String platformName) { |
| | | RequestSendItemMsg requestSendItemMsg = new RequestSendItemMsg(); |
| | | requestSendItemMsg.setServerId(serverId); |
| | | requestSendItemMsg.setMediaServerId(mediaServerId); |
| | | requestSendItemMsg.setApp(app); |
| | | requestSendItemMsg.setStream(stream); |
| | | requestSendItemMsg.setIp(ip); |
| | | requestSendItemMsg.setPort(port); |
| | | requestSendItemMsg.setSsrc(ssrc); |
| | | requestSendItemMsg.setPlatformId(platformId); |
| | | requestSendItemMsg.setPlatformName(platformName); |
| | | requestSendItemMsg.setChannelId(channelId); |
| | | requestSendItemMsg.setTcp(isTcp); |
| | | |
| | | return requestSendItemMsg; |
| | | } |
| | | |
| | | public String getServerId() { |
| | | return serverId; |
| | | } |
| | | |
| | | public void setServerId(String serverId) { |
| | | this.serverId = serverId; |
| | | } |
| | | |
| | | public String getMediaServerId() { |
| | | return mediaServerId; |
| | | } |
| | | |
| | | public void setMediaServerId(String mediaServerId) { |
| | | this.mediaServerId = mediaServerId; |
| | | } |
| | | |
| | | public String getApp() { |
| | | return app; |
| | | } |
| | | |
| | | public void setApp(String app) { |
| | | this.app = app; |
| | | } |
| | | |
| | | public String getStream() { |
| | | return stream; |
| | | } |
| | | |
| | | public void setStream(String stream) { |
| | | this.stream = stream; |
| | | } |
| | | |
| | | public String getIp() { |
| | | return ip; |
| | | } |
| | | |
| | | public void setIp(String ip) { |
| | | this.ip = ip; |
| | | } |
| | | |
| | | public int getPort() { |
| | | return port; |
| | | } |
| | | |
| | | public void setPort(int port) { |
| | | this.port = port; |
| | | } |
| | | |
| | | public String getSsrc() { |
| | | return ssrc; |
| | | } |
| | | |
| | | public void setSsrc(String ssrc) { |
| | | this.ssrc = ssrc; |
| | | } |
| | | |
| | | public String getPlatformId() { |
| | | return platformId; |
| | | } |
| | | |
| | | public void setPlatformId(String platformId) { |
| | | this.platformId = platformId; |
| | | } |
| | | |
| | | public String getPlatformName() { |
| | | return platformName; |
| | | } |
| | | |
| | | public void setPlatformName(String platformName) { |
| | | this.platformName = platformName; |
| | | } |
| | | |
| | | public String getChannelId() { |
| | | return channelId; |
| | | } |
| | | |
| | | public void setChannelId(String channelId) { |
| | | this.channelId = channelId; |
| | | } |
| | | |
| | | public Boolean getTcp() { |
| | | return isTcp; |
| | | } |
| | | |
| | | public void setTcp(Boolean tcp) { |
| | | isTcp = tcp; |
| | | } |
| | | } |
New file |
| | |
| | | package com.genersoft.iot.vmp.service.bean; |
| | | |
| | | import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; |
| | | |
| | | /** |
| | | * redis消息:下级回复推送信息 |
| | | * @author lin |
| | | */ |
| | | public class ResponseSendItemMsg { |
| | | |
| | | private SendRtpItem sendRtpItem; |
| | | |
| | | private MediaServerItem mediaServerItem; |
| | | |
| | | public SendRtpItem getSendRtpItem() { |
| | | return sendRtpItem; |
| | | } |
| | | |
| | | public void setSendRtpItem(SendRtpItem sendRtpItem) { |
| | | this.sendRtpItem = sendRtpItem; |
| | | } |
| | | |
| | | public MediaServerItem getMediaServerItem() { |
| | | return mediaServerItem; |
| | | } |
| | | |
| | | public void setMediaServerItem(MediaServerItem mediaServerItem) { |
| | | this.mediaServerItem = mediaServerItem; |
| | | } |
| | | } |
New file |
| | |
| | | package com.genersoft.iot.vmp.service.bean; |
| | | |
| | | /** |
| | | * @author lin |
| | | */ |
| | | public class WvpRedisMsg { |
| | | |
| | | public static WvpRedisMsg getInstance(String fromId, String toId, String type, String cmd, String serial, String content){ |
| | | WvpRedisMsg wvpRedisMsg = new WvpRedisMsg(); |
| | | wvpRedisMsg.setFromId(fromId); |
| | | wvpRedisMsg.setToId(toId); |
| | | wvpRedisMsg.setType(type); |
| | | wvpRedisMsg.setCmd(cmd); |
| | | wvpRedisMsg.setSerial(serial); |
| | | wvpRedisMsg.setContent(content); |
| | | return wvpRedisMsg; |
| | | } |
| | | |
| | | private String fromId; |
| | | |
| | | private String toId; |
| | | /** |
| | | * req 请求, res 回复 |
| | | */ |
| | | private String type; |
| | | private String cmd; |
| | | |
| | | /** |
| | | * 消息的ID |
| | | */ |
| | | private String serial; |
| | | private Object content; |
| | | |
| | | private final static String requestTag = "req"; |
| | | private final static String responseTag = "res"; |
| | | |
| | | public static WvpRedisMsg getRequestInstance(String fromId, String toId, String cmd, String serial, Object content) { |
| | | WvpRedisMsg wvpRedisMsg = new WvpRedisMsg(); |
| | | wvpRedisMsg.setType(requestTag); |
| | | wvpRedisMsg.setFromId(fromId); |
| | | wvpRedisMsg.setToId(toId); |
| | | wvpRedisMsg.setCmd(cmd); |
| | | wvpRedisMsg.setSerial(serial); |
| | | wvpRedisMsg.setContent(content); |
| | | return wvpRedisMsg; |
| | | } |
| | | |
| | | public static WvpRedisMsg getResponseInstance() { |
| | | WvpRedisMsg wvpRedisMsg = new WvpRedisMsg(); |
| | | wvpRedisMsg.setType(responseTag); |
| | | return wvpRedisMsg; |
| | | } |
| | | |
| | | public static WvpRedisMsg getResponseInstance(String fromId, String toId, String cmd, String serial, Object content) { |
| | | WvpRedisMsg wvpRedisMsg = new WvpRedisMsg(); |
| | | wvpRedisMsg.setType(responseTag); |
| | | wvpRedisMsg.setFromId(fromId); |
| | | wvpRedisMsg.setToId(toId); |
| | | wvpRedisMsg.setCmd(cmd); |
| | | wvpRedisMsg.setSerial(serial); |
| | | wvpRedisMsg.setContent(content); |
| | | return wvpRedisMsg; |
| | | } |
| | | |
| | | public static boolean isRequest(WvpRedisMsg wvpRedisMsg) { |
| | | return requestTag.equals(wvpRedisMsg.getType()); |
| | | } |
| | | |
| | | public String getSerial() { |
| | | return serial; |
| | | } |
| | | |
| | | public void setSerial(String serial) { |
| | | this.serial = serial; |
| | | } |
| | | |
| | | public String getFromId() { |
| | | return fromId; |
| | | } |
| | | |
| | | public void setFromId(String fromId) { |
| | | this.fromId = fromId; |
| | | } |
| | | |
| | | public String getToId() { |
| | | return toId; |
| | | } |
| | | |
| | | public void setToId(String toId) { |
| | | this.toId = toId; |
| | | } |
| | | |
| | | public String getType() { |
| | | return type; |
| | | } |
| | | |
| | | public void setType(String type) { |
| | | this.type = type; |
| | | } |
| | | |
| | | public String getCmd() { |
| | | return cmd; |
| | | } |
| | | |
| | | public void setCmd(String cmd) { |
| | | this.cmd = cmd; |
| | | } |
| | | |
| | | public Object getContent() { |
| | | return content; |
| | | } |
| | | |
| | | public void setContent(Object content) { |
| | | this.content = content; |
| | | } |
| | | } |
New file |
| | |
| | | package com.genersoft.iot.vmp.service.bean; |
| | | |
| | | /** |
| | | * @author lin |
| | | */ |
| | | |
| | | public class WvpRedisMsgCmd { |
| | | |
| | | public static final String GET_SEND_ITEM = "GetSendItem"; |
| | | public static final String REQUEST_PUSH_STREAM = "RequestPushStream"; |
| | | |
| | | } |
New file |
| | |
| | | package com.genersoft.iot.vmp.service.impl; |
| | | |
| | | import com.alibaba.fastjson.JSON; |
| | | import com.alibaba.fastjson.JSONObject; |
| | | import com.genersoft.iot.vmp.conf.DynamicTask; |
| | | import com.genersoft.iot.vmp.conf.UserSetting; |
| | | import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; |
| | | import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; |
| | | import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; |
| | | import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; |
| | | import com.genersoft.iot.vmp.service.IMediaServerService; |
| | | import com.genersoft.iot.vmp.service.bean.*; |
| | | import com.genersoft.iot.vmp.storager.IRedisCatchStorage; |
| | | import com.genersoft.iot.vmp.utils.redis.RedisUtil; |
| | | import com.genersoft.iot.vmp.vmanager.bean.WVPResult; |
| | | import org.slf4j.Logger; |
| | | import org.slf4j.LoggerFactory; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.data.redis.connection.Message; |
| | | import org.springframework.data.redis.connection.MessageListener; |
| | | import org.springframework.stereotype.Component; |
| | | |
| | | import javax.sip.InvalidArgumentException; |
| | | import javax.sip.SipException; |
| | | import java.text.ParseException; |
| | | import java.util.HashMap; |
| | | import java.util.Map; |
| | | import java.util.UUID; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | |
| | | |
| | | /** |
| | | * 监听下级发送推送信息,并发送国标推流消息上级 |
| | | * @author lin |
| | | */ |
| | | @Component |
| | | public class RedisGbPlayMsgListener implements MessageListener { |
| | | |
| | | private final static Logger logger = LoggerFactory.getLogger(RedisGbPlayMsgListener.class); |
| | | |
| | | public static final String WVP_PUSH_STREAM_KEY = "WVP_PUSH_STREAM"; |
| | | |
| | | /** |
| | | * 流媒体不存在的错误玛 |
| | | */ |
| | | public static final int ERROR_CODE_MEDIA_SERVER_NOT_FOUND = -1; |
| | | |
| | | /** |
| | | * 离线的错误玛 |
| | | */ |
| | | public static final int ERROR_CODE_OFFLINE = -2; |
| | | |
| | | /** |
| | | * 超时的错误玛 |
| | | */ |
| | | public static final int ERROR_CODE_TIMEOUT = -3; |
| | | |
| | | private Map<String, PlayMsgCallback> callbacks = new ConcurrentHashMap<>(); |
| | | private Map<String, PlayMsgCallbackForStartSendRtpStream> callbacksForStartSendRtpStream = new ConcurrentHashMap<>(); |
| | | private Map<String, PlayMsgErrorCallback> callbacksForError = new ConcurrentHashMap<>(); |
| | | |
| | | @Autowired |
| | | private UserSetting userSetting; |
| | | |
| | | @Autowired |
| | | private RedisUtil redis; |
| | | |
| | | @Autowired |
| | | private ZLMMediaListManager zlmMediaListManager; |
| | | |
| | | @Autowired |
| | | private ZLMRTPServerFactory zlmrtpServerFactory; |
| | | |
| | | @Autowired |
| | | private IMediaServerService mediaServerService; |
| | | |
| | | @Autowired |
| | | private IRedisCatchStorage redisCatchStorage; |
| | | |
| | | @Autowired |
| | | private DynamicTask dynamicTask; |
| | | |
| | | @Autowired |
| | | private ZLMMediaListManager mediaListManager; |
| | | |
| | | @Autowired |
| | | private ZLMHttpHookSubscribe subscribe; |
| | | |
| | | |
| | | public interface PlayMsgCallback{ |
| | | void handler(ResponseSendItemMsg responseSendItemMsg); |
| | | } |
| | | |
| | | public interface PlayMsgCallbackForStartSendRtpStream{ |
| | | void handler(JSONObject jsonObject); |
| | | } |
| | | |
| | | public interface PlayMsgErrorCallback{ |
| | | void handler(WVPResult wvpResult); |
| | | } |
| | | |
| | | @Override |
| | | public void onMessage(Message message, byte[] bytes) { |
| | | JSONObject msgJSON = JSON.parseObject(message.getBody(), JSONObject.class); |
| | | WvpRedisMsg wvpRedisMsg = JSON.toJavaObject(msgJSON, WvpRedisMsg.class); |
| | | if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) { |
| | | return; |
| | | } |
| | | if (WvpRedisMsg.isRequest(wvpRedisMsg)) { |
| | | logger.info("[收到REDIS通知] 请求: {}", new String(message.getBody())); |
| | | |
| | | switch (wvpRedisMsg.getCmd()){ |
| | | case WvpRedisMsgCmd.GET_SEND_ITEM: |
| | | RequestSendItemMsg content = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), RequestSendItemMsg.class); |
| | | requestSendItemMsgHand(content, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); |
| | | break; |
| | | case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: |
| | | RequestPushStreamMsg param = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), RequestPushStreamMsg.class);; |
| | | requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); |
| | | break; |
| | | default: |
| | | break; |
| | | } |
| | | |
| | | }else { |
| | | logger.info("[收到REDIS通知] 回复: {}", new String(message.getBody())); |
| | | switch (wvpRedisMsg.getCmd()){ |
| | | case WvpRedisMsgCmd.GET_SEND_ITEM: |
| | | |
| | | WVPResult content = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), WVPResult.class); |
| | | |
| | | String key = wvpRedisMsg.getSerial(); |
| | | switch (content.getCode()) { |
| | | case 0: |
| | | ResponseSendItemMsg responseSendItemMsg =JSON.toJavaObject((JSONObject)content.getData(), ResponseSendItemMsg.class); |
| | | PlayMsgCallback playMsgCallback = callbacks.get(key); |
| | | if (playMsgCallback != null) { |
| | | callbacksForError.remove(key); |
| | | playMsgCallback.handler(responseSendItemMsg); |
| | | } |
| | | break; |
| | | case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: |
| | | case ERROR_CODE_OFFLINE: |
| | | case ERROR_CODE_TIMEOUT: |
| | | PlayMsgErrorCallback errorCallback = callbacksForError.get(key); |
| | | if (errorCallback != null) { |
| | | callbacks.remove(key); |
| | | errorCallback.handler(content); |
| | | } |
| | | break; |
| | | default: |
| | | break; |
| | | } |
| | | break; |
| | | case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: |
| | | WVPResult wvpResult = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), WVPResult.class); |
| | | String serial = wvpRedisMsg.getSerial(); |
| | | switch (wvpResult.getCode()) { |
| | | case 0: |
| | | JSONObject jsonObject = (JSONObject)wvpResult.getData(); |
| | | PlayMsgCallbackForStartSendRtpStream playMsgCallback = callbacksForStartSendRtpStream.get(serial); |
| | | if (playMsgCallback != null) { |
| | | callbacksForError.remove(serial); |
| | | playMsgCallback.handler(jsonObject); |
| | | } |
| | | break; |
| | | case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: |
| | | case ERROR_CODE_OFFLINE: |
| | | case ERROR_CODE_TIMEOUT: |
| | | PlayMsgErrorCallback errorCallback = callbacksForError.get(serial); |
| | | if (errorCallback != null) { |
| | | callbacks.remove(serial); |
| | | errorCallback.handler(wvpResult); |
| | | } |
| | | break; |
| | | default: |
| | | break; |
| | | } |
| | | break; |
| | | default: |
| | | break; |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | |
| | | } |
| | | |
| | | /** |
| | | * 处理收到的请求推流的请求 |
| | | */ |
| | | private void requestPushStreamMsgHand(RequestPushStreamMsg requestPushStreamMsg, String fromId, String serial) { |
| | | MediaServerItem mediaInfo = mediaServerService.getOne(requestPushStreamMsg.getMediaServerId()); |
| | | if (mediaInfo == null) { |
| | | // TODO 回复错误 |
| | | return; |
| | | } |
| | | String is_Udp = requestPushStreamMsg.isTcp() ? "0" : "1"; |
| | | Map<String, Object> param = new HashMap<>(); |
| | | param.put("vhost","__defaultVhost__"); |
| | | param.put("app",requestPushStreamMsg.getApp()); |
| | | param.put("stream",requestPushStreamMsg.getStream()); |
| | | param.put("ssrc", requestPushStreamMsg.getSsrc()); |
| | | param.put("dst_url",requestPushStreamMsg.getIp()); |
| | | param.put("dst_port", requestPushStreamMsg.getPort()); |
| | | param.put("is_udp", is_Udp); |
| | | param.put("src_port", requestPushStreamMsg.getSrcPort()); |
| | | param.put("pt", requestPushStreamMsg.getPt()); |
| | | param.put("use_ps", requestPushStreamMsg.isPs() ? "1" : "0"); |
| | | param.put("only_audio", requestPushStreamMsg.isOnlyAudio() ? "1" : "0"); |
| | | JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); |
| | | // 回复消息 |
| | | responsePushStream(jsonObject, fromId, serial); |
| | | } |
| | | |
| | | private void responsePushStream(JSONObject content, String toId, String serial) { |
| | | |
| | | WVPResult<JSONObject> result = new WVPResult<>(); |
| | | result.setCode(0); |
| | | result.setData(content); |
| | | |
| | | WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), toId, |
| | | WvpRedisMsgCmd.REQUEST_PUSH_STREAM, serial, result); |
| | | JSONObject jsonObject = (JSONObject)JSON.toJSON(response); |
| | | redis.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); |
| | | } |
| | | |
| | | /** |
| | | * 处理收到的请求sendItem的请求 |
| | | */ |
| | | private void requestSendItemMsgHand(RequestSendItemMsg content, String toId, String serial) { |
| | | MediaServerItem mediaServerItem = mediaServerService.getOne(content.getMediaServerId()); |
| | | if (mediaServerItem == null) { |
| | | logger.info("[回复推流信息] 流媒体{}不存在 ", content.getMediaServerId()); |
| | | |
| | | WVPResult<SendRtpItem> result = new WVPResult<>(); |
| | | result.setCode(ERROR_CODE_MEDIA_SERVER_NOT_FOUND); |
| | | result.setMsg("流媒体不存在"); |
| | | |
| | | WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), toId, |
| | | WvpRedisMsgCmd.GET_SEND_ITEM, serial, result); |
| | | |
| | | JSONObject jsonObject = (JSONObject)JSON.toJSON(response); |
| | | redis.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); |
| | | return; |
| | | } |
| | | // 确定流是否在线 |
| | | boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, content.getApp(), content.getStream()); |
| | | if (streamReady) { |
| | | logger.info("[回复推流信息] {}/{}", content.getApp(), content.getStream()); |
| | | responseSendItem(mediaServerItem, content, toId, serial); |
| | | }else { |
| | | // 流已经离线 |
| | | // 发送redis消息以使设备上线 |
| | | logger.info("[ app={}, stream={} ]通道离线,发送redis信息控制设备开始推流",content.getApp(), content.getStream()); |
| | | |
| | | String taskKey = UUID.randomUUID().toString(); |
| | | // 设置超时 |
| | | dynamicTask.startDelay(taskKey, ()->{ |
| | | logger.info("[ app={}, stream={} ] 等待设备开始推流超时", content.getApp(), content.getStream()); |
| | | WVPResult<SendRtpItem> result = new WVPResult<>(); |
| | | result.setCode(ERROR_CODE_TIMEOUT); |
| | | WvpRedisMsg response = WvpRedisMsg.getResponseInstance( |
| | | userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, result |
| | | ); |
| | | JSONObject jsonObject = (JSONObject)JSON.toJSON(response); |
| | | redis.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); |
| | | }, userSetting.getPlatformPlayTimeout()); |
| | | |
| | | // 添加订阅 |
| | | JSONObject subscribeKey = new JSONObject(); |
| | | subscribeKey.put("app", content.getApp()); |
| | | subscribeKey.put("stream", content.getStream()); |
| | | subscribeKey.put("regist", true); |
| | | subscribeKey.put("schema", "rtmp"); |
| | | subscribeKey.put("mediaServerId", mediaServerItem.getId()); |
| | | subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, |
| | | (MediaServerItem mediaServerItemInUse, JSONObject json)->{ |
| | | dynamicTask.stop(taskKey); |
| | | responseSendItem(mediaServerItem, content, toId, serial); |
| | | }); |
| | | |
| | | MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1, content.getApp(), content.getStream(), |
| | | content.getChannelId(), content.getPlatformId(), content.getPlatformName(), content.getServerId(), |
| | | content.getMediaServerId()); |
| | | redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel); |
| | | |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 将获取到的sendItem发送出去 |
| | | */ |
| | | private void responseSendItem(MediaServerItem mediaServerItem, RequestSendItemMsg content, String toId, String serial) { |
| | | SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, content.getIp(), |
| | | content.getPort(), content.getSsrc(), content.getPlatformId(), |
| | | content.getApp(), content.getStream(), content.getChannelId(), |
| | | content.getTcp()); |
| | | |
| | | WVPResult<ResponseSendItemMsg> result = new WVPResult<>(); |
| | | result.setCode(0); |
| | | ResponseSendItemMsg responseSendItemMsg = new ResponseSendItemMsg(); |
| | | responseSendItemMsg.setSendRtpItem(sendRtpItem); |
| | | responseSendItemMsg.setMediaServerItem(mediaServerItem); |
| | | result.setData(responseSendItemMsg); |
| | | |
| | | WvpRedisMsg response = WvpRedisMsg.getResponseInstance( |
| | | userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, result |
| | | ); |
| | | JSONObject jsonObject = (JSONObject)JSON.toJSON(response); |
| | | redis.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); |
| | | } |
| | | |
| | | /** |
| | | * 发送消息要求下级生成推流信息 |
| | | * @param serverId 下级服务ID |
| | | * @param app 应用名 |
| | | * @param stream 流ID |
| | | * @param ip 目标IP |
| | | * @param port 目标端口 |
| | | * @param ssrc ssrc |
| | | * @param platformId 平台国标编号 |
| | | * @param channelId 通道ID |
| | | * @param isTcp 是否使用TCP |
| | | * @param callback 得到信息的回调 |
| | | */ |
| | | public void sendMsg(String serverId, String mediaServerId, String app, String stream, String ip, int port, String ssrc, |
| | | String platformId, String channelId, boolean isTcp, String platformName, PlayMsgCallback callback, PlayMsgErrorCallback errorCallback) { |
| | | RequestSendItemMsg requestSendItemMsg = RequestSendItemMsg.getInstance( |
| | | serverId, mediaServerId, app, stream, ip, port, ssrc, platformId, channelId, isTcp, platformName); |
| | | requestSendItemMsg.setServerId(serverId); |
| | | String key = UUID.randomUUID().toString(); |
| | | WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId, WvpRedisMsgCmd.GET_SEND_ITEM, |
| | | key, requestSendItemMsg); |
| | | |
| | | JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg); |
| | | logger.info("[请求推流SendItem] {}: {}", serverId, jsonObject); |
| | | callbacks.put(key, callback); |
| | | callbacksForError.put(key, errorCallback); |
| | | dynamicTask.startDelay(key, ()->{ |
| | | callbacks.remove(key); |
| | | callbacksForError.remove(key); |
| | | WVPResult<Object> wvpResult = new WVPResult<>(); |
| | | wvpResult.setCode(ERROR_CODE_TIMEOUT); |
| | | wvpResult.setMsg("timeout"); |
| | | errorCallback.handler(wvpResult); |
| | | }, userSetting.getPlatformPlayTimeout()); |
| | | redis.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); |
| | | } |
| | | |
| | | /** |
| | | * 发送请求推流的消息 |
| | | * @param param 推流参数 |
| | | * @param callback 回调 |
| | | */ |
| | | public void sendMsgForStartSendRtpStream(String serverId, RequestPushStreamMsg param, PlayMsgCallbackForStartSendRtpStream callback) { |
| | | String key = UUID.randomUUID().toString(); |
| | | WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId, |
| | | WvpRedisMsgCmd.REQUEST_PUSH_STREAM, key, param); |
| | | |
| | | JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg); |
| | | logger.info("[REDIS 请求其他平台推流] {}: {}", serverId, jsonObject); |
| | | dynamicTask.startDelay(key, ()->{ |
| | | callbacksForStartSendRtpStream.remove(key); |
| | | callbacksForError.remove(key); |
| | | }, userSetting.getPlatformPlayTimeout()); |
| | | callbacksForStartSendRtpStream.put(key, callback); |
| | | callbacksForError.put(key, (wvpResult)->{ |
| | | logger.info("[REDIS 请求其他平台推流] 失败: {}", wvpResult.getMsg()); |
| | | callbacksForStartSendRtpStream.remove(key); |
| | | callbacksForError.remove(key); |
| | | }); |
| | | redis.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); |
| | | } |
| | | } |
File was renamed from src/main/java/com/genersoft/iot/vmp/service/impl/RedisGPSMsgListener.java |
| | |
| | | import com.alibaba.fastjson.JSON; |
| | | import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; |
| | | import com.genersoft.iot.vmp.storager.IRedisCatchStorage; |
| | | import org.jetbrains.annotations.NotNull; |
| | | import org.slf4j.Logger; |
| | | import org.slf4j.LoggerFactory; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | |
| | | import org.springframework.data.redis.connection.MessageListener; |
| | | import org.springframework.stereotype.Component; |
| | | |
| | | /** |
| | | * 接收来自redis的GPS更新通知 |
| | | * @author lin |
| | | */ |
| | | @Component |
| | | public class RedisGPSMsgListener implements MessageListener { |
| | | public class RedisGpsMsgListener implements MessageListener { |
| | | |
| | | private final static Logger logger = LoggerFactory.getLogger(RedisGPSMsgListener.class); |
| | | private final static Logger logger = LoggerFactory.getLogger(RedisGpsMsgListener.class); |
| | | |
| | | @Autowired |
| | | private IRedisCatchStorage redisCatchStorage; |
| | | |
| | | @Override |
| | | public void onMessage(Message message, byte[] bytes) { |
| | | logger.info("收到来自REDIS的GPS通知: {}", new String(message.getBody())); |
| | | public void onMessage(@NotNull Message message, byte[] bytes) { |
| | | if (logger.isDebugEnabled()) { |
| | | logger.debug("收到来自REDIS的GPS通知: {}", new String(message.getBody())); |
| | | } |
| | | GPSMsgInfo gpsMsgInfo = JSON.parseObject(message.getBody(), GPSMsgInfo.class); |
| | | redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo); |
| | | } |
New file |
| | |
| | | package com.genersoft.iot.vmp.service.impl; |
| | | |
| | | import com.alibaba.fastjson.JSON; |
| | | import com.alibaba.fastjson.JSONObject; |
| | | import com.genersoft.iot.vmp.conf.UserSetting; |
| | | import com.genersoft.iot.vmp.gb28181.bean.AlarmChannelMessage; |
| | | import com.genersoft.iot.vmp.gb28181.bean.Device; |
| | | import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm; |
| | | import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; |
| | | import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; |
| | | import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; |
| | | import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; |
| | | import com.genersoft.iot.vmp.storager.IVideoManagerStorage; |
| | | import com.genersoft.iot.vmp.utils.DateUtil; |
| | | import org.slf4j.Logger; |
| | | import org.slf4j.LoggerFactory; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.data.redis.connection.Message; |
| | | import org.springframework.data.redis.connection.MessageListener; |
| | | import org.springframework.stereotype.Component; |
| | | |
| | | |
| | | /** |
| | | * @author lin |
| | | */ |
| | | @Component |
| | | public class RedisStreamMsgListener implements MessageListener { |
| | | |
| | | private final static Logger logger = LoggerFactory.getLogger(RedisStreamMsgListener.class); |
| | | |
| | | @Autowired |
| | | private ISIPCommander commander; |
| | | |
| | | @Autowired |
| | | private ISIPCommanderForPlatform commanderForPlatform; |
| | | |
| | | @Autowired |
| | | private IVideoManagerStorage storage; |
| | | |
| | | @Autowired |
| | | private UserSetting userSetting; |
| | | |
| | | @Autowired |
| | | private ZLMMediaListManager zlmMediaListManager; |
| | | |
| | | @Override |
| | | public void onMessage(Message message, byte[] bytes) { |
| | | |
| | | JSONObject steamMsgJson = JSON.parseObject(message.getBody(), JSONObject.class); |
| | | if (steamMsgJson == null) { |
| | | logger.warn("[REDIS的ALARM通知]消息解析失败"); |
| | | return; |
| | | } |
| | | String serverId = steamMsgJson.getString("serverId"); |
| | | |
| | | if (userSetting.getServerId().equals(serverId)) { |
| | | // 自己发送的消息忽略即可 |
| | | return; |
| | | } |
| | | logger.info("[REDIS通知] 流变化: {}", new String(message.getBody())); |
| | | String app = steamMsgJson.getString("app"); |
| | | String stream = steamMsgJson.getString("stream"); |
| | | boolean register = steamMsgJson.getBoolean("register"); |
| | | String mediaServerId = steamMsgJson.getString("mediaServerId"); |
| | | MediaItem mediaItem = new MediaItem(); |
| | | mediaItem.setSeverId(serverId); |
| | | mediaItem.setApp(app); |
| | | mediaItem.setStream(stream); |
| | | mediaItem.setRegist(register); |
| | | mediaItem.setMediaServerId(mediaServerId); |
| | | mediaItem.setCreateStamp(System.currentTimeMillis()/1000); |
| | | mediaItem.setAliveSecond(0L); |
| | | mediaItem.setTotalReaderCount("0"); |
| | | mediaItem.setOriginType(0); |
| | | mediaItem.setOriginTypeStr("0"); |
| | | mediaItem.setOriginTypeStr("unknown"); |
| | | |
| | | zlmMediaListManager.addPush(mediaItem); |
| | | |
| | | |
| | | } |
| | | } |
| | |
| | | streamPushItem.setStatus(true); |
| | | streamPushItem.setStreamType("push"); |
| | | streamPushItem.setVhost(item.getVhost()); |
| | | streamPushItem.setServerId(item.getSeverId()); |
| | | return streamPushItem; |
| | | } |
| | | |
| | |
| | | |
| | | |
| | | /** |
| | | * 获取但个推流 |
| | | * @param app |
| | | * @param stream |
| | | * @return |
| | | */ |
| | | StreamPushItem getMedia(String app, String stream); |
| | | |
| | | |
| | | /** |
| | | * 清空推流列表 |
| | | */ |
| | | void clearMediaList(); |
| | |
| | | public interface StreamPushMapper { |
| | | |
| | | @Insert("INSERT INTO stream_push (app, stream, totalReaderCount, originType, originTypeStr, " + |
| | | "createStamp, aliveSecond, mediaServerId) VALUES" + |
| | | "createStamp, aliveSecond, mediaServerId, serverId) VALUES" + |
| | | "('${app}', '${stream}', '${totalReaderCount}', '${originType}', '${originTypeStr}', " + |
| | | "'${createStamp}', '${aliveSecond}', '${mediaServerId}' )") |
| | | "'${createStamp}', '${aliveSecond}', '${mediaServerId}' , '${serverId}' )") |
| | | int add(StreamPushItem streamPushItem); |
| | | |
| | | @Update("UPDATE stream_push " + |
| | |
| | | String scanKey = VideoManagerConstants.WVP_STREAM_GPS_MSG_PREFIX + userSetting.getServerId() + "_*"; |
| | | List<GPSMsgInfo> result = new ArrayList<>(); |
| | | List<Object> keys = redis.scan(scanKey); |
| | | for (int i = 0; i < keys.size(); i++) { |
| | | String key = (String) keys.get(i); |
| | | for (Object o : keys) { |
| | | String key = (String) o; |
| | | GPSMsgInfo gpsMsgInfo = (GPSMsgInfo) redis.get(key); |
| | | if (!gpsMsgInfo.isStored()) { // 只取没有存过得 |
| | | result.add((GPSMsgInfo)redis.get(key)); |
| | |
| | | @Override |
| | | public void sendStreamPushRequestedMsg(MessageForPushChannel msg) { |
| | | String key = VideoManagerConstants.VM_MSG_STREAM_PUSH_REQUESTED; |
| | | logger.info("[redis 推流被请求通知] {}: {}-{}", key, msg.getApp(), msg.getStream()); |
| | | logger.info("[redis 推流被请求通知] {}: {}/{}", key, msg.getApp(), msg.getStream()); |
| | | redis.convertAndSend(key, (JSONObject)JSON.toJSON(msg)); |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | @Override |
| | | public StreamPushItem getMedia(String app, String stream) { |
| | | return streamPushMapper.selectOne(app, stream); |
| | | } |
| | | |
| | | @Override |
| | | public void clearMediaList() { |
| | | streamPushMapper.clear(); |
| | | } |
| | |
| | | </template> |
| | | |
| | | <script> |
| | | let webrtcPlayer = null; |
| | | export default { |
| | | name: 'rtcPlayer', |
| | | data() { |
| | | return { |
| | | webrtcPlayer: null, |
| | | timer: null |
| | | }; |
| | | }, |
| | |
| | | }, |
| | | methods: { |
| | | play: function (url) { |
| | | this.webrtcPlayer = new ZLMRTCClient.Endpoint({ |
| | | webrtcPlayer = new ZLMRTCClient.Endpoint({ |
| | | element: document.getElementById('webRtcPlayerBox'),// video 标签 |
| | | debug: true,// 是否打印日志 |
| | | zlmsdpUrl: url,//流地址 |
| | |
| | | videoEnable: false, |
| | | recvOnly: true, |
| | | }) |
| | | this.webrtcPlayer.on(ZLMRTCClient.Events.WEBRTC_ICE_CANDIDATE_ERROR,(e)=>{// ICE 协商出错 |
| | | webrtcPlayer.on(ZLMRTCClient.Events.WEBRTC_ICE_CANDIDATE_ERROR,(e)=>{// ICE 协商出错 |
| | | console.error('ICE 协商出错') |
| | | this.eventcallbacK("ICE ERROR", "ICE 协商出错") |
| | | }); |
| | | |
| | | this.webrtcPlayer.on(ZLMRTCClient.Events.WEBRTC_ON_REMOTE_STREAMS,(e)=>{//获取到了远端流,可以播放 |
| | | webrtcPlayer.on(ZLMRTCClient.Events.WEBRTC_ON_REMOTE_STREAMS,(e)=>{//获取到了远端流,可以播放 |
| | | console.error('播放成功',e.streams) |
| | | this.eventcallbacK("playing", "播放成功") |
| | | }); |
| | | |
| | | this.webrtcPlayer.on(ZLMRTCClient.Events.WEBRTC_OFFER_ANWSER_EXCHANGE_FAILED,(e)=>{// offer anwser 交换失败 |
| | | webrtcPlayer.on(ZLMRTCClient.Events.WEBRTC_OFFER_ANWSER_EXCHANGE_FAILED,(e)=>{// offer anwser 交换失败 |
| | | console.error('offer anwser 交换失败',e) |
| | | this.eventcallbacK("OFFER ANSWER ERROR ", "offer anwser 交换失败") |
| | | if (e.code ==-400 && e.msg=="流不存在"){ |
| | |
| | | } |
| | | }); |
| | | |
| | | this.webrtcPlayer.on(ZLMRTCClient.Events.WEBRTC_ON_LOCAL_STREAM,(s)=>{// 获取到了本地流 |
| | | webrtcPlayer.on(ZLMRTCClient.Events.WEBRTC_ON_LOCAL_STREAM,(s)=>{// 获取到了本地流 |
| | | |
| | | // document.getElementById('selfVideo').srcObject=s; |
| | | this.eventcallbacK("LOCAL STREAM", "获取到了本地流") |
| | |
| | | |
| | | }, |
| | | pause: function () { |
| | | if (this.webrtcPlayer != null) { |
| | | this.webrtcPlayer.close(); |
| | | this.webrtcPlayer = null; |
| | | if (webrtcPlayer != null) { |
| | | webrtcPlayer.close(); |
| | | webrtcPlayer = null; |
| | | } |
| | | |
| | | }, |