648540858
2022-05-10 f8f65d473bec182abeecd6fd17a9d4c4c4cfc7c5
优化语音广播流程
8个文件已修改
216 ■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatch.java 31 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/session/AudioBroadcastManager.java 24 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java 18 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java 65 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/IPlayService.java 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java 54 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatch.java
@@ -1,6 +1,11 @@
package com.genersoft.iot.vmp.gb28181.bean;
import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.stack.SIPDialog;
import javax.sip.Dialog;
/**
 * 缓存语音广播的状态
 * @author lin
@@ -32,6 +37,16 @@
     */
    private AudioBroadcastCatchStatus status;
    /**
     * 请求信息
     */
    private SIPRequest request;
    /**
     * 会话信息
     */
    private SIPDialog dialog;
    public String getDeviceId() {
        return deviceId;
@@ -56,4 +71,20 @@
    public void setStatus(AudioBroadcastCatchStatus status) {
        this.status = status;
    }
    public void setDialog(SIPDialog dialog) {
        this.dialog = dialog;
    }
    public SIPDialog getDialog() {
        return dialog;
    }
    public SIPRequest getRequest() {
        return request;
    }
    public void setRequest(SIPRequest request) {
        this.request = request;
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/session/AudioBroadcastManager.java
@@ -1,13 +1,14 @@
package com.genersoft.iot.vmp.gb28181.session;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatch;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
 * 语音广播消息管理类
@@ -15,6 +16,9 @@
 */
@Component
public class AudioBroadcastManager {
    @Autowired
    private SipConfig config;
    public static Map<String, AudioBroadcastCatch> data = new ConcurrentHashMap<>();
@@ -54,6 +58,16 @@
    }
    public AudioBroadcastCatch get(String deviceId, String channelId) {
        return data.get(deviceId + channelId);
        AudioBroadcastCatch audioBroadcastCatch = data.get(deviceId + channelId);
        if (audioBroadcastCatch == null) {
            Stream<AudioBroadcastCatch> allAudioBroadcastCatchStreamForDevice = data.values().stream().filter(
                    audioBroadcastCatchItem -> Objects.equals(audioBroadcastCatchItem.getDeviceId(), deviceId));
            List<AudioBroadcastCatch> audioBroadcastCatchList = allAudioBroadcastCatchStreamForDevice.collect(Collectors.toList());
            if (audioBroadcastCatchList.size() == 1 && Objects.equals(config.getId(), channelId)) {
                audioBroadcastCatch = audioBroadcastCatchList.get(0);
            }
        }
        return audioBroadcastCatch;
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
@@ -18,6 +18,8 @@
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.stack.SIPDialog;
import org.ehcache.shadow.org.terracotta.offheapstore.storage.IntegerStorageEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,15 +30,18 @@
import javax.sip.Dialog;
import javax.sip.DialogState;
import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.address.SipURI;
import javax.sip.header.CallIdHeader;
import javax.sip.header.FromHeader;
import javax.sip.header.HeaderAddress;
import javax.sip.header.ToHeader;
import java.text.ParseException;
import java.util.*;
/**
 * SIP命令类型: ACK请求
 * @author lin
 */
@Component
public class AckRequestProcessor extends SIPRequestProcessorParent implements InitializingBean, ISIPRequestProcessor {
@@ -96,8 +101,8 @@
            ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformGbId);
            // 取消设置的超时任务
            dynamicTask.stop(callIdHeader.getCallId());
            String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser();
            SendRtpItem sendRtpItem =  redisCatchStorage.querySendRTPServer(platformGbId, channelId, null, callIdHeader.getCallId());
//            String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser();
            SendRtpItem sendRtpItem =  redisCatchStorage.querySendRTPServer(platformGbId, null, null, callIdHeader.getCallId());
            String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
            MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
            logger.info("收到ACK,开始向上级推流 rtp/{}", sendRtpItem.getStreamId());
@@ -121,7 +126,14 @@
            } else {
                logger.error("RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"),JSONObject.toJSON(param));
                if (sendRtpItem.isOnlyAudio()) {
                    // TODO 可能是语音对讲
                    // 语音对讲
                    try {
                        cmder.streamByeCmd((SIPDialog) evt.getDialog(), (SIPRequest)evt.getRequest(), null);
                    } catch (SipException e) {
                        throw new RuntimeException(e);
                    } catch (ParseException e) {
                        throw new RuntimeException(e);
                    }
                }else {
                    // 向上级平台
                    commanderForPlatform.streamByeCmd(parentPlatform, callIdHeader.getCallId());
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
@@ -13,6 +13,7 @@
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
@@ -65,6 +66,9 @@
    @Autowired
    private VideoStreamSessionManager streamSession;
    @Autowired
    private IPlayService playService;
    @Override
    public void afterPropertiesSet() throws Exception {
        // 添加消息处理的订阅
@@ -106,6 +110,9 @@
                        if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) {
                            cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId, streamId, null);
                        }
                        if (sendRtpItem.isOnlyAudio()) {
                            playService.stopAudioBroadcast(sendRtpItem.getDeviceId(), channelId);
                        }
                        if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) {
                            MessageForPushChannel messageForPushChannel = new MessageForPushChannel();
                            messageForPushChannel.setType(0);
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -114,6 +114,7 @@
    private SipConfig config;
    @Override
    public void afterPropertiesSet() throws Exception {
        // 添加消息处理的订阅
@@ -492,7 +493,6 @@
                                gbStream.getApp(), gbStream.getStream(), channelId,
                                mediaTransmissionTCP);
                        if (sendRtpItem == null) {
                            logger.warn("服务器端口资源不足");
                            responseAck(evt, Response.BUSY_HERE);
@@ -562,25 +562,16 @@
        }
    }
    public void inviteFromDeviceHandle(RequestEvent evt, String requesterId, String channelId) throws InvalidArgumentException, ParseException, SipException, SdpException {
    public void inviteFromDeviceHandle(RequestEvent evt, String requesterId, String channelId1) throws InvalidArgumentException, ParseException, SipException, SdpException {
        // 兼容奇葩的海康这里使用的不是通道编号而是本平台编号
//        if (channelId.equals(config.getId())) {
//            List<AudioBroadcastCatch> all = audioBroadcastManager.getAll();
//            for (AudioBroadcastCatch audioBroadcastCatch : all) {
//                if (audioBroadcastCatch.getDeviceId().equals(requesterId)) {
//                    channelId = audioBroadcastCatch.getChannelId();
//                }
//            }
//        }
//        // 兼容失败
//        if (channelId.equals(config.getId())) {
//            responseAck(evt, Response.BAD_REQUEST);
//            return;
//        }
        // 非上级平台请求,查询是否设备请求(通常为接收语音广播的设备)
        Device device = redisCatchStorage.getDevice(requesterId);
        AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(requesterId, channelId1);
        if (audioBroadcastCatch == null) {
            logger.warn("来自设备的Invite请求非语音广播,已忽略");
            responseAck(evt, Response.FORBIDDEN);
            return;
        }
        Request request = evt.getRequest();
        if (device != null) {
            logger.info("收到设备" + requesterId + "的语音广播Invite请求");
@@ -606,7 +597,6 @@
            // 查看是否支持PS 负载96
            int port = -1;
            //boolean recvonly = false;
            boolean mediaTransmissionTCP = false;
            Boolean tcpActive = null;
            for (int i = 0; i < mediaDescriptions.size(); i++) {
@@ -638,7 +628,6 @@
                responseAck(evt, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式,发415
                return;
            }
            String sessionName = sdp.getSessionName().getValue();
            String addressStr = sdp.getOrigin().getAddress();
            logger.info("设备{}请求语音流,地址:{}:{},ssrc:{}", requesterId, addressStr, port, ssrc);
@@ -649,20 +638,19 @@
                return;
            }
            SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                    device.getDeviceId(), channelId,
                    device.getDeviceId(), audioBroadcastCatch.getChannelId(),
                    mediaTransmissionTCP);
            sendRtpItem.setTcp(mediaTransmissionTCP);
            if (tcpActive != null) {
                sendRtpItem.setTcpActive(tcpActive);
            }
            if (sendRtpItem == null) {
                logger.warn("服务器端口资源不足");
                responseAck(evt, Response.BUSY_HERE);
                return;
            }
            sendRtpItem.setTcp(mediaTransmissionTCP);
            if (tcpActive != null) {
                sendRtpItem.setTcpActive(tcpActive);
            }
            String app = "broadcast";
            String stream = device.getDeviceId() + "_" + channelId;
            String stream = device.getDeviceId() + "_" + audioBroadcastCatch.getChannelId();
            CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME);
            sendRtpItem.setPlayType(InviteStreamType.PLAY);
@@ -685,12 +673,9 @@
            subscribeKey.put("schema", "rtmp");
            subscribeKey.put("mediaServerId", mediaServerItem.getId());
            String finalSsrc = ssrc;
            String waiteStreamTimeoutTaskKey = "waite-stream-" + device.getDeviceId() + channelId;
            // 流已经存在时直接推流
            if (zlmrtpServerFactory.isStreamReady(mediaServerItem, app, stream)) {
                logger.info("发现已经在推流");
                dynamicTask.stop(waiteStreamTimeoutTaskKey);
                sendRtpItem.setStatus(2);
                redisCatchStorage.updateSendRTPSever(sendRtpItem);
                StringBuffer content = new StringBuffer(200);
@@ -711,6 +696,10 @@
                parentPlatform.setServerGBId(device.getDeviceId());
                try {
                    responseSdpAck(evt, content.toString(), parentPlatform);
                    Dialog dialog = evt.getDialog();
                    audioBroadcastCatch.setDialog((SIPDialog) dialog);
                    audioBroadcastCatch.setRequest((SIPRequest) request);
                    audioBroadcastManager.update(audioBroadcastCatch);
                } catch (SipException e) {
                    throw new RuntimeException(e);
                } catch (InvalidArgumentException e) {
@@ -721,19 +710,16 @@
            }else {
                // 流不存在时监听流上线
                // 设置等待推流的超时; 默认20s
                String waiteStreamTimeoutTaskKey = "waite-stream-" + device.getDeviceId() + audioBroadcastCatch.getChannelId();
                dynamicTask.startDelay(waiteStreamTimeoutTaskKey, ()->{
                    logger.info("等待推流超时: {}/{}", app, stream);
                    if (audioBroadcastManager.exit(device.getDeviceId(), channelId)) {
                        audioBroadcastManager.del(device.getDeviceId(), channelId);
                    }else {
                        // 兼容海康使用了错误的通道ID的情况
                        audioBroadcastManager.delByDeviceId(device.getDeviceId());
                    }
                    playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId());
                    // 发送bye
                    try {
                        cmder.streamByeCmd((SIPDialog)evt.getServerTransaction().getDialog(), (SIPRequest) evt.getRequest(), null);
                        responseAck(evt, Response.BUSY_HERE);
                    } catch (SipException e) {
                        throw new RuntimeException(e);
                    } catch (InvalidArgumentException e) {
                        throw new RuntimeException(e);
                    } catch (ParseException e) {
                        throw new RuntimeException(e);
@@ -743,10 +729,11 @@
                subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
                        (MediaServerItem mediaServerItemInUse, JSONObject json)->{
                            sendRtpItem.setStatus(2);
                            dynamicTask.stop(waiteStreamTimeoutTaskKey);
                            redisCatchStorage.updateSendRTPSever(sendRtpItem);
                            StringBuffer content = new StringBuffer(200);
                            content.append("v=0\r\n");
                            content.append("o="+ channelId +" 0 0 IN IP4 "+mediaServerItem.getSdpIp()+"\r\n");
                            content.append("o="+ audioBroadcastCatch.getChannelId() +" 0 0 IN IP4 "+mediaServerItem.getSdpIp()+"\r\n");
                            content.append("s=Play\r\n");
                            content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n");
                            content.append("t=0 0\r\n");
@@ -771,8 +758,6 @@
                            }
                        });
            }
            String timeOutTaskKey = "audio-broadcast-" + device.getDeviceId() + channelId;
            dynamicTask.stop(timeOutTaskKey);
            String key = DeferredResultHolder.CALLBACK_CMD_BROADCAST + device.getDeviceId();
            WVPResult<AudioBroadcastResult> wvpResult = new WVPResult<>();
            wvpResult.setCode(0);
src/main/java/com/genersoft/iot/vmp/service/IPlayService.java
@@ -43,4 +43,5 @@
    StreamInfo getDownLoadInfo(String deviceId, String channelId, String stream);
    void audioBroadcast(Device device, String channelId, int timeout, AudioBroadcastEvent event);
    void stopAudioBroadcast(String deviceId, String channelId);
}
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -5,6 +5,7 @@
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
@@ -44,9 +45,13 @@
import org.springframework.web.context.request.async.DeferredResult;
import javax.sip.ResponseEvent;
import javax.sip.SipException;
import java.io.FileNotFoundException;
import java.math.BigDecimal;
import java.text.ParseException;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@SuppressWarnings(value = {"rawtypes", "unchecked"})
@Service
@@ -92,6 +97,9 @@
    @Autowired
    private UserSetting userSetting;
    @Autowired
    private SipConfig sipConfig;
    @Autowired
    private DynamicTask dynamicTask;
@@ -641,16 +649,13 @@
        }
        // 查询通道使用状态
        if (audioBroadcastManager.exit(device.getDeviceId(), channelId)) {
            logger.warn("语音广播已经开启: {}", channelId);
            event.call("语音广播已经开启");
            return;
            SendRtpItem sendRtpItem =  redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null);
            if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) {
                logger.warn("语音广播已经开启: {}", channelId);
                event.call("语音广播已经开启");
                return;
            }
        }
        String timeOutTaskKey = "audio-broadcast-" + device.getDeviceId() + channelId;
        dynamicTask.startDelay(timeOutTaskKey, ()->{
            logger.error("语音广播发送超时: {}:{}", device.getDeviceId(), channelId);
            event.call("语音广播发送超时");
            audioBroadcastManager.del(device.getDeviceId(), channelId);
        }, timeout * 1000);
        // 发送通知
        cmder.audioBroadcastCmd(device, channelId, eventResultForOk -> {
@@ -658,11 +663,38 @@
            AudioBroadcastCatch audioBroadcastCatch = new AudioBroadcastCatch(device.getDeviceId(), channelId, AudioBroadcastCatchStatus.Ready);
            audioBroadcastManager.add(audioBroadcastCatch);
        }, eventResultForError -> {
            dynamicTask.stop(timeOutTaskKey);
            // 发送失败
            logger.error("语音广播发送失败: {}:{}", channelId, eventResultForError.msg);
            event.call("语音广播发送失败");
            audioBroadcastManager.del(device.getDeviceId(), channelId);
            stopAudioBroadcast(device.getDeviceId(), channelId);
        });
    }
    @Override
    public void stopAudioBroadcast(String deviceId, String channelId){
        AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(deviceId, channelId);
        if (audioBroadcastCatch != null) {
            audioBroadcastManager.del(deviceId, audioBroadcastCatch.getChannelId());
        }
        try {
            SendRtpItem sendRtpItem =  redisCatchStorage.querySendRTPServer(deviceId, channelId, null, null);
            if (sendRtpItem != null) {
                redisCatchStorage.deleteSendRTPServer(deviceId, sendRtpItem.getChannelId(), null, null);
                MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                Map<String, Object> param = new HashMap<>();
                param.put("vhost", "__defaultVhost__");
                param.put("app", sendRtpItem.getApp());
                param.put("stream", sendRtpItem.getStreamId());
                zlmresTfulUtils.stopSendRtp(mediaInfo, param);
            }
            if (audioBroadcastCatch.getStatus() == AudioBroadcastCatchStatus.Ok) {
                cmder.streamByeCmd(audioBroadcastCatch.getDialog(), audioBroadcastCatch.getRequest(), null);
            }
        } catch (SipException e) {
            throw new RuntimeException(e);
        } catch (ParseException e) {
            throw new RuntimeException(e);
        }
    }
}
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java
@@ -319,6 +319,22 @@
        return result;
    }
    @ApiOperation("停止语音广播")
    @ApiImplicitParams({
            @ApiImplicitParam(name = "deviceId", value = "设备Id", dataTypeClass = String.class),
            @ApiImplicitParam(name = "channelId", value = "通道Id", dataTypeClass = String.class),
    })
    @GetMapping("/broadcast/stop/{deviceId}/{channelId}")
    @PostMapping("/broadcast/stop/{deviceId}/{channelId}")
    public WVPResult<String> stopBroadcastA(@PathVariable String deviceId, @PathVariable String channelId) {
        if (logger.isDebugEnabled()) {
            logger.debug("停止语音广播API调用");
        }
        playService.stopAudioBroadcast(deviceId, channelId);
        return new WVPResult<>(0, "success", null);
    }
    @ApiOperation("获取所有的ssrc")
    @GetMapping("/ssrc")
    public WVPResult<JSONObject> getSsrc() {