From a42dda2bd3cc1cf8c20cc61e7ad9211eadecbaf3 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期四, 24 二月 2022 16:55:06 +0800 Subject: [PATCH] 规范数据库,添加必要约束,优化通道批量导入功能 --- src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java | 83 +++++++++++++++++++++++++++-------------- 1 files changed, 54 insertions(+), 29 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 8c1239e..4f45813 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -1,14 +1,17 @@ package com.genersoft.iot.vmp.media.zlm; +import java.util.ArrayList; import java.util.List; import java.util.UUID; import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONArray; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.MediaConfig; import com.genersoft.iot.vmp.conf.UserSetup; import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.gb28181.bean.GbStream; +import com.genersoft.iot.vmp.gb28181.event.EventPublisher; +import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.media.zlm.dto.*; import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.SSRCInfo; @@ -66,7 +69,7 @@ private IMediaService mediaService; @Autowired - private ZLMRESTfulUtils zlmresTfulUtils; + private EventPublisher eventPublisher; @Autowired private ZLMMediaListManager zlmMediaListManager; @@ -183,7 +186,6 @@ ret.put("code", 0); ret.put("msg", "success"); ret.put("enableHls", true); - ret.put("enableMP4", userSetup.isRecordPushLive()); String mediaServerId = json.getString("mediaServerId"); ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_publish, json); if (subscribe != null) { @@ -197,6 +199,11 @@ } String app = json.getString("app"); String stream = json.getString("stream"); + if ("rtp".equals(app)) { + ret.put("enableMP4", userSetup.getRecordSip()); + }else { + ret.put("enableMP4", userSetup.isRecordPushLive()); + } StreamInfo streamInfo = redisCatchStorage.queryPlaybackByStreamId(stream); // 褰曞儚鍥炴斁鏃朵笉杩涜褰曞儚涓嬭浇 @@ -302,7 +309,7 @@ @ResponseBody @PostMapping(value = "/on_stream_changed", produces = "application/json;charset=UTF-8") public ResponseEntity<String> onStreamChanged(@RequestBody MediaItem item){ - + if (logger.isDebugEnabled()) { logger.debug("[ ZLM HOOK ]on_stream_changed API璋冪敤锛屽弬鏁帮細" + JSONObject.toJSONString(item)); } @@ -322,14 +329,17 @@ String schema = item.getSchema(); List<MediaItem.MediaTrack> tracks = item.getTracks(); boolean regist = item.isRegist(); - if (tracks != null) { - logger.info("[stream: " + streamId + "] on_stream_changed->>" + schema); - } if ("rtmp".equals(schema)){ + logger.info("on_stream_changed锛氭敞鍐�->{}, app->{}, stream->{}", regist, app, streamId); if (regist) { mediaServerService.addCount(mediaServerId); }else { mediaServerService.removeCount(mediaServerId); + } + if (item.getOriginType() == OriginType.PULL.ordinal() + || item.getOriginType() == OriginType.FFMPEG_PULL.ordinal()) { + // 璁剧疆鎷夋祦浠g悊涓婄嚎/绂荤嚎 + streamProxyService.updateStatus(regist, app, streamId); } if ("rtp".equals(app) && !regist ) { StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(streamId); @@ -344,37 +354,53 @@ if (!"rtp".equals(app)){ String type = OriginType.values()[item.getOriginType()].getType(); MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); + if (mediaServerItem != null){ if (regist) { - StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, app, streamId, tracks); - redisCatchStorage.addStream(mediaServerItem, type, app, streamId, streamInfo); + StreamPushItem streamPushItem = null; + redisCatchStorage.addStream(mediaServerItem, type, app, streamId, item); if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal() || item.getOriginType() == OriginType.RTMP_PUSH.ordinal() || item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) { - zlmMediaListManager.addPush(item); + streamPushItem = zlmMediaListManager.addPush(item); } - }else { - // 鍏煎娴佹敞閿�鏃剁被鍨嬮敊璇殑闂锛岀瓑zlm鏇存柊鍚庡垹闄� - StreamPushItem streamPushItem = streamPushService.getPush(app, streamId); - if (streamPushItem != null) { - type = "PUSH"; + + List<GbStream> gbStreams = new ArrayList<>(); + if (streamPushItem == null || streamPushItem.getGbId() == null) { + GbStream gbStream = storager.getGbStream(app, streamId); + gbStreams.add(gbStream); }else { - StreamProxyItem streamProxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(app, streamId); - if (streamProxyByAppAndStream != null) { - type = "PULL"; + if (streamPushItem.getGbId() != null) { + gbStreams.add(streamPushItem); } } + if (gbStreams.size() > 0) { + eventPublisher.catalogEventPublishForStream(null, gbStreams.toArray(new GbStream[0]), CatalogEvent.ON); + } + + }else { + // 鍏煎娴佹敞閿�鏃剁被鍨嬩粠redis璁板綍鑾峰彇 + MediaItem mediaItem = redisCatchStorage.getStreamInfo(app, streamId, mediaServerId); + if (mediaItem != null) { + type = OriginType.values()[mediaItem.getOriginType()].getType(); + redisCatchStorage.removeStream(mediaServerItem.getId(), type, app, streamId); + } + GbStream gbStream = storager.getGbStream(app, streamId); + if (gbStream != null) { + eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF); + } zlmMediaListManager.removeMedia(app, streamId); - redisCatchStorage.removeStream(mediaServerItem.getId(), type, app, streamId); } - // 鍙戦�佹祦鍙樺寲redis娑堟伅 - JSONObject jsonObject = new JSONObject(); - jsonObject.put("serverId", userSetup.getServerId()); - jsonObject.put("app", app); - jsonObject.put("stream", streamId); - jsonObject.put("register", regist); - jsonObject.put("mediaServerId", mediaServerId); - redisCatchStorage.sendStreamChangeMsg(type, jsonObject); + if (type != null) { + // 鍙戦�佹祦鍙樺寲redis娑堟伅 + JSONObject jsonObject = new JSONObject(); + jsonObject.put("serverId", userSetup.getServerId()); + jsonObject.put("app", app); + jsonObject.put("stream", streamId); + jsonObject.put("register", regist); + jsonObject.put("mediaServerId", mediaServerId); + redisCatchStorage.sendStreamChangeMsg(type, jsonObject); + } } } } @@ -444,7 +470,6 @@ } return new ResponseEntity<String>(ret.toString(),HttpStatus.OK); } - } /** @@ -467,7 +492,7 @@ if (s.length == 2) { String deviceId = s[0]; String channelId = s[1]; - Device device = storager.queryVideoDevice(deviceId); + Device device = redisCatchStorage.getDevice(deviceId); if (device != null) { UUID uuid = UUID.randomUUID(); SSRCInfo ssrcInfo; -- Gitblit v1.8.0