src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
@@ -4,9 +4,8 @@ import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; @@ -82,6 +81,9 @@ @Autowired private ISIPCommanderForPlatform commanderForPlatform; @Autowired private AudioBroadcastManager audioBroadcastManager; /** * 处理 ACK请求 @@ -122,6 +124,13 @@ if (jsonObject == null) { logger.error("RTP推流失败: 请检查ZLM服务"); } else if (jsonObject.getInteger("code") == 0) { if (sendRtpItem.isOnlyAudio()) { AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); audioBroadcastCatch.setStatus(AudioBroadcastCatchStatus.Ok); audioBroadcastCatch.setDialog((SIPDialog) evt.getDialog()); audioBroadcastCatch.setRequest((SIPRequest) evt.getRequest()); audioBroadcastManager.update(audioBroadcastCatch); } logger.info("RTP推流成功[ {}/{} ],{}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port")); } else { logger.error("RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"),JSONObject.toJSON(param)); src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
@@ -91,7 +91,7 @@ if (dialog.getState().equals(DialogState.TERMINATED)) { String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser(); String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId, null, callIdHeader.getCallId()); SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, null, null, callIdHeader.getCallId()); logger.info("收到bye, [{}/{}]", platformGbId, channelId); if (sendRtpItem != null){ String streamId = sendRtpItem.getStreamId(); @@ -103,15 +103,15 @@ logger.info("收到bye:停止向上级推流:" + streamId); MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param); redisCatchStorage.deleteSendRTPServer(platformGbId, channelId, callIdHeader.getCallId(), null); redisCatchStorage.deleteSendRTPServer(platformGbId, sendRtpItem.getChannelId(), callIdHeader.getCallId(), null); int totalReaderCount = zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId); if (totalReaderCount <= 0) { logger.info("收到bye: {} 无其它观看者,通知设备停止推流", streamId); if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) { cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId, streamId, null); cmder.streamByeCmd(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId(), streamId, null); } if (sendRtpItem.isOnlyAudio()) { playService.stopAudioBroadcast(sendRtpItem.getDeviceId(), channelId); playService.stopAudioBroadcast(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); } if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) { MessageForPushChannel messageForPushChannel = new MessageForPushChannel(); src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -713,6 +713,7 @@ String waiteStreamTimeoutTaskKey = "waite-stream-" + device.getDeviceId() + audioBroadcastCatch.getChannelId(); dynamicTask.startDelay(waiteStreamTimeoutTaskKey, ()->{ logger.info("等待推流超时: {}/{}", app, stream); subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey); playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId()); // 发送bye try { @@ -728,35 +729,42 @@ subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, (MediaServerItem mediaServerItemInUse, JSONObject json)->{ sendRtpItem.setStatus(2); dynamicTask.stop(waiteStreamTimeoutTaskKey); redisCatchStorage.updateSendRTPSever(sendRtpItem); StringBuffer content = new StringBuffer(200); content.append("v=0\r\n"); content.append("o="+ audioBroadcastCatch.getChannelId() +" 0 0 IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); content.append("s=Play\r\n"); content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); content.append("t=0 0\r\n"); content.append("m=video "+ sendRtpItem.getLocalPort()+" RTP/AVP 8\r\n"); content.append("a=sendonly\r\n"); content.append("a=rtpmap:8 PCMA/8000\r\n"); content.append("y="+ finalSsrc + "\r\n"); content.append("f=v/////a/1/8/1\r\n"); logger.info("收到语音对讲推流"); try { sendRtpItem.setStatus(2); redisCatchStorage.updateSendRTPSever(sendRtpItem); StringBuffer content = new StringBuffer(200); content.append("v=0\r\n"); content.append("o="+ config.getId() +" "+ sdp.getOrigin().getSessionId() +" " + sdp.getOrigin().getSessionVersion() + " IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); content.append("s=Play\r\n"); content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); content.append("t=0 0\r\n"); content.append("m=audio "+ sendRtpItem.getLocalPort()+" RTP/AVP 8\r\n"); content.append("a=sendonly\r\n"); content.append("a=rtpmap:8 PCMA/8000\r\n"); content.append("y="+ finalSsrc + "\r\n"); content.append("f=v/////a/1/8/1\r\n"); ParentPlatform parentPlatform = new ParentPlatform(); parentPlatform.setServerIP(device.getIp()); parentPlatform.setServerPort(device.getPort()); parentPlatform.setServerGBId(device.getDeviceId()); try { responseSdpAck(evt, content.toString(), parentPlatform); } catch (SipException e) { throw new RuntimeException(e); } catch (InvalidArgumentException e) { throw new RuntimeException(e); } catch (ParseException e) { throw new RuntimeException(e); } }); ParentPlatform parentPlatform = new ParentPlatform(); parentPlatform.setServerIP(device.getIp()); parentPlatform.setServerPort(device.getPort()); parentPlatform.setServerGBId(device.getDeviceId()); responseSdpAck(evt, content.toString(), parentPlatform); Dialog dialog = evt.getDialog(); audioBroadcastCatch.setDialog((SIPDialog) dialog); audioBroadcastCatch.setRequest((SIPRequest) request); audioBroadcastManager.update(audioBroadcastCatch); } catch (SipException e) { throw new RuntimeException(e); } catch (InvalidArgumentException e) { throw new RuntimeException(e); } catch (ParseException e) { throw new RuntimeException(e); } catch (SdpParseException e) { throw new RuntimeException(e); } }); } String key = DeferredResultHolder.CALLBACK_CMD_BROADCAST + device.getDeviceId(); WVPResult<AudioBroadcastResult> wvpResult = new WVPResult<>(); src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -675,26 +675,27 @@ AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(deviceId, channelId); if (audioBroadcastCatch != null) { audioBroadcastManager.del(deviceId, audioBroadcastCatch.getChannelId()); } try { SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(deviceId, channelId, null, null); if (sendRtpItem != null) { redisCatchStorage.deleteSendRTPServer(deviceId, sendRtpItem.getChannelId(), null, null); MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); Map<String, Object> param = new HashMap<>(); param.put("vhost", "__defaultVhost__"); param.put("app", sendRtpItem.getApp()); param.put("stream", sendRtpItem.getStreamId()); zlmresTfulUtils.stopSendRtp(mediaInfo, param); try { SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(deviceId, audioBroadcastCatch.getChannelId(), null, null); if (sendRtpItem != null) { redisCatchStorage.deleteSendRTPServer(deviceId, sendRtpItem.getChannelId(), null, null); MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); Map<String, Object> param = new HashMap<>(); param.put("vhost", "__defaultVhost__"); param.put("app", sendRtpItem.getApp()); param.put("stream", sendRtpItem.getStreamId()); zlmresTfulUtils.stopSendRtp(mediaInfo, param); } if (audioBroadcastCatch.getStatus() == AudioBroadcastCatchStatus.Ok) { cmder.streamByeCmd(audioBroadcastCatch.getDialog(), audioBroadcastCatch.getRequest(), null); } } catch (SipException e) { throw new RuntimeException(e); } catch (ParseException e) { throw new RuntimeException(e); } if (audioBroadcastCatch.getStatus() == AudioBroadcastCatchStatus.Ok) { cmder.streamByeCmd(audioBroadcastCatch.getDialog(), audioBroadcastCatch.getRequest(), null); } } catch (SipException e) { throw new RuntimeException(e); } catch (ParseException e) { throw new RuntimeException(e); } } }