From 28b5cc39d0a2d9939f70b4c980a31d9b27fc1e4c Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期二, 19 七月 2022 14:27:30 +0800 Subject: [PATCH] 修复更新推流状态sql错误 --- src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java | 252 ++++++++++++++++++++++++++++++++++++++++---------- 1 files changed, 200 insertions(+), 52 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java index 5ffa467..693dda1 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java @@ -1,23 +1,33 @@ package com.genersoft.iot.vmp.media.zlm; -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; -import com.alibaba.fastjson.TypeReference; -import com.genersoft.iot.vmp.common.RealVideo; -import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; -import com.genersoft.iot.vmp.gb28181.session.SsrcUtil; -import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.gb28181.bean.GbStream; +import com.genersoft.iot.vmp.media.zlm.dto.*; +import com.genersoft.iot.vmp.service.IStreamProxyService; +import com.genersoft.iot.vmp.service.IStreamPushService; +import com.genersoft.iot.vmp.service.bean.ThirdPartyGB; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import com.genersoft.iot.vmp.storager.impl.RedisCatchStorageImpl; +import com.genersoft.iot.vmp.storager.IVideoManagerStorage; +import com.genersoft.iot.vmp.storager.dao.GbStreamMapper; +import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper; +import com.genersoft.iot.vmp.storager.dao.StreamPushMapper; +import com.genersoft.iot.vmp.utils.DateUtil; +import org.checkerframework.checker.units.qual.C; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; +import org.springframework.util.StringUtils; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +/** + * @author lin + */ @Component public class ZLMMediaListManager { @@ -29,60 +39,198 @@ @Autowired private IRedisCatchStorage redisCatchStorage; + @Autowired + private IVideoManagerStorage storager; - public void updateMediaList() { - JSONObject mediaList = zlmresTfulUtils.getMediaList(); - if (mediaList == null) return; - String dataStr = mediaList.getString("data"); + @Autowired + private GbStreamMapper gbStreamMapper; - Integer code = mediaList.getInteger("code"); - Map<String, RealVideo> result = new HashMap<>(); - if (code == 0 ) { - if (dataStr != null) { - List<MediaItem> mediaItems = JSON.parseObject(dataStr, new TypeReference<List<MediaItem>>() {}); - for (MediaItem item : mediaItems) { - if ("rtp".equals(item.getApp())) { - continue; - } - String key = item.getApp() + "_" + item.getStream(); - RealVideo realVideo = result.get(key); - if (realVideo == null) { - realVideo = new RealVideo(); - realVideo.setApp(item.getApp()); - realVideo.setStream(item.getStream()); - realVideo.setAliveSecond(item.getAliveSecond()); - realVideo.setCreateStamp(item.getCreateStamp()); - realVideo.setOriginSock(item.getOriginSock()); - realVideo.setTotalReaderCount(item.getTotalReaderCount()); - realVideo.setOriginType(item.getOriginType()); - realVideo.setOriginTypeStr(item.getOriginTypeStr()); - realVideo.setOriginUrl(item.getOriginUrl()); - realVideo.setCreateStamp(item.getCreateStamp()); - realVideo.setAliveSecond(item.getAliveSecond()); + @Autowired + private PlatformGbStreamMapper platformGbStreamMapper; - ArrayList<RealVideo.MediaSchema> mediaSchemas = new ArrayList<>(); - realVideo.setSchemas(mediaSchemas); - realVideo.setTracks(item.getTracks()); - realVideo.setVhost(item.getVhost()); - result.put(key, realVideo); - } + @Autowired + private IStreamPushService streamPushService; - RealVideo.MediaSchema mediaSchema = new RealVideo.MediaSchema(); - mediaSchema.setSchema(item.getSchema()); - mediaSchema.setBytesSpeed(item.getBytesSpeed()); - realVideo.getSchemas().add(mediaSchema); - } + @Autowired + private IStreamProxyService streamProxyService; + @Autowired + private StreamPushMapper streamPushMapper; + + @Autowired + private ZLMHttpHookSubscribe subscribe; + + @Autowired + private UserSetting userSetting; + + private Map<String, ChannelOnlineEvent> channelOnlineEvents = new ConcurrentHashMap<>(); + + + public void updateMediaList(MediaServerItem mediaServerItem) { + storager.clearMediaList(); + + // 浣跨敤寮傛鐨勫綋鏃舵洿鏂板獟浣撴祦鍒楄〃 + zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{ + if (mediaList == null) { + return; } + String dataStr = mediaList.getString("data"); + + Integer code = mediaList.getInteger("code"); + Map<String, StreamPushItem> result = new HashMap<>(); + List<StreamPushItem> streamPushItems = null; + // 鑾峰彇鎵�鏈夌殑鍥芥爣鍏宠仈 +// List<GbStream> gbStreams = gbStreamMapper.selectAllByMediaServerId(mediaServerItem.getId()); + if (code == 0 ) { + if (dataStr != null) { + streamPushItems = streamPushService.handleJSON(dataStr, mediaServerItem); + } + }else { + logger.warn("鏇存柊瑙嗛娴佸け璐ワ紝閿欒code锛� " + code); + } + + if (streamPushItems != null) { + storager.updateMediaList(streamPushItems); + for (StreamPushItem streamPushItem : streamPushItems) { + JSONObject jsonObject = new JSONObject(); + jsonObject.put("app", streamPushItem.getApp()); + jsonObject.put("stream", streamPushItem.getStream()); + jsonObject.put("mediaServerId", mediaServerItem.getId()); + subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_play,jsonObject, + (MediaServerItem mediaServerItemInuse, JSONObject response)->{ + updateMedia(mediaServerItem, response.getString("app"), response.getString("stream")); + } + ); + } + } + })); + + } + + public void addMedia(MediaServerItem mediaServerItem, String app, String streamId) { + //浣跨敤寮傛鏇存柊鎺ㄦ祦 + updateMedia(mediaServerItem, app, streamId); + } + + public StreamPushItem addPush(MediaItem mediaItem) { + // 鏌ユ壘姝ょ洿鎾祦鏄惁瀛樺湪redis棰勮gbId + StreamPushItem transform = streamPushService.transform(mediaItem); + StreamPushItem pushInDb = streamPushService.getPush(mediaItem.getApp(), mediaItem.getStream()); + transform.setUpdateTime(DateUtil.getNow()); + transform.setPushTime(DateUtil.getNow()); + if (pushInDb == null) { + transform.setCreateTime(DateUtil.getNow()); + streamPushMapper.add(transform); }else { - logger.warn("鏇存柊瑙嗛娴佸け璐ワ紝閿欒code锛� " + code); + streamPushMapper.update(transform); + + +// if (!StringUtils.isEmpty(pushInDb.getGbId())) { +// List<GbStream> gbStreamList = gbStreamMapper.selectByGBId(transform.getGbId()); +// if (gbStreamList != null && gbStreamList.size() == 1) { +// transform.setGbStreamId(gbStreamList.get(0).getGbStreamId()); +// transform.setPlatformId(gbStreamList.get(0).getPlatformId()); +// transform.setCatalogId(gbStreamList.get(0).getCatalogId()); +// transform.setGbId(gbStreamList.get(0).getGbId()); +// gbStreamMapper.update(transform); +// streamPushMapper.del(gbStreamList.get(0).getApp(), gbStreamList.get(0).getStream()); +// }else { +// transform.setCreateTime(DateUtil.getNow()); +// transform.setUpdateTime(DateUtil.getNow()); +// gbStreamMapper.add(transform); +// } + // 閫氱煡閫氶亾涓婄嚎 +// if (transform != null) { +// if (channelOnlineEvents.get(transform.getGbId()) != null) { +// channelOnlineEvents.get(transform.getGbId()).run(transform.getApp(), transform.getStream(), transform.getServerId()); +// channelOnlineEvents.remove(transform.getGbId()); +// } +// } +// } } - List<RealVideo> realVideos = new ArrayList<>(result.values()); - Collections.sort(realVideos); - redisCatchStorage.updateMediaList(realVideos); + + + return transform; + } + + + public void updateMedia(MediaServerItem mediaServerItem, String app, String streamId) { + //浣跨敤寮傛鏇存柊鎺ㄦ祦 + zlmresTfulUtils.getMediaList(mediaServerItem, app, streamId, "rtmp", json->{ + + if (json == null) { + return; + } + String dataStr = json.getString("data"); + + Integer code = json.getInteger("code"); + Map<String, StreamPushItem> result = new HashMap<>(); + List<StreamPushItem> streamPushItems = null; + if (code == 0 ) { + if (dataStr != null) { + streamPushItems = streamPushService.handleJSON(dataStr, mediaServerItem); + } + }else { + logger.warn("鏇存柊瑙嗛娴佸け璐ワ紝閿欒code锛� " + code); + } + + if (streamPushItems != null && streamPushItems.size() == 1) { + storager.updateMedia(streamPushItems.get(0)); + } + }); + } + + + public int removeMedia(String app, String streamId) { + // 鏌ユ壘鏄惁鍏宠仈浜嗗浗鏍囷紝 鍏宠仈浜嗕笉鍒犻櫎锛� 缃负绂荤嚎 + GbStream gbStream = gbStreamMapper.selectOne(app, streamId); + int result; + if (gbStream == null) { + result = storager.removeMedia(app, streamId); + }else { + // TODO 鏆備笉璁剧疆涓虹绾� + result =storager.mediaOffline(app, streamId); + } + return result; + } + + public void addChannelOnlineEventLister(String key, ChannelOnlineEvent callback) { + this.channelOnlineEvents.put(key,callback); + } + + public void removedChannelOnlineEventLister(String key) { + this.channelOnlineEvents.remove(key); } +// public void clearAllSessions() { +// logger.info("娓呯┖鎵�鏈夊浗鏍囩浉鍏崇殑session"); +// JSONObject allSessionJSON = zlmresTfulUtils.getAllSession(); +// ZLMServerConfig mediaInfo = redisCatchStorage.getMediaInfo(); +// HashSet<String> allLocalPorts = new HashSet(); +// if (allSessionJSON.getInteger("code") == 0) { +// JSONArray data = allSessionJSON.getJSONArray("data"); +// if (data.size() > 0) { +// for (int i = 0; i < data.size(); i++) { +// JSONObject sessionJOSN = data.getJSONObject(i); +// Integer local_port = sessionJOSN.getInteger("local_port"); +// if (!local_port.equals(Integer.valueOf(mediaInfo.getHttpPort())) && +// !local_port.equals(Integer.valueOf(mediaInfo.getHttpSSLport())) && +// !local_port.equals(Integer.valueOf(mediaInfo.getRtmpPort())) && +// !local_port.equals(Integer.valueOf(mediaInfo.getRtspPort())) && +// !local_port.equals(Integer.valueOf(mediaInfo.getRtspSSlport())) && +// !local_port.equals(Integer.valueOf(mediaInfo.getHookOnFlowReport()))){ +// allLocalPorts.add(sessionJOSN.getInteger("local_port") + ""); +// } +// } +// } +// } +// if (allLocalPorts.size() > 0) { +// List<String> result = new ArrayList<>(allLocalPorts); +// String localPortSStr = String.join(",", result); +// zlmresTfulUtils.kickSessions(localPortSStr); +// } +// } } -- Gitblit v1.8.0