648540858
2023-06-29 f62bf7b2c6239f2c67f5d9019f8302c8d441f870
Merge branch '2.6.8' into wvp-28181-2.0

# Conflicts:
# src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java
# src/main/java/com/genersoft/iot/vmp/gb28181/bean/Gb28181Sdp.java
# src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
# src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
# src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
# src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
# src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java
# src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
# src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
# src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
19个文件已修改
2个文件已添加
618 ■■■■ 已修改文件
doc/_sidebar.md 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java 176 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java 31 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java 32 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java 17 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java 34 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java 120 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/ptz/PtzController.java 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java 117 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
doc/_sidebar.md
@@ -1,6 +1,7 @@
<!-- 侧边栏 -->
* **编译与部署**
  * [测试](_content/introduction/test.md)
  * [编译](_content/introduction/compile.md)
  * [配置](_content/introduction/config.md)
  * [部署](_content/introduction/deployment.md)
src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
@@ -100,6 +100,21 @@
     */
    public static final String VM_MSG_STREAM_PUSH_REQUESTED = "VM_MSG_STREAM_PUSH_REQUESTED";
    /**
     * redis 消息通知上级平台开始观看流
     */
    public static final String VM_MSG_STREAM_START_PLAY_NOTIFY = "VM_MSG_STREAM_START_PLAY_NOTIFY";
    /**
     * redis 消息通知上级平台停止观看流
     */
    public static final String VM_MSG_STREAM_STOP_PLAY_NOTIFY = "VM_MSG_STREAM_STOP_PLAY_NOTIFY";
    /**
     * redis 消息接收关闭一个推流
     */
    public static final String VM_MSG_STREAM_PUSH_CLOSE_REQUESTED = "VM_MSG_STREAM_PUSH_CLOSE_REQUESTED";
    /**
     * redis 消息通知平台通知设备推流结果
src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java
@@ -46,6 +46,9 @@
    @Autowired
    private RedisCloseStreamMsgListener redisCloseStreamMsgListener;
    @Autowired
    private RedisPushStreamCloseResponseListener redisPushStreamCloseResponseListener;
    /**
     * redis消息监听器容器 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
@@ -67,6 +70,7 @@
        container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE));
        container.addMessageListener(redisPushStreamResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE));
        container.addMessageListener(redisCloseStreamMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE));
        container.addMessageListener(redisPushStreamCloseResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED));
        return container;
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java
@@ -64,7 +64,7 @@
        try {
            sipStack = (SipStackImpl)SipFactory.getInstance().createSipStack(DefaultProperties.getProperties(monitorIp, userSetting.getSipLog()));
        } catch (PeerUnavailableException e) {
            logger.error("[Sip Server] SIP服务启动失败, 监听地址{}失败,请检查ip是否正确", monitorIp);
            logger.error("[SIP SERVER] SIP服务启动失败, 监听地址{}失败,请检查ip是否正确", monitorIp);
            return;
        }
@@ -76,12 +76,12 @@
            tcpSipProvider.addSipListener(sipProcessorObserver);
            tcpSipProviderMap.put(monitorIp, tcpSipProvider);
            logger.info("[Sip Server] tcp://{}:{} 启动成功", monitorIp, port);
            logger.info("[SIP SERVER] tcp://{}:{} 启动成功", monitorIp, port);
        } catch (TransportNotSupportedException
                 | TooManyListenersException
                 | ObjectInUseException
                 | InvalidArgumentException e) {
            logger.error("[Sip Server] tcp://{}:{} SIP服务启动失败,请检查端口是否被占用或者ip是否正确"
            logger.error("[SIP SERVER] tcp://{}:{} SIP服务启动失败,请检查端口是否被占用或者ip是否正确"
                    , monitorIp, port);
        }
@@ -93,12 +93,12 @@
            udpSipProviderMap.put(monitorIp, udpSipProvider);
            logger.info("[Sip Server] udp://{}:{} 启动成功", monitorIp, port);
            logger.info("[SIP SERVER] udp://{}:{} 启动成功", monitorIp, port);
        } catch (TransportNotSupportedException
                 | TooManyListenersException
                 | ObjectInUseException
                 | InvalidArgumentException e) {
            logger.error("[Sip Server] udp://{}:{} SIP服务启动失败,请检查端口是否被占用或者ip是否正确"
            logger.error("[SIP SERVER] udp://{}:{} SIP服务启动失败,请检查端口是否被占用或者ip是否正确"
                    , monitorIp, port);
        }
    }
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java
@@ -283,7 +283,7 @@
        viaHeader.setRPort();
        viaHeaders.add(viaHeader);
        // from
        SipURI fromSipURI = SipFactory.getInstance().createAddressFactory().createSipURI(platform.getDeviceGBId(),
        SipURI fromSipURI = SipFactory.getInstance().createAddressFactory().createSipURI(sendRtpItem.getChannelId(),
                platform.getDeviceIp() + ":" + platform.getDevicePort());
        Address fromAddress = SipFactory.getInstance().createAddressFactory().createAddress(fromSipURI);
        FromHeader fromHeader = SipFactory.getInstance().createHeaderFactory().createFromHeader(fromAddress, sendRtpItem.getToTag());
@@ -296,13 +296,10 @@
        MaxForwardsHeader maxForwards = SipFactory.getInstance().createHeaderFactory().createMaxForwardsHeader(70);
        // ceq
        CSeqHeader cSeqHeader = SipFactory.getInstance().createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(), Request.BYE);
        MessageFactoryImpl messageFactory = (MessageFactoryImpl) SipFactory.getInstance().createMessageFactory();
        // 设置编码, 防止中文乱码
        messageFactory.setDefaultContentEncodingCharset("gb2312");
        CallIdHeader callIdHeader = SipFactory.getInstance().createHeaderFactory().createCallIdHeader(sendRtpItem.getCallId());
        request = (SIPRequest) messageFactory.createRequest(requestURI, Request.BYE, callIdHeader, cSeqHeader, fromHeader,
        request = (SIPRequest) SipFactory.getInstance().createMessageFactory().createRequest(requestURI, Request.BYE, callIdHeader, cSeqHeader, fromHeader,
                toHeader, viaHeaders, maxForwards);
        request.addHeader(SipUtils.createUserAgentHeader(gitUtil));
@@ -310,6 +307,7 @@
        String sipAddress = platform.getDeviceIp() + ":" + platform.getDevicePort();
        Address concatAddress = SipFactory.getInstance().createAddressFactory().createAddress(SipFactory.getInstance().createAddressFactory()
                .createSipURI(platform.getDeviceGBId(), sipAddress));
        request.addHeader(SipFactory.getInstance().createHeaderFactory().createContactHeader(concatAddress));
        return request;
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
@@ -371,7 +371,6 @@
            mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
            errorEvent.response(e);
        }), e -> {
            // 这里为例避免一个通道的点播只有一个callID这个参数使用一个固定值
            ResponseEvent responseEvent = (ResponseEvent) e.event;
            SIPResponse response = (SIPResponse) responseEvent.getResponse();
            streamSession.put(device.getDeviceId(), channelId, "play", stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), response,
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
@@ -215,6 +215,8 @@
                }else {
                    if (channel.getChannelId().length() != 20) {
                        catalogXml.append("</Item>\r\n");
                        logger.warn("[编号长度异常] {} 长度错误,请使用20位长度的国标编号,当前长度:{}", channel.getChannelId(), channel.getChannelId().length());
                        catalogXml.append("</Item>\r\n");
                        continue;
                    }
                    switch (Integer.parseInt(channel.getChannelId().substring(10, 13))){
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
@@ -3,6 +3,8 @@
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
@@ -14,6 +16,7 @@
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@@ -56,6 +59,9 @@
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
    @Autowired
    private UserSetting userSetting;
    @Autowired
    private IVideoManagerStorage storager;
@@ -155,6 +161,13 @@
        } else if (jsonObject.getInteger("code") == 0) {
            logger.info("调用ZLM推流接口, 结果: {}",  jsonObject);
            logger.info("RTP推流成功[ {}/{} ],{}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port"));
            if (sendRtpItem.getPlayType() == InviteStreamType.PUSH) {
                MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, sendRtpItem.getApp(), sendRtpItem.getStreamId(),
                        sendRtpItem.getChannelId(), parentPlatform.getServerGBId(), parentPlatform.getName(), userSetting.getServerId(),
                        sendRtpItem.getMediaServerId());
                messageForPushChannel.setPlatFormIndex(parentPlatform.getId());
                redisCatchStorage.sendPlatformStartPlayMsg(messageForPushChannel);
            }
        } else {
            logger.error("RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"), JSON.toJSONString(param));
            if (sendRtpItem.isOnlyAudio()) {
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
@@ -2,12 +2,15 @@
import com.genersoft.iot.vmp.common.InviteInfo;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
@@ -15,9 +18,11 @@
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IDeviceChannelService;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IPlatformService;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
@@ -32,10 +37,10 @@
import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.address.SipURI;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.header.CallIdHeader;
import javax.sip.header.FromHeader;
import javax.sip.header.HeaderAddress;
import javax.sip.header.ToHeader;
import javax.sip.message.Response;
import java.text.ParseException;
import java.util.HashMap;
@@ -60,7 +65,13 @@
    private IInviteStreamService inviteStreamService;
    @Autowired
    private IPlatformService platformService;
    @Autowired
    private IDeviceService deviceService;
    @Autowired
    private IDeviceChannelService channelService;
    @Autowired
    private IVideoManagerStorage storager;
@@ -80,6 +91,9 @@
    @Autowired
    private VideoStreamSessionManager streamSession;
    @Autowired
    private UserSetting userSetting;
    @Override
    public void afterPropertiesSet() throws Exception {
        // 添加消息处理的订阅
@@ -92,93 +106,91 @@
     */
    @Override
    public void process(RequestEvent evt) {
        SIPRequest request = (SIPRequest) evt.getRequest();
        try {
            responseAck((SIPRequest) evt.getRequest(), Response.OK);
            responseAck(request, Response.OK);
        } catch (SipException | InvalidArgumentException | ParseException e) {
            logger.error("[回复BYE信息失败],{}", e.getMessage());
        }
        CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME);
            String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser();
            String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser();
            SendRtpItem sendRtpItem =  redisCatchStorage.querySendRTPServer(platformGbId, channelId, null, callIdHeader.getCallId());
            logger.info("[收到bye] {}/{}", platformGbId, channelId);
            if (sendRtpItem != null){
                String streamId = sendRtpItem.getStreamId();
                Map<String, Object> param = new HashMap<>();
                param.put("vhost","__defaultVhost__");
                param.put("app",sendRtpItem.getApp());
                param.put("stream",streamId);
                param.put("ssrc",sendRtpItem.getSsrc());
                logger.info("[收到bye] 停止向上级推流:{}", streamId);
                MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                redisCatchStorage.deleteSendRTPServer(platformGbId, channelId, callIdHeader.getCallId(), null);
                ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
                zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param);
                int totalReaderCount = zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId);
                if (totalReaderCount <= 0) {
                    logger.info("[收到bye] {} 无其它观看者,通知设备停止推流", streamId);
                    if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) {
                        Device device = deviceService.getDevice(sendRtpItem.getDeviceId());
                        if (device == null) {
                            logger.info("[收到bye] {} 通知设备停止推流时未找到设备信息", streamId);
                        }
                        try {
                            logger.warn("[停止点播] {}/{}", sendRtpItem.getDeviceId(), channelId);
                            cmder.streamByeCmd(device, channelId, streamId, null);
                        } catch (InvalidArgumentException | ParseException | SipException |
                                 SsrcTransactionNotFoundException e) {
                            logger.error("[收到bye] {} 无其它观看者,通知设备停止推流, 发送BYE失败 {}",streamId, e.getMessage());
                        }
                    }
                    if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) {
                        MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
                                sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(),
                                sendRtpItem.getPlatformId(), null, null, sendRtpItem.getMediaServerId());
                        redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
                    }
                }
            }
            // 可能是设备主动停止
            Device device = storager.queryVideoDeviceByChannelId(platformGbId);
            if (device != null) {
                storager.stopPlay(device.getDeviceId(), channelId);
                SsrcTransaction ssrcTransactionForPlay = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, "play", null);
                if (ssrcTransactionForPlay != null){
                    if (ssrcTransactionForPlay.getCallId().equals(callIdHeader.getCallId())){
                        // 释放ssrc
                        MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransactionForPlay.getMediaServerId());
                        if (mediaServerItem != null) {
                            mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransactionForPlay.getSsrc());
                        }
                        streamSession.remove(device.getDeviceId(), channelId, ssrcTransactionForPlay.getStream());
                    }
                    InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
                    inviteStreamService.removeInviteInfo(inviteInfo);
                    if (inviteInfo != null) {
                        if (inviteInfo.getStreamInfo() != null) {
                            mediaServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServerId(), inviteInfo.getStream());
                        }
                    }
                }
                SsrcTransaction ssrcTransactionForPlayBack = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, callIdHeader.getCallId(), null);
                if (ssrcTransactionForPlayBack != null) {
                    // 释放ssrc
                    MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransactionForPlayBack.getMediaServerId());
                    if (mediaServerItem != null) {
                        mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransactionForPlayBack.getSsrc());
                    }
                    streamSession.remove(device.getDeviceId(), channelId, ssrcTransactionForPlayBack.getStream());
                    InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAYBACK, device.getDeviceId(), channelId);
                    if (inviteInfo != null) {
                        inviteStreamService.removeInviteInfo(inviteInfo);
                        if (inviteInfo.getStreamInfo() != null) {
                            mediaServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServerId(), inviteInfo.getStream());
                        }
                    }
        SendRtpItem sendRtpItem =  redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId());
        if (sendRtpItem != null){
            logger.info("[收到bye] 来自平台{}, 停止通道:{}", sendRtpItem.getPlatformId(), sendRtpItem.getChannelId());
            String streamId = sendRtpItem.getStreamId();
            Map<String, Object> param = new HashMap<>();
            param.put("vhost","__defaultVhost__");
            param.put("app",sendRtpItem.getApp());
            param.put("stream",streamId);
            param.put("ssrc",sendRtpItem.getSsrc());
            logger.info("[收到bye] 停止向上级推流:{}", streamId);
            MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
            redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),
                    callIdHeader.getCallId(), null);
            zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param);
            if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) {
                ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
                if (platform != null) {
                    MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
                            sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(),
                            sendRtpItem.getPlatformId(), platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
                    messageForPushChannel.setPlatFormIndex(platform.getId());
                    redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
                }else {
                    logger.info("[上级平台停止观看] 未找到平台{}的信息,发送redis消息失败", sendRtpItem.getPlatformId());
                }
            }
            int totalReaderCount = zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId);
            if (totalReaderCount <= 0) {
                logger.info("[收到bye] {} 无其它观看者,通知设备停止推流", streamId);
                if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) {
                    Device device = deviceService.getDevice(sendRtpItem.getDeviceId());
                    if (device == null) {
                        logger.info("[收到bye] {} 通知设备停止推流时未找到设备信息", streamId);
                    }
                    try {
                        logger.warn("[停止点播] {}/{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
                        cmder.streamByeCmd(device, sendRtpItem.getChannelId(), streamId, null);
                    } catch (InvalidArgumentException | ParseException | SipException |
                             SsrcTransactionNotFoundException e) {
                        logger.error("[收到bye] {} 无其它观看者,通知设备停止推流, 发送BYE失败 {}",streamId, e.getMessage());
                    }
                }
            }
        }else {
            // 可能是设备发送的停止
            SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(null, null, callIdHeader.getCallId(), null);
            if (ssrcTransaction == null) {
                logger.info("[收到bye] 但是无法获取推流信息和发流信息,忽略此请求");
                logger.info(request.toString());
                return;
            }
            logger.info("[收到bye] 来自设备:{}, 通道已停止推流: {}", ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId());
            Device device = deviceService.getDevice(ssrcTransaction.getDeviceId());
            if (device == null) {
                logger.info("[收到bye] 未找到设备:{} ", ssrcTransaction.getDeviceId());
                return;
            }
            DeviceChannel channel = channelService.getOne(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId());
            if (channel == null) {
                logger.info("[收到bye] 未找到通道,设备:{}, 通道:{}", ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId());
                return;
            }
            storager.stopPlay(device.getDeviceId(), channel.getChannelId());
            StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channel.getChannelId());
            if (streamInfo != null) {
                redisCatchStorage.stopPlay(streamInfo);
                mediaServerService.closeRTPServer(streamInfo.getMediaServerId(), streamInfo.getStream());
            }
            // 释放ssrc
            MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransaction.getMediaServerId());
            if (mediaServerItem != null) {
                mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransaction.getSsrc());
            }
            streamSession.remove(device.getDeviceId(), channel.getChannelId(), ssrcTransaction.getStream());
        }
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -181,16 +181,11 @@
                            return;
                        } else {
                            streamPushItem = streamPushService.getPush(gbStream.getApp(), gbStream.getStream());
                            if (streamPushItem == null || streamPushItem.getServerId().equals(userSetting.getServerId())) {
                                logger.info("[ app={}, stream={} ]找不到zlm {},返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId);
                                try {
                                    responseAck(request, Response.GONE);
                                } catch (SipException | InvalidArgumentException | ParseException e) {
                                    logger.error("[命令发送失败] invite GONE: {}", e.getMessage());
                                }
                                return;
                            }else {
                                 // TODO 可能漏回复消息
                            if (streamPushItem != null) {
                                mediaServerItem = mediaServerService.getOne(streamPushItem.getMediaServerId());
                            }
                            if (mediaServerItem == null) {
                                mediaServerItem = mediaServerService.getDefaultMediaServer();
                            }
                        }
                    } else {
@@ -351,7 +346,9 @@
                    }
                    logger.info("[上级Invite] {}, 平台:{}, 通道:{}, 收流地址:{}:{},收流方式:{}, ssrc:{}", sessionName, username, channelId, addressStr, port, streamTypeStr, ssrc);
                    SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                            device.getDeviceId(), channelId, mediaTransmissionTCP, platform.isRtcp());
                            device.getDeviceId(), channelId, mediaTransmissionTCP, platform.isRtcp(), ssrcFromCallback -> {
                                return redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, null, callIdHeader.getCallId()) != null;
                            });
                    if (tcpActive != null) {
                        sendRtpItem.setTcpActive(tcpActive);
@@ -557,7 +554,9 @@
            if (streamReady != null && streamReady) {
                // 自平台内容
                SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                        gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
                        gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp(), ssrcFromCallback ->{
                            return  redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, null, callIdHeader.getCallId()) != null;
                        });
                if (sendRtpItem == null) {
                    logger.warn("服务器端口资源不足");
@@ -596,7 +595,9 @@
            if (streamReady != null && streamReady) {
                // 自平台内容
                SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                        gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
                        gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp(), ssrcFromCallback ->{
                            return  redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, null, callIdHeader.getCallId()) != null;
                        });
                if (sendRtpItem == null) {
                    logger.warn("服务器端口资源不足");
@@ -712,7 +713,9 @@
                dynamicTask.stop(callIdHeader.getCallId());
                if (serverId.equals(userSetting.getServerId())) {
                    SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId,
                            app, stream, channelId, mediaTransmissionTCP, platform.isRtcp());
                            app, stream, channelId, mediaTransmissionTCP, platform.isRtcp(), ssrcFromCallback -> {
                                return redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, null, callIdHeader.getCallId()) != null;
                            });
                    if (sendRtpItem == null) {
                        logger.warn("上级点时创建sendRTPItem失败,可能是服务器端口资源不足");
src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java
@@ -125,7 +125,7 @@
        strTmp = String.format("%02X", moveSpeed);
        builder.append(strTmp, 0, 2);
        builder.append(strTmp, 0, 2);
        //优化zoom低倍速下的变倍速率
        if ((zoomSpeed > 0) && (zoomSpeed <16))
        {
@@ -263,4 +263,4 @@
        }
        return localDateTime.format(DateUtil.formatterISO8601);
    }
}
}
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -22,6 +22,7 @@
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.*;
import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
@@ -468,6 +469,13 @@
                            }
                            redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(),
                                    sendRtpItem.getCallId(), sendRtpItem.getStreamId());
                            if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) {
                                MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
                                        sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(),
                                        sendRtpItem.getPlatformId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
                                messageForPushChannel.setPlatFormIndex(parentPlatform.getId());
                                redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
                            }
                        }
                    }
                }
