From 89a9ab4534f10a224f70e546db838423e84a1965 Mon Sep 17 00:00:00 2001 From: 64850858 <648540858@qq.com> Date: 星期五, 16 七月 2021 16:34:51 +0800 Subject: [PATCH] 添加zlm集群支持 --- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/InviteRequestProcessor.java | 125 +++++++++++++++++++++++++++-------------- 1 files changed, 81 insertions(+), 44 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/InviteRequestProcessor.java index 91b6ecc..7219dbb 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/InviteRequestProcessor.java @@ -11,12 +11,16 @@ import javax.sip.message.Request; import javax.sip.message.Response; -import com.genersoft.iot.vmp.conf.MediaServerConfig; +import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; import com.genersoft.iot.vmp.gb28181.transmit.request.SIPRequestAbstractProcessor; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; +import com.genersoft.iot.vmp.media.zlm.dto.IMediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult; @@ -50,6 +54,8 @@ private IPlayService playService; private ZLMRTPServerFactory zlmrtpServerFactory; + + private IMediaServerService mediaServerService; public ZLMRTPServerFactory getZlmrtpServerFactory() { return zlmrtpServerFactory; @@ -91,8 +97,29 @@ // 鏌ヨ骞冲彴涓嬫槸鍚︽湁璇ラ�氶亾 DeviceChannel channel = storager.queryChannelInParentPlatform(requesterId, channelId); GbStream gbStream = storager.queryStreamInParentPlatform(requesterId, channelId); + IMediaServerItem mediaServerItem = null; // 涓嶆槸閫氶亾鍙兘鏄洿鎾祦 - if (channel != null || gbStream != null ) { + if (channel != null && gbStream == null ) { + if (channel.getStatus() == 0) { + logger.info("閫氶亾绂荤嚎锛岃繑鍥�400"); + responseAck(evt, Response.BAD_REQUEST, "channel [" + channel.getChannelId() + "] offline"); + return; + } + responseAck(evt, Response.CALL_IS_BEING_FORWARDED); // 閫氶亾瀛樺湪锛屽彂181锛屽懠鍙浆鎺ヤ腑 + }else if(channel == null && gbStream != null){ + String mediaServerId = gbStream.getMediaServerId(); + mediaServerItem = mediaServerService.getOne(mediaServerId); + if (mediaServerItem == null) { + logger.info("[ app={}, stream={} ]zlm鎵句笉鍒帮紝杩斿洖410",gbStream.getApp(), gbStream.getStream()); + responseAck(evt, Response.GONE, "media server not found"); + return; + } + Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream()); + if (!streamReady ) { + logger.info("[ app={}, stream={} ]閫氶亾绂荤嚎锛岃繑鍥�400",gbStream.getApp(), gbStream.getStream()); + responseAck(evt, Response.BAD_REQUEST, "channel [" + gbStream.getGbId() + "] offline"); + return; + } responseAck(evt, Response.CALL_IS_BEING_FORWARDED); // 閫氶亾瀛樺湪锛屽彂181锛屽懠鍙浆鎺ヤ腑 }else { logger.info("閫氶亾涓嶅瓨鍦紝杩斿洖404"); @@ -117,8 +144,8 @@ //boolean recvonly = false; boolean mediaTransmissionTCP = false; Boolean tcpActive = null; - for (int i = 0; i < mediaDescriptions.size(); i++) { - MediaDescription mediaDescription = (MediaDescription)mediaDescriptions.get(i); + for (Object description : mediaDescriptions) { + MediaDescription mediaDescription = (MediaDescription) description; Media media = mediaDescription.getMedia(); Vector mediaFormats = media.getMediaFormats(false); @@ -134,7 +161,7 @@ mediaTransmissionTCP = true; if ("active".equals(setup)) { tcpActive = true; - }else if ("passive".equals(setup)) { + } else if ("passive".equals(setup)) { tcpActive = false; } } @@ -161,7 +188,13 @@ responseAck(evt, Response.SERVER_INTERNAL_ERROR); return; } - SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(addressStr, port, ssrc, requesterId, + mediaServerItem = playService.getNewMediaServerItem(device); + if (mediaServerItem == null) { + logger.warn("鏈壘鍒板彲鐢ㄧ殑zlm"); + responseAck(evt, Response.BUSY_HERE); + return; + } + SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, device.getDeviceId(), channelId, mediaTransmissionTCP); if (tcpActive != null) { @@ -176,18 +209,18 @@ // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� redisCatchStorage.updateSendRTPSever(sendRtpItem); // 閫氱煡涓嬬骇鎺ㄦ祦锛� - PlayResult playResult = playService.play(device.getDeviceId(), channelId, (responseJSON)->{ + PlayResult playResult = playService.play(mediaServerItem,device.getDeviceId(), channelId, (mediaServerItemInUSe, responseJSON)->{ // 鏀跺埌鎺ㄦ祦锛� 鍥炲200OK, 绛夊緟ack // if (sendRtpItem == null) return; sendRtpItem.setStatus(1); redisCatchStorage.updateSendRTPSever(sendRtpItem); // TODO 娣诲姞瀵箃cp鐨勬敮鎸� - MediaServerConfig mediaInfo = redisCatchStorage.getMediaInfo(); + StringBuffer content = new StringBuffer(200); content.append("v=0\r\n"); - content.append("o="+"00000"+" 0 0 IN IP4 "+mediaInfo.getWanIp()+"\r\n"); + content.append("o="+"00000"+" 0 0 IN IP4 "+mediaServerItemInUSe.getSdpIp()+"\r\n"); content.append("s=Play\r\n"); - content.append("c=IN IP4 "+mediaInfo.getWanIp()+"\r\n"); + content.append("c=IN IP4 "+mediaServerItemInUSe.getSdpIp()+"\r\n"); content.append("t=0 0\r\n"); content.append("m=video "+ sendRtpItem.getLocalPort()+" RTP/AVP 96\r\n"); content.append("a=sendonly\r\n"); @@ -204,7 +237,7 @@ } catch (ParseException e) { e.printStackTrace(); } - } ,(event -> { + } ,((event) -> { // 鏈煡閿欒銆傜洿鎺ヨ浆鍙戣澶囩偣鎾殑閿欒 Response response = null; try { @@ -219,7 +252,7 @@ } }else if (gbStream != null) { - SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(addressStr, port, ssrc, requesterId, + SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP); @@ -235,34 +268,29 @@ // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� redisCatchStorage.updateSendRTPSever(sendRtpItem); - // 妫�娴嬬洿鎾祦鏄惁鍦ㄧ嚎 - Boolean streamReady = zlmrtpServerFactory.isStreamReady(gbStream.getApp(), gbStream.getStream()); - if (streamReady) { - sendRtpItem.setStatus(1); - redisCatchStorage.updateSendRTPSever(sendRtpItem); - // TODO 娣诲姞瀵箃cp鐨勬敮鎸� - MediaServerConfig mediaInfo = redisCatchStorage.getMediaInfo(); - StringBuffer content = new StringBuffer(200); - content.append("v=0\r\n"); - content.append("o="+"00000"+" 0 0 IN IP4 "+mediaInfo.getWanIp()+"\r\n"); - content.append("s=Play\r\n"); - content.append("c=IN IP4 "+mediaInfo.getWanIp()+"\r\n"); - content.append("t=0 0\r\n"); - content.append("m=video "+ sendRtpItem.getLocalPort()+" RTP/AVP 96\r\n"); - content.append("a=sendonly\r\n"); - content.append("a=rtpmap:96 PS/90000\r\n"); - content.append("y="+ ssrc + "\r\n"); - content.append("f=\r\n"); + sendRtpItem.setStatus(1); + redisCatchStorage.updateSendRTPSever(sendRtpItem); + // TODO 娣诲姞瀵箃cp鐨勬敮鎸� + StringBuffer content = new StringBuffer(200); + content.append("v=0\r\n"); + content.append("o="+"00000"+" 0 0 IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); + content.append("s=Play\r\n"); + content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); + content.append("t=0 0\r\n"); + content.append("m=video "+ sendRtpItem.getLocalPort()+" RTP/AVP 96\r\n"); + content.append("a=sendonly\r\n"); + content.append("a=rtpmap:96 PS/90000\r\n"); + content.append("y="+ ssrc + "\r\n"); + content.append("f=\r\n"); - try { - responseAck(evt, content.toString()); - } catch (SipException e) { - e.printStackTrace(); - } catch (InvalidArgumentException e) { - e.printStackTrace(); - } catch (ParseException e) { - e.printStackTrace(); - } + try { + responseAck(evt, content.toString()); + } catch (SipException e) { + e.printStackTrace(); + } catch (InvalidArgumentException e) { + e.printStackTrace(); + } catch (ParseException e) { + e.printStackTrace(); } } @@ -327,11 +355,6 @@ String username = sdp.getOrigin().getUsername(); String addressStr = sdp.getOrigin().getAddress(); logger.info("璁惧{}璇锋眰璇煶娴侊紝鍦板潃锛歿}:{}锛宻src锛歿}", username, addressStr, port, ssrc); - - - - - } else { logger.warn("鏉ヨ嚜鏃犳晥璁惧/骞冲彴鐨勮姹�"); @@ -364,6 +387,12 @@ */ private void responseAck(RequestEvent evt, int statusCode) throws SipException, InvalidArgumentException, ParseException { Response response = getMessageFactory().createResponse(statusCode, evt.getRequest()); + getServerTransaction(evt).sendResponse(response); + } + + private void responseAck(RequestEvent evt, int statusCode, String msg) throws SipException, InvalidArgumentException, ParseException { + Response response = getMessageFactory().createResponse(statusCode, evt.getRequest()); + response.setReasonPhrase(msg); getServerTransaction(evt).sendResponse(response); } @@ -434,4 +463,12 @@ public void setRedisCatchStorage(IRedisCatchStorage redisCatchStorage) { this.redisCatchStorage = redisCatchStorage; } + + public IMediaServerService getMediaServerService() { + return mediaServerService; + } + + public void setMediaServerService(IMediaServerService mediaServerService) { + this.mediaServerService = mediaServerService; + } } -- Gitblit v1.8.0