From 764d04b497356ba6bcbb75fd42b51eca750f7223 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期三, 29 五月 2024 15:02:51 +0800 Subject: [PATCH] 调整上级观看消息的发送 --- src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java | 254 +++++++++++++++++++++++++++++++++++++++++++++----- 1 files changed, 225 insertions(+), 29 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java index e4538e1..a0b3341 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java @@ -4,17 +4,20 @@ import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.common.VideoManagerConstants; +import com.genersoft.iot.vmp.conf.MediaConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; +import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.media.bean.MediaInfo; -import com.genersoft.iot.vmp.media.event.MediaArrivalEvent; -import com.genersoft.iot.vmp.media.event.MediaDepartureEvent; -import com.genersoft.iot.vmp.media.event.MediaServerChangeEvent; -import com.genersoft.iot.vmp.media.event.MediaServerDeleteEvent; +import com.genersoft.iot.vmp.media.bean.MediaServer; +import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; +import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent; +import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerChangeEvent; +import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerDeleteEvent; import com.genersoft.iot.vmp.media.service.IMediaNodeServerService; import com.genersoft.iot.vmp.media.service.IMediaServerService; -import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; +import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType; import com.genersoft.iot.vmp.service.IInviteStreamService; @@ -76,6 +79,13 @@ @Autowired private ApplicationEventPublisher applicationEventPublisher; + @Autowired + private MediaConfig mediaConfig; + + @Autowired + private SendRtpPortManager sendRtpPortManager; + + /** * 娴佸埌鏉ョ殑澶勭悊 @@ -85,7 +95,9 @@ public void onApplicationEvent(MediaArrivalEvent event) { if ("rtsp".equals(event.getSchema())) { logger.info("娴佸彉鍖栵細娉ㄥ唽 app->{}, stream->{}", event.getApp(), event.getStream()); - addCount(event.getSeverId()); + addCount(event.getMediaServer().getId()); + String type = OriginType.values()[event.getMediaInfo().getOriginType()].getType(); + redisCatchStorage.addStream(event.getMediaServer(), type, event.getApp(), event.getStream(), event.getMediaInfo()); } } @@ -97,8 +109,16 @@ public void onApplicationEvent(MediaDepartureEvent event) { if ("rtsp".equals(event.getSchema())) { logger.info("娴佸彉鍖栵細娉ㄩ攢, app->{}, stream->{}", event.getApp(), event.getStream()); - removeCount(event.getSeverId()); + removeCount(event.getMediaServer().getId()); + MediaInfo mediaInfo = redisCatchStorage.getStreamInfo( + event.getApp(), event.getStream(), event.getMediaServer().getId()); + if (mediaInfo == null) { + return; + } + String type = OriginType.values()[mediaInfo.getOriginType()].getType(); + redisCatchStorage.removeStream(mediaInfo.getMediaServer().getId(), type, event.getApp(), event.getStream()); } + } @@ -117,7 +137,7 @@ ssrcFactory.initMediaServerSSRC(mediaServer.getId(), null); } // 鏌ヨredis鏄惁瀛樺湪姝ediaServer - String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServer.getId(); + String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + ":" + mediaServer.getId(); Boolean hasKey = redisTemplate.hasKey(key); if (hasKey != null && ! hasKey) { redisTemplate.opsForValue().set(key, mediaServer); @@ -128,7 +148,7 @@ @Override public SSRCInfo openRTPServer(MediaServer mediaServer, String streamId, String presetSsrc, boolean ssrcCheck, - boolean isPlayback, Integer port, Boolean onlyAuto, Boolean reUsePort, Integer tcpMode) { + boolean isPlayback, Integer port, Boolean onlyAuto, Boolean disableAudio, Boolean reUsePort, Integer tcpMode) { if (mediaServer == null || mediaServer.getId() == null) { logger.info("[openRTPServer] 澶辫触, mediaServer == null || mediaServer.getId() == null"); return null; @@ -159,18 +179,12 @@ logger.info("[openRTPServer] 澶辫触, mediaServer鐨勭被鍨嬶細 {}锛屾湭鎵惧埌瀵瑰簲鐨勫疄鐜扮被", mediaServer.getType()); return null; } - rtpServerPort = mediaNodeServerService.createRTPServer(mediaServer, streamId, ssrcCheck ? Long.parseLong(ssrc) : 0, port, onlyAuto, reUsePort, tcpMode); + rtpServerPort = mediaNodeServerService.createRTPServer(mediaServer, streamId, ssrcCheck ? Long.parseLong(ssrc) : 0, port, onlyAuto, disableAudio, reUsePort, tcpMode); } else { rtpServerPort = mediaServer.getRtpProxyPort(); } return new SSRCInfo(rtpServerPort, ssrc, streamId); } - - @Override - public SSRCInfo openRTPServer(MediaServer mediaServer, String streamId, String ssrc, boolean ssrcCheck, boolean isPlayback, Integer port, Boolean onlyAuto) { - return openRTPServer(mediaServer, streamId, ssrc, ssrcCheck, isPlayback, port, onlyAuto, null, 0); - } - @Override public void closeRTPServer(MediaServer mediaServer, String streamId) { @@ -259,7 +273,7 @@ if (mediaServerInRedis == null || !ssrcFactory.hasMediaServerSSRC(mediaServerInDataBase.getId())) { ssrcFactory.initMediaServerSSRC(mediaServerInDataBase.getId(),null); } - String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerInDataBase.getId(); + String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + ":" + mediaServerInDataBase.getId(); redisTemplate.opsForValue().set(key, mediaServerInDataBase); if (mediaServerInDataBase.isStatus()) { resetOnlineServerItem(mediaServerInDataBase); @@ -275,8 +289,8 @@ @Override public List<MediaServer> getAllOnlineList() { List<MediaServer> result = new ArrayList<>(); - List<Object> mediaServerKeys = RedisUtil.scan(redisTemplate, String.format("%S*", VideoManagerConstants.MEDIA_SERVER_PREFIX+ userSetting.getServerId() + "_" )); - String onlineKey = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId(); + List<Object> mediaServerKeys = RedisUtil.scan(redisTemplate, String.format("%S*", VideoManagerConstants.MEDIA_SERVER_PREFIX+ userSetting.getServerId() + ":" )); + String onlineKey = VideoManagerConstants.ONLINE_MEDIA_SERVERS_PREFIX + userSetting.getServerId(); for (Object mediaServerKey : mediaServerKeys) { String key = (String) mediaServerKey; MediaServer mediaServer = JsonUtil.redisJsonToObject(redisTemplate, key, MediaServer.class); @@ -324,14 +338,14 @@ @Override public List<MediaServer> getAllOnline() { - String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId(); + String key = VideoManagerConstants.ONLINE_MEDIA_SERVERS_PREFIX + userSetting.getServerId(); Set<Object> mediaServerIdSet = redisTemplate.opsForZSet().reverseRange(key, 0, -1); List<MediaServer> result = new ArrayList<>(); if (mediaServerIdSet != null && mediaServerIdSet.size() > 0) { for (Object mediaServerId : mediaServerIdSet) { String mediaServerIdStr = (String) mediaServerId; - String serverKey = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerIdStr; + String serverKey = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + ":" + mediaServerIdStr; result.add((MediaServer) redisTemplate.opsForValue().get(serverKey)); } } @@ -349,7 +363,7 @@ if (mediaServerId == null) { return null; } - String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerId; + String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + ":" + mediaServerId; return JsonUtil.redisJsonToObject(redisTemplate, key, MediaServer.class); } @@ -361,7 +375,7 @@ @Override public void clearMediaServerForOnline() { - String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId(); + String key = VideoManagerConstants.ONLINE_MEDIA_SERVERS_PREFIX + userSetting.getServerId(); redisTemplate.delete(key); } @@ -385,6 +399,7 @@ logger.info("[娣诲姞濯掍綋鑺傜偣] 澶辫触, mediaServer鐨勭被鍨嬶細 {}锛屾湭鎵惧埌瀵瑰簲鐨勫疄鐜扮被", mediaServer.getType()); return; } + mediaServerMapper.add(mediaServer); if (mediaServer.isStatus()) { mediaNodeServerService.online(mediaServer); @@ -399,7 +414,7 @@ @Override public void resetOnlineServerItem(MediaServer serverItem) { // 鏇存柊缂撳瓨 - String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId(); + String key = VideoManagerConstants.ONLINE_MEDIA_SERVERS_PREFIX + userSetting.getServerId(); // 浣跨敤zset鐨勫垎鏁颁綔涓哄綋鍓嶅苟鍙戦噺锛� 榛樿鍊艰缃负0 if (redisTemplate.opsForZSet().score(key, serverItem.getId()) == null) { // 涓嶅瓨鍦ㄥ垯璁剧疆榛樿鍊� 宸插瓨鍦ㄥ垯閲嶇疆 redisTemplate.opsForZSet().add(key, serverItem.getId(), 0L); @@ -422,14 +437,14 @@ if (mediaServerId == null) { return; } - String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId(); + String key = VideoManagerConstants.ONLINE_MEDIA_SERVERS_PREFIX + userSetting.getServerId(); redisTemplate.opsForZSet().incrementScore(key, mediaServerId, 1); } @Override public void removeCount(String mediaServerId) { - String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId(); + String key = VideoManagerConstants.ONLINE_MEDIA_SERVERS_PREFIX + userSetting.getServerId(); redisTemplate.opsForZSet().incrementScore(key, mediaServerId, - 1); } @@ -439,7 +454,7 @@ */ @Override public MediaServer getMediaServerForMinimumLoad(Boolean hasAssist) { - String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId(); + String key = VideoManagerConstants.ONLINE_MEDIA_SERVERS_PREFIX + userSetting.getServerId(); Long size = redisTemplate.opsForZSet().zCard(key); if (size == null || size == 0) { logger.info("鑾峰彇璐熻浇鏈�浣庣殑鑺傜偣鏃舵棤鍦ㄧ嚎鑺傜偣"); @@ -518,8 +533,8 @@ @Override public void delete(String id) { mediaServerMapper.delOne(id); - redisTemplate.opsForZSet().remove(VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId(), id); - String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + id; + redisTemplate.opsForZSet().remove(VideoManagerConstants.ONLINE_MEDIA_SERVERS_PREFIX + userSetting.getServerId(), id); + String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + ":" + id; redisTemplate.delete(key); // 鍙戦�佽妭鐐圭Щ闄ら�氱煡 MediaServerDeleteEvent event = new MediaServerDeleteEvent(this); @@ -575,6 +590,16 @@ return false; } return mediaNodeServerService.stopSendRtp(mediaInfo, app, stream, ssrc); + } + + @Override + public boolean initStopSendRtp(MediaServer mediaInfo, String app, String stream, String ssrc) { + IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaInfo.getType()); + if (mediaNodeServerService == null) { + logger.info("[stopSendRtp] 澶辫触, mediaServer鐨勭被鍨嬶細 {}锛屾湭鎵惧埌瀵瑰簲鐨勫疄鐜扮被", mediaInfo.getType()); + return false; + } + return mediaNodeServerService.initStopSendRtp(mediaInfo, app, stream, ssrc); } @Override @@ -716,4 +741,175 @@ } return mediaNodeServerService.getFFmpegCMDs(mediaServer); } + + @Override + public StreamInfo getStreamInfoByAppAndStream(MediaServer mediaServerItem, String app, String stream, MediaInfo mediaInfo, String callId) { + return getStreamInfoByAppAndStream(mediaServerItem, app, stream, mediaInfo, null, callId, true); + } + + @Override + public StreamInfo getStreamInfoByAppAndStreamWithCheck(String app, String stream, String mediaServerId, String addr, boolean authority) { + StreamInfo streamInfo = null; + if (mediaServerId == null) { + mediaServerId = mediaConfig.getId(); + } + MediaServer mediaInfo = getOne(mediaServerId); + if (mediaInfo == null) { + return null; + } + String calld = null; + StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(app, stream); + if (streamAuthorityInfo != null) { + calld = streamAuthorityInfo.getCallId(); + } + List<StreamInfo> streamInfoList = getMediaList(mediaInfo, app, stream, calld); + if (streamInfoList.isEmpty()) { + return null; + }else { + return streamInfoList.get(0); + } + } + + + + @Override + public StreamInfo getStreamInfoByAppAndStreamWithCheck(String app, String stream, String mediaServerId, boolean authority) { + return getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, null, authority); + } + + @Override + public StreamInfo getStreamInfoByAppAndStream(MediaServer mediaServer, String app, String stream, MediaInfo mediaInfo, String addr, String callId, boolean isPlay) { + StreamInfo streamInfoResult = new StreamInfo(); + streamInfoResult.setStream(stream); + streamInfoResult.setApp(app); + if (addr == null) { + addr = mediaServer.getStreamIp(); + } + + streamInfoResult.setIp(addr); + streamInfoResult.setMediaServerId(mediaServer.getId()); + String callIdParam = ObjectUtils.isEmpty(callId)?"":"?callId=" + callId; + streamInfoResult.setRtmp(addr, mediaServer.getRtmpPort(),mediaServer.getRtmpSSlPort(), app, stream, callIdParam); + streamInfoResult.setRtsp(addr, mediaServer.getRtspPort(),mediaServer.getRtspSSLPort(), app, stream, callIdParam); + + + if ("abl".equals(mediaServer.getType())) { + String flvFile = String.format("%s/%s.flv%s", app, stream, callIdParam); + streamInfoResult.setFlv(addr, mediaServer.getFlvPort(),mediaServer.getFlvSSLPort(), flvFile); + streamInfoResult.setWsFlv(addr, mediaServer.getWsFlvPort(),mediaServer.getWsFlvSSLPort(), flvFile); + }else { + String flvFile = String.format("%s/%s.live.flv%s", app, stream, callIdParam); + streamInfoResult.setFlv(addr, mediaServer.getFlvPort(),mediaServer.getFlvSSLPort(), flvFile); + streamInfoResult.setWsFlv(addr, mediaServer.getWsFlvPort(),mediaServer.getWsFlvSSLPort(), flvFile); + } + + streamInfoResult.setFmp4(addr, mediaServer.getHttpPort(),mediaServer.getHttpSSlPort(), app, stream, callIdParam); + streamInfoResult.setHls(addr, mediaServer.getHttpPort(),mediaServer.getHttpSSlPort(), app, stream, callIdParam); + streamInfoResult.setTs(addr, mediaServer.getHttpPort(),mediaServer.getHttpSSlPort(), app, stream, callIdParam); + streamInfoResult.setRtc(addr, mediaServer.getHttpPort(),mediaServer.getHttpSSlPort(), app, stream, callIdParam, isPlay); + + streamInfoResult.setMediaInfo(mediaInfo); + return streamInfoResult; + } + + @Override + public Boolean isStreamReady(MediaServer mediaServer, String rtp, String streamId) { + IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType()); + if (mediaNodeServerService == null) { + logger.info("[isStreamReady] 澶辫触, mediaServer鐨勭被鍨嬶細 {}锛屾湭鎵惧埌瀵瑰簲鐨勫疄鐜扮被", mediaServer.getType()); + return false; + } + MediaInfo mediaInfo = mediaNodeServerService.getMediaInfo(mediaServer, rtp, streamId); + return mediaInfo != null; + } + + @Override + public void startSendRtpPassive(MediaServer mediaServer, SendRtpItem sendRtpItem, Integer timeout) { + IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType()); + if (mediaNodeServerService == null) { + logger.info("[startSendRtpPassive] 澶辫触, mediaServer鐨勭被鍨嬶細 {}锛屾湭鎵惧埌瀵瑰簲鐨勫疄鐜扮被", mediaServer.getType()); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "鏈壘鍒癿ediaServer瀵瑰簲鐨勫疄鐜扮被"); + } + mediaNodeServerService.startSendRtpPassive(mediaServer, sendRtpItem, timeout); + } + + @Override + public void startSendRtp(MediaServer mediaServer, SendRtpItem sendRtpItem) { + IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType()); + if (mediaNodeServerService == null) { + logger.info("[startSendRtpStream] 澶辫触, mediaServer鐨勭被鍨嬶細 {}锛屾湭鎵惧埌瀵瑰簲鐨勫疄鐜扮被", mediaServer.getType()); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "鏈壘鍒癿ediaServer瀵瑰簲鐨勫疄鐜扮被"); + } + logger.info("[寮�濮嬫帹娴乚 rtp/{}, 鐩爣={}:{}锛孲SRC={}, RTCP={}", sendRtpItem.getStream(), + sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp()); + mediaNodeServerService.startSendRtpStream(mediaServer, sendRtpItem); + } + + @Override + public SendRtpItem createSendRtpItem(MediaServer mediaServer, String ip, int port, String ssrc, String requesterId, String deviceId, String channelId, boolean isTcp, boolean rtcp) { + int localPort = sendRtpPortManager.getNextPort(mediaServer); + if (localPort == 0) { + return null; + } + SendRtpItem sendRtpItem = new SendRtpItem(); + sendRtpItem.setIp(ip); + sendRtpItem.setPort(port); + sendRtpItem.setSsrc(ssrc); + sendRtpItem.setPlatformId(deviceId); + sendRtpItem.setDeviceId(deviceId); + sendRtpItem.setChannelId(channelId); + sendRtpItem.setTcp(isTcp); + sendRtpItem.setRtcp(rtcp); + sendRtpItem.setApp("rtp"); + sendRtpItem.setLocalPort(localPort); + sendRtpItem.setServerId(userSetting.getServerId()); + sendRtpItem.setMediaServerId(mediaServer.getId()); + return sendRtpItem; + } + + @Override + public SendRtpItem createSendRtpItem(MediaServer serverItem, String ip, int port, String ssrc, String platformId, + String app, String stream, String channelId, boolean tcp, boolean rtcp){ + + int localPort = sendRtpPortManager.getNextPort(serverItem); + if (localPort == 0) { + return null; + } + SendRtpItem sendRtpItem = new SendRtpItem(); + sendRtpItem.setIp(ip); + sendRtpItem.setPort(port); + sendRtpItem.setSsrc(ssrc); + sendRtpItem.setApp(app); + sendRtpItem.setStream(stream); + sendRtpItem.setPlatformId(platformId); + sendRtpItem.setChannelId(channelId); + sendRtpItem.setTcp(tcp); + sendRtpItem.setLocalPort(localPort); + sendRtpItem.setServerId(userSetting.getServerId()); + sendRtpItem.setMediaServerId(serverItem.getId()); + sendRtpItem.setRtcp(rtcp); + return sendRtpItem; + } + + @Override + public MediaServer getMediaServerByAppAndStream(String app, String stream) { + List<MediaServer> mediaServerList = getAll(); + for (MediaServer mediaServer : mediaServerList) { + MediaInfo mediaInfo = getMediaInfo(mediaServer, app, stream); + if (mediaInfo != null) { + return mediaServer; + } + } + return null; + } + + @Override + public Long updateDownloadProcess(MediaServer mediaServer, String app, String stream) { + IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType()); + if (mediaNodeServerService == null) { + logger.info("[updateDownloadProcess] 澶辫触, mediaServer鐨勭被鍨嬶細 {}锛屾湭鎵惧埌瀵瑰簲鐨勫疄鐜扮被", mediaServer.getType()); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "鏈壘鍒癿ediaServer瀵瑰簲鐨勫疄鐜扮被"); + } + return mediaNodeServerService.updateDownloadProcess(mediaServer, app, stream); + } } -- Gitblit v1.8.0