@@ -513,7 +521,7 @@
                }
                return ret;
            }
            // 推流具有主动性,暂时不做处理
            // TODO 推流具有主动性,暂时不做处理
//            StreamPushItem streamPushItem = streamPushService.getPush(app, streamId);
//            if (streamPushItem != null) {
//                // TODO 发送停止
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
@@ -221,13 +221,14 @@
     * @param tcp 是否为tcp
     * @return SendRtpItem
     */
    public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, String deviceId, String channelId, boolean tcp, boolean rtcp){
    public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId,
                                         String deviceId, String channelId, boolean tcp, boolean rtcp, KeepPortCallback callback){
        // 默认为随机端口
        int localPort = 0;
        if (userSetting.getGbSendStreamStrict()) {
            if (userSetting.getGbSendStreamStrict()) {
                localPort = keepPort(serverItem, ssrc, localPort);
                localPort = keepPort(serverItem, ssrc, localPort, callback);
                if (localPort == 0) {
                    return null;
                }
@@ -259,11 +260,12 @@
     * @param tcp 是否为tcp
     * @return SendRtpItem
     */
    public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, String app, String stream, String channelId, boolean tcp, boolean rtcp){
    public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId,
                                         String app, String stream, String channelId, boolean tcp, boolean rtcp, KeepPortCallback callback){
        // 默认为随机端口
        int localPort = 0;
        if (userSetting.getGbSendStreamStrict()) {
            localPort = keepPort(serverItem, ssrc, localPort);
            localPort = keepPort(serverItem, ssrc, localPort, callback);
            if (localPort == 0) {
                return null;
            }
@@ -284,10 +286,14 @@
        return sendRtpItem;
    }
    public interface KeepPortCallback{
        Boolean keep(String ssrc);
    }
    /**
     * 保持端口,直到需要需要发流时再释放
     */
    public int keepPort(MediaServerItem serverItem, String ssrc, Integer localPort) {
    public int keepPort(MediaServerItem serverItem, String ssrc, int localPort, KeepPortCallback keepPortCallback) {
        Map<String, Object> param = new HashMap<>(3);
        param.put("port", localPort);
        param.put("enable_tcp", 1);
@@ -296,18 +302,20 @@
        if (jsonObject.getInteger("code") == 0) {
            localPort = jsonObject.getInteger("port");
            HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, serverItem.getId());
            // 订阅 zlm启动事件, 新的zlm也会从这里进入系统
            Integer finalLocalPort = localPort;
            hookSubscribe.addSubscribe(hookSubscribeForRtpServerTimeout,
                    (MediaServerItem mediaServerItem, HookParam hookParam)->{
                        logger.info("[上级点播] {}->监听端口到期继续保持监听: {}", ssrc, finalLocalPort);
                        OnRtpServerTimeoutHookParam rtpServerTimeoutHookParam = (OnRtpServerTimeoutHookParam) hookParam;
                        if (!ssrc.equals(rtpServerTimeoutHookParam.getSsrc())) {
                            return;
                        }
                        int port = keepPort(serverItem, ssrc, finalLocalPort);
                        if (port == 0) {
                            logger.info("[上级点播] {}->监听端口失败,移除监听", ssrc);
                            hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout);
                        if (ssrc.equals(rtpServerTimeoutHookParam.getSsrc())) {
                            if (keepPortCallback.keep(ssrc)) {
                                logger.info("[上级点播] {}->监听端口到期继续保持监听", ssrc);
                                keepPort(serverItem, ssrc, finalLocalPort, keepPortCallback);
                            }else {
                                logger.info("[上级点播] {}->发送取消,无需继续监听", ssrc);
                                releasePort(serverItem, ssrc);
                            }
                        }
                    });
            logger.info("[上级点播] {}->监听端口: {}", ssrc, localPort);
src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java
@@ -1,7 +1,5 @@
package com.genersoft.iot.vmp.service.bean;
import java.util.stream.Stream;
/**
 * 当上级平台
 * @author lin
@@ -29,9 +27,14 @@
    private String gbId;
    /**
     * 请求的平台ID
     * 请求的平台国标编号
     */
    private String platFormId;
    /**
     * 请求的平台自增ID
     */
    private int platFormIndex;
    /**
     * 请求平台名称
@@ -128,4 +131,12 @@
    public void setMediaServerId(String mediaServerId) {
        this.mediaServerId = mediaServerId;
    }
    public int getPlatFormIndex() {
        return platFormIndex;
    }
    public void setPlatFormIndex(int platFormIndex) {
        this.platFormIndex = platFormIndex;
    }
}
src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
@@ -105,6 +105,7 @@
            // 行政区划默认去编号的前6位
            parentPlatform.setAdministrativeDivision(parentPlatform.getServerGBId().substring(0,6));
        }
        parentPlatform.setTreeType("CivilCode");
        parentPlatform.setCatalogId(parentPlatform.getDeviceGBId());
        int result = platformMapper.addParentPlatform(parentPlatform);
        // 添加缓存
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
@@ -13,6 +13,7 @@
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.*;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -26,6 +27,7 @@
import java.text.ParseException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@@ -127,6 +129,7 @@
                                case WvpRedisMsgCmd.REQUEST_PUSH_STREAM:
                                    RequestPushStreamMsg param = JSON.to(RequestPushStreamMsg.class, wvpRedisMsg.getContent());
                                    requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
                                    break;
                                default:
                                    break;
@@ -311,7 +314,9 @@
        SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, content.getIp(),
                content.getPort(), content.getSsrc(), content.getPlatformId(),
                content.getApp(), content.getStream(), content.getChannelId(),
                content.getTcp(), content.getRtcp());
                content.getTcp(), content.getRtcp(), ssrcFromCallback -> {
                    return querySendRTPServer(content.getPlatformId(), content.getChannelId(), content.getStream(), null) != null;
                });
        WVPResult<ResponseSendItemMsg> result = new WVPResult<>();
        result.setCode(0);
@@ -388,4 +393,31 @@
        });
        redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
    }
    private SendRtpItem querySendRTPServer(String platformGbId, String channelId, String streamId, String callId) {
        if (platformGbId == null) {
            platformGbId = "*";
        }
        if (channelId == null) {
            channelId = "*";
        }
        if (streamId == null) {
            streamId = "*";
        }
        if (callId == null) {
            callId = "*";
        }
        String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX
                + userSetting.getServerId() + "_*_"
                + platformGbId + "_"
                + channelId + "_"
                + streamId + "_"
                + callId;
        List<Object> scan = RedisUtil.scan(redisTemplate, key);
        if (scan.size() > 0) {
            return (SendRtpItem)redisTemplate.opsForValue().get(scan.get(0));
        }else {
            return null;
        }
    }
}
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java
New file
@@ -0,0 +1,120 @@
package com.genersoft.iot.vmp.service.redisMsg;
import com.alibaba.fastjson2.JSON;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import java.text.ParseException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
 * 接收redis发送的结束推流请求
 * @author lin
 */
@Component
public class RedisPushStreamCloseResponseListener implements MessageListener {
    private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamCloseResponseListener.class);
    @Autowired
    private IStreamPushService streamPushService;
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
    @Autowired
    private IVideoManagerStorage storager;
    @Autowired
    private ISIPCommanderForPlatform commanderFroPlatform;
    @Autowired
    private UserSetting userSetting;
    @Autowired
    private IMediaServerService mediaServerService;
    @Autowired
    private ZLMRTPServerFactory zlmrtpServerFactory;
    private Map<String, PushStreamResponseEvent> responseEvents = new ConcurrentHashMap<>();
    public interface PushStreamResponseEvent{
        void run(MessageForPushChannelResponse response);
    }
    @Override
    public void onMessage(Message message, byte[] bytes) {
        logger.info("[REDIS消息-推流结束]: {}", new String(message.getBody()));
        MessageForPushChannel pushChannel = JSON.parseObject(message.getBody(), MessageForPushChannel.class);
        StreamPushItem push = streamPushService.getPush(pushChannel.getApp(), pushChannel.getStream());
        if (push != null) {
            if (redisCatchStorage.isChannelSendingRTP(push.getGbId())) {
                List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId(
                        push.getGbId());
                if (sendRtpItems.size() > 0) {
                    for (SendRtpItem sendRtpItem : sendRtpItems) {
                        ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
                        // 停止向上级推流
                        String streamId = sendRtpItem.getStreamId();
                        Map<String, Object> param = new HashMap<>();
                        param.put("vhost","__defaultVhost__");
                        param.put("app",sendRtpItem.getApp());
                        param.put("stream",streamId);
                        param.put("ssrc",sendRtpItem.getSsrc());
                        logger.info("[REDIS消息-推流结束] 停止向上级推流:{}", streamId);
                        MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                        redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStreamId());
                        zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param);
                        try {
                            commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem);
                        } catch (SipException | InvalidArgumentException | ParseException e) {
                            logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
                        }
                        if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) {
                            MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
                                    sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(),
                                    sendRtpItem.getPlatformId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
                            messageForPushChannel.setPlatFormIndex(parentPlatform.getId());
                            redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
                        }
                    }
                }
            }
        }
    }
    public void addEvent(String app, String stream, PushStreamResponseEvent callback) {
        responseEvents.put(app + stream, callback);
    }
    public void removeEvent(String app, String stream) {
        responseEvents.remove(app + stream);
    }
}
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
@@ -202,5 +202,10 @@
    void removeAllDevice();
    void sendDeviceOrChannelStatus(String deviceId, String channelId, boolean online);
    void sendChannelAddOrDelete(String deviceId, String channelId, boolean add);
    void sendPlatformStartPlayMsg(MessageForPushChannel messageForPushChannel);
    void sendPlatformStopPlayMsg(MessageForPushChannel messageForPushChannel);
}
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -622,4 +622,18 @@
        // 使用 RedisTemplate<Object, Object> 发送字符串消息会导致发送的消息多带了双引号
        stringRedisTemplate.convertAndSend(key, msg.toString());
    }
    @Override
    public void sendPlatformStartPlayMsg(MessageForPushChannel msg) {
        String key = VideoManagerConstants.VM_MSG_STREAM_START_PLAY_NOTIFY;
        logger.info("[redis发送通知] 推流被上级平台观看 {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId());
        redisTemplate.convertAndSend(key, JSON.toJSON(msg));
    }
    @Override
    public void sendPlatformStopPlayMsg(MessageForPushChannel msg) {
        String key = VideoManagerConstants.VM_MSG_STREAM_STOP_PLAY_NOTIFY;
        logger.info("[redis发送通知] 上级平台停止观看 {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId());
        redisTemplate.convertAndSend(key, JSON.toJSON(msg));
    }
}
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/ptz/PtzController.java
@@ -97,6 +97,9 @@
                cmdCode = 32;
                break;
            case "stop":
                horizonSpeed = 0;
                verticalSpeed = 0;
                zoomSpeed = 0;
                break;
            default:
                break;
src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java
New file
@@ -0,0 +1,117 @@
package com.genersoft.iot.vmp.vmanager.rtp;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.VersionInfo;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
@SuppressWarnings("rawtypes")
@Tag(name = "第三方服务对接")
@RestController
@RequestMapping("/api/rtp")
public class RtpController {
    @Autowired
    private ZlmHttpHookSubscribe zlmHttpHookSubscribe;
    @Autowired
    private IMediaServerService mediaServerService;
    @Autowired
    private VersionInfo versionInfo;
    @Autowired
    private SipConfig sipConfig;
    @Autowired
    private UserSetting userSetting;
    @Autowired
    private IDeviceService deviceService;
    @Autowired
    private IDeviceChannelService channelService;
    @Autowired
    private IStreamPushService pushService;
    @Autowired
    private IStreamProxyService proxyService;
    @Value("${server.port}")
    private int serverPort;
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
    @GetMapping(value = "/receive/open")
    @ResponseBody
    @Operation(summary = "开启收流和获取发流信息")
    @Parameter(name = "isSend", description = "是否发送,false时只开启收流, true同时返回推流信息", required = true)
    @Parameter(name = "callId", description = "整个过程的唯一标识,为了与后续接口关联", required = true)
    @Parameter(name = "ssrc", description = "来源流的SSRC,不传则不校验来源ssrc", required = false)
    @Parameter(name = "stream", description = "形成的流的ID", required = true)
    @Parameter(name = "tcpMode", description = "收流模式, 0为UDP, 1为TCP被动", required = true)
    @Parameter(name = "callBack", description = "回调地址,如果收流超时会通道回调通知,回调为get请求,参数为callId", required = true)
    public SendRtpItem openRtpServer(Boolean isSend, String ssrc, String callId, String stream, Integer tcpMode, String callBack) {
        MediaServerItem mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(null);
        if (mediaServerItem == null) {
            throw new ControllerException(ErrorCode.ERROR100.getCode(),"没有可用的MediaServer");
        }
        return null;
    }
    @GetMapping(value = "/receive/close")
    @ResponseBody
    @Operation(summary = "关闭收流")
    @Parameter(name = "stream", description = "流的ID", required = true)
    public void closeRtpServer(String stream) {
    }
    @GetMapping(value = "/send/start")
    @ResponseBody
    @Operation(summary = "发送流")
    @Parameter(name = "ssrc", description = "发送流的SSRC", required = true)
    @Parameter(name = "ip", description = "目标IP", required = true)
    @Parameter(name = "port", description = "目标端口", required = true)
    @Parameter(name = "app", description = "待发送应用名", required = true)
    @Parameter(name = "stream", description = "待发送流Id", required = true)
    @Parameter(name = "callId", description = "整个过程的唯一标识,不传则使用随机端口发流", required = true)
    @Parameter(name = "onlyAudio", description = "是否只有音频", required = true)
    @Parameter(name = "streamType", description = "流类型,1为es流,2为ps流, 默认es流", required = false)
    public void sendRTP(String ssrc, String ip, Integer port, String app, String stream, String callId, Boolean onlyAudio, Integer streamType) {
    }
    @GetMapping(value = "/send/stop")
    @ResponseBody
    @Operation(summary = "关闭发送流")
    @Parameter(name = "callId", description = "整个过程的唯一标识,不传则使用随机端口发流", required = true)
    public void closeSendRTP(String callId) {
    }
}