koisi
2024-08-05 8985905bbe9307cbe10b49d9efec082eebb45ada
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -5,11 +5,10 @@
import com.genersoft.iot.vmp.common.SystemAllInfo;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.AlarmChannelMessage;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
@@ -79,12 +78,8 @@
    @Override
    public void resetAllCSEQ() {
        String scanKey = VideoManagerConstants.SIP_CSEQ_PREFIX  + userSetting.getServerId() + "_*";
        List<Object> keys = RedisUtil.scan(redisTemplate, scanKey);
        for (Object o : keys) {
            String key = (String) o;
            redisTemplate.opsForValue().set(key, 1);
        }
        String key = VideoManagerConstants.SIP_CSEQ_PREFIX  + userSetting.getServerId();
        redisTemplate.opsForValue().set(key, 1);
    }
    @Override
@@ -144,15 +139,26 @@
    @Override
    public void updateSendRTPSever(SendRtpItem sendRtpItem) {
        redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem);
    }
        String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX +
                userSetting.getServerId() + "_"
                + sendRtpItem.getMediaServerId() + "_"
                + sendRtpItem.getPlatformId() + "_"
                + sendRtpItem.getChannelId() + "_"
                + sendRtpItem.getStream() + "_"
                + sendRtpItem.getCallId();
        redisTemplate.opsForValue().set(key, sendRtpItem);
    @Override
    public List<SendRtpItem> querySendRTPServer(String platformGbId, String channelId, String streamId) {
        String scanKey = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX
                + userSetting.getServerId() + "_*_"
                + platformGbId + "_"
                + channelId + "_"
                + streamId + "_"
                + "*";
        List<SendRtpItem> result = new ArrayList<>();
        List<Object> scan = RedisUtil.scan(redisTemplate, scanKey);
        if (!scan.isEmpty()) {
            for (Object o : scan) {
                String key = (String) o;
                result.add(JsonUtil.redisJsonToObject(redisTemplate, key, SendRtpItem.class));
            }
        }
        return result;
    }
    @Override
@@ -170,7 +176,7 @@
            callId = "*";
        }
        String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX
                + userSetting.getServerId() + "_*_"
                + "*_*_"
                + platformGbId + "_"
                + channelId + "_"
                + streamId + "_"
@@ -266,9 +272,18 @@
        List<Object> scan = RedisUtil.scan(redisTemplate, key);
        if (scan.size() > 0) {
            for (Object keyStr : scan) {
                logger.info("[删除 redis的SendRTP]: {}", keyStr.toString());
                redisTemplate.delete(keyStr);
            }
        }
    }
    /**
     * 删除RTP推送信息缓存
     */
    @Override
    public void deleteSendRTPServer(SendRtpItem sendRtpItem) {
        deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),sendRtpItem.getCallId(), sendRtpItem.getStream());
    }
    @Override
