648540858
2023-07-02 4604aaea99925415db8d9efe1d7e68d6f59e93c8
优化语音对讲支持根据设备设置释放收到ACK后开始发流
8个文件已修改
1个文件已添加
227 ■■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java 11 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java 139 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java 42 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/IPlayService.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMapper.java 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/resources/local.jks 补丁 | 查看 | 原始文档 | blame | 历史
web_src/src/components/dialog/deviceEdit.vue 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java
@@ -47,8 +47,6 @@
    private Boolean syncChannelOnDeviceOnline = Boolean.FALSE;
    private Boolean pushStreamAfterAck = Boolean.FALSE;
    private Boolean sipLog = Boolean.FALSE;
    private Boolean sqlLog = Boolean.FALSE;
    private Boolean sendToPlatformsWhenIdLost = Boolean.FALSE;
@@ -232,14 +230,6 @@
    public void setBroadcastForPlatform(String broadcastForPlatform) {
        this.broadcastForPlatform = broadcastForPlatform;
    }
    public Boolean getPushStreamAfterAck() {
        return pushStreamAfterAck;
    }
    public void setPushStreamAfterAck(Boolean pushStreamAfterAck) {
        this.pushStreamAfterAck = pushStreamAfterAck;
    }
    public Boolean getSipUseSourceIpAsRemoteAddress() {
src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java
@@ -188,8 +188,8 @@
    @Schema(description = "设备注册的事务信息")
    private SipTransactionInfo sipTransactionInfo;
    @Schema(description = "控制语音对讲流程,释放收到ACK后发流")
    private boolean broadcastPushAfterAck;
    public String getDeviceId() {
        return deviceId;
@@ -465,4 +465,11 @@
    /*======================设备主子码流逻辑END=========================*/
    public boolean isBroadcastPushAfterAck() {
        return broadcastPushAfterAck;
    }
    public void setBroadcastPushAfterAck(boolean broadcastPushAfterAck) {
        this.broadcastPushAfterAck = broadcastPushAfterAck;
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
@@ -3,6 +3,7 @@
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
@@ -10,9 +11,8 @@
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
@@ -63,6 +63,9 @@
    private IVideoManagerStorage storager;
    @Autowired
    private IDeviceService deviceService;
    @Autowired
    private ZLMRTPServerFactory zlmrtpServerFactory;
    @Autowired
@@ -87,40 +90,23 @@
    @Override
    public void process(RequestEvent evt) {
        CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME);
        String fromUserId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser();
        String toUserId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser();
        logger.info("[收到ACK]: 来自->{}", fromUserId);
        SendRtpItem sendRtpItem =  redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId());
        if (sendRtpItem == null) {
            logger.warn("[收到ACK]:未找到来自{},目标为({})的推流信息",fromUserId, toUserId);
            return;
        }
        logger.info("[收到ACK]:rtp/{}开始级推流, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getStream(),
                sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp());
        // 取消设置的超时任务
        dynamicTask.stop(callIdHeader.getCallId());
        MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
        ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(fromUserId);
        String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser();
        logger.info("[收到ACK]: platformGbId->{}", platformGbId);
        if (userSetting.getPushStreamAfterAck()) {
            ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformGbId);
            // 取消设置的超时任务
            dynamicTask.stop(callIdHeader.getCallId());
            String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser();
            SendRtpItem sendRtpItem =  redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId());
            if (sendRtpItem == null) {
                logger.warn("[收到ACK]:未找到通道({})的推流信息", channelId);
                return;
            }
            String isUdp = sendRtpItem.isTcp() ? "0" : "1";
            MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
            logger.info("收到ACK,rtp/{}开始级推流, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getStream(),
                    sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp());
            Map<String, Object> param = new HashMap<>(12);
            param.put("vhost","__defaultVhost__");
            param.put("app",sendRtpItem.getApp());
            param.put("stream",sendRtpItem.getStream());
            param.put("ssrc", sendRtpItem.getSsrc());
            param.put("dst_url",sendRtpItem.getIp());
            param.put("dst_port", sendRtpItem.getPort());
            param.put("src_port", sendRtpItem.getLocalPort());
            param.put("pt", sendRtpItem.getPt());
            param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
            param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
            param.put("is_udp", isUdp);
            if (!sendRtpItem.isTcp()) {
                // udp模式下开启rtcp保活
                param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0");
            }
        if (parentPlatform != null) {
            Map<String, Object> param = getSendRtpParam(sendRtpItem);
            if (mediaInfo == null) {
                RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(
                        sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStream(),
@@ -130,30 +116,75 @@
                    playService.startSendRtpStreamHand(sendRtpItem, parentPlatform, json, param, callIdHeader);
                });
            } else {
                // 如果是非严格模式,需要关闭端口占用
                JSONObject startSendRtpStreamResult = null;
                if (sendRtpItem.getLocalPort() != 0) {
                    if (sendRtpItem.isTcpActive()) {
                        startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param);
                    }else {
                        param.put("dst_url", sendRtpItem.getIp());
                        param.put("dst_port", sendRtpItem.getPort());
                        startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
                    }
                }else {
                    if (sendRtpItem.isTcpActive()) {
                        startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param);
                    }else {
                        param.put("dst_url", sendRtpItem.getIp());
                        param.put("dst_port", sendRtpItem.getPort());
                        startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
                    }
                }
                JSONObject startSendRtpStreamResult = sendRtp(sendRtpItem, mediaInfo, param);
                if (startSendRtpStreamResult != null) {
                    playService.startSendRtpStreamHand(sendRtpItem, parentPlatform, startSendRtpStreamResult, param, callIdHeader);
                }
            }
        }else {
            Device device = deviceService.getDevice(fromUserId);
            if (device == null) {
                logger.warn("[收到ACK]:来自{},目标为({})的推流信息为找到流体服务[{}]信息",fromUserId, toUserId, sendRtpItem.getMediaServerId());
                return;
            }
            // 设置为收到ACK后发送语音的设备已经在发送200OK开始发流了
            if (!device.isBroadcastPushAfterAck()) {
                return;
            }
            if (mediaInfo == null) {
                logger.warn("[收到ACK]:来自{},目标为({})的推流信息为找到流体服务[{}]信息",fromUserId, toUserId, sendRtpItem.getMediaServerId());
                return;
            }
            Map<String, Object> param = getSendRtpParam(sendRtpItem);
            JSONObject startSendRtpStreamResult = sendRtp(sendRtpItem, mediaInfo, param);
            if (startSendRtpStreamResult != null) {
                playService.startSendRtpStreamHand(sendRtpItem, device, startSendRtpStreamResult, param, callIdHeader);
            }
        }
    }
    private Map<String, Object> getSendRtpParam(SendRtpItem sendRtpItem) {
        String isUdp = sendRtpItem.isTcp() ? "0" : "1";
        Map<String, Object> param = new HashMap<>(12);
        param.put("vhost","__defaultVhost__");
        param.put("app",sendRtpItem.getApp());
        param.put("stream",sendRtpItem.getStream());
        param.put("ssrc", sendRtpItem.getSsrc());
        param.put("dst_url",sendRtpItem.getIp());
        param.put("dst_port", sendRtpItem.getPort());
        param.put("src_port", sendRtpItem.getLocalPort());
        param.put("pt", sendRtpItem.getPt());
        param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
        param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
        param.put("is_udp", isUdp);
        if (!sendRtpItem.isTcp()) {
            // udp模式下开启rtcp保活
            param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0");
        }
        return param;
    }
    private JSONObject sendRtp(SendRtpItem sendRtpItem, MediaServerItem mediaInfo, Map<String, Object> param){
        JSONObject startSendRtpStreamResult = null;
        if (sendRtpItem.getLocalPort() != 0) {
            if (sendRtpItem.isTcpActive()) {
                startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param);
            }else {
                param.put("dst_url", sendRtpItem.getIp());
                param.put("dst_port", sendRtpItem.getPort());
                startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
            }
        }else {
            if (sendRtpItem.isTcpActive()) {
                startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param);
            }else {
                param.put("dst_url", sendRtpItem.getIp());
                param.put("dst_port", sendRtpItem.getPort());
                startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
            }
        }
        return startSendRtpStreamResult;
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -427,23 +427,18 @@
                        try {
                            // 超时未收到Ack应该回复bye,当前等待时间为10秒
                            if (userSetting.getPushStreamAfterAck()) {
                                dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
                                    logger.info("Ack 等待超时");
                                    mediaServerService.releaseSsrc(mediaServerItemInUSe.getId(), sendRtpItem.getSsrc());
                                    // 回复bye
                                    try {
                                        cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId());
                                    } catch (SipException | InvalidArgumentException | ParseException e) {
                                        logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
                                    }
                                }, 60 * 1000);
                            }
                            dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
                                logger.info("Ack 等待超时");
                                mediaServerService.releaseSsrc(mediaServerItemInUSe.getId(), sendRtpItem.getSsrc());
                                // 回复bye
                                try {
                                    cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId());
                                } catch (SipException | InvalidArgumentException | ParseException e) {
                                    logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
                                }
                            }, 60 * 1000);
                            SIPResponse sipResponse = responseSdpAck(request, content.toString(), platform);
                            if (!userSetting.getPushStreamAfterAck()) {
                                playService.startPushStream(sendRtpItem, sipResponse, platform, request.getCallIdHeader());
                            }
                             responseSdpAck(request, content.toString(), platform);
                        } catch (SipException | InvalidArgumentException | ParseException e) {
                            logger.error("[命令发送失败] 国标级联 回复SdpAck", e);
                        }
