leesam
2024-04-10 ce950dea4aef933a12e78e4fe1f535ba9a83c3df
Merge branch 'refs/heads/master' into develop-add-api-key
19个文件已修改
433 ■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java 29 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java 32 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java 54 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java 57 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java 20 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/bean/RequestPushStreamMsg.java 18 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java 25 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java 11 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java 60 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java 15 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java 33 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java
@@ -1,5 +1,7 @@
package com.genersoft.iot.vmp.gb28181.bean;
import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
public class SendRtpItem {
    /**
@@ -122,6 +124,23 @@
     */
    private String receiveStream;
    public static SendRtpItem getInstance(RequestPushStreamMsg requestPushStreamMsg) {
        SendRtpItem sendRtpItem = new SendRtpItem();
        sendRtpItem.setMediaServerId(requestPushStreamMsg.getMediaServerId());
        sendRtpItem.setApp(requestPushStreamMsg.getApp());
        sendRtpItem.setStream(requestPushStreamMsg.getStream());
        sendRtpItem.setIp(requestPushStreamMsg.getIp());
        sendRtpItem.setPort(requestPushStreamMsg.getPort());
        sendRtpItem.setSsrc(requestPushStreamMsg.getSsrc());
        sendRtpItem.setTcp(requestPushStreamMsg.isTcp());
        sendRtpItem.setLocalPort(requestPushStreamMsg.getSrcPort());
        sendRtpItem.setPt(requestPushStreamMsg.getPt());
        sendRtpItem.setUsePs(requestPushStreamMsg.isPs());
        sendRtpItem.setOnlyAudio(requestPushStreamMsg.isOnlyAudio());
        return sendRtpItem;
    }
    public String getIp() {
        return ip;
    }
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
@@ -75,8 +75,6 @@
    @Autowired
    private IMediaServerService mediaServerService;
    @Autowired
    private ZLMServerFactory zlmServerFactory;
    /**
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
@@ -13,11 +13,10 @@
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderPlarformProvider;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
@@ -64,9 +63,6 @@
    @Autowired
    private SipSubscribe sipSubscribe;
    @Autowired
    private ZLMServerFactory zlmServerFactory;
    @Autowired
    private SipLayer sipLayer;
@@ -846,7 +842,7 @@
        MediaServer mediaServerItem = mediaServerService.getOne(mediaServerId);
        if (mediaServerItem != null) {
            mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc());
            zlmServerFactory.closeRtpServer(mediaServerItem, sendRtpItem.getStream());
            mediaServerService.closeRTPServer(mediaServerItem, sendRtpItem.getStream());
        }
        SIPRequest byeRequest = headerProviderPlatformProvider.createByeRequest(platform, sendRtpItem);
        if (byeRequest == null) {
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
@@ -1,22 +1,17 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatch;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
@@ -28,17 +23,12 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.address.SipURI;
import javax.sip.header.CallIdHeader;
import javax.sip.header.FromHeader;
import javax.sip.header.HeaderAddress;
import javax.sip.header.ToHeader;
import java.text.ParseException;
import java.util.HashMap;
import java.util.Map;
/**
 * SIP命令类型: ACK请求
@@ -70,12 +60,6 @@
    @Autowired
    private IDeviceService deviceService;
    @Autowired
    private ZLMServerFactory zlmServerFactory;
    @Autowired
    private HookSubscribe hookSubscribe;
    @Autowired
    private IMediaServerService mediaServerService;
@@ -122,11 +106,8 @@
        if (parentPlatform != null) {
            if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) {
                RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(
                        sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStream(),
                        sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(),
                        sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio());
                redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> {
                RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(sendRtpItem);
                redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, () -> {
                    playService.startSendRtpStreamFailHand(sendRtpItem, parentPlatform, callIdHeader);
                });
            } else {
@@ -134,7 +115,7 @@
                    if (sendRtpItem.isTcpActive()) {
                        mediaServerService.startSendRtpPassive(mediaInfo, parentPlatform, sendRtpItem, null);
                    } else {
                        mediaServerService.startSendRtpStream(mediaInfo, parentPlatform, sendRtpItem);
                        mediaServerService.startSendRtp(mediaInfo, parentPlatform, sendRtpItem);
                    }
                }catch (ControllerException e) {
                    logger.error("RTP推流失败: {}", e.getMessage());
@@ -159,7 +140,7 @@
                if (sendRtpItem.isTcpActive()) {
                    mediaServerService.startSendRtpPassive(mediaInfo, null, sendRtpItem, null);
                } else {
                    mediaServerService.startSendRtpStream(mediaInfo, null, sendRtpItem);
                    mediaServerService.startSendRtp(mediaInfo, null, sendRtpItem);
                }
            }catch (ControllerException e) {
                logger.error("RTP推流失败: {}", e.getMessage());
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
@@ -6,16 +6,15 @@
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
@@ -36,8 +35,6 @@
import javax.sip.header.CallIdHeader;
import javax.sip.message.Response;
import java.text.ParseException;
import java.util.HashMap;
import java.util.Map;
/**
 * SIP命令类型: BYE请求
@@ -76,12 +73,6 @@
    private IVideoManagerStorage storager;
    @Autowired
    private ZLMServerFactory zlmServerFactory;
    @Autowired
    private SSRCFactory ssrcFactory;
    @Autowired
    private IMediaServerService mediaServerService;
    @Autowired
@@ -110,7 +101,6 @@
    /**
     * 处理BYE请求
     * @param evt
     */
    @Override
    public void process(RequestEvent evt) {
@@ -128,11 +118,6 @@
            logger.info("[收到bye] 来自{},停止通道:{}, 类型: {}, callId: {}", sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getPlayType(), callIdHeader.getCallId());
            String streamId = sendRtpItem.getStream();
            Map<String, Object> param = new HashMap<>();
            param.put("vhost","__defaultVhost__");
            param.put("app",sendRtpItem.getApp());
            param.put("stream",streamId);
            param.put("ssrc",sendRtpItem.getSsrc());
            logger.info("[收到bye] 停止推流:{}, 媒体节点: {}", streamId, sendRtpItem.getMediaServerId());
            if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) {
@@ -149,7 +134,7 @@
                    MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                    redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),
                            callIdHeader.getCallId(), null);
                    zlmServerFactory.stopSendRtpStream(mediaInfo, param);
                    mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc());
                    if (userSetting.getUseCustomSsrcForParentInvite()) {
                        mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc());
                    }