@@ -313,14 +328,14 @@
    }
    @Override
    public void addStream(MediaServerItem mediaServerItem, String type, String app, String streamId, OnStreamChangedHookParam onStreamChangedHookParam) {
    public void addStream(MediaServer mediaServerItem, String type, String app, String streamId, MediaInfo mediaInfo) {
        // 查找是否使用了callID
        StreamAuthorityInfo streamAuthorityInfo = getStreamAuthorityInfo(app, streamId);
        String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX  + userSetting.getServerId() + "_" + type + "_" + app + "_" + streamId + "_" + mediaServerItem.getId();
        if (streamAuthorityInfo != null) {
            onStreamChangedHookParam.setCallId(streamAuthorityInfo.getCallId());
            mediaInfo.setCallId(streamAuthorityInfo.getCallId());
        }
        redisTemplate.opsForValue().set(key, onStreamChangedHookParam);
        redisTemplate.opsForValue().set(key, mediaInfo);
    }
    @Override
@@ -339,13 +354,13 @@
    }
    @Override
    public List<OnStreamChangedHookParam> getStreams(String mediaServerId, String type) {
        List<OnStreamChangedHookParam> result = new ArrayList<>();
    public List<MediaInfo> getStreams(String mediaServerId, String type) {
        List<MediaInfo> result = new ArrayList<>();
        String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetting.getServerId() + "_" + type + "_*_*_" + mediaServerId;
        List<Object> streams = RedisUtil.scan(redisTemplate, key);
        for (Object stream : streams) {
            OnStreamChangedHookParam onStreamChangedHookParam = (OnStreamChangedHookParam)redisTemplate.opsForValue().get(stream);
            result.add(onStreamChangedHookParam);
            MediaInfo mediaInfo = (MediaInfo)redisTemplate.opsForValue().get(stream);
            result.add(mediaInfo);
        }
        return result;
    }
@@ -464,14 +479,28 @@
    @Override
    public OnStreamChangedHookParam getStreamInfo(String app, String streamId, String mediaServerId) {
    public MediaInfo getStreamInfo(String app, String streamId, String mediaServerId) {
        String scanKey = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX  + userSetting.getServerId() + "_*_" + app + "_" + streamId + "_" + mediaServerId;
        OnStreamChangedHookParam result = null;
        MediaInfo result = null;
        List<Object> keys = RedisUtil.scan(redisTemplate, scanKey);
        if (keys.size() > 0) {
            String key = (String) keys.get(0);
            result = JsonUtil.redisJsonToObject(redisTemplate, key, OnStreamChangedHookParam.class);
            result = JsonUtil.redisJsonToObject(redisTemplate, key, MediaInfo.class);
        }
        return result;
    }
    @Override
    public MediaInfo getProxyStream(String app, String streamId) {
        String scanKey = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX  + userSetting.getServerId() + "_PULL_" + app + "_" + streamId + "_*";
        MediaInfo result = null;
        List<Object> keys = RedisUtil.scan(redisTemplate, scanKey);
        if (keys.size() > 0) {
            String key = (String) keys.get(0);
            result = JsonUtil.redisJsonToObject(redisTemplate, key, MediaInfo.class);
        }
        return result;
@@ -553,7 +582,7 @@
    @Override
    public void sendMobilePositionMsg(JSONObject jsonObject) {
        String key = VideoManagerConstants.VM_MSG_SUBSCRIBE_MOBILE_POSITION;
        logger.info("[redis发送通知] 发送 移动位置 {}: {}", key, jsonObject.toString());
        logger.debug("[redis发送通知] 发送 移动位置 {}: {}", key, jsonObject.toString());
        redisTemplate.convertAndSend(key, jsonObject);
    }
@@ -637,23 +666,35 @@
    }
    @Override
    public void sendPlatformStartPlayMsg(MessageForPushChannel msg) {
        String key = VideoManagerConstants.VM_MSG_STREAM_START_PLAY_NOTIFY;
        logger.info("[redis发送通知] 发送 推流被上级平台观看 {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId());
        redisTemplate.convertAndSend(key, JSON.toJSON(msg));
    public void sendPlatformStartPlayMsg(SendRtpItem sendRtpItem, ParentPlatform platform) {
        if (sendRtpItem.getPlayType() == InviteStreamType.PUSH && platform  != null) {
            MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, sendRtpItem.getApp(), sendRtpItem.getStream(),
                    sendRtpItem.getChannelId(), platform.getServerGBId(), platform.getName(), userSetting.getServerId(),
                    sendRtpItem.getMediaServerId());
            messageForPushChannel.setPlatFormIndex(platform.getId());
            String key = VideoManagerConstants.VM_MSG_STREAM_START_PLAY_NOTIFY;
            logger.info("[redis发送通知] 发送 推流被上级平台观看 {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), platform.getServerGBId());
            redisTemplate.convertAndSend(key, JSON.toJSON(messageForPushChannel));
        }
    }
    @Override
    public void sendPlatformStopPlayMsg(MessageForPushChannel msg) {
    public void sendPlatformStopPlayMsg(SendRtpItem sendRtpItem, ParentPlatform platform) {
        MessageForPushChannel msg = MessageForPushChannel.getInstance(0,
                sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
                sendRtpItem.getPlatformId(), platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
        msg.setPlatFormIndex(platform.getId());
        String key = VideoManagerConstants.VM_MSG_STREAM_STOP_PLAY_NOTIFY;
        logger.info("[redis发送通知] 发送 上级平台停止观看 {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId());
        logger.info("[redis发送通知] 发送 上级平台停止观看 {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), platform.getServerGBId());
        redisTemplate.convertAndSend(key, JSON.toJSON(msg));
    }
    @Override
    public void addPushListItem(String app, String stream, OnStreamChangedHookParam param) {
    public void addPushListItem(String app, String stream, MediaArrivalEvent event) {
        String key = VideoManagerConstants.PUSH_STREAM_LIST + app + "_" + stream;
        redisTemplate.opsForValue().set(key, param);
        redisTemplate.opsForValue().set(key, event.getHookParam());
    }
    @Override