koisi
2024-08-05 8985905bbe9307cbe10b49d9efec082eebb45ada
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
old mode 100644 new mode 100755
@@ -5,17 +5,17 @@
import com.genersoft.iot.vmp.common.SystemAllInfo;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.AlarmChannelMessage;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper;
import com.genersoft.iot.vmp.storager.dao.DeviceMapper;
import com.genersoft.iot.vmp.storager.dao.dto.PlatformRegisterInfo;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.utils.JsonUtil;
@@ -39,6 +39,9 @@
    @Autowired
    private DeviceChannelMapper deviceChannelMapper;
    @Autowired
    private DeviceMapper deviceMapper;
    @Autowired
    private UserSetting userSetting;
@@ -75,12 +78,8 @@
    @Override
    public void resetAllCSEQ() {
        String scanKey = VideoManagerConstants.SIP_CSEQ_PREFIX  + userSetting.getServerId() + "_*";
        List<Object> keys = RedisUtil.scan(redisTemplate, scanKey);
        for (Object o : keys) {
            String key = (String) o;
            redisTemplate.opsForValue().set(key, 1);
        }
        String key = VideoManagerConstants.SIP_CSEQ_PREFIX  + userSetting.getServerId();
        redisTemplate.opsForValue().set(key, 1);
    }
    @Override
@@ -140,15 +139,26 @@
    @Override
    public void updateSendRTPSever(SendRtpItem sendRtpItem) {
        redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem);
    }
        String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX +
                userSetting.getServerId() + "_"
                + sendRtpItem.getMediaServerId() + "_"
                + sendRtpItem.getPlatformId() + "_"
                + sendRtpItem.getChannelId() + "_"
                + sendRtpItem.getStreamId() + "_"
                + sendRtpItem.getCallId();
        redisTemplate.opsForValue().set(key, sendRtpItem);
    @Override
    public List<SendRtpItem> querySendRTPServer(String platformGbId, String channelId, String streamId) {
        String scanKey = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX
                + userSetting.getServerId() + "_*_"
                + platformGbId + "_"
                + channelId + "_"
                + streamId + "_"
                + "*";
        List<SendRtpItem> result = new ArrayList<>();
        List<Object> scan = RedisUtil.scan(redisTemplate, scanKey);
        if (!scan.isEmpty()) {
            for (Object o : scan) {
                String key = (String) o;
                result.add(JsonUtil.redisJsonToObject(redisTemplate, key, SendRtpItem.class));
            }
        }
        return result;
    }
    @Override
@@ -166,7 +176,7 @@
            callId = "*";
        }
        String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX
                + userSetting.getServerId() + "_*_"
                + "*_*_"
                + platformGbId + "_"
                + channelId + "_"
                + streamId + "_"
@@ -180,7 +190,7 @@
    }
    @Override
    public List<SendRtpItem> querySendRTPServerByChnnelId(String channelId) {
    public List<SendRtpItem> querySendRTPServerByChannelId(String channelId) {
        if (channelId == null) {
            return null;
        }
@@ -262,9 +272,18 @@
        List<Object> scan = RedisUtil.scan(redisTemplate, key);
        if (scan.size() > 0) {
            for (Object keyStr : scan) {
                logger.info("[删除 redis的SendRTP]: {}", keyStr.toString());
                redisTemplate.delete(keyStr);
            }
        }
    }
    /**
     * 删除RTP推送信息缓存
     */
    @Override
    public void deleteSendRTPServer(SendRtpItem sendRtpItem) {
        deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),sendRtpItem.getCallId(), sendRtpItem.getStream());
    }
    @Override