@@ -169,13 +154,13 @@
                MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),
                        callIdHeader.getCallId(), null);
                zlmServerFactory.stopSendRtpStream(mediaInfo, param);
                mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc());
                if (userSetting.getUseCustomSsrcForParentInvite()) {
                    mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc());
                }
            }
            MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
            if (mediaInfo != null) {
            MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
            if (mediaServer != null) {
                AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
                if (audioBroadcastCatch != null && audioBroadcastCatch.getSipTransactionInfo().getCallId().equals(callIdHeader.getCallId())) {
                    // 来自上级平台的停止对讲
@@ -183,8 +168,9 @@
                    audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
                }
                int totalReaderCount = zlmServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId);
                if (totalReaderCount <= 0) {
                MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, sendRtpItem.getApp(), streamId);
                if (mediaInfo.getReaderCount() <= 0) {
                    logger.info("[收到bye] {} 无其它观看者,通知设备停止推流", streamId);
                    if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) {
                        Device device = deviceService.getDevice(sendRtpItem.getDeviceId());
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -8,6 +8,7 @@
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
@@ -24,7 +25,6 @@
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.IInviteStreamService;
@@ -61,7 +61,6 @@
import javax.sip.message.Response;
import java.text.ParseException;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.Vector;
@@ -112,9 +111,6 @@
    @Autowired
    private AudioBroadcastManager audioBroadcastManager;
    @Autowired
    private ZLMServerFactory zlmServerFactory;
    @Autowired
    private IMediaServerService mediaServerService;
@@ -382,8 +378,9 @@
                    } else {
                        streamTypeStr = "UDP";
                    }
                    logger.info("[上级Invite] {}, 平台:{}, 通道:{}, 收流地址:{}:{},收流方式:{}, ssrc:{}", sessionName, username, channelId, addressStr, port, streamTypeStr, ssrc);
                    SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                    logger.info("[上级Invite] {}, 平台:{}, 通道:{}, 收流地址:{}:{},收流方式:{}, ssrc:{}",
                            sessionName, username, channelId, addressStr, port, streamTypeStr, ssrc);
                    SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                            device.getDeviceId(), channelId, mediaTransmissionTCP, platform.isRtcp());
                    if (tcpActive != null) {
@@ -462,30 +459,10 @@
                            responseSdpAck(request, content.toString(), platform);
                            // tcp主动模式,回复sdp后开启监听
                            if (sendRtpItem.isTcpActive()) {
                                MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                                Map<String, Object> param = new HashMap<>(12);
                                param.put("vhost","__defaultVhost__");
                                param.put("app",sendRtpItem.getApp());
                                param.put("stream",sendRtpItem.getStream());
                                param.put("ssrc", sendRtpItem.getSsrc());
                                if (!sendRtpItem.isTcpActive()) {
                                    param.put("dst_url",sendRtpItem.getIp());
                                    param.put("dst_port", sendRtpItem.getPort());
                                }
                                String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
                                param.put("is_udp", is_Udp);
                                param.put("src_port", localPort);
                                param.put("pt", sendRtpItem.getPt());
                                param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
                                param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
                                if (!sendRtpItem.isTcp()) {
                                    // 开启rtcp保活
                                    param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0");
                                }
                                JSONObject startSendRtpStreamResult = zlmServerFactory.startSendRtpPassive(mediaInfo, param);
                                if (startSendRtpStreamResult != null) {
                                    startSendRtpStreamHand(evt, sendRtpItem, null, startSendRtpStreamResult, param, callIdHeader);
                                }
                                MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                                try {
                                    mediaServerService.startSendRtpPassive(mediaServer, platform, sendRtpItem, 5);
                                }catch (ControllerException e) {}
                            }
                        } catch (SipException | InvalidArgumentException | ParseException e) {
                            logger.error("[命令发送失败] 国标级联 回复SdpAck", e);
@@ -638,13 +615,14 @@
     * 安排推流
     */
    private void pushProxyStream(RequestEvent evt, SIPRequest request, GbStream gbStream, ParentPlatform platform,
                            CallIdHeader callIdHeader, MediaServer mediaServerItem,
                            CallIdHeader callIdHeader, MediaServer mediaServer,
                            int port, Boolean tcpActive, boolean mediaTransmissionTCP,
                            String channelId, String addressStr, String ssrc, String requesterId) {
            Boolean streamReady = mediaServerService.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
            Boolean streamReady = mediaServerService.isStreamReady(mediaServer, gbStream.getApp(), gbStream.getStream());
            if (streamReady != null && streamReady) {
                // 自平台内容
                SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServer, addressStr, port, ssrc, requesterId,
                        gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
            if (sendRtpItem == null) {
@@ -665,7 +643,7 @@
            sendRtpItem.setCallId(callIdHeader.getCallId());
            sendRtpItem.setFromTag(request.getFromTag());
            SIPResponse response = sendStreamAck(mediaServerItem, request, sendRtpItem, platform, evt);
            SIPResponse response = sendStreamAck(mediaServer, request, sendRtpItem, platform, evt);
            if (response != null) {
                sendRtpItem.setToTag(response.getToTag());
            }
@@ -684,7 +662,7 @@
            Boolean streamReady = mediaServerService.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
            if (streamReady != null && streamReady) {
                // 自平台内容
                SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                        gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
                if (sendRtpItem == null) {
@@ -794,7 +772,7 @@
                dynamicTask.stop(callIdHeader.getCallId());
                redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream());
                if (serverId.equals(userSetting.getServerId())) {
                    SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId,
                    SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId,
                            app, stream, channelId, mediaTransmissionTCP, platform.isRtcp());
                    if (sendRtpItem == null) {
@@ -1074,7 +1052,7 @@
                        mediaTransmissionTCP ? (tcpActive ? "TCP主动" : "TCP被动") : "UDP", sdp.getSessionName().getValue());
                CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME);
                SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, gb28181Sdp.getSsrc(), requesterId,
                SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, addressStr, port, gb28181Sdp.getSsrc(), requesterId,
                        device.getDeviceId(), broadcastCatch.getChannelId(),
                        mediaTransmissionTCP, false);
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java
@@ -1,16 +1,15 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.cmd;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.NotifyMessageHandler;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.service.IPlatformService;
import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@@ -61,9 +60,6 @@
    @Autowired
    private AudioBroadcastManager audioBroadcastManager;
    @Autowired
    private ZLMServerFactory zlmServerFactory;
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
@@ -155,12 +151,13 @@
                                    }
                                }else {
                                    // 发流
                                    JSONObject jsonObject = zlmServerFactory.startSendRtp(hookData.getMediaServer(), sendRtpItem);
                                    if (jsonObject != null && jsonObject.getInteger("code") == 0 ) {
                                        logger.info("[语音喊话] 自动推流成功, device: {}, channel: {}", device.getDeviceId(), targetId);
                                    }else {
                                        logger.info("[语音喊话] 推流失败, 结果: {}", jsonObject);
                                    try {
                                        mediaServerService.startSendRtp(hookData.getMediaServer(),null, sendRtpItem);
                                    }catch (ControllerException e) {
                                        logger.info("[语音喊话] 推流失败, 结果: {}", e.getMessage());
                                        return;
                                    }
                                    logger.info("[语音喊话] 自动推流成功, device: {}, channel: {}", device.getDeviceId(), targetId);
                                }
                            }
                        }else {
src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java
@@ -141,5 +141,10 @@
    void startSendRtpPassive(MediaServer mediaServer, ParentPlatform platform, SendRtpItem sendRtpItem, Integer timeout);
    void startSendRtpStream(MediaServer mediaServer, ParentPlatform platform, SendRtpItem sendRtpItem);
    void startSendRtp(MediaServer mediaServer, ParentPlatform platform, SendRtpItem sendRtpItem);
    SendRtpItem createSendRtpItem(MediaServer mediaServerItem, String addressStr, int port, String ssrc, String requesterId, String deviceId, String channelId, boolean mediaTransmissionTCP, boolean rtcp);
    SendRtpItem createSendRtpItem(MediaServer serverItem, String ip, int port, String ssrc, String platformId,
                                  String app, String stream, String channelId, boolean tcp, boolean rtcp);
}
src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java
@@ -19,6 +19,7 @@
import com.genersoft.iot.vmp.media.service.IMediaNodeServerService;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.service.bean.MediaServerLoad;
@@ -82,6 +83,9 @@
    @Autowired
    private MediaConfig mediaConfig;
    @Autowired
    private SendRtpPortManager sendRtpPortManager;
