优化语音对讲支持根据设备设置释放收到ACK后开始发流
| | |
| | | |
| | | private Boolean syncChannelOnDeviceOnline = Boolean.FALSE; |
| | | |
| | | private Boolean pushStreamAfterAck = Boolean.FALSE; |
| | | |
| | | private Boolean sipLog = Boolean.FALSE; |
| | | private Boolean sqlLog = Boolean.FALSE; |
| | | private Boolean sendToPlatformsWhenIdLost = Boolean.FALSE; |
| | |
| | | |
| | | public void setBroadcastForPlatform(String broadcastForPlatform) { |
| | | this.broadcastForPlatform = broadcastForPlatform; |
| | | } |
| | | |
| | | public Boolean getPushStreamAfterAck() { |
| | | return pushStreamAfterAck; |
| | | } |
| | | |
| | | public void setPushStreamAfterAck(Boolean pushStreamAfterAck) { |
| | | this.pushStreamAfterAck = pushStreamAfterAck; |
| | | } |
| | | |
| | | public Boolean getSipUseSourceIpAsRemoteAddress() { |
| | |
| | | @Schema(description = "设备注册的事务信息") |
| | | private SipTransactionInfo sipTransactionInfo; |
| | | |
| | | |
| | | |
| | | @Schema(description = "控制语音对讲流程,释放收到ACK后发流") |
| | | private boolean broadcastPushAfterAck; |
| | | |
| | | public String getDeviceId() { |
| | | return deviceId; |
| | |
| | | /*======================设备主子码流逻辑END=========================*/ |
| | | |
| | | |
| | | public boolean isBroadcastPushAfterAck() { |
| | | return broadcastPushAfterAck; |
| | | } |
| | | |
| | | public void setBroadcastPushAfterAck(boolean broadcastPushAfterAck) { |
| | | this.broadcastPushAfterAck = broadcastPushAfterAck; |
| | | } |
| | | } |
| | |
| | | import com.alibaba.fastjson2.JSONObject; |
| | | import com.genersoft.iot.vmp.conf.DynamicTask; |
| | | import com.genersoft.iot.vmp.conf.UserSetting; |
| | | import com.genersoft.iot.vmp.gb28181.bean.Device; |
| | | import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; |
| | | import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; |
| | | import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; |
| | |
| | | import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; |
| | | import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; |
| | | import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; |
| | | import com.genersoft.iot.vmp.service.IDeviceService; |
| | | import com.genersoft.iot.vmp.service.IMediaServerService; |
| | | import com.genersoft.iot.vmp.service.IPlayService; |
| | | import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg; |
| | |
| | | private IVideoManagerStorage storager; |
| | | |
| | | @Autowired |
| | | private IDeviceService deviceService; |
| | | |
| | | @Autowired |
| | | private ZLMRTPServerFactory zlmrtpServerFactory; |
| | | |
| | | @Autowired |
| | |
| | | @Override |
| | | public void process(RequestEvent evt) { |
| | | CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME); |
| | | String fromUserId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser(); |
| | | String toUserId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); |
| | | logger.info("[收到ACK]: 来自->{}", fromUserId); |
| | | SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId()); |
| | | if (sendRtpItem == null) { |
| | | logger.warn("[收到ACK]:未找到来自{},目标为({})的推流信息",fromUserId, toUserId); |
| | | return; |
| | | } |
| | | logger.info("[收到ACK]:rtp/{}开始级推流, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getStream(), |
| | | sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp()); |
| | | // 取消设置的超时任务 |
| | | dynamicTask.stop(callIdHeader.getCallId()); |
| | | MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); |
| | | ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(fromUserId); |
| | | |
| | | String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser(); |
| | | logger.info("[收到ACK]: platformGbId->{}", platformGbId); |
| | | if (userSetting.getPushStreamAfterAck()) { |
| | | ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformGbId); |
| | | // 取消设置的超时任务 |
| | | dynamicTask.stop(callIdHeader.getCallId()); |
| | | String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); |
| | | SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId()); |
| | | if (sendRtpItem == null) { |
| | | logger.warn("[收到ACK]:未找到通道({})的推流信息", channelId); |
| | | return; |
| | | } |
| | | String isUdp = sendRtpItem.isTcp() ? "0" : "1"; |
| | | MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); |
| | | logger.info("收到ACK,rtp/{}开始级推流, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getStream(), |
| | | sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp()); |
| | | Map<String, Object> param = new HashMap<>(12); |
| | | param.put("vhost","__defaultVhost__"); |
| | | param.put("app",sendRtpItem.getApp()); |
| | | param.put("stream",sendRtpItem.getStream()); |
| | | param.put("ssrc", sendRtpItem.getSsrc()); |
| | | param.put("dst_url",sendRtpItem.getIp()); |
| | | param.put("dst_port", sendRtpItem.getPort()); |
| | | param.put("src_port", sendRtpItem.getLocalPort()); |
| | | param.put("pt", sendRtpItem.getPt()); |
| | | param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); |
| | | param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); |
| | | param.put("is_udp", isUdp); |
| | | if (!sendRtpItem.isTcp()) { |
| | | // udp模式下开启rtcp保活 |
| | | param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0"); |
| | | } |
| | | |
| | | if (parentPlatform != null) { |
| | | Map<String, Object> param = getSendRtpParam(sendRtpItem); |
| | | if (mediaInfo == null) { |
| | | RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance( |
| | | sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStream(), |
| | |
| | | playService.startSendRtpStreamHand(sendRtpItem, parentPlatform, json, param, callIdHeader); |
| | | }); |
| | | } else { |
| | | // 如果是非严格模式,需要关闭端口占用 |
| | | JSONObject startSendRtpStreamResult = null; |
| | | if (sendRtpItem.getLocalPort() != 0) { |
| | | if (sendRtpItem.isTcpActive()) { |
| | | startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param); |
| | | }else { |
| | | param.put("dst_url", sendRtpItem.getIp()); |
| | | param.put("dst_port", sendRtpItem.getPort()); |
| | | startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); |
| | | } |
| | | }else { |
| | | if (sendRtpItem.isTcpActive()) { |
| | | startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param); |
| | | }else { |
| | | param.put("dst_url", sendRtpItem.getIp()); |
| | | param.put("dst_port", sendRtpItem.getPort()); |
| | | startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); |
| | | } |
| | | } |
| | | JSONObject startSendRtpStreamResult = sendRtp(sendRtpItem, mediaInfo, param); |
| | | if (startSendRtpStreamResult != null) { |
| | | playService.startSendRtpStreamHand(sendRtpItem, parentPlatform, startSendRtpStreamResult, param, callIdHeader); |
| | | } |
| | | } |
| | | }else { |
| | | Device device = deviceService.getDevice(fromUserId); |
| | | if (device == null) { |
| | | logger.warn("[收到ACK]:来自{},目标为({})的推流信息为找到流体服务[{}]信息",fromUserId, toUserId, sendRtpItem.getMediaServerId()); |
| | | return; |
| | | } |
| | | // 设置为收到ACK后发送语音的设备已经在发送200OK开始发流了 |
| | | if (!device.isBroadcastPushAfterAck()) { |
| | | return; |
| | | } |
| | | if (mediaInfo == null) { |
| | | logger.warn("[收到ACK]:来自{},目标为({})的推流信息为找到流体服务[{}]信息",fromUserId, toUserId, sendRtpItem.getMediaServerId()); |
| | | return; |
| | | } |
| | | Map<String, Object> param = getSendRtpParam(sendRtpItem); |
| | | JSONObject startSendRtpStreamResult = sendRtp(sendRtpItem, mediaInfo, param); |
| | | if (startSendRtpStreamResult != null) { |
| | | playService.startSendRtpStreamHand(sendRtpItem, device, startSendRtpStreamResult, param, callIdHeader); |
| | | } |
| | | } |
| | | } |
| | | |
| | | private Map<String, Object> getSendRtpParam(SendRtpItem sendRtpItem) { |
| | | String isUdp = sendRtpItem.isTcp() ? "0" : "1"; |
| | | Map<String, Object> param = new HashMap<>(12); |
| | | param.put("vhost","__defaultVhost__"); |
| | | param.put("app",sendRtpItem.getApp()); |
| | | param.put("stream",sendRtpItem.getStream()); |
| | | param.put("ssrc", sendRtpItem.getSsrc()); |
| | | param.put("dst_url",sendRtpItem.getIp()); |
| | | param.put("dst_port", sendRtpItem.getPort()); |
| | | param.put("src_port", sendRtpItem.getLocalPort()); |
| | | param.put("pt", sendRtpItem.getPt()); |
| | | param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); |
| | | param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); |
| | | param.put("is_udp", isUdp); |
| | | if (!sendRtpItem.isTcp()) { |
| | | // udp模式下开启rtcp保活 |
| | | param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0"); |
| | | } |
| | | return param; |
| | | } |
| | | |
| | | private JSONObject sendRtp(SendRtpItem sendRtpItem, MediaServerItem mediaInfo, Map<String, Object> param){ |
| | | JSONObject startSendRtpStreamResult = null; |
| | | if (sendRtpItem.getLocalPort() != 0) { |
| | | if (sendRtpItem.isTcpActive()) { |
| | | startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param); |
| | | }else { |
| | | param.put("dst_url", sendRtpItem.getIp()); |
| | | param.put("dst_port", sendRtpItem.getPort()); |
| | | startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); |
| | | } |
| | | }else { |
| | | if (sendRtpItem.isTcpActive()) { |
| | | startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param); |
| | | }else { |
| | | param.put("dst_url", sendRtpItem.getIp()); |
| | | param.put("dst_port", sendRtpItem.getPort()); |
| | | startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); |
| | | } |
| | | } |
| | | return startSendRtpStreamResult; |
| | | |
| | | } |
| | | |
| | | } |
| | |
| | | |
| | | try { |
| | | // 超时未收到Ack应该回复bye,当前等待时间为10秒 |
| | | if (userSetting.getPushStreamAfterAck()) { |
| | | dynamicTask.startDelay(callIdHeader.getCallId(), () -> { |
| | | logger.info("Ack 等待超时"); |
| | | mediaServerService.releaseSsrc(mediaServerItemInUSe.getId(), sendRtpItem.getSsrc()); |
| | | // 回复bye |
| | | try { |
| | | cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId()); |
| | | } catch (SipException | InvalidArgumentException | ParseException e) { |
| | | logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); |
| | | } |
| | | }, 60 * 1000); |
| | | } |
| | | dynamicTask.startDelay(callIdHeader.getCallId(), () -> { |
| | | logger.info("Ack 等待超时"); |
| | | mediaServerService.releaseSsrc(mediaServerItemInUSe.getId(), sendRtpItem.getSsrc()); |
| | | // 回复bye |
| | | try { |
| | | cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId()); |
| | | } catch (SipException | InvalidArgumentException | ParseException e) { |
| | | logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); |
| | | } |
| | | }, 60 * 1000); |
| | | |
| | | SIPResponse sipResponse = responseSdpAck(request, content.toString(), platform); |
| | | if (!userSetting.getPushStreamAfterAck()) { |
| | | playService.startPushStream(sendRtpItem, sipResponse, platform, request.getCallIdHeader()); |
| | | } |
| | | responseSdpAck(request, content.toString(), platform); |
| | | } catch (SipException | InvalidArgumentException | ParseException e) { |
| | | logger.error("[命令发送失败] 国标级联 回复SdpAck", e); |
| | | } |
| | |
| | | if (response != null) { |
| | | sendRtpItem.setToTag(response.getToTag()); |
| | | } |
| | | |
| | | redisCatchStorage.updateSendRTPSever(sendRtpItem); |
| | | |
| | | } else { |
| | |
| | | content.append("f=\r\n"); |
| | | |
| | | try { |
| | | SIPResponse sipResponse = responseSdpAck(request, content.toString(), platform); |
| | | if (!userSetting.getPushStreamAfterAck()) { |
| | | playService.startPushStream(sendRtpItem, sipResponse, platform, request.getCallIdHeader()); |
| | | } |
| | | return sipResponse; |
| | | } catch (SipException e) { |
| | | logger.error("未处理的异常 ", e); |
| | | } catch (InvalidArgumentException e) { |
| | | logger.error("未处理的异常 ", e); |
| | | } catch (ParseException e) { |
| | | return responseSdpAck(request, content.toString(), platform); |
| | | } catch (SipException | InvalidArgumentException | ParseException e) { |
| | | logger.error("未处理的异常 ", e); |
| | | } |
| | | return null; |
| | |
| | | audioBroadcastManager.update(audioBroadcastCatch); |
| | | |
| | | // 开启发流,大华在收到200OK后就会开始建立连接 |
| | | if (!userSetting.getPushStreamAfterAck()) { |
| | | if (!device.isBroadcastPushAfterAck()) { |
| | | playService.startPushStream(sendRtpItem, sipResponse, parentPlatform, request.getCallIdHeader()); |
| | | } |
| | | |
| | |
| | | |
| | | void startPushStream(SendRtpItem sendRtpItem, SIPResponse sipResponse, ParentPlatform platform, CallIdHeader callIdHeader); |
| | | |
| | | void startSendRtpStreamHand(SendRtpItem sendRtpItem, ParentPlatform parentPlatform, |
| | | void startSendRtpStreamHand(SendRtpItem sendRtpItem, Object correlationInfo, |
| | | JSONObject jsonObject, Map<String, Object> param, CallIdHeader callIdHeader); |
| | | |
| | | void talkCmd(Device device, String channelId, MediaServerItem mediaServerItem, String stream, AudioBroadcastEvent event); |
| | |
| | | } |
| | | |
| | | @Override |
| | | public void startSendRtpStreamHand(SendRtpItem sendRtpItem, ParentPlatform parentPlatform, |
| | | public void startSendRtpStreamHand(SendRtpItem sendRtpItem, Object correlationInfo, |
| | | JSONObject jsonObject, Map<String, Object> param, CallIdHeader callIdHeader) { |
| | | if (jsonObject == null) { |
| | | logger.error("RTP推流失败: 请检查ZLM服务"); |
| | |
| | | } |
| | | } else { |
| | | // 向上级平台 |
| | | try { |
| | | commanderForPlatform.streamByeCmd(parentPlatform, callIdHeader.getCallId()); |
| | | } catch (SipException | InvalidArgumentException | ParseException e) { |
| | | logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); |
| | | if (correlationInfo instanceof ParentPlatform) { |
| | | try { |
| | | ParentPlatform parentPlatform = (ParentPlatform)correlationInfo; |
| | | commanderForPlatform.streamByeCmd(parentPlatform, callIdHeader.getCallId()); |
| | | } catch (SipException | InvalidArgumentException | ParseException e) { |
| | | logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); |
| | | } |
| | | } |
| | | } |
| | | } |
| | |
| | | "on_line," + |
| | | "media_server_id," + |
| | | "switch_primary_sub_stream," + |
| | | "broadcast_push_after_ack," + |
| | | "(SELECT count(0) FROM wvp_device_channel WHERE device_id=wvp_device.device_id) as channel_count "+ |
| | | " FROM wvp_device WHERE device_id = #{deviceId}") |
| | | Device getDeviceByDeviceId(String deviceId); |
| | |
| | | "subscribe_cycle_for_alarm,"+ |
| | | "ssrc_check,"+ |
| | | "as_message_channel,"+ |
| | | "broadcast_push_after_ack,"+ |
| | | "geo_coord_sys,"+ |
| | | "on_line"+ |
| | | ") VALUES (" + |
| | |
| | | "#{subscribeCycleForAlarm}," + |
| | | "#{ssrcCheck}," + |
| | | "#{asMessageChannel}," + |
| | | "#{broadcastPushAfterAck}," + |
| | | "#{geoCoordSys}," + |
| | | "#{onLine}" + |
| | | ")") |
| | |
| | | "subscribe_cycle_for_alarm,"+ |
| | | "ssrc_check,"+ |
| | | "as_message_channel,"+ |
| | | "broadcast_push_after_ack,"+ |
| | | "geo_coord_sys,"+ |
| | | "on_line,"+ |
| | | "media_server_id,"+ |
| | |
| | | "subscribe_cycle_for_alarm,"+ |
| | | "ssrc_check,"+ |
| | | "as_message_channel,"+ |
| | | "broadcast_push_after_ack,"+ |
| | | "geo_coord_sys,"+ |
| | | "on_line"+ |
| | | " FROM wvp_device WHERE on_line = true") |
| | |
| | | "subscribe_cycle_for_alarm,"+ |
| | | "ssrc_check,"+ |
| | | "as_message_channel,"+ |
| | | "broadcast_push_after_ack,"+ |
| | | "geo_coord_sys,"+ |
| | | "on_line"+ |
| | | " FROM wvp_device WHERE ip = #{host} AND port=#{port}") |
| | |
| | | "<if test=\"subscribeCycleForAlarm != null\">, subscribe_cycle_for_alarm=#{subscribeCycleForAlarm}</if>" + |
| | | "<if test=\"ssrcCheck != null\">, ssrc_check=#{ssrcCheck}</if>" + |
| | | "<if test=\"asMessageChannel != null\">, as_message_channel=#{asMessageChannel}</if>" + |
| | | "<if test=\"broadcastPushAfterAck != null\">, broadcast_push_after_ack=#{broadcastPushAfterAck}</if>" + |
| | | "<if test=\"geoCoordSys != null\">, geo_coord_sys=#{geoCoordSys}</if>" + |
| | | "<if test=\"switchPrimarySubStream != null\">, switch_primary_sub_stream=#{switchPrimarySubStream}</if>" + |
| | | "<if test=\"mediaServerId != null\">, media_server_id=#{mediaServerId}</if>" + |
| | |
| | | "charset,"+ |
| | | "ssrc_check,"+ |
| | | "as_message_channel,"+ |
| | | "broadcastPushAfterAck,"+ |
| | | "geo_coord_sys,"+ |
| | | "on_line,"+ |
| | | "media_server_id,"+ |
| | |
| | | "#{charset}," + |
| | | "#{ssrcCheck}," + |
| | | "#{asMessageChannel}," + |
| | | "#{broadcastPushAfterAck}," + |
| | | "#{geoCoordSys}," + |
| | | "#{onLine}," + |
| | | "#{mediaServerId}," + |
| | |
| | | <el-form-item label="其他选项"> |
| | | <el-checkbox label="SSRC校验" v-model="form.ssrcCheck" style="float: left"></el-checkbox> |
| | | <el-checkbox label="作为消息通道" v-model="form.asMessageChannel" style="float: left"></el-checkbox> |
| | | <el-checkbox label="收到ACK后发流" v-model="form.broadcastPushAfterAck" style="float: left"></el-checkbox> |
| | | </el-form-item> |
| | | <el-form-item> |
| | | <div style="float: right;"> |