@@ -650,7 +645,6 @@
                if (response != null) {
                    sendRtpItem.setToTag(response.getToTag());
                }
                redisCatchStorage.updateSendRTPSever(sendRtpItem);
            } else {
@@ -888,16 +882,8 @@
        content.append("f=\r\n");
        try {
            SIPResponse sipResponse = responseSdpAck(request, content.toString(), platform);
            if (!userSetting.getPushStreamAfterAck()) {
                playService.startPushStream(sendRtpItem, sipResponse, platform, request.getCallIdHeader());
            }
            return sipResponse;
        } catch (SipException e) {
            logger.error("未处理的异常 ", e);
        } catch (InvalidArgumentException e) {
            logger.error("未处理的异常 ", e);
        } catch (ParseException e) {
            return responseSdpAck(request, content.toString(), platform);
        } catch (SipException | InvalidArgumentException | ParseException e) {
            logger.error("未处理的异常 ", e);
        }
        return null;
@@ -1132,7 +1118,7 @@
            audioBroadcastManager.update(audioBroadcastCatch);
            // 开启发流,大华在收到200OK后就会开始建立连接
            if (!userSetting.getPushStreamAfterAck()) {
            if (!device.isBroadcastPushAfterAck()) {
                playService.startPushStream(sendRtpItem, sipResponse, parentPlatform, request.getCallIdHeader());
            }
src/main/java/com/genersoft/iot/vmp/service/IPlayService.java
@@ -64,7 +64,7 @@
    void startPushStream(SendRtpItem sendRtpItem, SIPResponse sipResponse, ParentPlatform platform, CallIdHeader callIdHeader);
    void startSendRtpStreamHand(SendRtpItem sendRtpItem, ParentPlatform parentPlatform,
    void startSendRtpStreamHand(SendRtpItem sendRtpItem, Object correlationInfo,
                                JSONObject jsonObject, Map<String, Object> param, CallIdHeader callIdHeader);
    void talkCmd(Device device, String channelId, MediaServerItem mediaServerItem, String stream, AudioBroadcastEvent event);
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -1481,7 +1481,7 @@
    }
    @Override
    public void startSendRtpStreamHand(SendRtpItem sendRtpItem, ParentPlatform parentPlatform,
    public void startSendRtpStreamHand(SendRtpItem sendRtpItem, Object correlationInfo,
                                       JSONObject jsonObject, Map<String, Object> param, CallIdHeader callIdHeader) {
        if (jsonObject == null) {
            logger.error("RTP推流失败: 请检查ZLM服务");
@@ -1504,10 +1504,13 @@
                }
            } else {
                // 向上级平台
                try {
                    commanderForPlatform.streamByeCmd(parentPlatform, callIdHeader.getCallId());
                } catch (SipException | InvalidArgumentException | ParseException e) {
                    logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
                if (correlationInfo instanceof ParentPlatform) {
                    try {
                        ParentPlatform parentPlatform = (ParentPlatform)correlationInfo;
                        commanderForPlatform.streamByeCmd(parentPlatform, callIdHeader.getCallId());
                    } catch (SipException | InvalidArgumentException | ParseException e) {
                        logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
                    }
                }
            }
        }
src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMapper.java
@@ -43,6 +43,7 @@
            "on_line," +
            "media_server_id," +
            "switch_primary_sub_stream," +
            "broadcast_push_after_ack," +
            "(SELECT count(0) FROM wvp_device_channel WHERE device_id=wvp_device.device_id) as channel_count "+
            " FROM wvp_device WHERE device_id = #{deviceId}")
    Device getDeviceByDeviceId(String deviceId);
@@ -73,6 +74,7 @@
                "subscribe_cycle_for_alarm,"+
                "ssrc_check,"+
                "as_message_channel,"+
                "broadcast_push_after_ack,"+
                "geo_coord_sys,"+
                "on_line"+
            ") VALUES (" +
@@ -101,6 +103,7 @@
                "#{subscribeCycleForAlarm}," +
                "#{ssrcCheck}," +
                "#{asMessageChannel}," +
                "#{broadcastPushAfterAck}," +
                "#{geoCoordSys}," +
                "#{onLine}" +
            ")")
@@ -155,6 +158,7 @@
            "subscribe_cycle_for_alarm,"+
            "ssrc_check,"+
            "as_message_channel,"+
            "broadcast_push_after_ack,"+
            "geo_coord_sys,"+
            "on_line,"+
            "media_server_id,"+
@@ -196,6 +200,7 @@
            "subscribe_cycle_for_alarm,"+
            "ssrc_check,"+
            "as_message_channel,"+
            "broadcast_push_after_ack,"+
            "geo_coord_sys,"+
            "on_line"+
            " FROM wvp_device WHERE on_line = true")
