From ff1e9f79c2992777fb7a1ecaeb85c8acb1250535 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期五, 07 七月 2023 14:51:41 +0800 Subject: [PATCH] 优化海康发送的Message消息设置的消息体长度为0,导致无法解析的问题 #920 --- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java | 201 +++++++++++++++++++++++--------------------------- 1 files changed, 92 insertions(+), 109 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index b7f600f..c0a1fec 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -1,12 +1,10 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; -import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; -import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; -import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.SIPSender; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; @@ -14,13 +12,16 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; -import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; +import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.*; +import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IPlayService; import com.genersoft.iot.vmp.service.IStreamProxyService; import com.genersoft.iot.vmp.service.IStreamPushService; +import com.genersoft.iot.vmp.service.bean.ErrorCallback; +import com.genersoft.iot.vmp.service.bean.InviteErrorCode; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener; @@ -90,7 +91,7 @@ private SIPSender sipSender; @Autowired - private ZLMRTPServerFactory zlmrtpServerFactory; + private ZLMServerFactory zlmServerFactory; @Autowired private IMediaServerService mediaServerService; @@ -100,9 +101,6 @@ @Autowired private SIPProcessorObserver sipProcessorObserver; - - @Autowired - private VideoStreamSessionManager sessionManager; @Autowired private UserSetting userSetting; @@ -330,10 +328,9 @@ } String ssrc; - if (gb28181Sdp.getSsrc() == null) { + if (userSetting.getUseCustomSsrcForParentInvite() || gb28181Sdp.getSsrc() == null) { // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡 ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); - logger.warn("[涓婄骇Invite] {} 骞冲彴锛歿}锛� 閫氶亾锛歿}, 缂哄皯 ssrc锛岃ˉ鍏呬负锛� {}", sessionName, username, channelId, ssrc); }else { ssrc = gb28181Sdp.getSsrc(); } @@ -348,10 +345,8 @@ streamTypeStr = "UDP"; } logger.info("[涓婄骇Invite] {}, 骞冲彴锛歿}锛� 閫氶亾锛歿}, 鏀舵祦鍦板潃锛歿}:{}锛屾敹娴佹柟寮忥細{}, ssrc锛歿}", sessionName, username, channelId, addressStr, port, streamTypeStr, ssrc); - SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, - device.getDeviceId(), channelId, mediaTransmissionTCP, platform.isRtcp(), ssrcFromCallback -> { - return redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, null, callIdHeader.getCallId()) != null; - }); + SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, + device.getDeviceId(), channelId, mediaTransmissionTCP, platform.isRtcp()); if (tcpActive != null) { sendRtpItem.setTcpActive(tcpActive); @@ -370,10 +365,10 @@ Long finalStartTime = startTime; Long finalStopTime = stopTime; - ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInUSe, responseJSON) -> { - String app = responseJSON.getString("app"); - String stream = responseJSON.getString("stream"); - logger.info("[涓婄骇鐐规挱]涓嬬骇宸茬粡寮�濮嬫帹娴併�� 鍥炲200OK(SDP)锛� {}/{}", app, stream); + ErrorCallback<Object> hookEvent = (code, msg, data) -> { + StreamInfo streamInfo = (StreamInfo)data; + MediaServerItem mediaServerItemInUSe = mediaServerService.getOne(streamInfo.getMediaServerId()); + logger.info("[涓婄骇Invite]涓嬬骇宸茬粡寮�濮嬫帹娴併�� 鍥炲200OK(SDP)锛� {}/{}", streamInfo.getApp(), streamInfo.getStream()); // * 0 绛夊緟璁惧鎺ㄦ祦涓婃潵 // * 1 涓嬬骇宸茬粡鎺ㄦ祦锛岀瓑寰呬笂绾у钩鍙板洖澶峚ck // * 2 鎺ㄦ祦涓� @@ -419,11 +414,13 @@ logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鍥炲SdpAck", e); } }; - SipSubscribe.Event errorEvent = ((event) -> { + ErrorCallback<Object> errorEvent = ((statusCode, msg, data) -> { // 鏈煡閿欒銆傜洿鎺ヨ浆鍙戣澶囩偣鎾殑閿欒 try { - Response response = getMessageFactory().createResponse(event.statusCode, evt.getRequest()); - sipSender.transmitRequest(request.getLocalAddress().getHostAddress(), response); + if (statusCode > 0) { + Response response = getMessageFactory().createResponse(statusCode, evt.getRequest()); + sipSender.transmitRequest(request.getLocalAddress().getHostAddress(), response); + } } catch (ParseException | SipException e) { logger.error("鏈鐞嗙殑寮傚父 ", e); } @@ -436,73 +433,75 @@ // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� redisCatchStorage.updateSendRTPSever(sendRtpItem); playService.playBack(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start), - DateUtil.formatter.format(end), null, result -> { - if (result.getCode() != 0) { - logger.warn("褰曞儚鍥炴斁澶辫触"); - if (result.getEvent() != null) { - errorEvent.response(result.getEvent()); - } + DateUtil.formatter.format(end), + (code, msg, data) -> { + if (code == InviteErrorCode.SUCCESS.getCode()){ + hookEvent.run(code, msg, data); + }else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()){ + logger.info("[褰曞儚鍥炴斁]瓒呮椂, 鐢ㄦ埛锛歿}锛� 閫氶亾锛歿}", username, channelId); redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); - try { - responseAck(request, Response.REQUEST_TIMEOUT); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 褰曞儚鍥炴斁 鍙戦�丷EQUEST_TIMEOUT: {}", e.getMessage()); - } - } else { - if (result.getMediaServerItem() != null) { - hookEvent.response(result.getMediaServerItem(), result.getResponse()); - } + errorEvent.run(code, msg, data); + }else { + errorEvent.run(code, msg, data); } }); - } else { - sendRtpItem.setPlayType(InviteStreamType.PLAY); - SsrcTransaction playTransaction = sessionManager.getSsrcTransaction(device.getDeviceId(), channelId, "play", null); - if (playTransaction != null) { - Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, "rtp", playTransaction.getStream()); - if (!streamReady) { - boolean hasRtpServer = mediaServerService.checkRtpServer(mediaServerItem, "rtp", playTransaction.getStream()); - if (hasRtpServer) { - logger.info("[涓婄骇鐐规挱]宸茬粡寮�鍚痳tpServer浣嗘槸灏氭湭鏀跺埌娴侊紝寮�鍚洃鍚祦鐨勫埌鏉�"); - HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", playTransaction.getStream(), true, "rtsp", mediaServerItem.getId()); - zlmHttpHookSubscribe.addSubscribe(hookSubscribe, hookEvent); - }else { - playTransaction = null; - } - } + }else if ("Download".equalsIgnoreCase(sessionName)) { + // 鑾峰彇鎸囧畾鐨勪笅杞介�熷害 + Vector sdpMediaDescriptions = sdp.getMediaDescriptions(true); + MediaDescription mediaDescription = null; + String downloadSpeed = "1"; + if (sdpMediaDescriptions.size() > 0) { + mediaDescription = (MediaDescription)sdpMediaDescriptions.get(0); } - if (playTransaction == null) { - String streamId = null; - if (mediaServerItem.isRtpEnable()) { - streamId = String.format("%s_%s", device.getDeviceId(), channelId); - } - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, device.isSsrcCheck(), false, 0, false, device.getStreamModeForParam()); - logger.info(JSONObject.toJSONString(ssrcInfo)); - sendRtpItem.setStreamId(ssrcInfo.getStream()); - sendRtpItem.setSsrc(ssrc); + if (mediaDescription != null) { + downloadSpeed = mediaDescription.getAttribute("downloadspeed"); + } - // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� - redisCatchStorage.updateSendRTPSever(sendRtpItem); - MediaServerItem finalMediaServerItem = mediaServerItem; - playService.play(mediaServerItem, ssrcInfo, device, channelId, hookEvent, errorEvent, (code, msg) -> { + sendRtpItem.setPlayType(InviteStreamType.DOWNLOAD); + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, null, device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam()); + sendRtpItem.setStreamId(ssrcInfo.getStream()); + // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� + redisCatchStorage.updateSendRTPSever(sendRtpItem); + playService.download(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start), + DateUtil.formatter.format(end), Integer.parseInt(downloadSpeed), + (code, msg, data) -> { + if (code == InviteErrorCode.SUCCESS.getCode()){ + hookEvent.run(code, msg, data); + }else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()){ + logger.info("[褰曞儚涓嬭浇]瓒呮椂, 鐢ㄦ埛锛歿}锛� 閫氶亾锛歿}", username, channelId); + redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); + errorEvent.run(code, msg, data); + }else { + errorEvent.run(code, msg, data); + } + }); + }else { + sendRtpItem.setPlayType(InviteStreamType.PLAY); + String streamId = null; + if (mediaServerItem.isRtpEnable()) { + streamId = String.format("%s_%s", device.getDeviceId(), channelId); + }else { + streamId = String.format("%08x", Integer.parseInt(ssrc)).toUpperCase(); + } + sendRtpItem.setStreamId(streamId); + redisCatchStorage.updateSendRTPSever(sendRtpItem); + playService.play(mediaServerItem, device.getDeviceId(), channelId, ((code, msg, data) -> { + if (code == InviteErrorCode.SUCCESS.getCode()){ + hookEvent.run(code, msg, data); + }else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()){ logger.info("[涓婄骇鐐规挱]瓒呮椂, 鐢ㄦ埛锛歿}锛� 閫氶亾锛歿}", username, channelId); redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); - }); - } else { - // 褰撳墠绯荤粺浣滀负涓嬬骇骞冲彴浣跨敤锛屽綋涓婄骇骞冲彴鐐规挱鏃朵笉鎼哄甫ssrc鏃讹紝骞朵笖璁惧鍦ㄥ綋鍓嶇郴缁熶腑宸茬粡鐐规挱浜嗐�傝繖涓椂鍊欓渶瑕侀噸鏂扮粰鐢熸垚涓�涓猻src锛屼笉浣跨敤榛樿鐨�"0000000000"銆� - sendRtpItem.setSsrc(ssrc); - sendRtpItem.setStreamId(playTransaction.getStream()); - // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� - redisCatchStorage.updateSendRTPSever(sendRtpItem); - JSONObject jsonObject = new JSONObject(); - jsonObject.put("app", sendRtpItem.getApp()); - jsonObject.put("stream", sendRtpItem.getStreamId()); - hookEvent.response(mediaServerItem, jsonObject); - } + errorEvent.run(code, msg, data); + }else { + errorEvent.run(code, msg, data); + } + })); + } } else if (gbStream != null) { String ssrc; - if (gb28181Sdp.getSsrc() == null) { + if (userSetting.getUseCustomSsrcForParentInvite() || gb28181Sdp.getSsrc() == null) { // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡 ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); }else { @@ -549,13 +548,11 @@ CallIdHeader callIdHeader, MediaServerItem mediaServerItem, int port, Boolean tcpActive, boolean mediaTransmissionTCP, String channelId, String addressStr, String ssrc, String requesterId) { - Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream()); - if (streamReady) { + Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream()); + if (streamReady != null && streamReady) { // 鑷钩鍙板唴瀹� - SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, - gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp(), ssrcFromCallback ->{ - return redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, null, callIdHeader.getCallId()) != null; - }); + SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, + gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp()); if (sendRtpItem == null) { logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�"); @@ -590,13 +587,11 @@ String channelId, String addressStr, String ssrc, String requesterId) { // 鎺ㄦ祦 if (streamPushItem.isSelf()) { - Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream()); - if (streamReady) { + Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream()); + if (streamReady != null && streamReady) { // 鑷钩鍙板唴瀹� - SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, - gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp(), ssrcFromCallback ->{ - return redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, null, callIdHeader.getCallId()) != null; - }); + SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, + gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp()); if (sendRtpItem == null) { logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�"); @@ -647,10 +642,9 @@ logger.info("[ app={}, stream={} ]閫氶亾鏈帹娴侊紝鍚敤娴佸悗寮�濮嬫帹娴�", gbStream.getApp(), gbStream.getStream()); // 鐩戝惉娴佷笂绾� HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(gbStream.getApp(), gbStream.getStream(), true, "rtsp", mediaServerItem.getId()); - zlmHttpHookSubscribe.addSubscribe(hookSubscribe, (mediaServerItemInUSe, responseJSON) -> { - String app = responseJSON.getString("app"); - String stream = responseJSON.getString("stream"); - logger.info("[涓婄骇鐐规挱]鎷夋祦浠g悊宸茬粡灏辩华锛� {}/{}", app, stream); + zlmHttpHookSubscribe.addSubscribe(hookSubscribe, (mediaServerItemInUSe, hookParam) -> { + OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam; + logger.info("[涓婄骇鐐规挱]鎷夋祦浠g悊宸茬粡灏辩华锛� {}/{}", streamChangedHookParam.getApp(), streamChangedHookParam.getStream()); dynamicTask.stop(callIdHeader.getCallId()); pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive, mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); @@ -712,10 +706,8 @@ mediaListManager.addChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream(), (app, stream, serverId) -> { dynamicTask.stop(callIdHeader.getCallId()); if (serverId.equals(userSetting.getServerId())) { - SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId, - app, stream, channelId, mediaTransmissionTCP, platform.isRtcp(), ssrcFromCallback -> { - return redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, null, callIdHeader.getCallId()) != null; - }); + SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId, + app, stream, channelId, mediaTransmissionTCP, platform.isRtcp()); if (sendRtpItem == null) { logger.warn("涓婄骇鐐规椂鍒涘缓sendRTPItem澶辫触锛屽彲鑳芥槸鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�"); @@ -886,20 +878,11 @@ } String contentString = new String(request.getRawContent()); // jainSip涓嶆敮鎸亂=瀛楁锛� 绉婚櫎绉婚櫎浠ヨВ鏋愩�� - String substring = contentString; String ssrc = "0000000404"; - int ssrcIndex = contentString.indexOf("y="); - if (ssrcIndex > 0) { - substring = contentString.substring(0, ssrcIndex); - ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); - } - ssrcIndex = substring.indexOf("f="); - if (ssrcIndex > 0) { - substring = contentString.substring(0, ssrcIndex); - } - SessionDescription sdp = null; + try { - sdp = SdpFactory.getInstance().createSessionDescription(substring); + Gb28181Sdp gb28181Sdp = SipUtils.parseSDP(contentString); + SessionDescription sdp = gb28181Sdp.getBaseSdb(); // 鑾峰彇鏀寔鐨勬牸寮� Vector mediaDescriptions = sdp.getMediaDescriptions(true); // 鏌ョ湅鏄惁鏀寔PS 璐熻浇96 -- Gitblit v1.8.0