xubinbin
2023-12-12 42a2772d1aa7493bcc4fac3e24ee8eda4eebc23d
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java
old mode 100644 new mode 100755
@@ -1,20 +1,34 @@
package com.genersoft.iot.vmp.service.redisMsg;
import com.alibaba.fastjson2.JSON;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import java.text.ParseException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
 * 接收redis发送的结束推流请求
@@ -25,11 +39,26 @@
    private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamCloseResponseListener.class);
    private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
    @Qualifier("taskExecutor")
    @Autowired
    private ThreadPoolTaskExecutor taskExecutor;
    private IStreamPushService streamPushService;
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
    @Autowired
    private IVideoManagerStorage storager;
    @Autowired
    private ISIPCommanderForPlatform commanderFroPlatform;
    @Autowired
    private UserSetting userSetting;
    @Autowired
    private IMediaServerService mediaServerService;
    @Autowired
    private ZLMServerFactory zlmServerFactory;
    private Map<String, PushStreamResponseEvent> responseEvents = new ConcurrentHashMap<>();
@@ -40,30 +69,45 @@
    @Override
    public void onMessage(Message message, byte[] bytes) {
        logger.info("[REDIS消息-请求推流结果]: {}", new String(message.getBody()));
        boolean isEmpty = taskQueue.isEmpty();
        taskQueue.offer(message);
        if (isEmpty) {
            taskExecutor.execute(() -> {
                while (!taskQueue.isEmpty()) {
                    Message msg = taskQueue.poll();
                    try {
                        MessageForPushChannelResponse response = JSON.parseObject(new String(msg.getBody()), MessageForPushChannelResponse.class);
                        if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){
                            logger.info("[REDIS消息-请求推流结果]:参数不全");
                            continue;
        logger.info("[REDIS消息-推流结束]: {}", new String(message.getBody()));
        MessageForPushChannel pushChannel = JSON.parseObject(message.getBody(), MessageForPushChannel.class);
        StreamPushItem push = streamPushService.getPush(pushChannel.getApp(), pushChannel.getStream());
        if (push != null) {
            if (redisCatchStorage.isChannelSendingRTP(push.getGbId())) {
                List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId(
                        push.getGbId());
                if (sendRtpItems.size() > 0) {
                    for (SendRtpItem sendRtpItem : sendRtpItems) {
                        ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
                        // 停止向上级推流
                        String streamId = sendRtpItem.getStreamId();
                        Map<String, Object> param = new HashMap<>();
                        param.put("vhost","__defaultVhost__");
                        param.put("app",sendRtpItem.getApp());
                        param.put("stream",streamId);
                        param.put("ssrc",sendRtpItem.getSsrc());
                        logger.info("[REDIS消息-推流结束] 停止向上级推流:{}", streamId);
                        MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                        redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStreamId());
                        zlmServerFactory.stopSendRtpStream(mediaInfo, param);
                        try {
                            commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem);
                        } catch (SipException | InvalidArgumentException | ParseException e) {
                            logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
                        }
                        // 查看正在等待的invite消息
                        if (responseEvents.get(response.getApp() + response.getStream()) != null) {
                            responseEvents.get(response.getApp() + response.getStream()).run(response);
                        if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) {
                            MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
                                    sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(),
                                    sendRtpItem.getPlatformId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
                            messageForPushChannel.setPlatFormIndex(parentPlatform.getId());
                            redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
                        }
                    }catch (Exception e) {
                        logger.warn("[REDIS消息-请求推流结果] 发现未处理的异常, \r\n{}", JSON.toJSONString(message));
                        logger.error("[REDIS消息-请求推流结果] 异常内容: ", e);
                    }
                }
            });
            }
        }
    }
    public void addEvent(String app, String stream, PushStreamResponseEvent callback) {