@@ -226,6 +231,7 @@
            "subscribe_cycle_for_alarm,"+
            "ssrc_check,"+
            "as_message_channel,"+
            "broadcast_push_after_ack,"+
            "geo_coord_sys,"+
            "on_line"+
            " FROM wvp_device WHERE ip = #{host} AND port=#{port}")
@@ -247,6 +253,7 @@
            "<if test=\"subscribeCycleForAlarm != null\">, subscribe_cycle_for_alarm=#{subscribeCycleForAlarm}</if>" +
            "<if test=\"ssrcCheck != null\">, ssrc_check=#{ssrcCheck}</if>" +
            "<if test=\"asMessageChannel != null\">, as_message_channel=#{asMessageChannel}</if>" +
            "<if test=\"broadcastPushAfterAck != null\">, broadcast_push_after_ack=#{broadcastPushAfterAck}</if>" +
            "<if test=\"geoCoordSys != null\">, geo_coord_sys=#{geoCoordSys}</if>" +
            "<if test=\"switchPrimarySubStream != null\">, switch_primary_sub_stream=#{switchPrimarySubStream}</if>" +
            "<if test=\"mediaServerId != null\">, media_server_id=#{mediaServerId}</if>" +
@@ -264,6 +271,7 @@
            "charset,"+
            "ssrc_check,"+
            "as_message_channel,"+
            "broadcastPushAfterAck,"+
            "geo_coord_sys,"+
            "on_line,"+
            "media_server_id,"+
@@ -278,6 +286,7 @@
            "#{charset}," +
            "#{ssrcCheck}," +
            "#{asMessageChannel}," +
            "#{broadcastPushAfterAck}," +
            "#{geoCoordSys}," +
            "#{onLine}," +
            "#{mediaServerId}," +
src/main/resources/local.jks
Binary files differ
web_src/src/components/dialog/deviceEdit.vue
@@ -70,6 +70,7 @@
          <el-form-item label="其他选项">
            <el-checkbox label="SSRC校验" v-model="form.ssrcCheck" style="float: left"></el-checkbox>
            <el-checkbox label="作为消息通道" v-model="form.asMessageChannel" style="float: left"></el-checkbox>
            <el-checkbox label="收到ACK后发流" v-model="form.broadcastPushAfterAck" style="float: left"></el-checkbox>
          </el-form-item>
          <el-form-item>
            <div style="float: right;">