648540858
2022-09-14 5b3dc4d5957050c2ce3e3c0013337168d8c9f700
优化点播结束后关闭RTPServer
12个文件已修改
2个文件已添加
1 文件已重命名
222 ■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/conf/redis/RedisConfig.java 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java 21 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/CatalogQueryMessageHandler.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannelResponse.java 71 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java 20 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java 12 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamResponseListener.java 62 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusListMsgListener.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
@@ -99,6 +99,12 @@
     */
    public static final String VM_MSG_STREAM_PUSH_REQUESTED = "VM_MSG_STREAM_PUSH_REQUESTED";
    /**
     * redis 消息通知平台通知设备推流结果
     */
    public static final String VM_MSG_STREAM_PUSH_RESPONSE = "VM_MSG_STREAM_PUSH_RESPONSE";
    /**
     * redis 消息请求所有的在线通道
     */
src/main/java/com/genersoft/iot/vmp/conf/redis/RedisConfig.java
@@ -12,7 +12,6 @@
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import com.genersoft.iot.vmp.utils.redis.FastJsonRedisSerializer;
@@ -43,7 +42,10 @@
    private RedisPushStreamStatusMsgListener redisPushStreamStatusMsgListener;
    @Autowired
    private RedisPushStreamListMsgListener redisPushStreamListMsgListener;
    private RedisPushStreamStatusListMsgListener redisPushStreamListMsgListener;
    @Autowired
    private RedisPushStreamResponseListener redisPushStreamResponseListener;
    @Bean
    public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