@@ -812,7 +816,7 @@
    }
    @Override
    public void startSendRtpStream(MediaServer mediaServer, ParentPlatform platform, SendRtpItem sendRtpItem) {
    public void startSendRtp(MediaServer mediaServer, ParentPlatform platform, SendRtpItem sendRtpItem) {
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
        if (mediaNodeServerService == null) {
            logger.info("[startSendRtpStream] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType());
@@ -821,7 +825,10 @@
        logger.info("[开始推流] rtp/{}, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getStream(),
                sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp());
        mediaNodeServerService.startSendRtpStream(mediaServer, sendRtpItem);
        sendPlatformStartPlayMsg(platform, sendRtpItem);
        if (platform != null) {
            sendPlatformStartPlayMsg(platform, sendRtpItem);
        }
    }
@@ -834,4 +841,50 @@
            redisCatchStorage.sendPlatformStartPlayMsg(messageForPushChannel);
        }
    }
    @Override
    public SendRtpItem createSendRtpItem(MediaServer mediaServer, String ip, int port, String ssrc, String requesterId, String deviceId, String channelId, boolean isTcp, boolean rtcp) {
        int localPort = sendRtpPortManager.getNextPort(mediaServer);
        if (localPort == 0) {
            return null;
        }
        SendRtpItem sendRtpItem = new SendRtpItem();
        sendRtpItem.setIp(ip);
        sendRtpItem.setPort(port);
        sendRtpItem.setSsrc(ssrc);
        sendRtpItem.setPlatformId(deviceId);
        sendRtpItem.setDeviceId(deviceId);
        sendRtpItem.setChannelId(channelId);
        sendRtpItem.setTcp(isTcp);
        sendRtpItem.setRtcp(rtcp);
        sendRtpItem.setApp("rtp");
        sendRtpItem.setLocalPort(localPort);
        sendRtpItem.setServerId(userSetting.getServerId());
        sendRtpItem.setMediaServerId(mediaServer.getId());
        return sendRtpItem;
    }
    @Override
    public SendRtpItem createSendRtpItem(MediaServer serverItem, String ip, int port, String ssrc, String platformId,
                                         String app, String stream, String channelId, boolean tcp, boolean rtcp){
        int localPort = sendRtpPortManager.getNextPort(serverItem);
        if (localPort == 0) {
            return null;
        }
        SendRtpItem sendRtpItem = new SendRtpItem();
        sendRtpItem.setIp(ip);
        sendRtpItem.setPort(port);
        sendRtpItem.setSsrc(ssrc);
        sendRtpItem.setApp(app);
        sendRtpItem.setStream(stream);
        sendRtpItem.setPlatformId(platformId);
        sendRtpItem.setChannelId(channelId);
        sendRtpItem.setTcp(tcp);
        sendRtpItem.setLocalPort(localPort);
        sendRtpItem.setServerId(userSetting.getServerId());
        sendRtpItem.setMediaServerId(serverItem.getId());
        sendRtpItem.setRtcp(rtcp);
        return sendRtpItem;
    }
}
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java
@@ -3,15 +3,13 @@
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.*;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IStreamProxyService;
import com.genersoft.iot.vmp.media.zlm.dto.ChannelOnlineEvent;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper;
import com.genersoft.iot.vmp.storager.dao.StreamPushMapper;
import com.genersoft.iot.vmp.utils.DateUtil;
import org.slf4j.Logger;
@@ -20,7 +18,7 @@
import org.springframework.stereotype.Component;
import java.text.ParseException;
import java.util.*;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
@@ -38,25 +36,15 @@
    private GbStreamMapper gbStreamMapper;
    @Autowired
    private PlatformGbStreamMapper platformGbStreamMapper;
    @Autowired
    private IStreamPushService streamPushService;
    @Autowired
    private IStreamProxyService streamProxyService;
    @Autowired
    private StreamPushMapper streamPushMapper;
    @Autowired
    private HookSubscribe subscribe;
    @Autowired
    private UserSetting userSetting;
    @Autowired
    private ZLMServerFactory zlmServerFactory;
    @Autowired
    private IMediaServerService mediaServerService;
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java
@@ -316,11 +316,23 @@
        if (timeout  != null) {
            param.put("close_delay_ms", timeout);
        }
        if (!sendRtpItem.isTcp()) {
            // 开启rtcp保活
            param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0");
        }
        if (!sendRtpItem.isTcpActive()) {
            param.put("dst_url",sendRtpItem.getIp());
            param.put("dst_port", sendRtpItem.getPort());
        }
        JSONObject jsonObject = zlmServerFactory.startSendRtpPassive(mediaServer, param, null);
        if (jsonObject == null || jsonObject.getInteger("code") != 0 ) {
            logger.error("启动监听TCP被动推流失败: {}, 参数:{}", jsonObject.getString("msg"), JSON.toJSONString(param));
            throw new ControllerException(jsonObject.getInteger("code"), jsonObject.getString("msg"));
        }
        logger.info("调用ZLM-TCP被动推流接口, 结果: {}",  jsonObject);
        logger.info("启动监听TCP被动推流成功[ {}/{} ],{}->{}:{}, " , sendRtpItem.getApp(), sendRtpItem.getStream(),
                jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port"));
    }
    @Override
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java
@@ -5,7 +5,6 @@
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -25,9 +24,6 @@
    @Autowired
    private UserSetting userSetting;
    @Autowired
    private HookSubscribe hookSubscribe;
    @Autowired
    private SendRtpPortManager sendRtpPortManager;
