648540858
2024-04-16 b4168c02cba462571dd3f5bdc1d0b1ffddbc938a
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformWaitPushStreamOnlineListener.java
@@ -2,12 +2,15 @@
import com.alibaba.fastjson2.JSON;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -35,13 +38,25 @@
    private UserSetting userSetting;
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
    @Autowired
    private ZlmHttpHookSubscribe hookSubscribe;
    @Autowired
    private RedisPlatformPushStreamOnlineLister redisPlatformPushStreamOnlineLister;
    @Autowired
    private SSRCFactory ssrcFactory;
    @Qualifier("taskExecutor")
    @Autowired
    private ThreadPoolTaskExecutor taskExecutor;
    /**
     * 当上级点播时,这里负责监听等到流上线,流上线后如果是在当前服务则直接回调,如果是其他wvp,则由redis消息进行通知
     */
    @Override
    public void onMessage(Message message, byte[] bytes) {
        logger.info("[REDIS消息-收到上级等到设备推流的redis消息]: {}", new String(message.getBody()));
@@ -66,7 +81,17 @@
                                null);
                        hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> {
                            // 读取redis中的上级点播信息,生成sendRtpItm发送出去
                            SendRtpItem sendRtpItem = redisCatchStorage.getWaiteSendRtpItem(messageForPushChannel.getApp(), messageForPushChannel.getStream());
                            if (sendRtpItem.getSsrc() == null) {
                                // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式
                                String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItemInUse.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItemInUse.getId());
                                sendRtpItem.setSsrc(ssrc);
                                sendRtpItem.setMediaServerId(mediaServerItemInUse.getId());
                                sendRtpItem.setLocalIp(mediaServerItemInUse.getSdpIp());
                                redisPlatformPushStreamOnlineLister.sendStreamEvent(sendRtpItem);
                                // 通知其他wvp, 由RedisPlatformPushStreamOnlineLister接收此监听。
                                redisCatchStorage.sendPushStreamOnline(sendRtpItem);
                            }
                        });