From 66eda32ab97d6e94e9f274d6faa4df586c452dfb Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期日, 25 六月 2023 10:18:29 +0800 Subject: [PATCH] 优化端口预占用 --- src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java | 188 +++++++++++++++++++++------------------------- 1 files changed, 87 insertions(+), 101 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java index 77add28..8e9b3d0 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java @@ -1,22 +1,30 @@ package com.genersoft.iot.vmp.media.zlm; -import com.alibaba.fastjson.JSONArray; -import com.alibaba.fastjson.JSONObject; -import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; -import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; +import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.GbStream; +import com.genersoft.iot.vmp.media.zlm.dto.*; +import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; +import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.service.IStreamProxyService; import com.genersoft.iot.vmp.service.IStreamPushService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import com.genersoft.iot.vmp.storager.IVideoManagerStorager; +import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.storager.dao.GbStreamMapper; import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper; +import com.genersoft.iot.vmp.storager.dao.StreamPushMapper; +import com.genersoft.iot.vmp.utils.DateUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import java.text.ParseException; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +/** + * @author lin + */ @Component public class ZLMMediaListManager { @@ -29,7 +37,7 @@ private IRedisCatchStorage redisCatchStorage; @Autowired - private IVideoManagerStorager storager; + private IVideoManagerStorage storager; @Autowired private GbStreamMapper gbStreamMapper; @@ -41,112 +49,90 @@ private IStreamPushService streamPushService; @Autowired - private ZLMHttpHookSubscribe subscribe; + private IStreamProxyService streamProxyService; + @Autowired + private StreamPushMapper streamPushMapper; - public void updateMediaList() { - storager.clearMediaList(); + @Autowired + private ZlmHttpHookSubscribe subscribe; - // 浣跨敤寮傛鐨勫綋鏃舵洿鏂板獟浣撴祦鍒楄〃 - zlmresTfulUtils.getMediaList((mediaList ->{ - if (mediaList == null) return; - String dataStr = mediaList.getString("data"); + @Autowired + private UserSetting userSetting; - Integer code = mediaList.getInteger("code"); - Map<String, StreamPushItem> result = new HashMap<>(); - List<StreamPushItem> streamPushItems = null; - // 鑾峰彇鎵�鏈夌殑鍥芥爣鍏宠仈 - List<GbStream> gbStreams = gbStreamMapper.selectAll(); - if (code == 0 ) { - if (dataStr != null) { - streamPushItems = streamPushService.handleJSON(dataStr); - } - }else { - logger.warn("鏇存柊瑙嗛娴佸け璐ワ紝閿欒code锛� " + code); - } + @Autowired + private ZLMRTPServerFactory zlmrtpServerFactory; - if (streamPushItems != null) { - storager.updateMediaList(streamPushItems); - for (StreamPushItem streamPushItem : streamPushItems) { - JSONObject jsonObject = new JSONObject(); - jsonObject.put("app", streamPushItem.getApp()); - jsonObject.put("stream", streamPushItem.getStream()); - subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_play,jsonObject,(response)->{ - updateMedia(response.getString("app"), response.getString("stream")); - }); - } - } - })); + @Autowired + private IMediaServerService mediaServerService; - } + private Map<String, ChannelOnlineEvent> channelOnPublishEvents = new ConcurrentHashMap<>(); - public void addMedia(String app, String streamId) { - //浣跨敤寮傛鏇存柊鎺ㄦ祦 - updateMedia(app, streamId); - } - - - public void updateMedia(String app, String streamId) { - //浣跨敤寮傛鏇存柊鎺ㄦ祦 - zlmresTfulUtils.getMediaList(app, streamId, "rtmp", json->{ - - if (json == null) return; - String dataStr = json.getString("data"); - - Integer code = json.getInteger("code"); - Map<String, StreamPushItem> result = new HashMap<>(); - List<StreamPushItem> streamPushItems = null; - if (code == 0 ) { - if (dataStr != null) { - streamPushItems = streamPushService.handleJSON(dataStr); - } - }else { - logger.warn("鏇存柊瑙嗛娴佸け璐ワ紝閿欒code锛� " + code); - } - - if (streamPushItems != null && streamPushItems.size() == 1) { - storager.updateMedia(streamPushItems.get(0)); - } - }); - } - - - public void removeMedia(String app, String streamId) { - // 鏌ユ壘鏄惁鍏宠仈浜嗗浗鏍囷紝 鍏宠仈浜嗕笉鍒犻櫎锛� 缃负绂荤嚎 - StreamProxyItem streamProxyItem = gbStreamMapper.selectOne(app, streamId); - if (streamProxyItem == null) { - storager.removeMedia(app, streamId); + public StreamPushItem addPush(OnStreamChangedHookParam onStreamChangedHookParam) { + StreamPushItem transform = streamPushService.transform(onStreamChangedHookParam); + StreamPushItem pushInDb = streamPushService.getPush(onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream()); + transform.setPushIng(onStreamChangedHookParam.isRegist()); + transform.setUpdateTime(DateUtil.getNow()); + transform.setPushTime(DateUtil.getNow()); + transform.setSelf(userSetting.getServerId().equals(onStreamChangedHookParam.getSeverId())); + if (pushInDb == null) { + transform.setCreateTime(DateUtil.getNow()); + streamPushMapper.add(transform); }else { - storager.mediaOutline(app, streamId); + streamPushMapper.update(transform); + gbStreamMapper.updateMediaServer(onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream(), onStreamChangedHookParam.getMediaServerId()); + } + ChannelOnlineEvent channelOnlineEventLister = getChannelOnlineEventLister(transform.getApp(), transform.getStream()); + if ( channelOnlineEventLister != null) { + try { + channelOnlineEventLister.run(transform.getApp(), transform.getStream(), transform.getServerId());; + } catch (ParseException e) { + logger.error("addPush: ", e); + } + removedChannelOnlineEventLister(transform.getApp(), transform.getStream()); + } + return transform; + } + + public void sendStreamEvent(String app, String stream, String mediaServerId) { + MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); + // 鏌ョ湅鎺ㄦ祦鐘舵�� + Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, app, stream); + if (streamReady != null && streamReady) { + ChannelOnlineEvent channelOnlineEventLister = getChannelOnlineEventLister(app, stream); + if (channelOnlineEventLister != null) { + try { + channelOnlineEventLister.run(app, stream, mediaServerId); + } catch (ParseException e) { + logger.error("sendStreamEvent: ", e); + } + removedChannelOnlineEventLister(app, stream); + } } } - public void clearAllSessions() { - logger.info("娓呯┖鎵�鏈夊浗鏍囩浉鍏崇殑session"); - JSONObject allSessionJSON = zlmresTfulUtils.getAllSession(); - ZLMServerConfig mediaInfo = redisCatchStorage.getMediaInfo(); - HashSet<String> allLocalPorts = new HashSet(); - if (allSessionJSON.getInteger("code") == 0) { - JSONArray data = allSessionJSON.getJSONArray("data"); - if (data.size() > 0) { - for (int i = 0; i < data.size(); i++) { - JSONObject sessionJOSN = data.getJSONObject(i); - Integer local_port = sessionJOSN.getInteger("local_port"); - if (!local_port.equals(Integer.valueOf(mediaInfo.getHttpPort())) && - !local_port.equals(Integer.valueOf(mediaInfo.getHttpSSLport())) && - !local_port.equals(Integer.valueOf(mediaInfo.getRtmpPort())) && - !local_port.equals(Integer.valueOf(mediaInfo.getRtspPort())) && - !local_port.equals(Integer.valueOf(mediaInfo.getRtspSSlport())) && - !local_port.equals(Integer.valueOf(mediaInfo.getHookOnFlowReport()))){ - allLocalPorts.add(sessionJOSN.getInteger("local_port") + ""); - } - } - } + public int removeMedia(String app, String streamId) { + // 鏌ユ壘鏄惁鍏宠仈浜嗗浗鏍囷紝 鍏宠仈浜嗕笉鍒犻櫎锛� 缃负绂荤嚎 + GbStream gbStream = gbStreamMapper.selectOne(app, streamId); + int result; + if (gbStream == null) { + result = storager.removeMedia(app, streamId); + }else { + result =storager.mediaOffline(app, streamId); } - if (allLocalPorts.size() > 0) { - List<String> result = new ArrayList<>(allLocalPorts); - String localPortSStr = String.join(",", result); - zlmresTfulUtils.kickSessions(localPortSStr); - } + return result; } + + public void addChannelOnlineEventLister(String app, String stream, ChannelOnlineEvent callback) { + this.channelOnPublishEvents.put(app + "_" + stream, callback); + } + + public void removedChannelOnlineEventLister(String app, String stream) { + this.channelOnPublishEvents.remove(app + "_" + stream); + } + + public ChannelOnlineEvent getChannelOnlineEventLister(String app, String stream) { + return this.channelOnPublishEvents.get(app + "_" + stream); + } + } -- Gitblit v1.8.0