src/main/java/com/genersoft/iot/vmp/service/bean/RequestPushStreamMsg.java
@@ -1,5 +1,7 @@
package com.genersoft.iot.vmp.service.bean;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
/**
 * redis消息:请求下级推送流信息
 * @author lin
@@ -80,6 +82,22 @@
        return requestPushStreamMsg;
    }
    public static RequestPushStreamMsg getInstance(SendRtpItem sendRtpItem) {
        RequestPushStreamMsg requestPushStreamMsg = new RequestPushStreamMsg();
        requestPushStreamMsg.setMediaServerId(sendRtpItem.getMediaServerId());
        requestPushStreamMsg.setApp(sendRtpItem.getApp());
        requestPushStreamMsg.setStream(sendRtpItem.getStream());
        requestPushStreamMsg.setIp(sendRtpItem.getIp());
        requestPushStreamMsg.setPort(sendRtpItem.getPort());
        requestPushStreamMsg.setSsrc(sendRtpItem.getSsrc());
        requestPushStreamMsg.setTcp(sendRtpItem.isTcp());
        requestPushStreamMsg.setSrcPort(sendRtpItem.getLocalPort());
        requestPushStreamMsg.setPt(sendRtpItem.getPt());
        requestPushStreamMsg.setPs(sendRtpItem.isUsePs());
        requestPushStreamMsg.setOnlyAudio(sendRtpItem.isOnlyAudio());
        return requestPushStreamMsg;
    }
    public String getMediaServerId() {
        return mediaServerId;
    }
src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
@@ -1,7 +1,9 @@
package com.genersoft.iot.vmp.service.impl;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.genersoft.iot.vmp.common.*;
import com.genersoft.iot.vmp.common.InviteInfo;
import com.genersoft.iot.vmp.common.InviteSessionStatus;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
@@ -11,13 +13,12 @@
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.HookData;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
import com.genersoft.iot.vmp.media.event.mediaServer.MediaSendRtpStoppedEvent;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.service.IPlatformService;
import com.genersoft.iot.vmp.service.IPlayService;
@@ -41,7 +42,9 @@
import javax.sip.ResponseEvent;
import javax.sip.SipException;
import java.text.ParseException;
import java.util.*;
import java.util.List;
import java.util.UUID;
import java.util.Vector;
/**
 * @author lin
@@ -76,9 +79,6 @@
    private DynamicTask dynamicTask;
    @Autowired
    private ZLMServerFactory zlmServerFactory;
    @Autowired
    private SubscribeHolder subscribeHolder;
    @Autowired
@@ -86,9 +86,6 @@
    @Autowired
    private UserSetting userSetting;
    @Autowired
    private HookSubscribe subscribe;
    @Autowired
    private VideoStreamSessionManager streamSession;
@@ -437,11 +434,7 @@
                ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
                redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(), null, null);
                MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                Map<String, Object> param = new HashMap<>(3);
                param.put("vhost", "__defaultVhost__");
                param.put("app", sendRtpItem.getApp());
                param.put("stream", sendRtpItem.getStream());
                zlmServerFactory.stopSendRtpStream(mediaInfo, param);
                mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), null);
            }
        }
    }
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -1,6 +1,5 @@
package com.genersoft.iot.vmp.service.impl;
import com.alibaba.fastjson2.JSONObject;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.genersoft.iot.vmp.common.*;
import com.genersoft.iot.vmp.conf.DynamicTask;
@@ -25,7 +24,6 @@
import com.genersoft.iot.vmp.media.event.media.MediaNotFoundEvent;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.service.*;
@@ -1421,11 +1419,8 @@
        MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
        if (mediaInfo == null) {
            RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(
                    sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStream(),
                    sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(),
                    sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio());
            redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> {
            RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(sendRtpItem);
            redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, () -> {
                startSendRtpStreamFailHand(sendRtpItem, platform, callIdHeader);
            });
        } else {
@@ -1433,7 +1428,7 @@
                if (sendRtpItem.isTcpActive()) {
                    mediaServerService.startSendRtpPassive(mediaInfo, platform, sendRtpItem, null);
                } else {
                    mediaServerService.startSendRtpStream(mediaInfo, platform, sendRtpItem);
                    mediaServerService.startSendRtp(mediaInfo, platform, sendRtpItem);
                }
            }catch (ControllerException e) {
                logger.error("RTP推流失败: {}", e.getMessage());
src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
@@ -9,15 +9,14 @@
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
import com.genersoft.iot.vmp.media.event.media.MediaNotFoundEvent;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.service.IGbStreamService;
@@ -63,9 +62,6 @@
    @Autowired
    private IVideoManagerStorage videoManagerStorager;
    @Autowired
    private ZLMServerFactory zlmServerFactory;
    @Autowired
    private StreamProxyMapper streamProxyMapper;
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
@@ -5,12 +5,12 @@
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.*;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@@ -27,7 +27,6 @@
import org.springframework.stereotype.Component;
import java.text.ParseException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -73,9 +72,6 @@
    private RedisTemplate<Object, Object> redisTemplate;
    @Autowired
    private ZLMServerFactory zlmServerFactory;
    @Autowired
    private IMediaServerService mediaServerService;
    @Autowired
@@ -101,7 +97,7 @@
    }
    public interface PlayMsgCallbackForStartSendRtpStream{
        void handler(JSONObject jsonObject);
        void handler();
    }
    public interface PlayMsgErrorCallback{
@@ -181,11 +177,10 @@
                                    String serial = wvpRedisMsg.getSerial();
                                    switch (wvpResult.getCode()) {
                                        case 0:
                                            JSONObject jsonObject = (JSONObject)wvpResult.getData();
                                            PlayMsgCallbackForStartSendRtpStream playMsgCallback = callbacksForStartSendRtpStream.get(serial);
                                            if (playMsgCallback != null) {
                                                callbacksForError.remove(serial);
                                                playMsgCallback.handler(jsonObject);
                                                playMsgCallback.handler();
                                            }
                                            break;
                                        case ERROR_CODE_MEDIA_SERVER_NOT_FOUND:
@@ -219,36 +214,24 @@
     * 处理收到的请求推流的请求
     */
    private void requestPushStreamMsgHand(RequestPushStreamMsg requestPushStreamMsg, String fromId, String serial) {
        MediaServer mediaInfo = mediaServerService.getOne(requestPushStreamMsg.getMediaServerId());
        if (mediaInfo == null) {
        MediaServer mediaServer = mediaServerService.getOne(requestPushStreamMsg.getMediaServerId());
        if (mediaServer == null) {
            // TODO 回复错误
            return;
        }
        String is_Udp = requestPushStreamMsg.isTcp() ? "0" : "1";
        Map<String, Object> param = new HashMap<>();
        param.put("vhost","__defaultVhost__");
        param.put("app",requestPushStreamMsg.getApp());
        param.put("stream",requestPushStreamMsg.getStream());
        param.put("ssrc", requestPushStreamMsg.getSsrc());
        param.put("dst_url",requestPushStreamMsg.getIp());
        param.put("dst_port", requestPushStreamMsg.getPort());
        param.put("is_udp", is_Udp);
        param.put("src_port", requestPushStreamMsg.getSrcPort());
        param.put("pt", requestPushStreamMsg.getPt());
        param.put("use_ps", requestPushStreamMsg.isPs() ? "1" : "0");
        param.put("only_audio", requestPushStreamMsg.isOnlyAudio() ? "1" : "0");
        JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaInfo, param);
        SendRtpItem sendRtpItem = SendRtpItem.getInstance(requestPushStreamMsg);
        try {
            mediaServerService.startSendRtp(mediaServer, null, sendRtpItem);
        }catch (ControllerException e) {
            return;
        }
        // 回复消息
        responsePushStream(jsonObject, fromId, serial);
    }
    private void responsePushStream(JSONObject content, String toId, String serial) {
        WVPResult<JSONObject> result = new WVPResult<>();
        result.setCode(0);
        result.setData(content);
        WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), toId,
        WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), fromId,
                WvpRedisMsgCmd.REQUEST_PUSH_STREAM, serial, JSON.toJSONString(result));
        JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
        redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