@@ -81,7 +83,7 @@
        container.addMessageListener(redisGbPlayMsgListener, new PatternTopic(RedisGbPlayMsgListener.WVP_PUSH_STREAM_KEY));
        container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE));
        container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE));
        container.addMessageListener(redisPushStreamResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE));
        return container;
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
@@ -694,7 +694,7 @@
                dialog = streamSession.getDialogByStream(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getStream());
            }
            mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc());
            mediaServerService.closeRTPServer(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getStream());
            mediaServerService.closeRTPServer(ssrcTransaction.getMediaServerId(), ssrcTransaction.getStream());
            streamSession.remove(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getStream());
            if (dialog == null) {
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
@@ -121,7 +121,7 @@
                    StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId);
                    if (streamInfo != null) {
                        redisCatchStorage.stopPlay(streamInfo);
                        mediaServerService.closeRTPServer(device.getDeviceId(), channelId, streamInfo.getStream());
                        mediaServerService.closeRTPServer(streamInfo.getMediaServerId(), streamInfo.getStream());
                    }
                    SsrcTransaction ssrcTransactionForPlay = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, "play", null);
                    if (ssrcTransactionForPlay != null){
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -24,6 +24,7 @@
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.service.impl.RedisGbPlayMsgListener;
import com.genersoft.iot.vmp.service.impl.RedisPushStreamResponseListener;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
@@ -74,7 +75,7 @@
    private DynamicTask dynamicTask;
    @Autowired
    private SIPCommander cmder;
    private RedisPushStreamResponseListener redisPushStreamResponseListener;
    @Autowired
    private IPlayService playService;
@@ -556,7 +557,6 @@
            otherWvpPushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                    mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
        }
    }
    /**
     * 通知流上线
@@ -639,6 +639,23 @@
                            mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                }
            });
            // 添加回复的拒绝或者错误的通知
            redisPushStreamResponseListener.addEvent(gbStream.getApp(), gbStream.getStream(), response -> {
                if (response.getCode() != 0) {
                    dynamicTask.stop(callIdHeader.getCallId());
                    mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream());
                    try {
                        responseAck(evt, Response.TEMPORARILY_UNAVAILABLE, response.getMsg());
                    } catch (SipException e) {
                        throw new RuntimeException(e);
                    } catch (InvalidArgumentException e) {
                        throw new RuntimeException(e);
                    } catch (ParseException e) {
                        throw new RuntimeException(e);
                    }
                }
            });
        }
    }
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/CatalogQueryMessageHandler.java
@@ -79,8 +79,8 @@
            List<DeviceChannel> allChannels = new ArrayList<>();
            // 回复平台
            DeviceChannel deviceChannel = getChannelForPlatform(parentPlatform);
            allChannels.add(deviceChannel);
//            DeviceChannel deviceChannel = getChannelForPlatform(parentPlatform);
//            allChannels.add(deviceChannel);
            // 回复目录
            if (catalogs.size() > 0) {
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
@@ -139,6 +139,7 @@
            param.put("stream_id", streamId);
            JSONObject jsonObject = zlmresTfulUtils.closeRtpServer(serverItem, param);
            if (jsonObject != null ) {
                System.out.println(jsonObject);
                if (jsonObject.getInteger("code") == 0) {
                    result = jsonObject.getInteger("hit") == 1;
                }else {
src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java
@@ -50,7 +50,9 @@
    SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId, String ssrc, boolean ssrcCheck, boolean isPlayback, Integer port);
    void closeRTPServer(String deviceId, String channelId, String ssrc);
    void closeRTPServer(MediaServerItem mediaServerItem, String streamId);
    void closeRTPServer(String mediaServerId, String streamId);
    void clearRTPServer(MediaServerItem mediaServerItem);
src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java
@@ -48,6 +48,8 @@
     */
    private String mediaServerId;
    public static MessageForPushChannel getInstance(int type, String app, String stream, String gbId,
                                                    String platFormId, String platFormName, String serverId,
                                                    String mediaServerId){
src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannelResponse.java
New file
@@ -0,0 +1,71 @@
package com.genersoft.iot.vmp.service.bean;
/**
 * 当redis回复推流结果上级平台
 * @author lin
 */
public class MessageForPushChannelResponse {
    /**
     * 错误玛
     * 0 成功 1 失败
     */
    private int code;
    /**
     * 错误内容
     */
    private String msg;
    /**
     * 流应用名
     */
    private String app;
    /**
     * 流Id
     */
    private String stream;
    public static MessageForPushChannelResponse getInstance(int code, String msg, String app, String stream){
        MessageForPushChannelResponse messageForPushChannel = new MessageForPushChannelResponse();
        messageForPushChannel.setCode(code);
        messageForPushChannel.setMsg(msg);
        messageForPushChannel.setApp(app);
        messageForPushChannel.setStream(stream);
        return messageForPushChannel;
    }
    public int getCode() {
        return code;
    }
    public void setCode(int code) {
        this.code = code;
    }
    public String getApp() {
        return app;
    }
    public void setApp(String app) {
        this.app = app;
    }
    public String getStream() {
        return stream;
    }
    public void setStream(String stream) {
        this.stream = stream;
    }
    public String getMsg() {
        return msg;
    }
    public void setMsg(String msg) {
        this.msg = msg;
    }
}
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
@@ -145,7 +145,7 @@
        if (ssrcTransactions != null && ssrcTransactions.size() > 0) {
            for (SsrcTransaction ssrcTransaction : ssrcTransactions) {
                mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc());
                mediaServerService.closeRTPServer(deviceId, ssrcTransaction.getChannelId(), ssrcTransaction.getStream());
                mediaServerService.closeRTPServer(ssrcTransaction.getMediaServerId(), ssrcTransaction.getStream());
                streamSession.remove(deviceId, ssrcTransaction.getChannelId(), ssrcTransaction.getStream());
            }
        }
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
@@ -164,16 +164,18 @@
    }
    @Override
    public void closeRTPServer(String deviceId, String channelId, String stream) {
        String mediaServerId = streamSession.getMediaServerId(deviceId, channelId, stream);
        String ssrc = streamSession.getSSRC(deviceId, channelId, stream);
        MediaServerItem mediaServerItem = this.getOne(mediaServerId);
        if (mediaServerItem != null) {
            String streamId = String.format("%s_%s", deviceId, channelId);
            zlmrtpServerFactory.closeRTPServer(mediaServerItem, streamId);
            releaseSsrc(mediaServerItem.getId(), ssrc);
    public void closeRTPServer(MediaServerItem mediaServerItem, String streamId) {
        if (mediaServerItem == null) {
            return;
        }
        streamSession.remove(deviceId, channelId, stream);
        zlmrtpServerFactory.closeRTPServer(mediaServerItem, streamId);
        releaseSsrc(mediaServerItem.getId(), streamId);
    }
    @Override
    public void closeRTPServer(String mediaServerId, String streamId) {
        MediaServerItem mediaServerItem = this.getOne(mediaServerId);
        closeRTPServer(mediaServerItem, streamId);
    }
    @Override
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -270,7 +270,7 @@
                logger.info("[点播超时] 消息未响应 deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
                timeoutCallback.run(0, "点播超时");
                mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
                mediaServerService.closeRTPServer(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
                mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream());
                streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
            }
        }, userSetting.getPlayTimeout());
