From b944f8867c78dbe6dd4704115b48beb9f6dc12d9 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期日, 23 四月 2023 15:54:34 +0800 Subject: [PATCH] 支持推流和拉流代理通道状态变化发送通知 --- src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/HookResult.java | 4 + src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java | 12 ++-- src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java | 120 ++++++++++++++++++++++----------------- src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java | 8 ++ src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java | 17 +++-- 5 files changed, 93 insertions(+), 68 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java index be73ebd..89ecb18 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java @@ -1,14 +1,9 @@ package com.genersoft.iot.vmp.gb28181.event.subscribe.catalog; -import com.genersoft.iot.vmp.common.VideoManagerConstants; -import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; -import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.service.IGbStreamService; -import com.genersoft.iot.vmp.service.IMediaServerService; -import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -16,12 +11,14 @@ import org.springframework.context.ApplicationListener; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; -import org.springframework.util.StringUtils; import javax.sip.InvalidArgumentException; import javax.sip.SipException; import java.text.ParseException; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; /** * catalog浜嬩欢 @@ -42,6 +39,9 @@ @Autowired private SubscribeHolder subscribeHolder; + + @Autowired + private UserSetting userSetting; @Override public void onApplicationEvent(CatalogEvent event) { @@ -93,6 +93,9 @@ } if (event.getGbStreams() != null && event.getGbStreams().size() > 0){ for (GbStream gbStream : event.getGbStreams()) { + if (gbStream.getStreamType().equals("push") && !userSetting.isUsePushingAsStatus()) { + continue; + } DeviceChannel deviceChannelByStream = gbStreamService.getDeviceChannelListByStream(gbStream, gbStream.getCatalogId(), parentPlatform); deviceChannelList.add(deviceChannelByStream); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java index f9f8fc2..b9a41a5 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java @@ -118,6 +118,8 @@ // 绂荤嚎 logger.info("[鏀跺埌閫氶亾绂荤嚎閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); if (userSetting.getRefuseChannelStatusChannelFormNotify()) { + logger.info("[鏀跺埌閫氶亾绂荤嚎閫氱煡] 浣嗘槸骞冲彴宸查厤缃嫆缁濇娑堟伅锛屾潵鑷澶�: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); + }else { updateChannelOfflineList.add(channel); if (updateChannelOfflineList.size() > 300) { executeSaveForOffline(); @@ -126,14 +128,14 @@ // 鍙戦�乺edis娑堟伅 redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); } - }else { - logger.info("[鏀跺埌閫氶亾绂荤嚎閫氱煡] 浣嗘槸骞冲彴宸查厤缃嫆缁濇娑堟伅锛屾潵鑷澶�: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); } break; case CatalogEvent.VLOST: // 瑙嗛涓㈠け logger.info("[鏀跺埌閫氶亾瑙嗛涓㈠け閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); if (userSetting.getRefuseChannelStatusChannelFormNotify()) { + logger.info("[鏀跺埌閫氶亾瑙嗛涓㈠け閫氱煡] 浣嗘槸骞冲彴宸查厤缃嫆缁濇娑堟伅锛屾潵鑷澶�: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); + }else { updateChannelOfflineList.add(channel); if (updateChannelOfflineList.size() > 300) { executeSaveForOffline(); @@ -142,14 +144,14 @@ // 鍙戦�乺edis娑堟伅 redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); } - }else { - logger.info("[鏀跺埌閫氶亾瑙嗛涓㈠け閫氱煡] 浣嗘槸骞冲彴宸查厤缃嫆缁濇娑堟伅锛屾潵鑷澶�: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); } break; case CatalogEvent.DEFECT: // 鏁呴殰 logger.info("[鏀跺埌閫氶亾瑙嗛鏁呴殰閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); if (userSetting.getRefuseChannelStatusChannelFormNotify()) { + logger.info("[鏀跺埌閫氶亾瑙嗛鏁呴殰閫氱煡] 浣嗘槸骞冲彴宸查厤缃嫆缁濇娑堟伅锛屾潵鑷澶�: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); + }else { updateChannelOfflineList.add(channel); if (updateChannelOfflineList.size() > 300) { executeSaveForOffline(); @@ -158,8 +160,6 @@ // 鍙戦�乺edis娑堟伅 redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); } - }else { - logger.info("[鏀跺埌閫氶亾瑙嗛鏁呴殰閫氱煡] 浣嗘槸骞冲彴宸查厤缃嫆缁濇娑堟伅锛屾潵鑷澶�: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); } break; case CatalogEvent.ADD: diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 523b10f..405fdd0 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -7,6 +7,7 @@ import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; +import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; @@ -292,22 +293,24 @@ JSONObject json = (JSONObject) JSON.toJSON(param); taskExecutor.execute(() -> { ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_stream_changed, json); + MediaServerItem mediaInfo = mediaServerService.getOne(param.getMediaServerId()); + if (mediaInfo == null) { + logger.info("[ZLM HOOK] 娴佸彉鍖栨湭鎵惧埌ZLM, {}", param.getMediaServerId()); + return; + } if (subscribe != null) { - MediaServerItem mediaInfo = mediaServerService.getOne(param.getMediaServerId()); - if (mediaInfo != null) { - subscribe.response(mediaInfo, json); - } + subscribe.response(mediaInfo, json); } List<OnStreamChangedHookParam.MediaTrack> tracks = param.getTracks(); // TODO 閲嶆瀯姝ゅ閫昏緫 - + boolean isPush = false; if (param.isRegist()) { // 澶勭悊娴佹敞鍐岀殑閴存潈淇℃伅 if (param.getOriginType() == OriginType.RTMP_PUSH.ordinal() || param.getOriginType() == OriginType.RTSP_PUSH.ordinal() || param.getOriginType() == OriginType.RTC_PUSH.ordinal()) { - + isPush = true; StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(param.getApp(), param.getStream()); if (streamAuthorityInfo == null) { streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(param); @@ -329,7 +332,10 @@ mediaServerService.removeCount(param.getMediaServerId()); } // 璁剧疆鎷夋祦浠g悊涓婄嚎/绂荤嚎 - streamProxyService.updateStatus(param.isRegist(), param.getApp(), param.getStream()); + int updateStatusResult = streamProxyService.updateStatus(param.isRegist(), param.getApp(), param.getStream()); + if (updateStatusResult > 0) { + + } if ("rtp".equals(param.getApp()) && !param.isRegist()) { StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(param.getStream()); @@ -337,7 +343,8 @@ redisCatchStorage.stopPlay(streamInfo); storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId()); } else { - streamInfo = redisCatchStorage.queryPlayback(null, null, param.getStream(), null); + streamInfo = redisCatchStorage.queryPlayback(null, null, + param.getStream(), null); if (streamInfo != null) { redisCatchStorage.stopPlayback(streamInfo.getDeviceID(), streamInfo.getChannelId(), streamInfo.getStream(), null); @@ -346,48 +353,50 @@ } else { if (!"rtp".equals(param.getApp())) { String type = OriginType.values()[param.getOriginType()].getType(); - MediaServerItem mediaServerItem = mediaServerService.getOne(param.getMediaServerId()); - - if (mediaServerItem != null) { - if (param.isRegist()) { - StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(param.getApp(), param.getStream()); - String callId = null; - if (streamAuthorityInfo != null) { - callId = streamAuthorityInfo.getCallId(); - } - StreamInfo streamInfoByAppAndStream = mediaService.getStreamInfoByAppAndStream(mediaServerItem, - param.getApp(), param.getStream(), tracks, callId); - param.setStreamInfo(new StreamContent(streamInfoByAppAndStream)); - redisCatchStorage.addStream(mediaServerItem, type, param.getApp(), param.getStream(), param); - if (param.getOriginType() == OriginType.RTSP_PUSH.ordinal() - || param.getOriginType() == OriginType.RTMP_PUSH.ordinal() - || param.getOriginType() == OriginType.RTC_PUSH.ordinal()) { - param.setSeverId(userSetting.getServerId()); - zlmMediaListManager.addPush(param); - } - } else { - // 鍏煎娴佹敞閿�鏃剁被鍨嬩粠redis璁板綍鑾峰彇 - OnStreamChangedHookParam onStreamChangedHookParam = redisCatchStorage.getStreamInfo(param.getApp(), param.getStream(), param.getMediaServerId()); - if (onStreamChangedHookParam != null) { - type = OriginType.values()[onStreamChangedHookParam.getOriginType()].getType(); - redisCatchStorage.removeStream(mediaServerItem.getId(), type, param.getApp(), param.getStream()); - } - GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream()); - if (gbStream != null) { + if (param.isRegist()) { + StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo( + param.getApp(), param.getStream()); + String callId = null; + if (streamAuthorityInfo != null) { + callId = streamAuthorityInfo.getCallId(); + } + StreamInfo streamInfoByAppAndStream = mediaService.getStreamInfoByAppAndStream(mediaInfo, + param.getApp(), param.getStream(), tracks, callId); + param.setStreamInfo(new StreamContent(streamInfoByAppAndStream)); + redisCatchStorage.addStream(mediaInfo, type, param.getApp(), param.getStream(), param); + if (param.getOriginType() == OriginType.RTSP_PUSH.ordinal() + || param.getOriginType() == OriginType.RTMP_PUSH.ordinal() + || param.getOriginType() == OriginType.RTC_PUSH.ordinal()) { + param.setSeverId(userSetting.getServerId()); + zlmMediaListManager.addPush(param); + } + } else { + // 鍏煎娴佹敞閿�鏃剁被鍨嬩粠redis璁板綍鑾峰彇 + OnStreamChangedHookParam onStreamChangedHookParam = redisCatchStorage.getStreamInfo( + param.getApp(), param.getStream(), param.getMediaServerId()); + if (onStreamChangedHookParam != null) { + type = OriginType.values()[onStreamChangedHookParam.getOriginType()].getType(); + redisCatchStorage.removeStream(mediaInfo.getId(), type, param.getApp(), param.getStream()); + } + GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream()); + if (gbStream != null) { // eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF); - } - zlmMediaListManager.removeMedia(param.getApp(), param.getStream()); } - if (type != null) { - // 鍙戦�佹祦鍙樺寲redis娑堟伅 - JSONObject jsonObject = new JSONObject(); - jsonObject.put("serverId", userSetting.getServerId()); - jsonObject.put("app", param.getApp()); - jsonObject.put("stream", param.getStream()); - jsonObject.put("register", param.isRegist()); - jsonObject.put("mediaServerId", param.getMediaServerId()); - redisCatchStorage.sendStreamChangeMsg(type, jsonObject); - } + zlmMediaListManager.removeMedia(param.getApp(), param.getStream()); + } + GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream()); + if (gbStream != null) { + eventPublisher.catalogEventPublishForStream(null, gbStream, param.isRegist()?CatalogEvent.ON:CatalogEvent.OFF); + } + if (type != null) { + // 鍙戦�佹祦鍙樺寲redis娑堟伅 + JSONObject jsonObject = new JSONObject(); + jsonObject.put("serverId", userSetting.getServerId()); + jsonObject.put("app", param.getApp()); + jsonObject.put("stream", param.getStream()); + jsonObject.put("register", param.isRegist()); + jsonObject.put("mediaServerId", param.getMediaServerId()); + redisCatchStorage.sendStreamChangeMsg(type, jsonObject); } } } @@ -403,7 +412,8 @@ try { if (platform != null) { commanderFroPlatform.streamByeCmd(platform, sendRtpItem); - redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStreamId()); + redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(), + sendRtpItem.getCallId(), sendRtpItem.getStreamId()); } else { cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId()); } @@ -428,7 +438,8 @@ @PostMapping(value = "/on_stream_none_reader", produces = "application/json;charset=UTF-8") public JSONObject onStreamNoneReader(@RequestBody OnStreamNoneReaderHookParam param) { - logger.info("[ZLM HOOK]娴佹棤浜鸿鐪嬶細{]->{}->{}/{}" + param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream()); + logger.info("[ZLM HOOK]娴佹棤浜鸿鐪嬶細{]->{}->{}/{}" + param.getMediaServerId(), param.getSchema(), + param.getApp(), param.getStream()); JSONObject ret = new JSONObject(); ret.put("code", 0); // 鍥芥爣绫诲瀷鐨勬祦 @@ -440,7 +451,8 @@ if (streamInfoForPlayCatch != null) { // 鏀跺埌鏃犱汉瑙傜湅璇存槑娴佷篃娌℃湁鍦ㄥ線涓婄骇鎺ㄩ�� if (redisCatchStorage.isChannelSendingRTP(streamInfoForPlayCatch.getChannelId())) { - List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId(streamInfoForPlayCatch.getChannelId()); + List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId( + streamInfoForPlayCatch.getChannelId()); if (sendRtpItems.size() > 0) { for (SendRtpItem sendRtpItem : sendRtpItems) { ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId()); @@ -470,7 +482,8 @@ return ret; } // 褰曞儚鍥炴斁 - StreamInfo streamInfoForPlayBackCatch = redisCatchStorage.queryPlayback(null, null, param.getStream(), null); + StreamInfo streamInfoForPlayBackCatch = redisCatchStorage.queryPlayback(null, null, + param.getStream(), null); if (streamInfoForPlayBackCatch != null) { if (streamInfoForPlayBackCatch.isPause()) { ret.put("close", false); @@ -491,7 +504,8 @@ return ret; } // 褰曞儚涓嬭浇 - StreamInfo streamInfoForDownload = redisCatchStorage.queryDownload(null, null, param.getStream(), null); + StreamInfo streamInfoForDownload = redisCatchStorage.queryDownload(null, null, + param.getStream(), null); // 杩涜褰曞儚涓嬭浇鏃舵棤浜鸿鐪嬩笉鏂祦 if (streamInfoForDownload != null) { ret.put("close", false); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/HookResult.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/HookResult.java index a2da561..b327f13 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/HookResult.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/HookResult.java @@ -18,6 +18,10 @@ return new HookResult(0, "success"); } + public static HookResult Fail(){ + return new HookResult(-1, "fail"); + } + public int getCode() { return code; } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index facd54c..4c681e8 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; import java.util.*; @@ -42,6 +43,9 @@ @Autowired private RedisTemplate<Object, Object> redisTemplate; + + @Autowired + private StringRedisTemplate stringRedisTemplate; @Override public Long getCSEQ() { @@ -913,7 +917,7 @@ msg.append(":").append(channelId); } msg.append(" ").append(online? "ON":"OFF"); - - redisTemplate.convertAndSend(key, msg.toString()); + // 浣跨敤 RedisTemplate<Object, Object> 鍙戦�佸瓧绗︿覆娑堟伅浼氬鑷村彂閫佺殑娑堟伅澶氬甫浜嗗弻寮曞彿 + stringRedisTemplate.convertAndSend(key, msg.toString()); } } -- Gitblit v1.8.0