648540858
2024-05-29 764d04b497356ba6bcbb75fd42b51eca750f7223
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -12,6 +12,7 @@
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
@@ -81,12 +82,8 @@
    @Override
    public void resetAllCSEQ() {
        String scanKey = VideoManagerConstants.SIP_CSEQ_PREFIX  + userSetting.getServerId() + "_*";
        List<Object> keys = RedisUtil.scan(redisTemplate, scanKey);
        for (Object o : keys) {
            String key = (String) o;
            redisTemplate.opsForValue().set(key, 1);
        }
        String key = VideoManagerConstants.SIP_CSEQ_PREFIX  + userSetting.getServerId();
        redisTemplate.opsForValue().set(key, 1);
    }
    @Override
@@ -146,15 +143,26 @@
    @Override
    public void updateSendRTPSever(SendRtpItem sendRtpItem) {
        redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem);
    }
        String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX +
                userSetting.getServerId() + "_"
                + sendRtpItem.getMediaServerId() + "_"
                + sendRtpItem.getPlatformId() + "_"
                + sendRtpItem.getChannelId() + "_"
                + sendRtpItem.getStream() + "_"
                + sendRtpItem.getCallId();
        redisTemplate.opsForValue().set(key, sendRtpItem);
    @Override
    public List<SendRtpItem> querySendRTPServer(String platformGbId, String channelId, String streamId) {
        String scanKey = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX
                + userSetting.getServerId() + "_*_"
                + platformGbId + "_"
                + channelId + "_"
                + streamId + "_"
                + "*";
        List<SendRtpItem> result = new ArrayList<>();
        List<Object> scan = RedisUtil.scan(redisTemplate, scanKey);
        if (!scan.isEmpty()) {
            for (Object o : scan) {
                String key = (String) o;
                result.add(JsonUtil.redisJsonToObject(redisTemplate, key, SendRtpItem.class));
            }
        }
        return result;
    }
    @Override
@@ -172,7 +180,7 @@
            callId = "*";
        }
        String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX
                + userSetting.getServerId() + "_*_"
                + "*_*_"
                + platformGbId + "_"
                + channelId + "_"
                + streamId + "_"
@@ -268,9 +276,18 @@
        List<Object> scan = RedisUtil.scan(redisTemplate, key);
        if (scan.size() > 0) {
            for (Object keyStr : scan) {
                logger.info("[删除 redis的SendRTP]: {}", keyStr.toString());
                redisTemplate.delete(keyStr);
            }
        }
    }
    /**
     * 删除RTP推送信息缓存
     */
    @Override
    public void deleteSendRTPServer(SendRtpItem sendRtpItem) {
        deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),sendRtpItem.getCallId(), sendRtpItem.getStream());
    }
    @Override
@@ -555,7 +572,7 @@
    @Override
    public void sendMobilePositionMsg(JSONObject jsonObject) {
        String key = VideoManagerConstants.VM_MSG_SUBSCRIBE_MOBILE_POSITION;
        logger.info("[redis发送通知] 发送 移动位置 {}: {}", key, jsonObject.toString());
        logger.debug("[redis发送通知] 发送 移动位置 {}: {}", key, jsonObject.toString());
        redisTemplate.convertAndSend(key, jsonObject);
    }
@@ -639,16 +656,28 @@
    }
    @Override
    public void sendPlatformStartPlayMsg(MessageForPushChannel msg) {
        String key = VideoManagerConstants.VM_MSG_STREAM_START_PLAY_NOTIFY;
        logger.info("[redis发送通知] 发送 推流被上级平台观看 {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId());
        redisTemplate.convertAndSend(key, JSON.toJSON(msg));
    public void sendPlatformStartPlayMsg(SendRtpItem sendRtpItem, ParentPlatform platform) {
        if (sendRtpItem.getPlayType() == InviteStreamType.PUSH && platform  != null) {
            MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, sendRtpItem.getApp(), sendRtpItem.getStream(),
                    sendRtpItem.getChannelId(), platform.getServerGBId(), platform.getName(), userSetting.getServerId(),
                    sendRtpItem.getMediaServerId());
            messageForPushChannel.setPlatFormIndex(platform.getId());
            String key = VideoManagerConstants.VM_MSG_STREAM_START_PLAY_NOTIFY;
            logger.info("[redis发送通知] 发送 推流被上级平台观看 {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), platform.getServerGBId());
            redisTemplate.convertAndSend(key, JSON.toJSON(messageForPushChannel));
        }
    }
    @Override
    public void sendPlatformStopPlayMsg(MessageForPushChannel msg) {
    public void sendPlatformStopPlayMsg(SendRtpItem sendRtpItem, ParentPlatform platform) {
        MessageForPushChannel msg = MessageForPushChannel.getInstance(0,
                sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
                sendRtpItem.getPlatformId(), platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
        msg.setPlatFormIndex(platform.getId());
        String key = VideoManagerConstants.VM_MSG_STREAM_STOP_PLAY_NOTIFY;
        logger.info("[redis发送通知] 发送 上级平台停止观看 {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId());
        logger.info("[redis发送通知] 发送 上级平台停止观看 {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), platform.getServerGBId());
        redisTemplate.convertAndSend(key, JSON.toJSON(msg));
    }
@@ -681,4 +710,30 @@
        logger.info("[redis发送通知] 发送 停止向上级推流 {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId());
        redisTemplate.convertAndSend(key, JSON.toJSON(msg));
    }
    @Override
    public void addWaiteSendRtpItem(SendRtpItem sendRtpItem, int platformPlayTimeout) {
        String key = VideoManagerConstants.WAITE_SEND_PUSH_STREAM + sendRtpItem.getApp() + "_" + sendRtpItem.getStream();
        redisTemplate.opsForValue().set(key, sendRtpItem);
    }
    @Override
    public SendRtpItem getWaiteSendRtpItem(String app, String stream) {
        String key = VideoManagerConstants.WAITE_SEND_PUSH_STREAM + app + "_" + stream;
        return JsonUtil.redisJsonToObject(redisTemplate, key, SendRtpItem.class);
    }
    @Override
    public void sendStartSendRtp(SendRtpItem sendRtpItem) {
        String key = VideoManagerConstants.START_SEND_PUSH_STREAM + sendRtpItem.getApp() + "_" + sendRtpItem.getStream();
        logger.info("[redis发送通知] 通知其他WVP推流 {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getPlatformId());
        redisTemplate.convertAndSend(key, JSON.toJSON(sendRtpItem));
    }
    @Override
    public void sendPushStreamOnline(SendRtpItem sendRtpItem) {
        String key = VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED;
        logger.info("[redis发送通知] 流上线 {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getPlatformId());
        redisTemplate.convertAndSend(key, JSON.toJSON(sendRtpItem));
    }
}