From 394c40d8bb29b8e40fc7978ea638f1592b03d617 Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期四, 08 九月 2022 16:42:46 +0800
Subject: [PATCH] 修复目录订阅的状态异常
---
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java | 117 +++++++++++++++++++++++++++++++++++++++++++++++-----------
1 files changed, 94 insertions(+), 23 deletions(-)
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
index e4b9e3e..82b3ba4 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -13,13 +13,13 @@
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
-import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
+import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
-import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
-import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
+import com.genersoft.iot.vmp.media.zlm.dto.*;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IPlayService;
+import com.genersoft.iot.vmp.service.IStreamProxyService;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
@@ -38,7 +38,6 @@
import javax.sdp.*;
import javax.sip.*;
-import javax.sip.address.SipURI;
import javax.sip.header.CallIdHeader;
import javax.sip.message.Request;
import javax.sip.message.Response;
@@ -65,6 +64,8 @@
@Autowired
private IStreamPushService streamPushService;
+ @Autowired
+ private IStreamProxyService streamProxyService;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@@ -86,6 +87,9 @@
@Autowired
private IMediaServerService mediaServerService;
+
+ @Autowired
+ private ZlmHttpHookSubscribe zlmHttpHookSubscribe;
@Autowired
private SIPProcessorObserver sipProcessorObserver;
@@ -142,6 +146,7 @@
MediaServerItem mediaServerItem = null;
StreamPushItem streamPushItem = null;
+ StreamProxyItem proxyByAppAndStream =null;
// 涓嶆槸閫氶亾鍙兘鏄洿鎾祦
if (channel != null && gbStream == null) {
if (channel.getStatus() == 0) {
@@ -171,6 +176,13 @@
if ("push".equals(gbStream.getStreamType())) {
streamPushItem = streamPushService.getPush(gbStream.getApp(), gbStream.getStream());
if (streamPushItem == null) {
+ logger.info("[ app={}, stream={} ]鎵句笉鍒皕lm {}锛岃繑鍥�410", gbStream.getApp(), gbStream.getStream(), mediaServerId);
+ responseAck(evt, Response.GONE);
+ return;
+ }
+ }else if("proxy".equals(gbStream.getStreamType())){
+ proxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(gbStream.getApp(), gbStream.getStream());
+ if (proxyByAppAndStream == null) {
logger.info("[ app={}, stream={} ]鎵句笉鍒皕lm {}锛岃繑鍥�410", gbStream.getApp(), gbStream.getStream(), mediaServerId);
responseAck(evt, Response.GONE);
return;
@@ -237,16 +249,16 @@
String protocol = media.getProtocol();
// 鍖哄垎TCP鍙戞祦杩樻槸udp锛� 褰撳墠榛樿udp
- if ("TCP/RTP/AVP".equals(protocol)) {
+ if ("TCP/RTP/AVP".equalsIgnoreCase(protocol)) {
String setup = mediaDescription.getAttribute("setup");
if (setup != null) {
mediaTransmissionTCP = true;
- if ("active".equals(setup)) {
+ if ("active".equalsIgnoreCase(setup)) {
tcpActive = true;
// 涓嶆敮鎸乼cp涓诲姩
responseAck(evt, Response.NOT_IMPLEMENTED, "tcp active not support"); // 鐩綍涓嶆敮鎸佺偣鎾�
return;
- } else if ("passive".equals(setup)) {
+ } else if ("passive".equalsIgnoreCase(setup)) {
tcpActive = false;
}
}
@@ -291,11 +303,11 @@
return;
}
sendRtpItem.setCallId(callIdHeader.getCallId());
- sendRtpItem.setPlayType("Play".equals(sessionName) ? InviteStreamType.PLAY : InviteStreamType.PLAYBACK);
+ sendRtpItem.setPlayType("Play".equalsIgnoreCase(sessionName) ? InviteStreamType.PLAY : InviteStreamType.PLAYBACK);
Long finalStartTime = startTime;
Long finalStopTime = stopTime;
- ZLMHttpHookSubscribe.Event hookEvent = (mediaServerItemInUSe, responseJSON) -> {
+ ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInUSe, responseJSON) -> {
String app = responseJSON.getString("app");
String stream = responseJSON.getString("stream");
logger.info("[涓婄骇鐐规挱]涓嬬骇宸茬粡寮�濮嬫帹娴併�� 鍥炲200OK(SDP)锛� {}/{}", app, stream);
@@ -310,7 +322,7 @@
content.append("o=" + channelId + " 0 0 IN IP4 " + mediaServerItemInUSe.getSdpIp() + "\r\n");
content.append("s=" + sessionName + "\r\n");
content.append("c=IN IP4 " + mediaServerItemInUSe.getSdpIp() + "\r\n");
- if ("Playback".equals(sessionName)) {
+ if ("Playback".equalsIgnoreCase(sessionName)) {
content.append("t=" + finalStartTime + " " + finalStopTime + "\r\n");
} else {
content.append("t=0 0\r\n");
@@ -354,7 +366,7 @@
}
});
sendRtpItem.setApp("rtp");
- if ("Playback".equals(sessionName)) {
+ if ("Playback".equalsIgnoreCase(sessionName)) {
sendRtpItem.setPlayType(InviteStreamType.PLAYBACK);
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, true, true);
sendRtpItem.setStreamId(ssrcInfo.getStream());
@@ -389,7 +401,14 @@
if (playTransaction != null) {
Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, "rtp", playTransaction.getStream());
if (!streamReady) {
- playTransaction = null;
+ boolean hasRtpServer = mediaServerService.checkRtpServer(mediaServerItem, "rtp", playTransaction.getStream());
+ if (hasRtpServer) {
+ logger.info("[涓婄骇鐐规挱]宸茬粡寮�鍚痳tpServer浣嗘槸灏氭湭鏀跺埌娴侊紝寮�鍚洃鍚祦鐨勫埌鏉�");
+ HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", playTransaction.getStream(), true, "rtsp", mediaServerItem.getId());
+ zlmHttpHookSubscribe.addSubscribe(hookSubscribe, hookEvent);
+ }else {
+ playTransaction = null;
+ }
}
}
if (playTransaction == null) {
@@ -398,6 +417,7 @@
streamId = String.format("%s_%s", device.getDeviceId(), channelId);
}
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, device.isSsrcCheck(), false);
+ logger.info(JSONObject.toJSONString(ssrcInfo));
sendRtpItem.setStreamId(ssrcInfo.getStream());
// 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶�
redisCatchStorage.updateSendRTPSever(sendRtpItem);
@@ -416,14 +436,33 @@
}
}
} else if (gbStream != null) {
- if (streamPushItem != null && streamPushItem.isPushIng()) {
- // 鎺ㄦ祦鐘舵��
- pushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
- mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
- } else {
- // 鏈帹娴� 鎷夎捣
- notifyStreamOnline(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
- mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+ if("push".equals(gbStream.getStreamType())) {
+ if (streamPushItem != null && streamPushItem.isPushIng()) {
+ // 鎺ㄦ祦鐘舵��
+ pushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
+ mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+ } else {
+ // 鏈帹娴� 鎷夎捣
+ notifyStreamOnline(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
+ mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+ }
+ }else if ("proxy".equals(gbStream.getStreamType())){
+ if(null != proxyByAppAndStream &&proxyByAppAndStream.isStatus()){
+ pushProxyStream(evt, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive,
+ mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+ }else{
+ //寮�鍚唬鐞嗘媺娴�
+ boolean start1 = streamProxyService.start(gbStream.getApp(), gbStream.getStream());
+ if(start1) {
+ pushProxyStream(evt, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive,
+ mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+ }else{
+ //澶辫触鍚庨�氱煡
+ notifyStreamOnline(evt, gbStream, null, platform, callIdHeader, mediaServerItem, port, tcpActive,
+ mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+ }
+ }
+
}
}
}
@@ -442,7 +481,39 @@
/**
* 瀹夋帓鎺ㄦ祦
*/
+ private void pushProxyStream(RequestEvent evt, GbStream gbStream, ParentPlatform platform,
+ CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
+ int port, Boolean tcpActive, boolean mediaTransmissionTCP,
+ String channelId, String addressStr, String ssrc, String requesterId) throws InvalidArgumentException, ParseException, SipException {
+ Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
+ if (streamReady) {
+ // 鑷钩鍙板唴瀹�
+ SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
+ gbStream.getApp(), gbStream.getStream(), channelId,
+ mediaTransmissionTCP);
+ if (sendRtpItem == null) {
+ logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�");
+ responseAck(evt, Response.BUSY_HERE);
+ return;
+ }
+ if (tcpActive != null) {
+ sendRtpItem.setTcpActive(tcpActive);
+ }
+ sendRtpItem.setPlayType(InviteStreamType.PUSH);
+ // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶�
+ sendRtpItem.setStatus(1);
+ sendRtpItem.setCallId(callIdHeader.getCallId());
+ byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog());
+ sendRtpItem.setDialog(dialogByteArray);
+ byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction());
+ sendRtpItem.setTransaction(transactionByteArray);
+ redisCatchStorage.updateSendRTPSever(sendRtpItem);
+ sendStreamAck(mediaServerItem, sendRtpItem, platform, evt);
+
+ }
+
+ }
private void pushStream(RequestEvent evt, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
int port, Boolean tcpActive, boolean mediaTransmissionTCP,
@@ -487,7 +558,6 @@
}
}
-
/**
* 閫氱煡娴佷笂绾�
*/
@@ -501,7 +571,8 @@
responseAck(evt, Response.BAD_REQUEST, "channel [" + gbStream.getGbId() + "] offline");
} else if ("push".equals(gbStream.getStreamType())) {
if (!platform.isStartOfflinePush()) {
- responseAck(evt, Response.TEMPORARILY_UNAVAILABLE, "channel unavailable");
+ // 骞冲彴璁剧疆涓叧闂簡鎷夎捣绂荤嚎鐨勬帹娴佸垯鐩存帴鍥炲
+ responseAck(evt, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing");
return;
}
// 鍙戦�乺edis娑堟伅浠ヤ娇璁惧涓婄嚎
@@ -537,7 +608,7 @@
app, stream, channelId, mediaTransmissionTCP);
if (sendRtpItem == null) {
- logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�");
+ logger.warn("涓婄骇鐐规椂鍒涘缓sendRTPItem澶辫触锛屽彲鑳芥槸鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�");
try {
responseAck(evt, Response.BUSY_HERE);
} catch (SipException e) {
--
Gitblit v1.8.0