@@ -304,19 +323,19 @@
    @Override
    public void sendStreamChangeMsg(String type, JSONObject jsonObject) {
        String key = VideoManagerConstants.WVP_MSG_STREAM_CHANGE_PREFIX + type;
        logger.info("[redis 流变化事件] {}: {}", key, jsonObject.toString());
        logger.info("[redis 流变化事件] 发送 {}: {}", key, jsonObject.toString());
        redisTemplate.convertAndSend(key, jsonObject);
    }
    @Override
    public void addStream(MediaServerItem mediaServerItem, String type, String app, String streamId, OnStreamChangedHookParam onStreamChangedHookParam) {
    public void addStream(MediaServer mediaServerItem, String type, String app, String streamId, MediaInfo mediaInfo) {
        // 查找是否使用了callID
        StreamAuthorityInfo streamAuthorityInfo = getStreamAuthorityInfo(app, streamId);
        String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX  + userSetting.getServerId() + "_" + type + "_" + app + "_" + streamId + "_" + mediaServerItem.getId();
        if (streamAuthorityInfo != null) {
            onStreamChangedHookParam.setCallId(streamAuthorityInfo.getCallId());
            mediaInfo.setCallId(streamAuthorityInfo.getCallId());
        }
        redisTemplate.opsForValue().set(key, onStreamChangedHookParam);
        redisTemplate.opsForValue().set(key, mediaInfo);
    }
    @Override
@@ -335,13 +354,13 @@
    }
    @Override
    public List<OnStreamChangedHookParam> getStreams(String mediaServerId, String type) {
        List<OnStreamChangedHookParam> result = new ArrayList<>();
    public List<MediaInfo> getStreams(String mediaServerId, String type) {
        List<MediaInfo> result = new ArrayList<>();
        String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetting.getServerId() + "_" + type + "_*_*_" + mediaServerId;
        List<Object> streams = RedisUtil.scan(redisTemplate, key);
        for (Object stream : streams) {
            OnStreamChangedHookParam onStreamChangedHookParam = (OnStreamChangedHookParam)redisTemplate.opsForValue().get(stream);
            result.add(onStreamChangedHookParam);
            MediaInfo mediaInfo = (MediaInfo)redisTemplate.opsForValue().get(stream);
            result.add(mediaInfo);
        }
        return result;
    }
@@ -375,7 +394,8 @@
        for (Object o : keys) {
            String key = (String) o;
            Device device = JsonUtil.redisJsonToObject(redisTemplate, key, Device.class);
            if (Objects.nonNull(device)) { // 只取没有存过得
            if (Objects.nonNull(device)) {
                // 只取没有存过得
                result.add(JsonUtil.redisJsonToObject(redisTemplate, key, Device.class));
            }
        }
@@ -386,14 +406,22 @@
    @Override
    public Device getDevice(String deviceId) {
        String key = VideoManagerConstants.DEVICE_PREFIX + userSetting.getServerId() + "_" + deviceId;
        return JsonUtil.redisJsonToObject(redisTemplate, key, Device.class);
        Device device = JsonUtil.redisJsonToObject(redisTemplate, key, Device.class);
        if (device == null){
            device = deviceMapper.getDeviceByDeviceId(deviceId);
            if (device != null) {
                updateDevice(device);
            }
        }
        return device;
    }
    @Override
    public void updateGpsMsgInfo(GPSMsgInfo gpsMsgInfo) {
        String key = VideoManagerConstants.WVP_STREAM_GPS_MSG_PREFIX + userSetting.getServerId() + "_" + gpsMsgInfo.getId();
        Duration duration = Duration.ofSeconds(60L);
        redisTemplate.opsForValue().set(key, gpsMsgInfo, duration); // 默认GPS消息保存1分钟
        redisTemplate.opsForValue().set(key, gpsMsgInfo, duration);
        // 默认GPS消息保存1分钟
    }
    @Override
@@ -451,14 +479,28 @@
    @Override
    public OnStreamChangedHookParam getStreamInfo(String app, String streamId, String mediaServerId) {
    public MediaInfo getStreamInfo(String app, String streamId, String mediaServerId) {
        String scanKey = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX  + userSetting.getServerId() + "_*_" + app + "_" + streamId + "_" + mediaServerId;
        OnStreamChangedHookParam result = null;
        MediaInfo result = null;
        List<Object> keys = RedisUtil.scan(redisTemplate, scanKey);
        if (keys.size() > 0) {
            String key = (String) keys.get(0);
            result = JsonUtil.redisJsonToObject(redisTemplate, key, OnStreamChangedHookParam.class);
            result = JsonUtil.redisJsonToObject(redisTemplate, key, MediaInfo.class);
        }
        return result;
    }
    @Override
    public MediaInfo getProxyStream(String app, String streamId) {
        String scanKey = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX  + userSetting.getServerId() + "_PULL_" + app + "_" + streamId + "_*";
        MediaInfo result = null;
        List<Object> keys = RedisUtil.scan(redisTemplate, scanKey);
        if (keys.size() > 0) {
            String key = (String) keys.get(0);
            result = JsonUtil.redisJsonToObject(redisTemplate, key, MediaInfo.class);
        }
        return result;
@@ -540,14 +582,14 @@
    @Override
    public void sendMobilePositionMsg(JSONObject jsonObject) {
        String key = VideoManagerConstants.VM_MSG_SUBSCRIBE_MOBILE_POSITION;
        logger.info("[redis发送通知] 移动位置 {}: {}", key, jsonObject.toString());
        logger.debug("[redis发送通知] 发送 移动位置 {}: {}", key, jsonObject.toString());
        redisTemplate.convertAndSend(key, jsonObject);
    }
    @Override
    public void sendStreamPushRequestedMsg(MessageForPushChannel msg) {
        String key = VideoManagerConstants.VM_MSG_STREAM_PUSH_REQUESTED;
        logger.info("[redis发送通知] 推流被请求 {}: {}/{}", key, msg.getApp(), msg.getStream());
        logger.info("[redis发送通知] 发送 推流被请求 {}: {}/{}", key, msg.getApp(), msg.getStream());
        redisTemplate.convertAndSend(key, JSON.toJSON(msg));
    }
@@ -555,7 +597,7 @@
    public void sendAlarmMsg(AlarmChannelMessage msg) {
        // 此消息用于对接第三方服务下级来的消息内容
        String key = VideoManagerConstants.VM_MSG_SUBSCRIBE_ALARM;
        logger.info("[redis发送通知] 报警{}: {}", key, JSON.toJSON(msg));
        logger.info("[redis发送通知] 发送 报警{}: {}", key, JSON.toJSON(msg));
        redisTemplate.convertAndSend(key, JSON.toJSON(msg));
    }
@@ -568,7 +610,7 @@
    @Override
    public void sendStreamPushRequestedMsgForStatus() {
        String key = VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED;
        logger.info("[redis通知]获取所有推流设备的状态");
        logger.info("[redis通知] 发送 获取所有推流设备的状态");
        JSONObject jsonObject = new JSONObject();
        jsonObject.put(key, key);
        redisTemplate.convertAndSend(key, jsonObject);
@@ -587,15 +629,6 @@
    }
    @Override
    public int getGbReceiveCount(String id) {
        String playKey = VideoManagerConstants.PLAYER_PREFIX + "_" + userSetting.getServerId() + "_" + id + "_*";
        String playBackKey = VideoManagerConstants.PLAY_BLACK_PREFIX + "_" + userSetting.getServerId() + "_" + id + "_*";
        String downloadKey = VideoManagerConstants.DOWNLOAD_PREFIX + "_" + userSetting.getServerId() + "_" + id + "_*";
        return RedisUtil.scan(redisTemplate, playKey).size() + RedisUtil.scan(redisTemplate, playBackKey).size() + RedisUtil.scan(redisTemplate, downloadKey).size();
    }
    @Override
    public int getGbSendCount(String id) {
        String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX
                + userSetting.getServerId() + "_*_" + id + "_*";
@@ -605,19 +638,111 @@
    @Override
    public void sendDeviceOrChannelStatus(String deviceId, String channelId, boolean online) {
        String key = VideoManagerConstants.VM_MSG_SUBSCRIBE_DEVICE_STATUS;
        if (channelId == null) {
            logger.info("[redis通知] 推送设备状态, {}-{}", deviceId, online);
        }else {
            logger.info("[redis通知] 推送通道状态, {}/{}-{}", deviceId, channelId, online);
        }
        StringBuilder msg = new StringBuilder();
        msg.append(deviceId);
        if (channelId != null) {
            msg.append(":").append(channelId);
        }
        msg.append(" ").append(online? "ON":"OFF");
        logger.info("[redis通知] 推送设备/通道状态-> {} ", msg);
        // 使用 RedisTemplate<Object, Object> 发送字符串消息会导致发送的消息多带了双引号
        stringRedisTemplate.convertAndSend(key, msg.toString());
    }
    @Override
    public void sendChannelAddOrDelete(String deviceId, String channelId, boolean add) {
        String key = VideoManagerConstants.VM_MSG_SUBSCRIBE_DEVICE_STATUS;
        StringBuilder msg = new StringBuilder();
        msg.append(deviceId);
        if (channelId != null) {
            msg.append(":").append(channelId);
        }
        msg.append(" ").append(add? "ADD":"DELETE");
        logger.info("[redis通知] 推送通道-> {}", msg);
        // 使用 RedisTemplate<Object, Object> 发送字符串消息会导致发送的消息多带了双引号
        stringRedisTemplate.convertAndSend(key, msg.toString());
    }
    @Override
    public void sendPlatformStartPlayMsg(SendRtpItem sendRtpItem, ParentPlatform platform) {
        if (sendRtpItem.getPlayType() == InviteStreamType.PUSH && platform  != null) {
            MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, sendRtpItem.getApp(), sendRtpItem.getStream(),
                    sendRtpItem.getChannelId(), platform.getServerGBId(), platform.getName(), userSetting.getServerId(),
                    sendRtpItem.getMediaServerId());
            messageForPushChannel.setPlatFormIndex(platform.getId());
            String key = VideoManagerConstants.VM_MSG_STREAM_START_PLAY_NOTIFY;
            logger.info("[redis发送通知] 发送 推流被上级平台观看 {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), platform.getServerGBId());
            redisTemplate.convertAndSend(key, JSON.toJSON(messageForPushChannel));
        }
    }
    @Override
    public void sendPlatformStopPlayMsg(SendRtpItem sendRtpItem, ParentPlatform platform) {
        MessageForPushChannel msg = MessageForPushChannel.getInstance(0,
                sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
                sendRtpItem.getPlatformId(), platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
        msg.setPlatFormIndex(platform.getId());
        String key = VideoManagerConstants.VM_MSG_STREAM_STOP_PLAY_NOTIFY;
        logger.info("[redis发送通知] 发送 上级平台停止观看 {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), platform.getServerGBId());
        redisTemplate.convertAndSend(key, JSON.toJSON(msg));
    }
    @Override
    public void addPushListItem(String app, String stream, MediaArrivalEvent event) {
        String key = VideoManagerConstants.PUSH_STREAM_LIST + app + "_" + stream;
        redisTemplate.opsForValue().set(key, event.getHookParam());
    }
    @Override
    public OnStreamChangedHookParam getPushListItem(String app, String stream) {
        String key = VideoManagerConstants.PUSH_STREAM_LIST + app + "_" + stream;
        return (OnStreamChangedHookParam)redisTemplate.opsForValue().get(key);
    }
    @Override
    public void removePushListItem(String app, String stream, String mediaServerId) {
        String key = VideoManagerConstants.PUSH_STREAM_LIST + app + "_" + stream;
        OnStreamChangedHookParam param = (OnStreamChangedHookParam)redisTemplate.opsForValue().get(key);
        if (param != null && param.getMediaServerId().equalsIgnoreCase(mediaServerId)) {
            redisTemplate.delete(key);
        }
    }
    @Override
    public void sendPushStreamClose(MessageForPushChannel msg) {
        String key = VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED;
        logger.info("[redis发送通知] 发送 停止向上级推流 {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId());
        redisTemplate.convertAndSend(key, JSON.toJSON(msg));
    }
    @Override
    public void addWaiteSendRtpItem(SendRtpItem sendRtpItem, int platformPlayTimeout) {
        String key = VideoManagerConstants.WAITE_SEND_PUSH_STREAM + sendRtpItem.getApp() + "_" + sendRtpItem.getStream();
        redisTemplate.opsForValue().set(key, sendRtpItem);
    }
    @Override
    public SendRtpItem getWaiteSendRtpItem(String app, String stream) {
        String key = VideoManagerConstants.WAITE_SEND_PUSH_STREAM + app + "_" + stream;
        return JsonUtil.redisJsonToObject(redisTemplate, key, SendRtpItem.class);
    }
    @Override
    public void sendStartSendRtp(SendRtpItem sendRtpItem) {
        String key = VideoManagerConstants.START_SEND_PUSH_STREAM + sendRtpItem.getApp() + "_" + sendRtpItem.getStream();
        logger.info("[redis发送通知] 通知其他WVP推流 {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getPlatformId());
        redisTemplate.convertAndSend(key, JSON.toJSON(sendRtpItem));
    }
    @Override
    public void sendPushStreamOnline(SendRtpItem sendRtpItem) {
        String key = VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED;
        logger.info("[redis发送通知] 流上线 {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getPlatformId());
        redisTemplate.convertAndSend(key, JSON.toJSON(sendRtpItem));
    }
}