@@ -333,7 +333,7 @@
                                });
                    }
                    // 关闭rtp server
                    mediaServerService.closeRTPServer(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
                    mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream());
                    // 重新开启ssrc server
                    mediaServerService.openRTPServer(mediaServerItem, finalSsrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), false, finalSsrcInfo.getPort());
@@ -341,7 +341,7 @@
            }
        }, (event) -> {
            dynamicTask.stop(timeOutTaskKey);
            mediaServerService.closeRTPServer(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
            mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream());
            // 释放ssrc
            mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
@@ -445,7 +445,7 @@
                cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null);
            }else {
                mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
                mediaServerService.closeRTPServer(deviceId, channelId, ssrcInfo.getStream());
                mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
                streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
            }
            cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null);
@@ -533,7 +533,7 @@
                                    });
                                }
                                // 关闭rtp server
                                mediaServerService.closeRTPServer(device.getDeviceId(), channelId, ssrcInfo.getStream());
                                mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
                                // 重新开启ssrc server
                                mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), true, ssrcInfo.getPort());
                            }
@@ -593,7 +593,7 @@
                cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null);
            }else {
                mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
                mediaServerService.closeRTPServer(deviceId, channelId, ssrcInfo.getStream());
                mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
                streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
            }
            cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null);
src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamResponseListener.java
New file
@@ -0,0 +1,62 @@
package com.genersoft.iot.vmp.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.media.zlm.dto.ChannelOnlineEvent;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.IGbStreamService;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse;
import com.genersoft.iot.vmp.utils.DateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
 * 接收redis返回的推流结果
 * @author lin
 */
@Component
public class RedisPushStreamResponseListener implements MessageListener {
    private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamResponseListener.class);
    private Map<String, PushStreamResponseEvent> responseEvents = new ConcurrentHashMap<>();
    public interface PushStreamResponseEvent{
        void run(MessageForPushChannelResponse response);
    }
    @Override
    public void onMessage(Message message, byte[] bytes) {
        //
        logger.warn("[REDIS消息-请求推流结果]: {}", new String(message.getBody()));
        MessageForPushChannelResponse response = JSON.parseObject(new String(message.getBody()), MessageForPushChannelResponse.class);
        if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){
            logger.info("[REDIS消息-请求推流结果]:参数不全");
            return;
        }
        // 查看正在等待的invite消息
        if (responseEvents.get(response.getApp() + response.getStream()) != null) {
            responseEvents.get(response.getApp() + response.getStream()).run(response);
        }
    }
    public void addEvent(String app, String stream, PushStreamResponseEvent callback) {
        responseEvents.put(app + stream, callback);
    }
    public void removeEvent(String app, String stream) {
        responseEvents.remove(app + stream);
    }
}
src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusListMsgListener.java
File was renamed from src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamListMsgListener.java
@@ -9,7 +9,6 @@
import com.genersoft.iot.vmp.utils.DateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
@@ -23,9 +22,9 @@
 * @Description: 接收redis发送的推流设备列表更新通知
 */
@Component
public class RedisPushStreamListMsgListener implements MessageListener {
public class RedisPushStreamStatusListMsgListener implements MessageListener {
    private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamListMsgListener.class);
    private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamStatusListMsgListener.class);
    @Resource
    private IMediaServerService mediaServerService;