From 764d04b497356ba6bcbb75fd42b51eca750f7223 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期三, 29 五月 2024 15:02:51 +0800 Subject: [PATCH] 调整上级观看消息的发送 --- src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java | 160 +++++++++++++++++++++++++++++++++++++++++------------ 1 files changed, 123 insertions(+), 37 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 0f1509f..b743cfc 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -9,9 +9,12 @@ import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; -import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.bean.MediaInfo; +import com.genersoft.iot.vmp.media.bean.MediaServer; +import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; +import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; -import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; +import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; @@ -79,12 +82,8 @@ @Override public void resetAllCSEQ() { - String scanKey = VideoManagerConstants.SIP_CSEQ_PREFIX + userSetting.getServerId() + "_*"; - List<Object> keys = RedisUtil.scan(redisTemplate, scanKey); - for (Object o : keys) { - String key = (String) o; - redisTemplate.opsForValue().set(key, 1); - } + String key = VideoManagerConstants.SIP_CSEQ_PREFIX + userSetting.getServerId(); + redisTemplate.opsForValue().set(key, 1); } @Override @@ -144,15 +143,26 @@ @Override public void updateSendRTPSever(SendRtpItem sendRtpItem) { + redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem); + } - String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + - userSetting.getServerId() + "_" - + sendRtpItem.getMediaServerId() + "_" - + sendRtpItem.getPlatformId() + "_" - + sendRtpItem.getChannelId() + "_" - + sendRtpItem.getStreamId() + "_" - + sendRtpItem.getCallId(); - redisTemplate.opsForValue().set(key, sendRtpItem); + @Override + public List<SendRtpItem> querySendRTPServer(String platformGbId, String channelId, String streamId) { + String scanKey = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + + userSetting.getServerId() + "_*_" + + platformGbId + "_" + + channelId + "_" + + streamId + "_" + + "*"; + List<SendRtpItem> result = new ArrayList<>(); + List<Object> scan = RedisUtil.scan(redisTemplate, scanKey); + if (!scan.isEmpty()) { + for (Object o : scan) { + String key = (String) o; + result.add(JsonUtil.redisJsonToObject(redisTemplate, key, SendRtpItem.class)); + } + } + return result; } @Override @@ -170,7 +180,7 @@ callId = "*"; } String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX - + userSetting.getServerId() + "_*_" + + "*_*_" + platformGbId + "_" + channelId + "_" + streamId + "_" @@ -184,7 +194,7 @@ } @Override - public List<SendRtpItem> querySendRTPServerByChnnelId(String channelId) { + public List<SendRtpItem> querySendRTPServerByChannelId(String channelId) { if (channelId == null) { return null; } @@ -266,9 +276,18 @@ List<Object> scan = RedisUtil.scan(redisTemplate, key); if (scan.size() > 0) { for (Object keyStr : scan) { + logger.info("[鍒犻櫎 redis鐨凷endRTP]锛� {}", keyStr.toString()); redisTemplate.delete(keyStr); } } + } + + /** + * 鍒犻櫎RTP鎺ㄩ�佷俊鎭紦瀛� + */ + @Override + public void deleteSendRTPServer(SendRtpItem sendRtpItem) { + deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),sendRtpItem.getCallId(), sendRtpItem.getStream()); } @Override @@ -313,14 +332,14 @@ } @Override - public void addStream(MediaServerItem mediaServerItem, String type, String app, String streamId, OnStreamChangedHookParam onStreamChangedHookParam) { + public void addStream(MediaServer mediaServerItem, String type, String app, String streamId, MediaInfo mediaInfo) { // 鏌ユ壘鏄惁浣跨敤浜哻allID StreamAuthorityInfo streamAuthorityInfo = getStreamAuthorityInfo(app, streamId); String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetting.getServerId() + "_" + type + "_" + app + "_" + streamId + "_" + mediaServerItem.getId(); if (streamAuthorityInfo != null) { - onStreamChangedHookParam.setCallId(streamAuthorityInfo.getCallId()); + mediaInfo.setCallId(streamAuthorityInfo.getCallId()); } - redisTemplate.opsForValue().set(key, onStreamChangedHookParam); + redisTemplate.opsForValue().set(key, mediaInfo); } @Override @@ -339,13 +358,13 @@ } @Override - public List<OnStreamChangedHookParam> getStreams(String mediaServerId, String type) { - List<OnStreamChangedHookParam> result = new ArrayList<>(); + public List<MediaInfo> getStreams(String mediaServerId, String type) { + List<MediaInfo> result = new ArrayList<>(); String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetting.getServerId() + "_" + type + "_*_*_" + mediaServerId; List<Object> streams = RedisUtil.scan(redisTemplate, key); for (Object stream : streams) { - OnStreamChangedHookParam onStreamChangedHookParam = (OnStreamChangedHookParam)redisTemplate.opsForValue().get(stream); - result.add(onStreamChangedHookParam); + MediaInfo mediaInfo = (MediaInfo)redisTemplate.opsForValue().get(stream); + result.add(mediaInfo); } return result; } @@ -464,14 +483,14 @@ @Override - public OnStreamChangedHookParam getStreamInfo(String app, String streamId, String mediaServerId) { + public MediaInfo getStreamInfo(String app, String streamId, String mediaServerId) { String scanKey = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetting.getServerId() + "_*_" + app + "_" + streamId + "_" + mediaServerId; - OnStreamChangedHookParam result = null; + MediaInfo result = null; List<Object> keys = RedisUtil.scan(redisTemplate, scanKey); if (keys.size() > 0) { String key = (String) keys.get(0); - result = JsonUtil.redisJsonToObject(redisTemplate, key, OnStreamChangedHookParam.class); + result = JsonUtil.redisJsonToObject(redisTemplate, key, MediaInfo.class); } return result; @@ -553,7 +572,7 @@ @Override public void sendMobilePositionMsg(JSONObject jsonObject) { String key = VideoManagerConstants.VM_MSG_SUBSCRIBE_MOBILE_POSITION; - logger.info("[redis鍙戦�侀�氱煡] 鍙戦�� 绉诲姩浣嶇疆 {}: {}", key, jsonObject.toString()); + logger.debug("[redis鍙戦�侀�氱煡] 鍙戦�� 绉诲姩浣嶇疆 {}: {}", key, jsonObject.toString()); redisTemplate.convertAndSend(key, jsonObject); } @@ -609,14 +628,13 @@ @Override public void sendDeviceOrChannelStatus(String deviceId, String channelId, boolean online) { String key = VideoManagerConstants.VM_MSG_SUBSCRIBE_DEVICE_STATUS; - logger.info("[redis閫氱煡] 鍙戦�� 鎺ㄩ�佽澶�/閫氶亾鐘舵�侊紝 {}/{}-{}", deviceId, channelId, online); StringBuilder msg = new StringBuilder(); msg.append(deviceId); if (channelId != null) { msg.append(":").append(channelId); } msg.append(" ").append(online? "ON":"OFF"); - logger.info("[redis閫氱煡] 鎺ㄩ�佺姸鎬�-> {} ", msg); + logger.info("[redis閫氱煡] 鎺ㄩ�佽澶�/閫氶亾鐘舵��-> {} ", msg); // 浣跨敤 RedisTemplate<Object, Object> 鍙戦�佸瓧绗︿覆娑堟伅浼氬鑷村彂閫佺殑娑堟伅澶氬甫浜嗗弻寮曞彿 stringRedisTemplate.convertAndSend(key, msg.toString()); } @@ -638,16 +656,84 @@ } @Override - public void sendPlatformStartPlayMsg(MessageForPushChannel msg) { - String key = VideoManagerConstants.VM_MSG_STREAM_START_PLAY_NOTIFY; - logger.info("[redis鍙戦�侀�氱煡] 鍙戦�� 鎺ㄦ祦琚笂绾у钩鍙拌鐪� {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId()); + public void sendPlatformStartPlayMsg(SendRtpItem sendRtpItem, ParentPlatform platform) { + if (sendRtpItem.getPlayType() == InviteStreamType.PUSH && platform != null) { + MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, sendRtpItem.getApp(), sendRtpItem.getStream(), + sendRtpItem.getChannelId(), platform.getServerGBId(), platform.getName(), userSetting.getServerId(), + sendRtpItem.getMediaServerId()); + messageForPushChannel.setPlatFormIndex(platform.getId()); + String key = VideoManagerConstants.VM_MSG_STREAM_START_PLAY_NOTIFY; + logger.info("[redis鍙戦�侀�氱煡] 鍙戦�� 鎺ㄦ祦琚笂绾у钩鍙拌鐪� {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), platform.getServerGBId()); + redisTemplate.convertAndSend(key, JSON.toJSON(messageForPushChannel)); + } + } + + @Override + public void sendPlatformStopPlayMsg(SendRtpItem sendRtpItem, ParentPlatform platform) { + + MessageForPushChannel msg = MessageForPushChannel.getInstance(0, + sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), + sendRtpItem.getPlatformId(), platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); + msg.setPlatFormIndex(platform.getId()); + + String key = VideoManagerConstants.VM_MSG_STREAM_STOP_PLAY_NOTIFY; + logger.info("[redis鍙戦�侀�氱煡] 鍙戦�� 涓婄骇骞冲彴鍋滄瑙傜湅 {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), platform.getServerGBId()); redisTemplate.convertAndSend(key, JSON.toJSON(msg)); } @Override - public void sendPlatformStopPlayMsg(MessageForPushChannel msg) { - String key = VideoManagerConstants.VM_MSG_STREAM_STOP_PLAY_NOTIFY; - logger.info("[redis鍙戦�侀�氱煡] 鍙戦�� 涓婄骇骞冲彴鍋滄瑙傜湅 {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId()); + public void addPushListItem(String app, String stream, MediaArrivalEvent event) { + String key = VideoManagerConstants.PUSH_STREAM_LIST + app + "_" + stream; + StreamPushItem streamPushItem = StreamPushItem.getInstance(event, userSetting.getServerId()); + redisTemplate.opsForValue().set(key, streamPushItem); + } + + @Override + public StreamPushItem getPushListItem(String app, String stream) { + String key = VideoManagerConstants.PUSH_STREAM_LIST + app + "_" + stream; + return (StreamPushItem)redisTemplate.opsForValue().get(key); + } + + @Override + public void removePushListItem(String app, String stream, String mediaServerId) { + String key = VideoManagerConstants.PUSH_STREAM_LIST + app + "_" + stream; + StreamPushItem param = (StreamPushItem)redisTemplate.opsForValue().get(key); + if (param != null && param.getMediaServerId().equalsIgnoreCase(mediaServerId)) { + redisTemplate.delete(key); + } + + } + + @Override + public void sendPushStreamClose(MessageForPushChannel msg) { + String key = VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED; + logger.info("[redis鍙戦�侀�氱煡] 鍙戦�� 鍋滄鍚戜笂绾ф帹娴� {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId()); redisTemplate.convertAndSend(key, JSON.toJSON(msg)); } + + @Override + public void addWaiteSendRtpItem(SendRtpItem sendRtpItem, int platformPlayTimeout) { + String key = VideoManagerConstants.WAITE_SEND_PUSH_STREAM + sendRtpItem.getApp() + "_" + sendRtpItem.getStream(); + redisTemplate.opsForValue().set(key, sendRtpItem); + } + + @Override + public SendRtpItem getWaiteSendRtpItem(String app, String stream) { + String key = VideoManagerConstants.WAITE_SEND_PUSH_STREAM + app + "_" + stream; + return JsonUtil.redisJsonToObject(redisTemplate, key, SendRtpItem.class); + } + + @Override + public void sendStartSendRtp(SendRtpItem sendRtpItem) { + String key = VideoManagerConstants.START_SEND_PUSH_STREAM + sendRtpItem.getApp() + "_" + sendRtpItem.getStream(); + logger.info("[redis鍙戦�侀�氱煡] 閫氱煡鍏朵粬WVP鎺ㄦ祦 {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getPlatformId()); + redisTemplate.convertAndSend(key, JSON.toJSON(sendRtpItem)); + } + + @Override + public void sendPushStreamOnline(SendRtpItem sendRtpItem) { + String key = VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED; + logger.info("[redis鍙戦�侀�氱煡] 娴佷笂绾� {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getPlatformId()); + redisTemplate.convertAndSend(key, JSON.toJSON(sendRtpItem)); + } } -- Gitblit v1.8.0