From 7d530099878dbcf474182bbbfc694ecfe74c9fbd Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期日, 07 四月 2024 20:06:48 +0800 Subject: [PATCH] 优化推流通知 --- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java | 22 +++-- src/main/java/com/genersoft/iot/vmp/vmanager/cloudRecord/CloudRecordController.java | 5 - src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java | 29 +++++++ src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java | 53 ++++++++----- src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java | 2 src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java | 6 + src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java | 2 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java | 98 +++++++++++------------ src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java | 2 src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java | 4 - 10 files changed, 132 insertions(+), 91 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java index 361bdc6..30193d2 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java @@ -305,4 +305,33 @@ public void setReceiveStream(String receiveStream) { this.receiveStream = receiveStream; } + + @Override + public String toString() { + return "SendRtpItem{" + + "ip='" + ip + '\'' + + ", port=" + port + + ", ssrc='" + ssrc + '\'' + + ", platformId='" + platformId + '\'' + + ", deviceId='" + deviceId + '\'' + + ", app='" + app + '\'' + + ", channelId='" + channelId + '\'' + + ", status=" + status + + ", stream='" + stream + '\'' + + ", tcp=" + tcp + + ", tcpActive=" + tcpActive + + ", localPort=" + localPort + + ", mediaServerId='" + mediaServerId + '\'' + + ", serverId='" + serverId + '\'' + + ", CallId='" + CallId + '\'' + + ", fromTag='" + fromTag + '\'' + + ", toTag='" + toTag + '\'' + + ", pt=" + pt + + ", usePs=" + usePs + + ", onlyAudio=" + onlyAudio + + ", rtcp=" + rtcp + + ", playType=" + playType + + ", receiveStream='" + receiveStream + '\'' + + '}'; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index 7004820..242e5ef 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -116,7 +116,7 @@ if (parentPlatform != null) { Map<String, Object> param = getSendRtpParam(sendRtpItem); - if (mediaInfo == null) { + if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) { RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance( sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(), diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 3205498..74fad54 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -575,14 +575,20 @@ } if ("push".equals(gbStream.getStreamType())) { - if (streamPushItem != null && streamPushItem.isPushIng()) { - // 鎺ㄦ祦鐘舵�� - pushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); - } else { - // 鏈帹娴� 鎷夎捣 - notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + if (streamPushItem != null) { + // 浠巖edis鏌ヨ鏄惁姝e湪鎺ユ敹杩欎釜鎺ㄦ祦 + OnStreamChangedHookParam pushListItem = redisCatchStorage.getPushListItem(gbStream.getApp(), gbStream.getStream()); + if (pushListItem != null) { + StreamPushItem transform = streamPushService.transform(pushListItem); + transform.setSelf(userSetting.getServerId().equals(pushListItem.getSeverId())); + // 鎺ㄦ祦鐘舵�� + pushStream(evt, request, gbStream, transform, platform, callIdHeader, mediaServerItem, port, tcpActive, + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + }else { + // 鏈帹娴� 鎷夎捣 + notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + } } } else if ("proxy".equals(gbStream.getStreamType())) { if (null != proxyByAppAndStream) { diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 5feb306..3a7f452 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -509,32 +509,43 @@ List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(param.getStream()); if (!sendRtpItems.isEmpty()) { for (SendRtpItem sendRtpItem : sendRtpItems) { - if (sendRtpItem != null && sendRtpItem.getApp().equals(param.getApp())) { - String platformId = sendRtpItem.getPlatformId(); - ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId); - Device device = deviceService.getDevice(platformId); + if (sendRtpItem == null) { + continue; + } - try { - if (platform != null) { - commanderFroPlatform.streamByeCmd(platform, sendRtpItem); - redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(), - sendRtpItem.getCallId(), sendRtpItem.getStream()); - } else { - cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId()); - if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST) - || sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) { - AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); - if (audioBroadcastCatch != null) { - // 鏉ヨ嚜涓婄骇骞冲彴鐨勫仠姝㈠璁� - logger.info("[鍋滄瀵硅] 鏉ヨ嚜涓婄骇锛屽钩鍙帮細{}, 閫氶亾锛歿}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); - audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + if (sendRtpItem.getApp().equals(param.getApp())) { + logger.info(sendRtpItem.toString()); + if (userSetting.getServerId().equals(sendRtpItem.getServerId())) { + // 閫氱煡鍏朵粬wvp鍋滄鍙戞祦 +// redisCatchStorage.sendRtp + }else { + String platformId = sendRtpItem.getPlatformId(); + ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId); + Device device = deviceService.getDevice(platformId); + + try { + if (platform != null) { + commanderFroPlatform.streamByeCmd(platform, sendRtpItem); + redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(), + sendRtpItem.getCallId(), sendRtpItem.getStream()); + } else { + cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId()); + if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST) + || sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) { + AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + if (audioBroadcastCatch != null) { + // 鏉ヨ嚜涓婄骇骞冲彴鐨勫仠姝㈠璁� + logger.info("[鍋滄瀵硅] 鏉ヨ嚜涓婄骇锛屽钩鍙帮細{}, 閫氶亾锛歿}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + } } } + } catch (SipException | InvalidArgumentException | ParseException | + SsrcTransactionNotFoundException e) { + logger.error("[鍛戒护鍙戦�佸け璐 鍙戦�丅YE: {}", e.getMessage()); } - } catch (SipException | InvalidArgumentException | ParseException | - SsrcTransactionNotFoundException e) { - logger.error("[鍛戒护鍙戦�佸け璐 鍙戦�丅YE: {}", e.getMessage()); } + } } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index c8d203f..8c0e9b0 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -1170,7 +1170,7 @@ dynamicTask.startDelay(key, ()->{ logger.info("[璇煶骞挎挱]绛夊緟invite娑堟伅瓒呮椂锛歿}/{}", device.getDeviceId(), channelId); stopAudioBroadcast(device.getDeviceId(), channelId); - }, 2000); + }, 10*1000); }, eventResultForError -> { // 鍙戦�佸け璐� logger.error("璇煶骞挎挱鍙戦�佸け璐ワ細 {}:{}", channelId, eventResultForError.msg); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java index 59c5ace..f0230f7 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java @@ -8,7 +8,6 @@ import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; -import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; @@ -25,7 +24,6 @@ import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.storager.dao.GbStreamMapper; -import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper; import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper; import com.genersoft.iot.vmp.storager.dao.StreamProxyMapper; import com.genersoft.iot.vmp.utils.DateUtil; @@ -333,8 +331,6 @@ result = zlmresTfulUtils.addStreamProxy(mediaServerItem, param.getApp(), param.getStream(), param.getUrl().trim(), param.isEnableAudio(), param.isEnableMp4(), param.getRtpType()); } - System.out.println("addStreamProxyToZlm===="); - System.out.println(result); if (result != null && result.getInteger("code") == 0) { JSONObject data = result.getJSONObject("data"); if (data == null) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java index f5f2948..0912f0b 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java @@ -1,11 +1,7 @@ package com.genersoft.iot.vmp.service.redisMsg; -import com.alibaba.fastjson2.JSON; -import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.conf.UserSetting; - import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; -import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -41,52 +37,52 @@ @Override public void onMessage(Message message, byte[] bytes) { - boolean isEmpty = taskQueue.isEmpty(); - taskQueue.offer(message); - if (isEmpty) { - taskExecutor.execute(() -> { - while (!taskQueue.isEmpty()) { - Message msg = taskQueue.poll(); - try { - JSONObject steamMsgJson = JSON.parseObject(msg.getBody(), JSONObject.class); - if (steamMsgJson == null) { - logger.warn("[鏀跺埌redis 娴佸彉鍖朷娑堟伅瑙f瀽澶辫触"); - continue; - } - String serverId = steamMsgJson.getString("serverId"); - - if (userSetting.getServerId().equals(serverId)) { - // 鑷繁鍙戦�佺殑娑堟伅蹇界暐鍗冲彲 - continue; - } - logger.info("[鏀跺埌redis 娴佸彉鍖朷锛� {}", new String(message.getBody())); - String app = steamMsgJson.getString("app"); - String stream = steamMsgJson.getString("stream"); - boolean register = steamMsgJson.getBoolean("register"); - String mediaServerId = steamMsgJson.getString("mediaServerId"); - OnStreamChangedHookParam onStreamChangedHookParam = new OnStreamChangedHookParam(); - onStreamChangedHookParam.setSeverId(serverId); - onStreamChangedHookParam.setApp(app); - onStreamChangedHookParam.setStream(stream); - onStreamChangedHookParam.setRegist(register); - onStreamChangedHookParam.setMediaServerId(mediaServerId); - onStreamChangedHookParam.setCreateStamp(System.currentTimeMillis()/1000); - onStreamChangedHookParam.setAliveSecond(0L); - onStreamChangedHookParam.setTotalReaderCount("0"); - onStreamChangedHookParam.setOriginType(0); - onStreamChangedHookParam.setOriginTypeStr("0"); - onStreamChangedHookParam.setOriginTypeStr("unknown"); - if (register) { - zlmMediaListManager.addPush(onStreamChangedHookParam); - }else { - zlmMediaListManager.removeMedia(app, stream); - } - }catch (Exception e) { - logger.warn("[REDIS娑堟伅-娴佸彉鍖朷 鍙戠幇鏈鐞嗙殑寮傚父, \r\n{}", JSON.toJSONString(message)); - logger.error("[REDIS娑堟伅-娴佸彉鍖朷 寮傚父鍐呭锛� ", e); - } - } - }); - } +// boolean isEmpty = taskQueue.isEmpty(); +// taskQueue.offer(message); +// if (isEmpty) { +// taskExecutor.execute(() -> { +// while (!taskQueue.isEmpty()) { +// Message msg = taskQueue.poll(); +// try { +// JSONObject steamMsgJson = JSON.parseObject(msg.getBody(), JSONObject.class); +// if (steamMsgJson == null) { +// logger.warn("[鏀跺埌redis 娴佸彉鍖朷娑堟伅瑙f瀽澶辫触"); +// continue; +// } +// String serverId = steamMsgJson.getString("serverId"); +// +// if (userSetting.getServerId().equals(serverId)) { +// // 鑷繁鍙戦�佺殑娑堟伅蹇界暐鍗冲彲 +// continue; +// } +// logger.info("[鏀跺埌redis 娴佸彉鍖朷锛� {}", new String(message.getBody())); +// String app = steamMsgJson.getString("app"); +// String stream = steamMsgJson.getString("stream"); +// boolean register = steamMsgJson.getBoolean("register"); +// String mediaServerId = steamMsgJson.getString("mediaServerId"); +// OnStreamChangedHookParam onStreamChangedHookParam = new OnStreamChangedHookParam(); +// onStreamChangedHookParam.setSeverId(serverId); +// onStreamChangedHookParam.setApp(app); +// onStreamChangedHookParam.setStream(stream); +// onStreamChangedHookParam.setRegist(register); +// onStreamChangedHookParam.setMediaServerId(mediaServerId); +// onStreamChangedHookParam.setCreateStamp(System.currentTimeMillis()/1000); +// onStreamChangedHookParam.setAliveSecond(0L); +// onStreamChangedHookParam.setTotalReaderCount("0"); +// onStreamChangedHookParam.setOriginType(0); +// onStreamChangedHookParam.setOriginTypeStr("0"); +// onStreamChangedHookParam.setOriginTypeStr("unknown"); +// if (register) { +// zlmMediaListManager.addPush(onStreamChangedHookParam); +// }else { +// zlmMediaListManager.removeMedia(app, stream); +// } +// }catch (Exception e) { +// logger.warn("[REDIS娑堟伅-娴佸彉鍖朷 鍙戠幇鏈鐞嗙殑寮傚父, \r\n{}", JSON.toJSONString(message)); +// logger.error("[REDIS娑堟伅-娴佸彉鍖朷 寮傚父鍐呭锛� ", e); +// } +// } +// }); +// } } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index 78fd280..06b1d56 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -211,5 +211,7 @@ void addPushListItem(String app, String stream, OnStreamChangedHookParam param); + OnStreamChangedHookParam getPushListItem(String app, String stream); + void removePushListItem(String app, String stream, String mediaServerId); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 18a037d..a1a0033 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -657,6 +657,12 @@ } @Override + public OnStreamChangedHookParam getPushListItem(String app, String stream) { + String key = VideoManagerConstants.PUSH_STREAM_LIST + app + "_" + stream; + return (OnStreamChangedHookParam)redisTemplate.opsForValue().get(key); + } + + @Override public void removePushListItem(String app, String stream, String mediaServerId) { String key = VideoManagerConstants.PUSH_STREAM_LIST + app + "_" + stream; OnStreamChangedHookParam param = (OnStreamChangedHookParam)redisTemplate.opsForValue().get(key); diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/cloudRecord/CloudRecordController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/cloudRecord/CloudRecordController.java index b3d1990..b37a3d9 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/cloudRecord/CloudRecordController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/cloudRecord/CloudRecordController.java @@ -1,12 +1,8 @@ package com.genersoft.iot.vmp.vmanager.cloudRecord; import com.alibaba.fastjson2.JSONArray; -import com.genersoft.iot.vmp.conf.DynamicTask; -import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.security.JwtUtils; -import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; -import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.ICloudRecordService; import com.genersoft.iot.vmp.service.IMediaServerService; @@ -22,7 +18,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.redis.core.RedisTemplate; import org.springframework.web.bind.annotation.*; import javax.servlet.http.HttpServletRequest; -- Gitblit v1.8.0