@@ -317,7 +300,7 @@
     * 将获取到的sendItem发送出去
     */
    private void responseSendItem(MediaServer mediaServerItem, RequestSendItemMsg content, String toId, String serial) {
        SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, content.getIp(),
        SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, content.getIp(),
                content.getPort(), content.getSsrc(), content.getPlatformId(),
                content.getApp(), content.getStream(), content.getChannelId(),
                content.getTcp(), content.getRtcp());
@@ -453,13 +436,8 @@
            // TODO 回复错误
            return;
        }
        Map<String, Object> param = new HashMap<>();
        param.put("vhost","__defaultVhost__");
        param.put("app",sendRtpItem.getApp());
        param.put("stream",sendRtpItem.getStream());
        param.put("ssrc", sendRtpItem.getSsrc());
        if (zlmServerFactory.stopSendRtpStream(mediaInfo, param)) {
        if (mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc())) {
            logger.info("[REDIS 执行其他平台的请求停止推流] 成功: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
            // 发送redis消息
            MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java
@@ -8,7 +8,6 @@
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
@@ -25,7 +24,6 @@
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import java.text.ParseException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -57,9 +55,6 @@
    @Autowired
    private IMediaServerService mediaServerService;
    @Autowired
    private ZLMServerFactory zlmServerFactory;
    private Map<String, PushStreamResponseEvent> responseEvents = new ConcurrentHashMap<>();
@@ -88,16 +83,10 @@
                    }
                    if (push.isSelf()) {
                        // 停止向上级推流
                        String streamId = sendRtpItem.getStream();
                        Map<String, Object> param = new HashMap<>();
                        param.put("vhost","__defaultVhost__");
                        param.put("app",sendRtpItem.getApp());
                        param.put("stream",streamId);
                        param.put("ssrc",sendRtpItem.getSsrc());
                        logger.info("[REDIS消息-推流结束] 停止向上级推流:{}", streamId);
                        logger.info("[REDIS消息-推流结束] 停止向上级推流:{}", sendRtpItem.getStream());
                        MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                        redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStream());
                        zlmServerFactory.stopSendRtpStream(mediaInfo, param);
                        mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc());
                        if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) {
                            MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
                                    sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java
@@ -13,6 +13,7 @@
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.OtherPsSendInfo;
@@ -81,8 +82,8 @@
        logger.info("[第三方PS服务对接->开启收流和获取发流信息] isSend->{}, ssrc->{}, callId->{}, stream->{}, tcpMode->{}, callBack->{}",
                isSend, ssrc, callId, stream, tcpMode==0?"UDP":"TCP被动", callBack);
        MediaServer mediaServerItem = mediaServerService.getDefaultMediaServer();
        if (mediaServerItem == null) {
        MediaServer mediaServer = mediaServerService.getDefaultMediaServer();
        if (mediaServer == null) {
            throw new ControllerException(ErrorCode.ERROR100.getCode(),"没有可用的MediaServer");
        }
        if (stream == null) {
@@ -100,13 +101,14 @@
            }
        }
        String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_PS_INFO + userSetting.getServerId() + "_" + callId + "_"  + stream;
        int localPort = zlmServerFactory.createRTPServer(mediaServerItem, stream, ssrcInt, null, false, false, tcpMode);
        if (localPort == 0) {
        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServer, stream, ssrcInt + "", false, false, null, false, false, false, tcpMode);
        if (ssrcInfo.getPort() == 0) {
            throw new ControllerException(ErrorCode.ERROR100.getCode(), "获取端口失败");
        }
        // 注册回调如果rtp收流超时则通过回调发送通知
        if (callBack != null) {
            Hook hook = Hook.getInstance(HookType.on_rtp_server_timeout, "rtp", stream, mediaServerItem.getId());
            Hook hook = Hook.getInstance(HookType.on_rtp_server_timeout, "rtp", stream, mediaServer.getId());
            // 订阅 zlm启动事件, 新的zlm也会从这里进入系统
            hookSubscribe.addSubscribe(hook,
                    (hookData)->{
@@ -128,8 +130,8 @@
                    });
        }
        OtherPsSendInfo otherPsSendInfo = new OtherPsSendInfo();
        otherPsSendInfo.setReceiveIp(mediaServerItem.getSdpIp());
        otherPsSendInfo.setReceivePort(localPort);
        otherPsSendInfo.setReceiveIp(mediaServer.getSdpIp());
        otherPsSendInfo.setReceivePort(ssrcInfo.getPort());
        otherPsSendInfo.setCallId(callId);
        otherPsSendInfo.setStream(stream);
@@ -138,9 +140,9 @@
        if (isSend != null && isSend) {
            String key = VideoManagerConstants.WVP_OTHER_SEND_PS_INFO + userSetting.getServerId() + "_"  + callId;
            // 预创建发流信息
            int port = sendRtpPortManager.getNextPort(mediaServerItem);
            int port = sendRtpPortManager.getNextPort(mediaServer);
            otherPsSendInfo.setSendLocalIp(mediaServerItem.getSdpIp());
            otherPsSendInfo.setSendLocalIp(mediaServer.getSdpIp());
            otherPsSendInfo.setSendLocalPort(port);
            // 将信息写入redis中,以备后用
            redisTemplate.opsForValue().set(key, otherPsSendInfo, 300, TimeUnit.SECONDS);
@@ -156,7 +158,7 @@
    public void closeRtpServer(String stream) {
        logger.info("[第三方PS服务对接->关闭收流] stream->{}", stream);
        MediaServer mediaServerItem = mediaServerService.getDefaultMediaServer();
        zlmServerFactory.closeRtpServer(mediaServerItem,stream);
        mediaServerService.closeRTPServer(mediaServerItem, stream);
        String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_PS_INFO + userSetting.getServerId() + "_*_"  + stream;
        List<Object> scan = RedisUtil.scan(redisTemplate, receiveKey);
        if (!scan.isEmpty()) {
@@ -198,7 +200,7 @@
                        app,
                        stream,
                        callId);
        MediaServer mediaServerItem = mediaServerService.getDefaultMediaServer();
        MediaServer mediaServer = mediaServerService.getDefaultMediaServer();
        String key = VideoManagerConstants.WVP_OTHER_SEND_PS_INFO + userSetting.getServerId() + "_"  + callId;
        OtherPsSendInfo sendInfo = (OtherPsSendInfo)redisTemplate.opsForValue().get(key);
        if (sendInfo == null) {
@@ -224,9 +226,10 @@
        param.put("src_port", sendInfo.getSendLocalPort());
        Boolean streamReady = mediaServerService.isStreamReady(mediaServerItem, app, stream);
        Boolean streamReady = mediaServerService.isStreamReady(mediaServer, app, stream);
        if (streamReady) {
            JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, param);
            JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServer, param);
//            mediaServerService.startSendRtp(mediaServer, );
            if (jsonObject.getInteger("code") == 0) {
                logger.info("[第三方PS服务对接->发送流] 视频流发流成功,callId->{},param->{}", callId, param);
                redisTemplate.opsForValue().set(key, sendInfo);
@@ -238,7 +241,7 @@
        }else {
            logger.info("[第三方PS服务对接->发送流] 流不存在,等待流上线,callId->{}", callId);
            String uuid = UUID.randomUUID().toString();
            Hook hook = Hook.getInstance(HookType.on_media_arrival, app, stream, mediaServerItem.getId());
            Hook hook = Hook.getInstance(HookType.on_media_arrival, app, stream, mediaServer.getId());
            dynamicTask.startDelay(uuid, ()->{
                logger.info("[第三方PS服务对接->发送流] 等待流上线超时 callId->{}", callId);
                redisTemplate.delete(key);
@@ -257,7 +260,7 @@
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
                        JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, param);
                        JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServer, param);
                        if (jsonObject.getInteger("code") == 0) {
                            logger.info("[第三方PS服务对接->发送流] 视频流发流成功,callId->{},param->{}", callId, param);
                            redisTemplate.opsForValue().set(key, finalSendInfo);