648540858
2023-06-28 def56793ba7c636fbbcabad38e3ac113a3764087
增加上级推流和停止推流的通知
8个文件已修改
1个文件已添加
165 ■■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java 42 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java 76 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java
@@ -63,6 +63,7 @@
        container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE));
        container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE));
        container.addMessageListener(redisPushStreamResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE));
//        container.addMessageListener(, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED));
        return container;
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
@@ -3,6 +3,8 @@
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
@@ -14,6 +16,7 @@
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@@ -56,6 +59,9 @@
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
    @Autowired
    private UserSetting userSetting;
    @Autowired
    private IVideoManagerStorage storager;
@@ -155,6 +161,13 @@
        } else if (jsonObject.getInteger("code") == 0) {
            logger.info("调用ZLM推流接口, 结果: {}",  jsonObject);
            logger.info("RTP推流成功[ {}/{} ],{}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port"));
            if (sendRtpItem.getPlayType() == InviteStreamType.PUSH) {
                MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, sendRtpItem.getApp(), sendRtpItem.getStreamId(),
                        sendRtpItem.getChannelId(), parentPlatform.getServerGBId(), parentPlatform.getName(), userSetting.getServerId(),
                        sendRtpItem.getMediaServerId());
                messageForPushChannel.setPlatFormIndex(parentPlatform.getId());
                redisCatchStorage.sendPlatformStartPlayMsg(messageForPushChannel);
            }
        } else {
            logger.error("RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"), JSON.toJSONString(param));
            if (sendRtpItem.isOnlyAudio()) {
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
@@ -1,11 +1,9 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
@@ -15,6 +13,7 @@
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IPlatformService;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
@@ -25,7 +24,9 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.sip.*;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.address.SipURI;
import javax.sip.header.CallIdHeader;
import javax.sip.header.FromHeader;
@@ -52,6 +53,9 @@
    private IRedisCatchStorage redisCatchStorage;
    @Autowired
    private IPlatformService platformService;
    @Autowired
    private IDeviceService deviceService;
    @Autowired
@@ -68,6 +72,9 @@
    @Autowired
    private VideoStreamSessionManager streamSession;
    @Autowired
    private UserSetting userSetting;
    @Override
    public void afterPropertiesSet() throws Exception {
@@ -103,6 +110,19 @@
                MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                redisCatchStorage.deleteSendRTPServer(platformGbId, channelId, callIdHeader.getCallId(), null);
                zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param);
                if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) {
                    ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
                    if (platform != null) {
                        MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
                                sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(),
                                sendRtpItem.getPlatformId(), platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
                        messageForPushChannel.setPlatFormIndex(platform.getId());
                        redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
                    }else {
                        logger.info("[上级平台停止观看] 未找到平台{}的信息,发送redis消息失败", sendRtpItem.getPlatformId());
                    }
                }
                int totalReaderCount = zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId);
                if (totalReaderCount <= 0) {
                    logger.info("[收到bye] {} 无其它观看者,通知设备停止推流", streamId);
@@ -119,12 +139,12 @@
                            logger.error("[收到bye] {} 无其它观看者,通知设备停止推流, 发送BYE失败 {}",streamId, e.getMessage());
                        }
                    }
                    if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) {
                        MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
                                sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(),
                                sendRtpItem.getPlatformId(), null, null, sendRtpItem.getMediaServerId());
                        redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
                    }
//                    if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) {
//                        MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
//                                sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(),
//                                sendRtpItem.getPlatformId(), null, null, sendRtpItem.getMediaServerId());
//                        redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
//                    }
                }
            }
            // 可能是设备主动停止
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -19,6 +19,7 @@
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.*;
import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.vmanager.bean.DeferredResultEx;
@@ -463,6 +464,13 @@
                            }
                            redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(),
                                    sendRtpItem.getCallId(), sendRtpItem.getStreamId());
                            if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) {
                                MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
                                        sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(),
                                        sendRtpItem.getPlatformId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
                                messageForPushChannel.setPlatFormIndex(parentPlatform.getId());
                                redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
                            }
                        }
                    }
                }
