src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatch.java
New file @@ -0,0 +1,59 @@ package com.genersoft.iot.vmp.gb28181.bean; /** * 缓存语音广播的状态 * @author lin */ public class AudioBroadcastCatch { public AudioBroadcastCatch(String deviceId, String channelId, AudioBroadcastCatchStatus status) { this.deviceId = deviceId; this.channelId = channelId; this.status = status; } public AudioBroadcastCatch() { } /** * 设备编号 */ private String deviceId; /** * 通道编号 */ private String channelId; /** * 语音广播状态 */ private AudioBroadcastCatchStatus status; public String getDeviceId() { return deviceId; } public void setDeviceId(String deviceId) { this.deviceId = deviceId; } public String getChannelId() { return channelId; } public void setChannelId(String channelId) { this.channelId = channelId; } public AudioBroadcastCatchStatus getStatus() { return status; } public void setStatus(AudioBroadcastCatchStatus status) { this.status = status; } } src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatchStatus.java
New file @@ -0,0 +1,15 @@ package com.genersoft.iot.vmp.gb28181.bean; /** * 语音广播状态 * @author lin */ public enum AudioBroadcastCatchStatus { // 发送语音广播消息等待对方回复语音广播 Ready, // 收到回复等待invite消息 WaiteInvite, // 收到invite消息 Ok, } src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java
@@ -134,16 +134,6 @@ */ private boolean ssrcCheck; /** * 设备用于接收语音消息的通道 */ private String audioChannelForReceive; /** * 设备用于发送语音消息的通道 */ private String audioChannelForSend; public String getDeviceId() { return deviceId; @@ -345,11 +335,4 @@ this.ssrcCheck = ssrcCheck; } public String getAudioChannelForReceive() { return audioChannelForReceive; } public void setAudioChannelForReceive(String audioChannelForReceive) { this.audioChannelForReceive = audioChannelForReceive; } } src/main/java/com/genersoft/iot/vmp/gb28181/session/AudioBroadcastManager.java
New file @@ -0,0 +1,59 @@ package com.genersoft.iot.vmp.gb28181.session; import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatch; import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * 语音广播消息管理类 * @author lin */ @Component public class AudioBroadcastManager { public static Map<String, AudioBroadcastCatch> data = new ConcurrentHashMap<>(); public void add(AudioBroadcastCatch audioBroadcastCatch) { this.update(audioBroadcastCatch); } public void update(AudioBroadcastCatch audioBroadcastCatch) { data.put(audioBroadcastCatch.getDeviceId() + audioBroadcastCatch.getChannelId(), audioBroadcastCatch); } public void del(String deviceId, String channelId) { data.remove(deviceId + channelId); } public void delByDeviceId(String deviceId) { for (String key : data.keySet()) { if (key.startsWith(deviceId)) { data.remove(key); } } } public List<AudioBroadcastCatch> getAll(){ Collection<AudioBroadcastCatch> values = data.values(); return new ArrayList<>(values); } public boolean exit(String deviceId, String channelId) { for (String key : data.keySet()) { if (key.equals(deviceId + channelId)) { return true; } } return false; } public AudioBroadcastCatch get(String deviceId, String channelId) { return data.get(deviceId + channelId); } } src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java
@@ -21,9 +21,6 @@ public static Map<String, CatalogData> data = new ConcurrentHashMap<>(); @Autowired private DeferredResultHolder deferredResultHolder; @Autowired private IVideoManagerStorage storager; public void addReady(Device device, int sn ) { src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java
@@ -6,8 +6,12 @@ import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.stack.SIPDialog; import javax.sip.Dialog; import javax.sip.SipException; import java.text.ParseException; /** * @description:设备能力接口,用于定义设备的控制、查询能力 @@ -123,6 +127,7 @@ */ void streamByeCmd(String deviceId, String channelId, String stream, String callId, SipSubscribe.Event okEvent); void streamByeCmd(String deviceId, String channelId, String stream, String callId); void streamByeCmd(SIPDialog dialog, SIPRequest request, SipSubscribe.Event okEvent) throws SipException, ParseException; /** * 回放暂停 @@ -144,21 +149,13 @@ */ void playSpeedCmd(Device device, StreamInfo streamInfo, Double speed); /** * 语音广播 * * @param device 视频设备 * @param channelId 预览通道 */ boolean audioBroadcastCmd(Device device,String channelId); /** * 语音广播 * * @param device 视频设备 */ void audioBroadcastCmd(Device device, SipSubscribe.Event okEvent); boolean audioBroadcastCmd(Device device); boolean audioBroadcastCmd(Device device, String channelId, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent); /** * 音视频录像控制 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
@@ -733,42 +733,34 @@ } } Request byeRequest = dialog.createRequest(Request.BYE); SipURI byeURI = (SipURI) byeRequest.getRequestURI(); SIPRequest request = (SIPRequest)transaction.getRequest(); byeURI.setHost(request.getRemoteAddress().getHostAddress()); byeURI.setPort(request.getRemotePort()); ViaHeader viaHeader = (ViaHeader) byeRequest.getHeader(ViaHeader.NAME); String protocol = viaHeader.getTransport().toUpperCase(); ClientTransaction clientTransaction = null; if("TCP".equals(protocol)) { clientTransaction = tcpSipProvider.getNewClientTransaction(byeRequest); } else if("UDP".equals(protocol)) { clientTransaction = udpSipProvider.getNewClientTransaction(byeRequest); } CallIdHeader callIdHeader = (CallIdHeader) byeRequest.getHeader(CallIdHeader.NAME); if (okEvent != null) { sipSubscribe.addOkSubscribe(callIdHeader.getCallId(), okEvent); } dialog.sendRequest(clientTransaction); streamByeCmd(dialog, (SIPRequest)transaction.getRequest(), okEvent); } catch (SipException | ParseException e) { e.printStackTrace(); } } /** * 语音广播 * * @param device 视频设备 * @param channelId 预览通道 */ @Override public boolean audioBroadcastCmd(Device device, String channelId) { // 改为新的实现 return false; public void streamByeCmd(SIPDialog dialog, SIPRequest request, SipSubscribe.Event okEvent) throws SipException, ParseException { Request byeRequest = dialog.createRequest(Request.BYE); SipURI byeURI = (SipURI) byeRequest.getRequestURI(); byeURI.setHost(request.getRemoteAddress().getHostAddress()); byeURI.setPort(request.getRemotePort()); ViaHeader viaHeader = (ViaHeader) byeRequest.getHeader(ViaHeader.NAME); String protocol = viaHeader.getTransport().toUpperCase(); ClientTransaction clientTransaction = null; if("TCP".equals(protocol)) { clientTransaction = tcpSipProvider.getNewClientTransaction(byeRequest); } else if("UDP".equals(protocol)) { clientTransaction = udpSipProvider.getNewClientTransaction(byeRequest); } CallIdHeader callIdHeader = (CallIdHeader) byeRequest.getHeader(CallIdHeader.NAME); if (okEvent != null) { sipSubscribe.addOkSubscribe(callIdHeader.getCallId(), okEvent); } dialog.sendRequest(clientTransaction); } /** @@ -777,7 +769,7 @@ * @param device 视频设备 */ @Override public boolean audioBroadcastCmd(Device device) { public boolean audioBroadcastCmd(Device device,String channelId, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) { try { StringBuffer broadcastXml = new StringBuffer(200); String charset = device.getCharset(); @@ -786,7 +778,7 @@ broadcastXml.append("<CmdType>Broadcast</CmdType>\r\n"); broadcastXml.append("<SN>" + (int)((Math.random()*9+1)*100000) + "</SN>\r\n"); broadcastXml.append("<SourceID>" + sipConfig.getId() + "</SourceID>\r\n"); broadcastXml.append("<TargetID>" + device.getDeviceId() + "</TargetID>\r\n"); broadcastXml.append("<TargetID>" + channelId + "</TargetID>\r\n"); broadcastXml.append("</Notify>\r\n"); String tm = Long.toString(System.currentTimeMillis()); @@ -795,39 +787,14 @@ : udpSipProvider.getNewCallId(); Request request = headerProvider.createMessageRequest(device, broadcastXml.toString(), "z9hG4bK-ViaBcst-" + tm, "FromBcst" + tm, null, callIdHeader); transmitRequest(device, request); transmitRequest(device, request, errorEvent, okEvent); return true; } catch (SipException | ParseException | InvalidArgumentException e) { e.printStackTrace(); } return false; } @Override public void audioBroadcastCmd(Device device, SipSubscribe.Event errorEvent) { try { StringBuffer broadcastXml = new StringBuffer(200); String charset = device.getCharset(); broadcastXml.append("<?xml version=\"1.0\" encoding=\"" + charset + "\"?>\r\n"); broadcastXml.append("<Notify>\r\n"); broadcastXml.append("<CmdType>Broadcast</CmdType>\r\n"); broadcastXml.append("<SN>" + (int)((Math.random()*9+1)*100000) + "</SN>\r\n"); broadcastXml.append("<SourceID>" + sipConfig.getId() + "</SourceID>\r\n"); broadcastXml.append("<TargetID>" + device.getDeviceId() + "</TargetID>\r\n"); broadcastXml.append("</Notify>\r\n"); String tm = Long.toString(System.currentTimeMillis()); CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId() : udpSipProvider.getNewCallId(); Request request = headerProvider.createMessageRequest(device, broadcastXml.toString(), "z9hG4bK-ViaBcst-" + tm, "FromBcst" + tm, null, callIdHeader); transmitRequest(device, request, errorEvent); } catch (SipException | ParseException | InvalidArgumentException e) { e.printStackTrace(); } } /** * 音视频录像控制 * src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
@@ -94,6 +94,9 @@ param.put("dst_port", sendRtpItem.getPort()); param.put("is_udp", is_Udp); param.put("src_port", sendRtpItem.getLocalPort()); param.put("pt", 8); param.put("use_ps", 0); param.put("only_audio", 1); zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -2,21 +2,27 @@ import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItemLite; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IPlayService; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; @@ -24,8 +30,12 @@ import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.SerializeUtils; import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import gov.nist.javax.sdp.TimeDescriptionImpl; import gov.nist.javax.sdp.fields.TimeField; import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.stack.SIPDialog; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; @@ -41,6 +51,7 @@ import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; import java.util.Vector; /** @@ -73,7 +84,7 @@ private IPlayService playService; @Autowired private ISIPCommander commander; private AudioBroadcastManager audioBroadcastManager; @Autowired private ZLMRTPServerFactory zlmrtpServerFactory; @@ -92,6 +103,15 @@ @Autowired private ZLMMediaListManager mediaListManager; @Autowired private DeferredResultHolder resultHolder; @Autowired private ZLMHttpHookSubscribe subscribe; @Autowired private SipConfig config; @Override @@ -126,7 +146,7 @@ // 查询请求是否来自上级平台\设备 ParentPlatform platform = storager.queryParentPlatByServerGBId(requesterId); if (platform == null) { inviteFromDeviceHandle(evt, requesterId); inviteFromDeviceHandle(evt, requesterId, channelId); }else { // 查询平台下是否有该通道 DeviceChannel channel = storager.queryChannelInParentPlatform(requesterId, channelId); @@ -542,10 +562,25 @@ } } public void inviteFromDeviceHandle(RequestEvent evt, String requesterId) throws InvalidArgumentException, ParseException, SipException, SdpException { public void inviteFromDeviceHandle(RequestEvent evt, String requesterId, String channelId) throws InvalidArgumentException, ParseException, SipException, SdpException { // 兼容奇葩的海康这里使用的不是通道编号而是本平台编号 // if (channelId.equals(config.getId())) { // List<AudioBroadcastCatch> all = audioBroadcastManager.getAll(); // for (AudioBroadcastCatch audioBroadcastCatch : all) { // if (audioBroadcastCatch.getDeviceId().equals(requesterId)) { // channelId = audioBroadcastCatch.getChannelId(); // } // } // } // // 兼容失败 // if (channelId.equals(config.getId())) { // responseAck(evt, Response.BAD_REQUEST); // return; // } // 非上级平台请求,查询是否设备请求(通常为接收语音广播的设备) Device device = redisCatchStorage.getDevice(requesterId); Request request = evt.getRequest(); if (device != null) { logger.info("收到设备" + requesterId + "的语音广播Invite请求"); @@ -558,7 +593,7 @@ int ssrcIndex = contentString.indexOf("y="); if (ssrcIndex > 0) { substring = contentString.substring(0, ssrcIndex); ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12).trim(); } ssrcIndex = substring.indexOf("f="); if (ssrcIndex > 0) { @@ -568,6 +603,7 @@ // 获取支持的格式 Vector mediaDescriptions = sdp.getMediaDescriptions(true); // 查看是否支持PS 负载96 int port = -1; //boolean recvonly = false; @@ -602,10 +638,150 @@ responseAck(evt, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式,发415 return; } String username = sdp.getOrigin().getUsername(); String sessionName = sdp.getSessionName().getValue(); String addressStr = sdp.getOrigin().getAddress(); logger.info("设备{}请求语音流,地址:{}:{},ssrc:{}", username, addressStr, port, ssrc); logger.info("设备{}请求语音流,地址:{}:{},ssrc:{}", requesterId, addressStr, port, ssrc); MediaServerItem mediaServerItem = playService.getNewMediaServerItem(device); if (mediaServerItem == null) { logger.warn("未找到可用的zlm"); responseAck(evt, Response.BUSY_HERE); return; } SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, device.getDeviceId(), channelId, mediaTransmissionTCP); sendRtpItem.setTcp(mediaTransmissionTCP); if (tcpActive != null) { sendRtpItem.setTcpActive(tcpActive); } if (sendRtpItem == null) { logger.warn("服务器端口资源不足"); responseAck(evt, Response.BUSY_HERE); return; } String app = "broadcast"; String stream = device.getDeviceId() + "_" + channelId; CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME); sendRtpItem.setPlayType(InviteStreamType.PLAY); sendRtpItem.setCallId(callIdHeader.getCallId()); sendRtpItem.setPlatformId(requesterId); sendRtpItem.setStatus(1); sendRtpItem.setApp(app); sendRtpItem.setStreamId(stream); redisCatchStorage.updateSendRTPSever(sendRtpItem); // hook监听等待设备推流上来 // 添加订阅 JSONObject subscribeKey = new JSONObject(); subscribeKey.put("app", app); subscribeKey.put("stream", stream); subscribeKey.put("regist", true); subscribeKey.put("schema", "rtmp"); subscribeKey.put("mediaServerId", mediaServerItem.getId()); String finalSsrc = ssrc; String waiteStreamTimeoutTaskKey = "waite-stream-" + device.getDeviceId() + channelId; if (zlmrtpServerFactory.isStreamReady(mediaServerItem, app, stream)) { logger.info("发现已经在推流"); dynamicTask.stop(waiteStreamTimeoutTaskKey); sendRtpItem.setStatus(2); redisCatchStorage.updateSendRTPSever(sendRtpItem); StringBuffer content = new StringBuffer(200); content.append("v=0\r\n"); content.append("o="+ config.getId() +" "+ sdp.getOrigin().getSessionId() +" " + sdp.getOrigin().getSessionVersion() + " IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); content.append("s=Play\r\n"); content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); content.append("t=0 0\r\n"); content.append("m=audio "+ sendRtpItem.getLocalPort()+" RTP/AVP 8\r\n"); content.append("a=sendonly\r\n"); content.append("a=rtpmap:8 PCMA/8000\r\n"); content.append("y="+ finalSsrc + "\r\n"); content.append("f=v/////a/1/8/1\r\n"); ParentPlatform parentPlatform = new ParentPlatform(); parentPlatform.setServerIP(device.getIp()); parentPlatform.setServerPort(device.getPort()); parentPlatform.setServerGBId(device.getDeviceId()); try { responseSdpAck(evt, content.toString(), parentPlatform); } catch (SipException e) { throw new RuntimeException(e); } catch (InvalidArgumentException e) { throw new RuntimeException(e); } catch (ParseException e) { throw new RuntimeException(e); } }else { // 设置等待推流的超时; 默认20s String finalChannelId = channelId; dynamicTask.startDelay(waiteStreamTimeoutTaskKey, ()->{ logger.info("等待推流超时: {}/{}", app, stream); if (audioBroadcastManager.exit(device.getDeviceId(), finalChannelId)) { audioBroadcastManager.del(device.getDeviceId(), finalChannelId); }else { // 兼容海康使用了错误的通道ID的情况 audioBroadcastManager.delByDeviceId(device.getDeviceId()); } // 发送bye try { cmder.streamByeCmd((SIPDialog)evt.getServerTransaction().getDialog(), (SIPRequest) evt.getRequest(), null); } catch (SipException e) { throw new RuntimeException(e); } catch (ParseException e) { throw new RuntimeException(e); } }, 20*1000); subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, (MediaServerItem mediaServerItemInUse, JSONObject json)->{ sendRtpItem.setStatus(2); redisCatchStorage.updateSendRTPSever(sendRtpItem); StringBuffer content = new StringBuffer(200); content.append("v=0\r\n"); content.append("o="+ finalChannelId +" 0 0 IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); content.append("s=Play\r\n"); content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); content.append("t=0 0\r\n"); content.append("m=video "+ sendRtpItem.getLocalPort()+" RTP/AVP 8\r\n"); content.append("a=sendonly\r\n"); content.append("a=rtpmap:8 PCMA/8000\r\n"); content.append("y="+ finalSsrc + "\r\n"); content.append("f=v/////a/1/8/1\r\n"); ParentPlatform parentPlatform = new ParentPlatform(); parentPlatform.setServerIP(device.getIp()); parentPlatform.setServerPort(device.getPort()); parentPlatform.setServerGBId(device.getDeviceId()); try { responseSdpAck(evt, content.toString(), parentPlatform); } catch (SipException e) { throw new RuntimeException(e); } catch (InvalidArgumentException e) { throw new RuntimeException(e); } catch (ParseException e) { throw new RuntimeException(e); } }); } String timeOutTaskKey = "audio-broadcast-" + device.getDeviceId() + channelId; dynamicTask.stop(timeOutTaskKey); String key = DeferredResultHolder.CALLBACK_CMD_BROADCAST + device.getDeviceId(); WVPResult<AudioBroadcastResult> wvpResult = new WVPResult<>(); wvpResult.setCode(0); wvpResult.setMsg("success"); AudioBroadcastResult audioBroadcastResult = new AudioBroadcastResult(); audioBroadcastResult.setApp(app); audioBroadcastResult.setStream(stream); audioBroadcastResult.setMediaServerItem(new MediaServerItemLite(mediaServerItem)); audioBroadcastResult.setCodec("G.711"); wvpResult.setData(audioBroadcastResult); RequestMessage requestMessage = new RequestMessage(); requestMessage.setKey(key); requestMessage.setData(wvpResult); resultHolder.invokeAllResult(requestMessage); } else { logger.warn("来自无效设备/平台的请求"); responseAck(evt, Response.BAD_REQUEST); src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageHandlerAbstract.java
@@ -6,7 +6,11 @@ import org.dom4j.Element; import org.springframework.beans.factory.annotation.Autowired; import javax.sip.InvalidArgumentException; import javax.sip.RequestEvent; import javax.sip.SipException; import javax.sip.message.Response; import java.text.ParseException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -23,6 +27,10 @@ @Override public void handForDevice(RequestEvent evt, Device device, Element element) { String cmd = getText(element, "CmdType"); if (cmd == null) { handNullCmd(evt); return; } IMessageHandler messageHandler = messageHandlerMap.get(cmd); if (messageHandler != null) { messageHandler.handForDevice(evt, device, element); @@ -37,4 +45,17 @@ messageHandler.handForPlatform(evt, parentPlatform, element); } } public void handNullCmd(RequestEvent evt){ try { responseAck(evt, Response.OK); } catch (SipException e) { throw new RuntimeException(e); } catch (InvalidArgumentException e) { throw new RuntimeException(e); } catch (ParseException e) { throw new RuntimeException(e); } return; } } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/BroadcastResponseMessageHandler.java
@@ -1,8 +1,11 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatch; import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatchStatus; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; @@ -36,6 +39,9 @@ @Autowired private DeferredResultHolder deferredResultHolder; @Autowired private AudioBroadcastManager audioBroadcastManager; @Override public void afterPropertiesSet() throws Exception { responseMessageHandler.addHandler(cmdType, this); @@ -45,21 +51,16 @@ public void handForDevice(RequestEvent evt, Device device, Element rootElement) { try { String channelId = getText(rootElement, "DeviceID"); String key = DeferredResultHolder.CALLBACK_CMD_BROADCAST + device.getDeviceId() + channelId; // 回复200 OK responseAck(evt, Response.OK); // 此处是对本平台发出Broadcast指令的应答 JSONObject json = new JSONObject(); XmlUtil.node2Json(rootElement, json); if (logger.isDebugEnabled()) { logger.debug(json.toJSONString()); if (!audioBroadcastManager.exit(device.getDeviceId(), channelId)) { // 回复410 responseAck(evt, Response.GONE); return; } RequestMessage msg = new RequestMessage(); msg.setKey(key); msg.setData(json); deferredResultHolder.invokeAllResult(msg); logger.info("收到语音广播的回复:{}/{}", device.getDeviceId(), channelId ); AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(device.getDeviceId(), channelId); audioBroadcastCatch.setStatus(AudioBroadcastCatchStatus.WaiteInvite); audioBroadcastManager.update(audioBroadcastCatch); responseAck(evt, Response.OK); } catch (ParseException | SipException | InvalidArgumentException e) { e.printStackTrace(); } src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
@@ -271,7 +271,7 @@ * 查询待转推的流是否就绪 */ public Boolean isStreamReady(MediaServerItem mediaServerItem, String app, String streamId) { JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(mediaServerItem, app, "rtmp", streamId); JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(mediaServerItem, app, "rtsp", streamId); return (mediaInfo.getInteger("code") == 0 && mediaInfo.getBoolean("online")); } src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaServerItemLite.java
New file @@ -0,0 +1,197 @@ package com.genersoft.iot.vmp.media.zlm.dto; import com.genersoft.iot.vmp.gb28181.session.SsrcConfig; import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig; import org.springframework.util.StringUtils; import java.util.HashMap; /** * 精简的MediaServerItem信息,方便给前端返回数据 */ public class MediaServerItemLite { private String id; private String ip; private String hookIp; private String sdpIp; private String streamIp; private int httpPort; private int httpSSlPort; private int rtmpPort; private int rtmpSSlPort; private int rtpProxyPort; private int rtspPort; private int rtspSSLPort; private String secret; private int streamNoneReaderDelayMS; private int hookAliveInterval; private int recordAssistPort; public MediaServerItemLite(MediaServerItem mediaServerItem) { this.id = mediaServerItem.getId(); this.ip = mediaServerItem.getIp(); this.hookIp = mediaServerItem.getHookIp(); this.sdpIp = mediaServerItem.getSdpIp(); this.streamIp = mediaServerItem.getStreamIp(); this.httpPort = mediaServerItem.getHttpPort(); this.httpSSlPort = mediaServerItem.getHttpSSlPort(); this.rtmpPort = mediaServerItem.getRtmpPort(); this.rtmpSSlPort = mediaServerItem.getRtmpSSlPort(); this.rtpProxyPort = mediaServerItem.getRtpProxyPort(); this.rtspPort = mediaServerItem.getRtspPort(); this.rtspSSLPort = mediaServerItem.getRtspSSLPort(); this.secret = mediaServerItem.getSecret(); this.streamNoneReaderDelayMS = mediaServerItem.getStreamNoneReaderDelayMS(); this.hookAliveInterval = mediaServerItem.getHookAliveInterval(); this.streamNoneReaderDelayMS = mediaServerItem.getStreamNoneReaderDelayMS(); this.recordAssistPort = mediaServerItem.getRecordAssistPort(); } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getIp() { return ip; } public void setIp(String ip) { this.ip = ip; } public String getHookIp() { return hookIp; } public void setHookIp(String hookIp) { this.hookIp = hookIp; } public String getSdpIp() { return sdpIp; } public void setSdpIp(String sdpIp) { this.sdpIp = sdpIp; } public String getStreamIp() { return streamIp; } public void setStreamIp(String streamIp) { this.streamIp = streamIp; } public int getHttpPort() { return httpPort; } public void setHttpPort(int httpPort) { this.httpPort = httpPort; } public int getHttpSSlPort() { return httpSSlPort; } public void setHttpSSlPort(int httpSSlPort) { this.httpSSlPort = httpSSlPort; } public int getRtmpPort() { return rtmpPort; } public void setRtmpPort(int rtmpPort) { this.rtmpPort = rtmpPort; } public int getRtmpSSlPort() { return rtmpSSlPort; } public void setRtmpSSlPort(int rtmpSSlPort) { this.rtmpSSlPort = rtmpSSlPort; } public int getRtpProxyPort() { return rtpProxyPort; } public void setRtpProxyPort(int rtpProxyPort) { this.rtpProxyPort = rtpProxyPort; } public int getRtspPort() { return rtspPort; } public void setRtspPort(int rtspPort) { this.rtspPort = rtspPort; } public int getRtspSSLPort() { return rtspSSLPort; } public void setRtspSSLPort(int rtspSSLPort) { this.rtspSSLPort = rtspSSLPort; } public String getSecret() { return secret; } public void setSecret(String secret) { this.secret = secret; } public int getStreamNoneReaderDelayMS() { return streamNoneReaderDelayMS; } public void setStreamNoneReaderDelayMS(int streamNoneReaderDelayMS) { this.streamNoneReaderDelayMS = streamNoneReaderDelayMS; } public int getHookAliveInterval() { return hookAliveInterval; } public void setHookAliveInterval(int hookAliveInterval) { this.hookAliveInterval = hookAliveInterval; } public int getRecordAssistPort() { return recordAssistPort; } public void setRecordAssistPort(int recordAssistPort) { this.recordAssistPort = recordAssistPort; } } src/main/java/com/genersoft/iot/vmp/service/IPlayService.java
@@ -11,6 +11,7 @@ import com.genersoft.iot.vmp.service.bean.InviteTimeOutCallback; import com.genersoft.iot.vmp.service.bean.PlayBackCallback; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.AudioBroadcastEvent; import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult; import org.springframework.http.ResponseEntity; import org.springframework.web.context.request.async.DeferredResult; @@ -40,4 +41,6 @@ DeferredResult<ResponseEntity<String>> download(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo,String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack); StreamInfo getDownLoadInfo(String deviceId, String channelId, String stream); void audioBroadcast(Device device, String channelId, int timeout, AudioBroadcastEvent event); } src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -8,6 +8,7 @@ import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; @@ -26,7 +27,9 @@ import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.redis.RedisUtil; import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.AudioBroadcastEvent; import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult; import com.genersoft.iot.vmp.service.IMediaService; import com.genersoft.iot.vmp.service.IPlayService; @@ -56,6 +59,9 @@ @Autowired private SIPCommander cmder; @Autowired private AudioBroadcastManager audioBroadcastManager; @Autowired private SIPCommanderFroPlatform sipCommanderFroPlatform; @@ -621,4 +627,42 @@ } } } @Override public void audioBroadcast(Device device, String channelId, int timeout, AudioBroadcastEvent event) { if (device == null || channelId == null) { return; } DeviceChannel deviceChannel = storager.queryChannel(device.getDeviceId(), channelId); if (deviceChannel == null) { logger.warn("开启语音广播的时候未找到通道: {}", channelId); event.call("开启语音广播的时候未找到通道"); return; } // 查询通道使用状态 if (audioBroadcastManager.exit(device.getDeviceId(), channelId)) { logger.warn("语音广播已经开启: {}", channelId); event.call("语音广播已经开启"); return; } String timeOutTaskKey = "audio-broadcast-" + device.getDeviceId() + channelId; dynamicTask.startDelay(timeOutTaskKey, ()->{ logger.error("语音广播发送超时: {}:{}", device.getDeviceId(), channelId); event.call("语音广播发送超时"); audioBroadcastManager.del(device.getDeviceId(), channelId); }, timeout * 1000); // 发送通知 cmder.audioBroadcastCmd(device, channelId, eventResultForOk -> { // 发送成功 AudioBroadcastCatch audioBroadcastCatch = new AudioBroadcastCatch(device.getDeviceId(), channelId, AudioBroadcastCatchStatus.Ready); audioBroadcastManager.add(audioBroadcastCatch); }, eventResultForError -> { dynamicTask.stop(timeOutTaskKey); // 发送失败 logger.error("语音广播发送失败: {}:{}", channelId, eventResultForError.msg); event.call("语音广播发送失败"); audioBroadcastManager.del(device.getDeviceId(), channelId); }); } } src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMapper.java
@@ -37,8 +37,6 @@ "subscribeCycleForMobilePosition," + "mobilePositionSubmissionInterval," + "subscribeCycleForAlarm," + "audioChannelForReceive," + "audioChannelForSend," + "ssrcCheck," + "online" + ") VALUES (" + @@ -62,8 +60,6 @@ "#{subscribeCycleForMobilePosition}," + "#{mobilePositionSubmissionInterval}," + "#{subscribeCycleForAlarm}," + "#{audioChannelForReceive}," + "#{audioChannelForSend}," + "#{ssrcCheck}," + "#{online}" + ")") @@ -90,8 +86,6 @@ "<if test=\"subscribeCycleForMobilePosition != null\">, subscribeCycleForMobilePosition=${subscribeCycleForMobilePosition}</if>" + "<if test=\"mobilePositionSubmissionInterval != null\">, mobilePositionSubmissionInterval=${mobilePositionSubmissionInterval}</if>" + "<if test=\"subscribeCycleForAlarm != null\">, subscribeCycleForAlarm=${subscribeCycleForAlarm}</if>" + "<if test=\"audioChannelForReceive != null\">, audioChannelForReceive=#{audioChannelForReceive}</if>" + "<if test=\"audioChannelForSend != null\">, audioChannelForSend=#{audioChannelForSend}</if>" + "<if test=\"ssrcCheck != null\">, ssrcCheck=${ssrcCheck}</if>" + "WHERE deviceId='${deviceId}'"+ " </script>"}) src/main/java/com/genersoft/iot/vmp/vmanager/bean/AudioBroadcastResult.java
New file @@ -0,0 +1,62 @@ package com.genersoft.iot.vmp.vmanager.bean; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItemLite; /** * @author lin */ public class AudioBroadcastResult { /** * 推流的媒体节点信息 */ private MediaServerItemLite mediaServerItem; /** * 编码格式 */ private String codec; /** * 向zlm推流的应用名 */ private String app; /** * 向zlm推流的流ID */ private String stream; public MediaServerItemLite getMediaServerItem() { return mediaServerItem; } public void setMediaServerItem(MediaServerItemLite mediaServerItem) { this.mediaServerItem = mediaServerItem; } public String getCodec() { return codec; } public void setCodec(String codec) { this.codec = codec; } public String getApp() { return app; } public void setApp(String app) { this.app = app; } public String getStream() { return stream; } public void setStream(String stream) { this.stream = stream; } } src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java
@@ -11,6 +11,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult; import com.genersoft.iot.vmp.service.IMediaService; @@ -39,6 +40,9 @@ import java.util.List; import java.util.UUID; /** * @author lin */ @Api(tags = "国标设备点播") @CrossOrigin @RestController @@ -102,7 +106,7 @@ logger.debug(String.format("设备预览/回放停止API调用,streamId:%s_%s", deviceId, channelId )); String uuid = UUID.randomUUID().toString(); DeferredResult<ResponseEntity<String>> result = new DeferredResult<ResponseEntity<String>>(); DeferredResult<ResponseEntity<String>> result = new DeferredResult<>(); // 录像查询以channelId作为deviceId查询 String key = DeferredResultHolder.CALLBACK_CMD_STOP + deviceId + channelId; @@ -123,7 +127,7 @@ RequestMessage msgForSuccess = new RequestMessage(); msgForSuccess.setId(uuid); msgForSuccess.setKey(key); msgForSuccess.setData(String.format("success")); msgForSuccess.setData("success"); resultHolder.invokeAllResult(msgForSuccess); }); @@ -251,81 +255,73 @@ @ApiOperation("语音广播命令") @ApiImplicitParams({ @ApiImplicitParam(name = "deviceId", value = "设备Id", dataTypeClass = String.class), @ApiImplicitParam(name = "channelForSend", value = "设备用于发送语音数据的通道", dataTypeClass = String.class), @ApiImplicitParam(name = "channelForReceive", value = "设备用于接收语音数据的通道", dataTypeClass = String.class), @ApiImplicitParam(name = "channelId", value = "通道Id", dataTypeClass = String.class), @ApiImplicitParam(name = "timeout", value = "推流超时时间(秒)", dataTypeClass = Integer.class), }) @GetMapping("/broadcast/{deviceId}") @PostMapping("/broadcast/{deviceId}") public DeferredResult<ResponseEntity<String>> broadcastApi(@PathVariable String deviceId, String channelForSend, String channelForReceive) { @GetMapping("/broadcast/{deviceId}/{channelId}") @PostMapping("/broadcast/{deviceId}/{channelId}") public DeferredResult<WVPResult<AudioBroadcastResult>> broadcastApi(@PathVariable String deviceId, @PathVariable String channelId, Integer timeout) { if (logger.isDebugEnabled()) { logger.debug("语音广播API调用"); } Device device = storager.queryVideoDevice(deviceId); DeferredResult<ResponseEntity<String>> result = new DeferredResult<>(3 * 1000L); String key = DeferredResultHolder.CALLBACK_CMD_BROADCAST + deviceId; if (device == null) { WVPResult<AudioBroadcastResult> result = new WVPResult<>(); result.setCode(-1); result.setMsg("未找到设备: " + deviceId); DeferredResult<WVPResult<AudioBroadcastResult>> deferredResult = new DeferredResult<>(); deferredResult.setResult(result); return deferredResult; } if (channelId == null) { WVPResult<AudioBroadcastResult> result = new WVPResult<>(); result.setCode(-1); result.setMsg("未找到通道: " + channelId); DeferredResult<WVPResult<AudioBroadcastResult>> deferredResult = new DeferredResult<>(); deferredResult.setResult(result); return deferredResult; } String key = DeferredResultHolder.CALLBACK_CMD_BROADCAST + deviceId; if (resultHolder.exist(key, null)) { result.setResult(new ResponseEntity<>("设备使用中",HttpStatus.OK)); return result; WVPResult<AudioBroadcastResult> wvpResult = new WVPResult<>(); wvpResult.setCode(-1); wvpResult.setMsg("设备使用中"); DeferredResult<WVPResult<AudioBroadcastResult>> deferredResult = new DeferredResult<>(); deferredResult.setResult(wvpResult); return deferredResult; } // playService.audioBroadcast(deviceId, channelForSend, channelForReceive); if (timeout == null){ timeout = 30; } DeferredResult<WVPResult<AudioBroadcastResult>> result = new DeferredResult<>(timeout.longValue()*1000 + 2000); String uuid = UUID.randomUUID().toString(); if (device == null) { resultHolder.put(key, key, result); RequestMessage msg = new RequestMessage(); msg.setKey(key); msg.setId(uuid); JSONObject json = new JSONObject(); json.put("DeviceID", deviceId); json.put("CmdType", "Broadcast"); json.put("Result", "Failed"); json.put("Description", "Device 不存在"); msg.setData(json); resultHolder.invokeResult(msg); return result; } cmder.audioBroadcastCmd(device, (event) -> { RequestMessage msg = new RequestMessage(); msg.setKey(key); msg.setId(uuid); JSONObject json = new JSONObject(); json.put("DeviceID", deviceId); json.put("CmdType", "Broadcast"); json.put("Result", "Failed"); json.put("Description", String.format("语音广播操作失败,错误码: %s, %s", event.statusCode, event.msg)); msg.setData(json); resultHolder.invokeResult(msg); result.onTimeout(()->{ WVPResult<AudioBroadcastResult> wvpResult = new WVPResult<>(); wvpResult.setCode(-1); wvpResult.setMsg("请求超时"); RequestMessage requestMessage = new RequestMessage(); requestMessage.setKey(key); requestMessage.setData(wvpResult); resultHolder.invokeAllResult(requestMessage); }); result.onTimeout(() -> { logger.warn(String.format("语音广播操作超时, 设备未返回应答指令")); RequestMessage msg = new RequestMessage(); msg.setKey(key); msg.setId(uuid); JSONObject json = new JSONObject(); json.put("DeviceID", deviceId); json.put("CmdType", "Broadcast"); json.put("Result", "Failed"); json.put("Error", "Timeout. Device did not response to broadcast command."); msg.setData(json); resultHolder.invokeResult(msg); playService.audioBroadcast(device, channelId, timeout, (msg)->{ WVPResult<AudioBroadcastResult> wvpResult = new WVPResult<>(); wvpResult.setCode(-1); wvpResult.setMsg(msg); RequestMessage requestMessage = new RequestMessage(); requestMessage.setKey(key); requestMessage.setData(wvpResult); resultHolder.invokeAllResult(requestMessage); }); resultHolder.put(key, uuid, result); return result; } @ApiOperation("获取所有的ssrc") @GetMapping("/ssrc") public WVPResult<JSONObject> getSSRC() { public WVPResult<JSONObject> getSsrc() { if (logger.isDebugEnabled()) { logger.debug("获取所有的ssrc"); } src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/bean/AudioBroadcastEvent.java
New file @@ -0,0 +1,9 @@ package com.genersoft.iot.vmp.vmanager.gb28181.play.bean; /** * @author lin */ public interface AudioBroadcastEvent { void call(String msg); } web_src/src/components/dialog/deviceEdit.vue
@@ -37,9 +37,6 @@ </el-select> </el-form-item> <el-form-item label="语音发送通道" prop="name"> <el-input v-model="form.audioChannelForSend" clearable></el-input> </el-form-item> <el-form-item label="语音接收送通道" prop="name"> <el-input v-model="form.audioChannelForReceive" clearable></el-input> </el-form-item> <el-form-item label="目录订阅" title="0为取消订阅" prop="subscribeCycleForCatalog" > @@ -105,6 +102,8 @@ }) }, onSubmit: function () { console.log("onSubmit"); console.log(this.form); this.form.subscribeCycleForCatalog = this.form.subscribeCycleForCatalog||0 this.form.subscribeCycleForMobilePosition = this.form.subscribeCycleForMobilePosition||0 this.form.mobilePositionSubmissionInterval = this.form.mobilePositionSubmissionInterval||0 @@ -124,7 +123,7 @@ }); } }).catch(function (error) { console.error(error); console.log(error); }); }, close: function () {