From 1768565ce045d4beddccc82d10b533ea3022cf8d Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期一, 18 三月 2024 23:42:18 +0800 Subject: [PATCH] 优化媒体节点服务的代码结构 --- src/main/java/com/genersoft/iot/vmp/media/impl/MediaServerServiceImpl.java | 145 ++++++++++++++++++++++++++---------------------- 1 files changed, 79 insertions(+), 66 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/media/impl/MediaServerServiceImpl.java similarity index 83% rename from src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java rename to src/main/java/com/genersoft/iot/vmp/media/impl/MediaServerServiceImpl.java index 190d665..4e8ed27 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/media/impl/MediaServerServiceImpl.java @@ -1,4 +1,4 @@ -package com.genersoft.iot.vmp.service.impl; +package com.genersoft.iot.vmp.media.impl; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONArray; @@ -12,11 +12,12 @@ import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; +import com.genersoft.iot.vmp.media.IMediaNodeServerService; import com.genersoft.iot.vmp.media.zlm.*; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.ServerKeepaliveData; import com.genersoft.iot.vmp.service.IInviteStreamService; -import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.media.IMediaServerService; import com.genersoft.iot.vmp.service.bean.MediaServerLoad; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; @@ -25,30 +26,23 @@ import com.genersoft.iot.vmp.utils.JsonUtil; import com.genersoft.iot.vmp.utils.redis.RedisUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; -import com.genersoft.iot.vmp.vmanager.bean.RecordFile; import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Response; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.jdbc.datasource.DataSourceTransactionManager; -import org.springframework.scheduling.annotation.Async; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionStatus; -import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; import java.io.File; import java.time.LocalDateTime; import java.util.*; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; /** * 濯掍綋鏈嶅姟鍣ㄨ妭鐐圭鐞� @@ -83,9 +77,6 @@ private AssistRESTfulUtils assistRESTfulUtils; @Autowired - private ZLMRESTfulUtils zlmresTfulUtils; - - @Autowired private MediaServerMapper mediaServerMapper; @Autowired @@ -113,20 +104,16 @@ @Autowired private RedisTemplate<Object, Object> redisTemplate; - @Qualifier("taskExecutor") @Autowired - private ThreadPoolTaskExecutor taskExecutor; - - - + private Map<String, IMediaNodeServerService> nodeServerServiceMap; /** * 鍒濆鍖� */ @Override - public void updateVmServer(List<MediaServerItem> mediaServerItemList) { - logger.info("[zlm] 缂撳瓨鍒濆鍖� "); + public void updateVmServer(List<MediaServerItem> mediaServerItemList) { + logger.info("[濯掍綋鏈嶅姟鑺傜偣] 缂撳瓨鍒濆鍖� "); for (MediaServerItem mediaServerItem : mediaServerItemList) { if (ObjectUtils.isEmpty(mediaServerItem.getId())) { continue; @@ -173,7 +160,12 @@ } int rtpServerPort; if (mediaServerItem.isRtpEnable()) { - rtpServerPort = zlmServerFactory.createRTPServer(mediaServerItem, streamId, ssrcCheck ? Long.parseLong(ssrc) : 0, port, onlyAuto, reUsePort, tcpMode); + IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServerItem.getType()); + if (mediaNodeServerService == null) { + logger.info("[openRTPServer] 澶辫触, mediaServerItem鐨勭被鍨嬶細 {}锛屾湭鎵惧埌瀵瑰簲鐨勫疄鐜扮被", mediaServerItem.getType()); + return null; + } + rtpServerPort = mediaNodeServerService.createRTPServer(mediaServerItem, streamId, ssrcCheck ? Long.parseLong(ssrc) : 0, port, onlyAuto, reUsePort, tcpMode); } else { rtpServerPort = mediaServerItem.getRtpProxyPort(); } @@ -191,7 +183,12 @@ if (mediaServerItem == null) { return; } - zlmServerFactory.closeRtpServer(mediaServerItem, streamId); + IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServerItem.getType()); + if (mediaNodeServerService == null) { + logger.info("[closeRTPServer] 澶辫触, mediaServerItem鐨勭被鍨嬶細 {}锛屾湭鎵惧埌瀵瑰簲鐨勫疄鐜扮被", mediaServerItem.getType()); + return; + } + mediaNodeServerService.closeRtpServer(mediaServerItem, streamId); } @Override @@ -200,21 +197,42 @@ callback.run(false); return; } - zlmServerFactory.closeRtpServer(mediaServerItem, streamId, callback); + IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServerItem.getType()); + if (mediaNodeServerService == null) { + logger.info("[closeRTPServer] 澶辫触, mediaServerItem鐨勭被鍨嬶細 {}锛屾湭鎵惧埌瀵瑰簲鐨勫疄鐜扮被", mediaServerItem.getType()); + return; + } + mediaNodeServerService.closeRtpServer(mediaServerItem, streamId, callback); } @Override public void closeRTPServer(String mediaServerId, String streamId) { MediaServerItem mediaServerItem = this.getOne(mediaServerId); - if (mediaServerItem != null && mediaServerItem.isRtpEnable()) { + if (mediaServerItem == null) { + return; + } + if (mediaServerItem.isRtpEnable()) { closeRTPServer(mediaServerItem, streamId); } - zlmresTfulUtils.closeStreams(mediaServerItem, "rtp", streamId); + IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServerItem.getType()); + if (mediaNodeServerService == null) { + logger.info("[closeRTPServer] 澶辫触, mediaServerItem鐨勭被鍨嬶細 {}锛屾湭鎵惧埌瀵瑰簲鐨勫疄鐜扮被", mediaServerItem.getType()); + return; + } + mediaNodeServerService.closeStreams(mediaServerItem, "rtp", streamId); } @Override public Boolean updateRtpServerSSRC(MediaServerItem mediaServerItem, String streamId, String ssrc) { - return zlmServerFactory.updateRtpServerSSRC(mediaServerItem, streamId, ssrc); + if (mediaServerItem == null) { + return false; + } + IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServerItem.getType()); + if (mediaNodeServerService == null) { + logger.info("[updateRtpServerSSRC] 澶辫触, mediaServerItem鐨勭被鍨嬶細 {}锛屾湭鎵惧埌瀵瑰簲鐨勫疄鐜扮被", mediaServerItem.getType()); + return false; + } + return mediaNodeServerService.updateRtpServerSSRC(mediaServerItem, streamId, ssrc); } @Override @@ -227,12 +245,11 @@ } /** - * zlm 閲嶅惎鍚庨噸缃粬鐨勬帹娴佷俊鎭紝 TODO 缁欐鍦ㄤ娇鐢ㄧ殑璁惧鍙戦�佸仠姝㈠懡浠� + * 濯掍綋鏈嶅姟鑺傜偣 閲嶅惎鍚庨噸缃粬鐨勬帹娴佷俊鎭紝 TODO 缁欐鍦ㄤ娇鐢ㄧ殑璁惧鍙戦�佸仠姝㈠懡浠� */ @Override public void clearRTPServer(MediaServerItem mediaServerItem) { ssrcFactory.reset(mediaServerItem.getId()); - } @@ -301,7 +318,7 @@ } /** - * 鑾峰彇鍗曚釜zlm鏈嶅姟鍣� + * 鑾峰彇鍗曚釜濯掍綋鏈嶅姟鑺傜偣鏈嶅姟鍣� * @param mediaServerId 鏈嶅姟id * @return MediaServerItem */ @@ -331,24 +348,20 @@ mediaServerItem.setCreateTime(DateUtil.getNow()); mediaServerItem.setUpdateTime(DateUtil.getNow()); mediaServerItem.setHookAliveInterval(30f); - JSONObject responseJSON = zlmresTfulUtils.getMediaServerConfig(mediaServerItem); - if (responseJSON != null) { - JSONArray data = responseJSON.getJSONArray("data"); - if (data != null && data.size() > 0) { - ZLMServerConfig zlmServerConfig= JSON.parseObject(JSON.toJSONString(data.get(0)), ZLMServerConfig.class); - if (mediaServerMapper.queryOne(zlmServerConfig.getGeneralMediaServerId()) != null) { - throw new ControllerException(ErrorCode.ERROR100.getCode(),"淇濆瓨澶辫触锛屽獟浣撴湇鍔D [ " + zlmServerConfig.getGeneralMediaServerId() + " ] 宸插瓨鍦紝璇蜂慨鏀瑰獟浣撴湇鍔″櫒閰嶇疆"); - } - mediaServerItem.setId(zlmServerConfig.getGeneralMediaServerId()); - zlmServerConfig.setIp(mediaServerItem.getIp()); - mediaServerMapper.add(mediaServerItem); - zlmServerOnline(zlmServerConfig); - }else { - throw new ControllerException(ErrorCode.ERROR100.getCode(),"杩炴帴澶辫触"); - } - + if (mediaServerItem.getType() == null) { + logger.info("[娣诲姞濯掍綋鑺傜偣] 澶辫触, mediaServerItem鐨勭被鍨嬶細涓虹┖"); + return; + } + IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServerItem.getType()); + if (mediaNodeServerService == null) { + logger.info("[娣诲姞濯掍綋鑺傜偣] 澶辫触, mediaServerItem鐨勭被鍨嬶細 {}锛屾湭鎵惧埌瀵瑰簲鐨勫疄鐜扮被", mediaServerItem.getType()); + return; + } + if (mediaNodeServerService.checkNodeId(mediaServerItem)) { + mediaServerMapper.add(mediaServerItem); + mediaNodeServerService.online(mediaServerItem); }else { - throw new ControllerException(ErrorCode.ERROR100.getCode(),"杩炴帴澶辫触"); + throw new ControllerException(ErrorCode.ERROR100.getCode(),"淇濆瓨澶辫触锛屽獟浣撴湇鍔D [ " + mediaServerItem.getId() + " ] 宸插瓨鍦紝璇蜂慨鏀瑰獟浣撴湇鍔″櫒閰嶇疆"); } } @@ -364,7 +377,7 @@ TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); int delResult = mediaServerMapper.delDefault(); if (delResult == 0) { - logger.error("绉婚櫎鏁版嵁搴撻粯璁lm鑺傜偣澶辫触"); + logger.error("绉婚櫎鏁版嵁搴撻粯璁ゅ獟浣撴湇鍔¤妭鐐硅妭鐐瑰け璐�"); //浜嬪姟鍥炴粴 dataSourceTransactionManager.rollback(transactionStatus); return 0; @@ -378,19 +391,19 @@ } /** - * 澶勭悊zlm涓婄嚎 - * @param zlmServerConfig zlm涓婄嚎鎼哄甫鐨勫弬鏁� + * 澶勭悊濯掍綋鏈嶅姟鑺傜偣涓婄嚎 + * @param zlmServerConfig 濯掍綋鏈嶅姟鑺傜偣涓婄嚎鎼哄甫鐨勫弬鏁� */ @Override public void zlmServerOnline(ZLMServerConfig zlmServerConfig) { MediaServerItem serverItem = mediaServerMapper.queryOne(zlmServerConfig.getGeneralMediaServerId()); if (serverItem == null) { - logger.warn("[鏈敞鍐岀殑zlm] 鎷掓帴鎺ュ叆锛歿}鏉ヨ嚜{}锛歿}", zlmServerConfig.getGeneralMediaServerId(), zlmServerConfig.getIp(),zlmServerConfig.getHttpPort() ); - logger.warn("璇锋鏌LM鐨�<general.mediaServerId>閰嶇疆鏄惁涓嶹VP鐨�<media.id>涓�鑷�"); + logger.warn("[鏈敞鍐岀殑濯掍綋鏈嶅姟鑺傜偣] 鎷掓帴鎺ュ叆锛歿}鏉ヨ嚜{}锛歿}", zlmServerConfig.getGeneralMediaServerId(), zlmServerConfig.getIp(),zlmServerConfig.getHttpPort() ); + logger.warn("璇锋鏌ュ獟浣撴湇鍔¤妭鐐圭殑ID閰嶇疆鏄惁涓嶹VP鐨�<media.id>涓�鑷�"); return; }else { - logger.info("[ZLM] 姝e湪杩炴帴 : {} -> {}:{}", + logger.info("[濯掍綋鏈嶅姟鑺傜偣] 姝e湪杩炴帴 : {} -> {}:{}", zlmServerConfig.getGeneralMediaServerId(), zlmServerConfig.getIp(), zlmServerConfig.getHttpPort()); } serverItem.setHookAliveInterval(zlmServerConfig.getHookAliveInterval()); @@ -418,7 +431,7 @@ serverItem.setStatus(true); if (ObjectUtils.isEmpty(serverItem.getId())) { - logger.warn("[鏈敞鍐岀殑zlm] serverItem缂哄皯ID锛� 鏃犳硶鎺ュ叆锛歿}锛歿}", zlmServerConfig.getIp(),zlmServerConfig.getHttpPort() ); + logger.warn("[鏈敞鍐岀殑濯掍綋鏈嶅姟鑺傜偣] serverItem缂哄皯ID锛� 鏃犳硶鎺ュ叆锛歿}锛歿}", zlmServerConfig.getIp(),zlmServerConfig.getHttpPort() ); return; } mediaServerMapper.update(serverItem); @@ -436,9 +449,9 @@ final String zlmKeepaliveKey = zlmKeepaliveKeyPrefix + serverItem.getId(); dynamicTask.stop(zlmKeepaliveKey); dynamicTask.startDelay(zlmKeepaliveKey, new KeepAliveTimeoutRunnable(serverItem), (serverItem.getHookAliveInterval().intValue() + 5) * 1000); - publisher.zlmOnlineEventPublish(serverItem.getId()); + publisher.mediaServerOnlineEventPublish(serverItem.getId()); - logger.info("[ZLM] 杩炴帴鎴愬姛 {} - {}:{} ", + logger.info("[濯掍綋鏈嶅姟鑺傜偣] 杩炴帴鎴愬姛 {} - {}:{} ", zlmServerConfig.getGeneralMediaServerId(), zlmServerConfig.getIp(), zlmServerConfig.getHttpPort()); } @@ -452,12 +465,12 @@ @Override public void run() { - logger.info("[zlm蹇冭烦鍒版湡]锛�" + serverItem.getId()); + logger.info("[濯掍綋鏈嶅姟鑺傜偣蹇冭烦鍒版湡]锛�" + serverItem.getId()); // 鍙戣捣http璇锋眰楠岃瘉zlm鏄惁纭疄鏃犳硶杩炴帴锛屽鏋滅‘瀹炴棤娉曡繛鎺ュ垯鍙戦�佺绾夸簨浠讹紝鍚﹀垯涓嶄綔澶勭悊 JSONObject mediaServerConfig = zlmresTfulUtils.getMediaServerConfig(serverItem); if (mediaServerConfig != null && mediaServerConfig.getInteger("code") == 0) { - logger.info("[zlm蹇冭烦鍒版湡]锛歿}楠岃瘉鍚巣lm浠嶅湪绾匡紝鎭㈠蹇冭烦淇℃伅,璇锋鏌lm鏄惁鍙互姝e父鍚憌vp鍙戦�佸績璺�", serverItem.getId()); - // 娣诲姞zlm淇℃伅 + logger.info("[濯掍綋鏈嶅姟鑺傜偣蹇冭烦鍒版湡]锛歿}楠岃瘉鍚庡獟浣撴湇鍔¤妭鐐逛粛鍦ㄧ嚎锛屾仮澶嶅績璺充俊鎭�,璇锋鏌ュ獟浣撴湇鍔¤妭鐐规槸鍚﹀彲浠ユ甯稿悜wvp鍙戦�佸績璺�", serverItem.getId()); + // 娣诲姞濯掍綋鏈嶅姟鑺傜偣淇℃伅 updateMediaServerKeepalive(serverItem.getId(), null); }else { publisher.zlmOfflineEventPublish(serverItem.getId()); @@ -556,13 +569,13 @@ } /** - * 瀵箊lm鏈嶅姟鍣ㄨ繘琛屽熀纭�閰嶇疆 + * 瀵瑰獟浣撴湇鍔¤妭鐐规湇鍔″櫒杩涜鍩虹閰嶇疆 * @param mediaServerItem 鏈嶅姟ID - * @param restart 鏄惁閲嶅惎zlm + * @param restart 鏄惁閲嶅惎濯掍綋鏈嶅姟鑺傜偣 */ @Override public void setZLMConfig(MediaServerItem mediaServerItem, boolean restart) { - logger.info("[ZLM] 姝e湪璁剧疆 锛歿} -> {}:{}", + logger.info("[濯掍綋鏈嶅姟鑺傜偣] 姝e湪璁剧疆 锛歿} -> {}:{}", mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort()); String protocol = sslEnabled ? "https" : "http"; String hookPrefix = String.format("%s://%s:%s/index/hook", protocol, mediaServerItem.getHookIp(), serverPort); @@ -612,17 +625,17 @@ if (responseJSON != null && responseJSON.getInteger("code") == 0) { if (restart) { - logger.info("[ZLM] 璁剧疆鎴愬姛,寮�濮嬮噸鍚互淇濊瘉閰嶇疆鐢熸晥 {} -> {}:{}", + logger.info("[濯掍綋鏈嶅姟鑺傜偣] 璁剧疆鎴愬姛,寮�濮嬮噸鍚互淇濊瘉閰嶇疆鐢熸晥 {} -> {}:{}", mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort()); zlmresTfulUtils.restartServer(mediaServerItem); }else { - logger.info("[ZLM] 璁剧疆鎴愬姛 {} -> {}:{}", + logger.info("[濯掍綋鏈嶅姟鑺傜偣] 璁剧疆鎴愬姛 {} -> {}:{}", mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort()); } }else { - logger.info("[ZLM] 璁剧疆zlm澶辫触 {} -> {}:{}", + logger.info("[濯掍綋鏈嶅姟鑺傜偣] 璁剧疆濯掍綋鏈嶅姟鑺傜偣澶辫触 {} -> {}:{}", mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort()); } @@ -701,11 +714,11 @@ // 缂撳瓨涓嶅瓨鍦紝浠庢暟鎹簱鏌ヨ锛屽鏋滄暟鎹簱涓嶅瓨鍦ㄥ垯鏄敊璇殑 mediaServerItem = getOneFromDatabase(mediaServerId); if (mediaServerItem == null) { - logger.warn("[鏇存柊ZLM 淇濇椿淇℃伅] 娴佸獟浣搟}灏氭湭鍔犲叆浣跨敤,璇锋鏌ヨ妭鐐逛腑鏄惁鍚湁姝ゆ祦濯掍綋 ", mediaServerId); + logger.warn("[鏇存柊濯掍綋鏈嶅姟鑺傜偣 淇濇椿淇℃伅] 娴佸獟浣搟}灏氭湭鍔犲叆浣跨敤,璇锋鏌ヨ妭鐐逛腑鏄惁鍚湁姝ゆ祦濯掍綋 ", mediaServerId); return; } - // zlm杩炴帴閲嶈瘯 - logger.warn("[鏇存柊ZLM 淇濇椿淇℃伅]灏濊瘯閾炬帴zml id {}", mediaServerId); + // 濯掍綋鏈嶅姟鑺傜偣杩炴帴閲嶈瘯 + logger.warn("[鏇存柊濯掍綋鏈嶅姟鑺傜偣 淇濇椿淇℃伅]灏濊瘯閾炬帴zml id {}", mediaServerId); ssrcFactory.initMediaServerSSRC(mediaServerItem.getId(), null); String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerItem.getId(); redisTemplate.opsForValue().set(key, mediaServerItem); -- Gitblit v1.8.0