648540858
2023-06-30 1458014fe304e6a492a66c9a7b69600d47efc1d8
修复合并主线后语音对讲失败的问题
6个文件已修改
120 ■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java 85 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
@@ -3,8 +3,6 @@
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
@@ -16,7 +14,6 @@
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
@@ -79,9 +76,6 @@
    @Autowired
    private RedisGbPlayMsgListener redisGbPlayMsgListener;
    @Autowired
    private UserSetting userSetting;
    @Autowired
    private IPlayService playService;
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
@@ -6,7 +6,6 @@
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
@@ -16,10 +15,6 @@
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@@ -38,7 +33,6 @@
import javax.sip.message.Response;
import java.text.ParseException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
@@ -164,53 +158,54 @@
            }
        }
            // 可能是设备发送的停止
            SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(null, null, callIdHeader.getCallId(), null);
            if (ssrcTransaction == null) {
            if (ssrcTransaction == null && sendRtpItem == null) {
                logger.info("[收到bye] 但是无法获取推流信息和发流信息,忽略此请求");
                logger.info(request.toString());
                return;
            }
            logger.info("[收到bye] 来自设备:{}, 通道已停止推流: {}", ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId());
            if (ssrcTransaction != null) {
                logger.info("[收到bye] 来自设备:{}, 通道已停止推流: {}", ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId());
            Device device = deviceService.getDevice(ssrcTransaction.getDeviceId());
            if (device == null) {
                logger.info("[收到bye] 未找到设备:{} ", ssrcTransaction.getDeviceId());
                return;
            }
            DeviceChannel channel = channelService.getOne(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId());
            if (channel == null) {
                logger.info("[收到bye] 未找到通道,设备:{}, 通道:{}", ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId());
                return;
            }
            storager.stopPlay(device.getDeviceId(), channel.getChannelId());
            InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channel.getChannelId());
            if (inviteInfo != null) {
                inviteStreamService.removeInviteInfo(inviteInfo);
                if (inviteInfo.getStreamInfo() != null) {
                    mediaServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServerId(), inviteInfo.getStreamInfo().getStream());
                Device device = deviceService.getDevice(ssrcTransaction.getDeviceId());
                if (device == null) {
                    logger.info("[收到bye] 未找到设备:{} ", ssrcTransaction.getDeviceId());
                    return;
                }
                DeviceChannel channel = channelService.getOne(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId());
                if (channel == null) {
                    logger.info("[收到bye] 未找到通道,设备:{}, 通道:{}", ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId());
                    return;
                }
                storager.stopPlay(device.getDeviceId(), channel.getChannelId());
                InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channel.getChannelId());
                if (inviteInfo != null) {
                    inviteStreamService.removeInviteInfo(inviteInfo);
                    if (inviteInfo.getStreamInfo() != null) {
                        mediaServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServerId(), inviteInfo.getStreamInfo().getStream());
                    }
                }
                // 释放ssrc
                MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransaction.getMediaServerId());
                if (mediaServerItem != null) {
                    mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransaction.getSsrc());
                }
                streamSession.remove(device.getDeviceId(), channel.getChannelId(), ssrcTransaction.getStream());
                if (ssrcTransaction.getType() == InviteSessionType.BROADCAST) {
                    // 查找来源的对讲设备,发送停止
                    Device sourceDevice = storager.queryVideoDeviceByPlatformIdAndChannelId(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId());
                    if (sourceDevice != null) {
                        playService.stopAudioBroadcast(sourceDevice.getDeviceId(), channel.getChannelId());
                    }
                }
                AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(ssrcTransaction.getDeviceId(), channel.getChannelId());
                if (audioBroadcastCatch != null) {
                    // 来自上级平台的停止对讲
                    logger.info("[停止对讲] 来自上级,平台:{}, 通道:{}", ssrcTransaction.getDeviceId(), channel.getChannelId());
                    audioBroadcastManager.del(ssrcTransaction.getDeviceId(), channel.getChannelId());
                }
            }
            // 释放ssrc
            MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransaction.getMediaServerId());
            if (mediaServerItem != null) {
                mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransaction.getSsrc());
            }
            streamSession.remove(device.getDeviceId(), channel.getChannelId(), ssrcTransaction.getStream());
            if (ssrcTransaction.getType() == InviteSessionType.BROADCAST) {
                // 查找来源的对讲设备,发送停止
                Device sourceDevice = storager.queryVideoDeviceByPlatformIdAndChannelId(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId());
                if (sourceDevice != null) {
                    playService.stopAudioBroadcast(sourceDevice.getDeviceId(), channel.getChannelId());
                }
            }
            AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(ssrcTransaction.getDeviceId(), channel.getChannelId());
            if (audioBroadcastCatch != null) {
                // 来自上级平台的停止对讲
                logger.info("[停止对讲] 来自上级,平台:{}, 通道:{}", ssrcTransaction.getDeviceId(), channel.getChannelId());
                audioBroadcastManager.del(ssrcTransaction.getDeviceId(), channel.getChannelId());
            }
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -1030,10 +1030,12 @@
                }
                logger.info("设备{}请求语音流, 收流地址:{}:{},ssrc:{}, {}, 对讲方式:{}", requesterId, addressStr, port, ssrc,
                        mediaTransmissionTCP ? (tcpActive ? "TCP主动" : "TCP被动") : "UDP", sdp.getSessionName().getValue());
                CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME);
                SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                        device.getDeviceId(), broadcastCatch.getChannelId(),
                        mediaTransmissionTCP, false);
                        mediaTransmissionTCP, false, ssrcFromCallback -> {
                            return redisCatchStorage.querySendRTPServer(requesterId, channelId, null, callIdHeader.getCallId()) != null;
                        });
                if (sendRtpItem == null) {
                    logger.warn("服务器端口资源不足");
@@ -1048,7 +1050,7 @@
                }
                CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME);
                sendRtpItem.setPlayType(InviteStreamType.BROADCAST);
                sendRtpItem.setCallId(callIdHeader.getCallId());
                sendRtpItem.setPlatformId(requesterId);
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
@@ -11,9 +11,6 @@
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRtpServerTimeoutHookParam;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -309,7 +306,7 @@
            localPort = jsonObject.getInteger("port");
            HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, serverItem.getId());
            // 订阅 zlm启动事件, 新的zlm也会从这里进入系统
            Integer finalLocalPort = localPort;
            int finalLocalPort = localPort;
            hookSubscribe.addSubscribe(hookSubscribeForRtpServerTimeout,
                    (MediaServerItem mediaServerItem, HookParam hookParam)->{
                        logger.info("[上级点播] {}->监听端口到期继续保持监听: {}", ssrc, finalLocalPort);
@@ -324,7 +321,7 @@
                            }
                        }
                    });
            logger.info("[上级点播] {}->监听端口: {}", ssrc, localPort);
            logger.info("[上级点播] {}->: {}", ssrc, localPort);
            return localPort;
        }else {
            logger.info("[上级点播] 监听端口失败: {}->{}", ssrc, localPort);
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -235,8 +235,10 @@
        sendRtpItem.setUsePs(false);
        sendRtpItem.setReceiveStream(stream + "_talk");
        int port = zlmrtpServerFactory.keepPort(mediaServerItem, playSsrc, null);
        String callId = SipUtils.getNewCallId();
        int port = zlmrtpServerFactory.keepPort(mediaServerItem, playSsrc, 0, ssrcFromCallback ->{
            return  redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, callId) != null;
        });
        //端口获取失败的ssrcInfo 没有必要发送点播指令
        if (port <= 0) {
            logger.info("[语音对讲] 端口分配异常,deviceId={},channelId={}", device.getDeviceId(), channelId);
@@ -264,7 +266,7 @@
            }
        }, userSetting.getPlayTimeout());
        String callId = SipUtils.getNewCallId();
        zlmrtpServerFactory.releasePort(mediaServerItem, playSsrc);
        Map<String, Object> param = new HashMap<>(12);
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java
@@ -80,7 +80,7 @@
                    for (SendRtpItem sendRtpItem : sendRtpItems) {
                        ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
                        // 停止向上级推流
                        String streamId = sendRtpItem.getStreamId();
                        String streamId = sendRtpItem.getStream();
                        Map<String, Object> param = new HashMap<>();
                        param.put("vhost","__defaultVhost__");
                        param.put("app",sendRtpItem.getApp());
@@ -88,7 +88,7 @@
                        param.put("ssrc",sendRtpItem.getSsrc());
                        logger.info("[REDIS消息-推流结束] 停止向上级推流:{}", streamId);
                        MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                        redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStreamId());
                        redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStream());
                        zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param);
                        try {
@@ -98,7 +98,7 @@
                        }
                        if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) {
                            MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
                                    sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(),
                                    sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
                                    sendRtpItem.getPlatformId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
                            messageForPushChannel.setPlatFormIndex(parentPlatform.getId());
                            redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);