From 1af77ab5f7c11a4b3d59c1989b51b9fca29679ce Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期二, 18 十月 2022 22:18:49 +0800 Subject: [PATCH] Merge pull request #645 from IKangXu/wvp-28181-2.0 --- src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java | 220 ++++++++++++------------------------------------------ 1 files changed, 50 insertions(+), 170 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java index 959c06e..d615980 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java @@ -1,28 +1,25 @@ package com.genersoft.iot.vmp.media.zlm; -import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.GbStream; import com.genersoft.iot.vmp.media.zlm.dto.*; +import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IStreamProxyService; import com.genersoft.iot.vmp.service.IStreamPushService; -import com.genersoft.iot.vmp.service.bean.ThirdPartyGB; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.storager.dao.GbStreamMapper; import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper; import com.genersoft.iot.vmp.storager.dao.StreamPushMapper; -import org.checkerframework.checker.units.qual.C; +import com.genersoft.iot.vmp.utils.DateUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import org.springframework.util.StringUtils; +import java.text.ParseException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; -import java.util.regex.Matcher; -import java.util.regex.Pattern; /** * @author lin @@ -57,200 +54,83 @@ private StreamPushMapper streamPushMapper; @Autowired - private ZLMHttpHookSubscribe subscribe; + private ZlmHttpHookSubscribe subscribe; @Autowired private UserSetting userSetting; - private Map<String, ChannelOnlineEvent> channelOnlineEvents = new ConcurrentHashMap<>(); + @Autowired + private ZLMRTPServerFactory zlmrtpServerFactory; + @Autowired + private IMediaServerService mediaServerService; - public void updateMediaList(MediaServerItem mediaServerItem) { - storager.clearMediaList(); - - // 浣跨敤寮傛鐨勫綋鏃舵洿鏂板獟浣撴祦鍒楄〃 - zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{ - if (mediaList == null) { - return; - } - String dataStr = mediaList.getString("data"); - - Integer code = mediaList.getInteger("code"); - Map<String, StreamPushItem> result = new HashMap<>(); - List<StreamPushItem> streamPushItems = null; - // 鑾峰彇鎵�鏈夌殑鍥芥爣鍏宠仈 -// List<GbStream> gbStreams = gbStreamMapper.selectAllByMediaServerId(mediaServerItem.getId()); - if (code == 0 ) { - if (dataStr != null) { - streamPushItems = streamPushService.handleJSON(dataStr, mediaServerItem); - } - }else { - logger.warn("鏇存柊瑙嗛娴佸け璐ワ紝閿欒code锛� " + code); - } - - if (streamPushItems != null) { - storager.updateMediaList(streamPushItems); - for (StreamPushItem streamPushItem : streamPushItems) { - JSONObject jsonObject = new JSONObject(); - jsonObject.put("app", streamPushItem.getApp()); - jsonObject.put("stream", streamPushItem.getStream()); - jsonObject.put("mediaServerId", mediaServerItem.getId()); - subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_play,jsonObject, - (MediaServerItem mediaServerItemInuse, JSONObject response)->{ - updateMedia(mediaServerItem, response.getString("app"), response.getString("stream")); - } - ); - } - } - })); - - } - - public void addMedia(MediaServerItem mediaServerItem, String app, String streamId) { - //浣跨敤寮傛鏇存柊鎺ㄦ祦 - updateMedia(mediaServerItem, app, streamId); - } + private Map<String, ChannelOnlineEvent> channelOnPublishEvents = new ConcurrentHashMap<>(); public StreamPushItem addPush(MediaItem mediaItem) { - // 鏌ユ壘姝ょ洿鎾祦鏄惁瀛樺湪redis棰勮gbId StreamPushItem transform = streamPushService.transform(mediaItem); - // 浠巗treamId鍙栧嚭鏌ヨ鍏抽敭鍊� - Pattern pattern = Pattern.compile(userSetting.getThirdPartyGBIdReg()); - Matcher matcher = pattern.matcher(mediaItem.getStream());// 鎸囧畾瑕佸尮閰嶇殑瀛楃涓� - String queryKey = null; - if (matcher.find()) { //姝ゅfind锛堬級姣忔琚皟鐢ㄥ悗锛屼細鍋忕Щ鍒颁笅涓�涓尮閰� - queryKey = matcher.group(); + StreamPushItem pushInDb = streamPushService.getPush(mediaItem.getApp(), mediaItem.getStream()); + transform.setPushIng(mediaItem.isRegist()); + transform.setUpdateTime(DateUtil.getNow()); + transform.setPushTime(DateUtil.getNow()); + transform.setSelf(userSetting.getServerId().equals(mediaItem.getSeverId())); + if (pushInDb == null) { + transform.setCreateTime(DateUtil.getNow()); + streamPushMapper.add(transform); + }else { + streamPushMapper.update(transform); + gbStreamMapper.updateMediaServer(mediaItem.getApp(), mediaItem.getStream(), mediaItem.getMediaServerId()); } - if (queryKey != null) { - ThirdPartyGB thirdPartyGB = redisCatchStorage.queryMemberNoGBId(queryKey); - if (thirdPartyGB != null && !StringUtils.isEmpty(thirdPartyGB.getNationalStandardNo())) { - transform.setGbId(thirdPartyGB.getNationalStandardNo()); - transform.setName(thirdPartyGB.getName()); + ChannelOnlineEvent channelOnlineEventLister = getChannelOnlineEventLister(transform.getApp(), transform.getStream()); + if ( channelOnlineEventLister != null) { + try { + channelOnlineEventLister.run(transform.getApp(), transform.getStream(), transform.getServerId());; + } catch (ParseException e) { + logger.error("addPush: ", e); } + removedChannelOnlineEventLister(transform.getApp(), transform.getStream()); } - if (!StringUtils.isEmpty(transform.getGbId())) { - // 濡傛灉杩欎釜鍥芥爣ID宸茬粡缁欎簡鍏朵粬鎺ㄦ祦涓旀祦宸茬绾匡紝鍒欑Щ闄ゅ叾浠栨帹娴� - List<GbStream> gbStreams = gbStreamMapper.selectByGBId(transform.getGbId()); - if (gbStreams.size() > 0) { - for (GbStream gbStream : gbStreams) { - // 鍑虹幇浣跨敤鐩稿悓鍥芥爣Id鐨勮棰戞祦鏃讹紝浣跨敤鏂版祦鏇挎崲鏃ф祦锛� - if (queryKey != null && gbStream.getApp().equals(mediaItem.getApp())) { - Matcher matcherForStream = pattern.matcher(gbStream.getStream()); - String queryKeyForStream = null; - if (matcherForStream.find()) { //姝ゅfind锛堬級姣忔琚皟鐢ㄥ悗锛屼細鍋忕Щ鍒颁笅涓�涓尮閰� - queryKeyForStream = matcherForStream.group(); - } - if (queryKeyForStream == null || !queryKeyForStream.equals(queryKey)) { - // 姝ゆ椂涓嶆槸鍚屼竴涓祦 - gbStreamMapper.del(gbStream.getApp(), gbStream.getStream()); - if (!gbStream.isStatus()) { - streamPushMapper.del(gbStream.getApp(), gbStream.getStream()); - } - } - } - } - } - List<GbStream> gbStreamList = gbStreamMapper.selectByGBId(transform.getGbId()); - if (gbStreamList != null && gbStreamList.size() == 1) { - transform.setGbStreamId(gbStreamList.get(0).getGbStreamId()); - transform.setPlatformId(gbStreamList.get(0).getPlatformId()); - transform.setCatalogId(gbStreamList.get(0).getCatalogId()); - transform.setGbId(gbStreamList.get(0).getGbId()); - gbStreamMapper.update(transform); - streamPushMapper.del(gbStreamList.get(0).getApp(), gbStreamList.get(0).getStream()); - }else { - transform.setCreateStamp(System.currentTimeMillis()); - gbStreamMapper.add(transform); - } - if (transform != null) { - if (channelOnlineEvents.get(transform.getGbId()) != null) { - channelOnlineEvents.get(transform.getGbId()).run(transform.getApp(), transform.getStream(), transform.getServerId()); - channelOnlineEvents.remove(transform.getGbId()); - } - } - } - - storager.updateMedia(transform); return transform; } - - public void updateMedia(MediaServerItem mediaServerItem, String app, String streamId) { - //浣跨敤寮傛鏇存柊鎺ㄦ祦 - zlmresTfulUtils.getMediaList(mediaServerItem, app, streamId, "rtmp", json->{ - - if (json == null) { - return; - } - String dataStr = json.getString("data"); - - Integer code = json.getInteger("code"); - Map<String, StreamPushItem> result = new HashMap<>(); - List<StreamPushItem> streamPushItems = null; - if (code == 0 ) { - if (dataStr != null) { - streamPushItems = streamPushService.handleJSON(dataStr, mediaServerItem); + public void sendStreamEvent(String app, String stream, String mediaServerId) { + MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); + // 鏌ョ湅鎺ㄦ祦鐘舵�� + if (zlmrtpServerFactory.isStreamReady(mediaServerItem, app, stream)) { + ChannelOnlineEvent channelOnlineEventLister = getChannelOnlineEventLister(app, stream); + if (channelOnlineEventLister != null) { + try { + channelOnlineEventLister.run(app, stream, mediaServerId); + } catch (ParseException e) { + logger.error("sendStreamEvent: ", e); } - }else { - logger.warn("鏇存柊瑙嗛娴佸け璐ワ紝閿欒code锛� " + code); + removedChannelOnlineEventLister(app, stream); } - - if (streamPushItems != null && streamPushItems.size() == 1) { - storager.updateMedia(streamPushItems.get(0)); - } - }); + } } - public int removeMedia(String app, String streamId) { // 鏌ユ壘鏄惁鍏宠仈浜嗗浗鏍囷紝 鍏宠仈浜嗕笉鍒犻櫎锛� 缃负绂荤嚎 - StreamProxyItem streamProxyItem = gbStreamMapper.selectOne(app, streamId); - int result = 0; - if (streamProxyItem == null) { + GbStream gbStream = gbStreamMapper.selectOne(app, streamId); + int result; + if (gbStream == null) { result = storager.removeMedia(app, streamId); }else { - // TODO 鏆備笉璁剧疆涓虹绾� - result =storager.mediaOutline(app, streamId); + result =storager.mediaOffline(app, streamId); } return result; } - public void addChannelOnlineEventLister(String key, ChannelOnlineEvent callback) { - this.channelOnlineEvents.put(key,callback); + public void addChannelOnlineEventLister(String app, String stream, ChannelOnlineEvent callback) { + this.channelOnPublishEvents.put(app + "_" + stream, callback); } - public void removedChannelOnlineEventLister(String key) { - this.channelOnlineEvents.remove(key); + public void removedChannelOnlineEventLister(String app, String stream) { + this.channelOnPublishEvents.remove(app + "_" + stream); } + public ChannelOnlineEvent getChannelOnlineEventLister(String app, String stream) { + return this.channelOnPublishEvents.get(app + "_" + stream); + } - -// public void clearAllSessions() { -// logger.info("娓呯┖鎵�鏈夊浗鏍囩浉鍏崇殑session"); -// JSONObject allSessionJSON = zlmresTfulUtils.getAllSession(); -// ZLMServerConfig mediaInfo = redisCatchStorage.getMediaInfo(); -// HashSet<String> allLocalPorts = new HashSet(); -// if (allSessionJSON.getInteger("code") == 0) { -// JSONArray data = allSessionJSON.getJSONArray("data"); -// if (data.size() > 0) { -// for (int i = 0; i < data.size(); i++) { -// JSONObject sessionJOSN = data.getJSONObject(i); -// Integer local_port = sessionJOSN.getInteger("local_port"); -// if (!local_port.equals(Integer.valueOf(mediaInfo.getHttpPort())) && -// !local_port.equals(Integer.valueOf(mediaInfo.getHttpSSLport())) && -// !local_port.equals(Integer.valueOf(mediaInfo.getRtmpPort())) && -// !local_port.equals(Integer.valueOf(mediaInfo.getRtspPort())) && -// !local_port.equals(Integer.valueOf(mediaInfo.getRtspSSlport())) && -// !local_port.equals(Integer.valueOf(mediaInfo.getHookOnFlowReport()))){ -// allLocalPorts.add(sessionJOSN.getInteger("local_port") + ""); -// } -// } -// } -// } -// if (allLocalPorts.size() > 0) { -// List<String> result = new ArrayList<>(allLocalPorts); -// String localPortSStr = String.join(",", result); -// zlmresTfulUtils.kickSessions(localPortSStr); -// } -// } } -- Gitblit v1.8.0