src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java
@@ -34,7 +34,7 @@
    /**
     * 请求的平台自增ID
     */
    private String platFormIndex;
    private int platFormIndex;
    /**
     * 请求平台名称
@@ -132,11 +132,11 @@
        this.mediaServerId = mediaServerId;
    }
    public String getPlatFormIndex() {
    public int getPlatFormIndex() {
        return platFormIndex;
    }
    public void setPlatFormIndex(String platFormIndex) {
    public void setPlatFormIndex(int platFormIndex) {
        this.platFormIndex = platFormIndex;
    }
}
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
@@ -129,6 +129,7 @@
                                case WvpRedisMsgCmd.REQUEST_PUSH_STREAM:
                                    RequestPushStreamMsg param = JSON.to(RequestPushStreamMsg.class, wvpRedisMsg.getContent());
                                    requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
                                    break;
                                default:
                                    break;
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java
New file
@@ -0,0 +1,76 @@
package com.genersoft.iot.vmp.service.redisMsg;
import com.alibaba.fastjson2.JSON;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
 * 接收redis发送的结束推流请求
 * @author lin
 */
@Component
public class RedisPushStreamCloseResponseListener implements MessageListener {
    private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamCloseResponseListener.class);
    private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
    @Qualifier("taskExecutor")
    @Autowired
    private ThreadPoolTaskExecutor taskExecutor;
    private Map<String, PushStreamResponseEvent> responseEvents = new ConcurrentHashMap<>();
    public interface PushStreamResponseEvent{
        void run(MessageForPushChannelResponse response);
    }
    @Override
    public void onMessage(Message message, byte[] bytes) {
        logger.info("[REDIS消息-请求推流结果]: {}", new String(message.getBody()));
        boolean isEmpty = taskQueue.isEmpty();
        taskQueue.offer(message);
        if (isEmpty) {
            taskExecutor.execute(() -> {
                while (!taskQueue.isEmpty()) {
                    Message msg = taskQueue.poll();
                    try {
                        MessageForPushChannelResponse response = JSON.parseObject(new String(msg.getBody()), MessageForPushChannelResponse.class);
                        if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){
                            logger.info("[REDIS消息-请求推流结果]:参数不全");
                            continue;
                        }
                        // 查看正在等待的invite消息
                        if (responseEvents.get(response.getApp() + response.getStream()) != null) {
                            responseEvents.get(response.getApp() + response.getStream()).run(response);
                        }
                    }catch (Exception e) {
                        logger.warn("[REDIS消息-请求推流结果] 发现未处理的异常, \r\n{}", JSON.toJSONString(message));
                        logger.error("[REDIS消息-请求推流结果] 异常内容: ", e);
                    }
                }
            });
        }
    }
    public void addEvent(String app, String stream, PushStreamResponseEvent callback) {
        responseEvents.put(app + stream, callback);
    }
    public void removeEvent(String app, String stream) {
        responseEvents.remove(app + stream);
    }
}
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
@@ -263,4 +263,8 @@
    void removeAllDevice();
    void sendDeviceOrChannelStatus(String deviceId, String channelId, boolean online);
    void sendPlatformStartPlayMsg(MessageForPushChannel messageForPushChannel);
    void sendPlatformStopPlayMsg(MessageForPushChannel messageForPushChannel);
}
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -925,4 +925,18 @@
        // 使用 RedisTemplate<Object, Object> 发送字符串消息会导致发送的消息多带了双引号
        stringRedisTemplate.convertAndSend(key, msg.toString());
    }
    @Override
    public void sendPlatformStartPlayMsg(MessageForPushChannel msg) {
        String key = VideoManagerConstants.VM_MSG_STREAM_START_PLAY_NOTIFY;
        logger.info("[redis发送通知] 推流被上级平台观看 {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId());
        redisTemplate.convertAndSend(key, JSON.toJSON(msg));
    }
    @Override
    public void sendPlatformStopPlayMsg(MessageForPushChannel msg) {
        String key = VideoManagerConstants.VM_MSG_STREAM_STOP_PLAY_NOTIFY;
        logger.info("[redis发送通知] 上级平台停止观看 {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId());
        redisTemplate.convertAndSend(key, JSON.toJSON(msg));
    }
}