src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatch.java
@@ -1,6 +1,11 @@ package com.genersoft.iot.vmp.gb28181.bean; import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.stack.SIPDialog; import javax.sip.Dialog; /** * 缓存语音广播的状态 * @author lin @@ -32,6 +37,16 @@ */ private AudioBroadcastCatchStatus status; /** * 请求信息 */ private SIPRequest request; /** * 会话信息 */ private SIPDialog dialog; public String getDeviceId() { return deviceId; @@ -56,4 +71,20 @@ public void setStatus(AudioBroadcastCatchStatus status) { this.status = status; } public void setDialog(SIPDialog dialog) { this.dialog = dialog; } public SIPDialog getDialog() { return dialog; } public SIPRequest getRequest() { return request; } public void setRequest(SIPRequest request) { this.request = request; } } src/main/java/com/genersoft/iot/vmp/gb28181/session/AudioBroadcastManager.java
@@ -1,13 +1,14 @@ package com.genersoft.iot.vmp.gb28181.session; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatch; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import java.util.stream.Stream; /** * 语音广播消息管理类 @@ -15,6 +16,9 @@ */ @Component public class AudioBroadcastManager { @Autowired private SipConfig config; public static Map<String, AudioBroadcastCatch> data = new ConcurrentHashMap<>(); @@ -54,6 +58,16 @@ } public AudioBroadcastCatch get(String deviceId, String channelId) { return data.get(deviceId + channelId); AudioBroadcastCatch audioBroadcastCatch = data.get(deviceId + channelId); if (audioBroadcastCatch == null) { Stream<AudioBroadcastCatch> allAudioBroadcastCatchStreamForDevice = data.values().stream().filter( audioBroadcastCatchItem -> Objects.equals(audioBroadcastCatchItem.getDeviceId(), deviceId)); List<AudioBroadcastCatch> audioBroadcastCatchList = allAudioBroadcastCatchStreamForDevice.collect(Collectors.toList()); if (audioBroadcastCatchList.size() == 1 && Objects.equals(config.getId(), channelId)) { audioBroadcastCatch = audioBroadcastCatchList.get(0); } } return audioBroadcastCatch; } } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
@@ -18,6 +18,8 @@ import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.stack.SIPDialog; import org.ehcache.shadow.org.terracotta.offheapstore.storage.IntegerStorageEngine; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,15 +30,18 @@ import javax.sip.Dialog; import javax.sip.DialogState; import javax.sip.RequestEvent; import javax.sip.SipException; import javax.sip.address.SipURI; import javax.sip.header.CallIdHeader; import javax.sip.header.FromHeader; import javax.sip.header.HeaderAddress; import javax.sip.header.ToHeader; import java.text.ParseException; import java.util.*; /** * SIP命令类型: ACK请求 * @author lin */ @Component public class AckRequestProcessor extends SIPRequestProcessorParent implements InitializingBean, ISIPRequestProcessor { @@ -96,8 +101,8 @@ ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformGbId); // 取消设置的超时任务 dynamicTask.stop(callIdHeader.getCallId()); String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId, null, callIdHeader.getCallId()); // String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, null, null, callIdHeader.getCallId()); String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); logger.info("收到ACK,开始向上级推流 rtp/{}", sendRtpItem.getStreamId()); @@ -121,7 +126,14 @@ } else { logger.error("RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"),JSONObject.toJSON(param)); if (sendRtpItem.isOnlyAudio()) { // TODO 可能是语音对讲 // 语音对讲 try { cmder.streamByeCmd((SIPDialog) evt.getDialog(), (SIPRequest)evt.getRequest(), null); } catch (SipException e) { throw new RuntimeException(e); } catch (ParseException e) { throw new RuntimeException(e); } }else { // 向上级平台 commanderForPlatform.streamByeCmd(parentPlatform, callIdHeader.getCallId()); src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
@@ -13,6 +13,7 @@ import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IPlayService; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; @@ -65,6 +66,9 @@ @Autowired private VideoStreamSessionManager streamSession; @Autowired private IPlayService playService; @Override public void afterPropertiesSet() throws Exception { // 添加消息处理的订阅 @@ -106,6 +110,9 @@ if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) { cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId, streamId, null); } if (sendRtpItem.isOnlyAudio()) { playService.stopAudioBroadcast(sendRtpItem.getDeviceId(), channelId); } if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) { MessageForPushChannel messageForPushChannel = new MessageForPushChannel(); messageForPushChannel.setType(0); src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -114,6 +114,7 @@ private SipConfig config; @Override public void afterPropertiesSet() throws Exception { // 添加消息处理的订阅 @@ -492,7 +493,6 @@ gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP); if (sendRtpItem == null) { logger.warn("服务器端口资源不足"); responseAck(evt, Response.BUSY_HERE); @@ -562,25 +562,16 @@ } } public void inviteFromDeviceHandle(RequestEvent evt, String requesterId, String channelId) throws InvalidArgumentException, ParseException, SipException, SdpException { public void inviteFromDeviceHandle(RequestEvent evt, String requesterId, String channelId1) throws InvalidArgumentException, ParseException, SipException, SdpException { // 兼容奇葩的海康这里使用的不是通道编号而是本平台编号 // if (channelId.equals(config.getId())) { // List<AudioBroadcastCatch> all = audioBroadcastManager.getAll(); // for (AudioBroadcastCatch audioBroadcastCatch : all) { // if (audioBroadcastCatch.getDeviceId().equals(requesterId)) { // channelId = audioBroadcastCatch.getChannelId(); // } // } // } // // 兼容失败 // if (channelId.equals(config.getId())) { // responseAck(evt, Response.BAD_REQUEST); // return; // } // 非上级平台请求,查询是否设备请求(通常为接收语音广播的设备) Device device = redisCatchStorage.getDevice(requesterId); AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(requesterId, channelId1); if (audioBroadcastCatch == null) { logger.warn("来自设备的Invite请求非语音广播,已忽略"); responseAck(evt, Response.FORBIDDEN); return; } Request request = evt.getRequest(); if (device != null) { logger.info("收到设备" + requesterId + "的语音广播Invite请求"); @@ -606,7 +597,6 @@ // 查看是否支持PS 负载96 int port = -1; //boolean recvonly = false; boolean mediaTransmissionTCP = false; Boolean tcpActive = null; for (int i = 0; i < mediaDescriptions.size(); i++) { @@ -638,7 +628,6 @@ responseAck(evt, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式,发415 return; } String sessionName = sdp.getSessionName().getValue(); String addressStr = sdp.getOrigin().getAddress(); logger.info("设备{}请求语音流,地址:{}:{},ssrc:{}", requesterId, addressStr, port, ssrc); @@ -649,20 +638,19 @@ return; } SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, device.getDeviceId(), channelId, device.getDeviceId(), audioBroadcastCatch.getChannelId(), mediaTransmissionTCP); sendRtpItem.setTcp(mediaTransmissionTCP); if (tcpActive != null) { sendRtpItem.setTcpActive(tcpActive); } if (sendRtpItem == null) { logger.warn("服务器端口资源不足"); responseAck(evt, Response.BUSY_HERE); return; } sendRtpItem.setTcp(mediaTransmissionTCP); if (tcpActive != null) { sendRtpItem.setTcpActive(tcpActive); } String app = "broadcast"; String stream = device.getDeviceId() + "_" + channelId; String stream = device.getDeviceId() + "_" + audioBroadcastCatch.getChannelId(); CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME); sendRtpItem.setPlayType(InviteStreamType.PLAY); @@ -685,12 +673,9 @@ subscribeKey.put("schema", "rtmp"); subscribeKey.put("mediaServerId", mediaServerItem.getId()); String finalSsrc = ssrc; String waiteStreamTimeoutTaskKey = "waite-stream-" + device.getDeviceId() + channelId; // 流已经存在时直接推流 if (zlmrtpServerFactory.isStreamReady(mediaServerItem, app, stream)) { logger.info("发现已经在推流"); dynamicTask.stop(waiteStreamTimeoutTaskKey); sendRtpItem.setStatus(2); redisCatchStorage.updateSendRTPSever(sendRtpItem); StringBuffer content = new StringBuffer(200); @@ -711,6 +696,10 @@ parentPlatform.setServerGBId(device.getDeviceId()); try { responseSdpAck(evt, content.toString(), parentPlatform); Dialog dialog = evt.getDialog(); audioBroadcastCatch.setDialog((SIPDialog) dialog); audioBroadcastCatch.setRequest((SIPRequest) request); audioBroadcastManager.update(audioBroadcastCatch); } catch (SipException e) { throw new RuntimeException(e); } catch (InvalidArgumentException e) { @@ -721,19 +710,16 @@ }else { // 流不存在时监听流上线 // 设置等待推流的超时; 默认20s String waiteStreamTimeoutTaskKey = "waite-stream-" + device.getDeviceId() + audioBroadcastCatch.getChannelId(); dynamicTask.startDelay(waiteStreamTimeoutTaskKey, ()->{ logger.info("等待推流超时: {}/{}", app, stream); if (audioBroadcastManager.exit(device.getDeviceId(), channelId)) { audioBroadcastManager.del(device.getDeviceId(), channelId); }else { // 兼容海康使用了错误的通道ID的情况 audioBroadcastManager.delByDeviceId(device.getDeviceId()); } playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId()); // 发送bye try { cmder.streamByeCmd((SIPDialog)evt.getServerTransaction().getDialog(), (SIPRequest) evt.getRequest(), null); responseAck(evt, Response.BUSY_HERE); } catch (SipException e) { throw new RuntimeException(e); } catch (InvalidArgumentException e) { throw new RuntimeException(e); } catch (ParseException e) { throw new RuntimeException(e); @@ -743,10 +729,11 @@ subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, (MediaServerItem mediaServerItemInUse, JSONObject json)->{ sendRtpItem.setStatus(2); dynamicTask.stop(waiteStreamTimeoutTaskKey); redisCatchStorage.updateSendRTPSever(sendRtpItem); StringBuffer content = new StringBuffer(200); content.append("v=0\r\n"); content.append("o="+ channelId +" 0 0 IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); content.append("o="+ audioBroadcastCatch.getChannelId() +" 0 0 IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); content.append("s=Play\r\n"); content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); content.append("t=0 0\r\n"); @@ -771,8 +758,6 @@ } }); } String timeOutTaskKey = "audio-broadcast-" + device.getDeviceId() + channelId; dynamicTask.stop(timeOutTaskKey); String key = DeferredResultHolder.CALLBACK_CMD_BROADCAST + device.getDeviceId(); WVPResult<AudioBroadcastResult> wvpResult = new WVPResult<>(); wvpResult.setCode(0); src/main/java/com/genersoft/iot/vmp/service/IPlayService.java
@@ -43,4 +43,5 @@ StreamInfo getDownLoadInfo(String deviceId, String channelId, String stream); void audioBroadcast(Device device, String channelId, int timeout, AudioBroadcastEvent event); void stopAudioBroadcast(String deviceId, String channelId); } src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; @@ -44,9 +45,13 @@ import org.springframework.web.context.request.async.DeferredResult; import javax.sip.ResponseEvent; import javax.sip.SipException; import java.io.FileNotFoundException; import java.math.BigDecimal; import java.text.ParseException; import java.util.*; import java.util.stream.Collectors; import java.util.stream.Stream; @SuppressWarnings(value = {"rawtypes", "unchecked"}) @Service @@ -92,6 +97,9 @@ @Autowired private UserSetting userSetting; @Autowired private SipConfig sipConfig; @Autowired private DynamicTask dynamicTask; @@ -641,16 +649,13 @@ } // 查询通道使用状态 if (audioBroadcastManager.exit(device.getDeviceId(), channelId)) { logger.warn("语音广播已经开启: {}", channelId); event.call("语音广播已经开启"); return; SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null); if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) { logger.warn("语音广播已经开启: {}", channelId); event.call("语音广播已经开启"); return; } } String timeOutTaskKey = "audio-broadcast-" + device.getDeviceId() + channelId; dynamicTask.startDelay(timeOutTaskKey, ()->{ logger.error("语音广播发送超时: {}:{}", device.getDeviceId(), channelId); event.call("语音广播发送超时"); audioBroadcastManager.del(device.getDeviceId(), channelId); }, timeout * 1000); // 发送通知 cmder.audioBroadcastCmd(device, channelId, eventResultForOk -> { @@ -658,11 +663,38 @@ AudioBroadcastCatch audioBroadcastCatch = new AudioBroadcastCatch(device.getDeviceId(), channelId, AudioBroadcastCatchStatus.Ready); audioBroadcastManager.add(audioBroadcastCatch); }, eventResultForError -> { dynamicTask.stop(timeOutTaskKey); // 发送失败 logger.error("语音广播发送失败: {}:{}", channelId, eventResultForError.msg); event.call("语音广播发送失败"); audioBroadcastManager.del(device.getDeviceId(), channelId); stopAudioBroadcast(device.getDeviceId(), channelId); }); } @Override public void stopAudioBroadcast(String deviceId, String channelId){ AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(deviceId, channelId); if (audioBroadcastCatch != null) { audioBroadcastManager.del(deviceId, audioBroadcastCatch.getChannelId()); } try { SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(deviceId, channelId, null, null); if (sendRtpItem != null) { redisCatchStorage.deleteSendRTPServer(deviceId, sendRtpItem.getChannelId(), null, null); MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); Map<String, Object> param = new HashMap<>(); param.put("vhost", "__defaultVhost__"); param.put("app", sendRtpItem.getApp()); param.put("stream", sendRtpItem.getStreamId()); zlmresTfulUtils.stopSendRtp(mediaInfo, param); } if (audioBroadcastCatch.getStatus() == AudioBroadcastCatchStatus.Ok) { cmder.streamByeCmd(audioBroadcastCatch.getDialog(), audioBroadcastCatch.getRequest(), null); } } catch (SipException e) { throw new RuntimeException(e); } catch (ParseException e) { throw new RuntimeException(e); } } } src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java
@@ -319,6 +319,22 @@ return result; } @ApiOperation("停止语音广播") @ApiImplicitParams({ @ApiImplicitParam(name = "deviceId", value = "设备Id", dataTypeClass = String.class), @ApiImplicitParam(name = "channelId", value = "通道Id", dataTypeClass = String.class), }) @GetMapping("/broadcast/stop/{deviceId}/{channelId}") @PostMapping("/broadcast/stop/{deviceId}/{channelId}") public WVPResult<String> stopBroadcastA(@PathVariable String deviceId, @PathVariable String channelId) { if (logger.isDebugEnabled()) { logger.debug("停止语音广播API调用"); } playService.stopAudioBroadcast(deviceId, channelId); return new WVPResult<>(0, "success", null); } @ApiOperation("获取所有的ssrc") @GetMapping("/ssrc") public WVPResult<JSONObject> getSsrc() {