From 935221ab4112894ad3bc92ed91eff3af8bd2226b Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期一, 08 八月 2022 09:32:38 +0800
Subject: [PATCH] Merge pull request #567 from mrjackwang/wvp-28181-2.0
---
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java | 80 +++++++++++++++++++++++---
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java | 7 ++
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/media/MediaController.java | 29 +++++++++
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java | 39 +++++++++++-
4 files changed, 139 insertions(+), 16 deletions(-)
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
index e4b9e3e..34cb753 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -17,9 +17,11 @@
import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
+import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IPlayService;
+import com.genersoft.iot.vmp.service.IStreamProxyService;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
@@ -65,6 +67,8 @@
@Autowired
private IStreamPushService streamPushService;
+ @Autowired
+ private IStreamProxyService streamProxyService;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@@ -142,6 +146,7 @@
MediaServerItem mediaServerItem = null;
StreamPushItem streamPushItem = null;
+ StreamProxyItem proxyByAppAndStream =null;
// 涓嶆槸閫氶亾鍙兘鏄洿鎾祦
if (channel != null && gbStream == null) {
if (channel.getStatus() == 0) {
@@ -171,6 +176,13 @@
if ("push".equals(gbStream.getStreamType())) {
streamPushItem = streamPushService.getPush(gbStream.getApp(), gbStream.getStream());
if (streamPushItem == null) {
+ logger.info("[ app={}, stream={} ]鎵句笉鍒皕lm {}锛岃繑鍥�410", gbStream.getApp(), gbStream.getStream(), mediaServerId);
+ responseAck(evt, Response.GONE);
+ return;
+ }
+ }else if("proxy".equals(gbStream.getStreamType())){
+ proxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(gbStream.getApp(), gbStream.getStream());
+ if (proxyByAppAndStream == null) {
logger.info("[ app={}, stream={} ]鎵句笉鍒皕lm {}锛岃繑鍥�410", gbStream.getApp(), gbStream.getStream(), mediaServerId);
responseAck(evt, Response.GONE);
return;
@@ -416,14 +428,33 @@
}
}
} else if (gbStream != null) {
- if (streamPushItem != null && streamPushItem.isPushIng()) {
- // 鎺ㄦ祦鐘舵��
- pushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
- mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
- } else {
- // 鏈帹娴� 鎷夎捣
- notifyStreamOnline(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
- mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+ if("push".equals(gbStream.getStreamType())) {
+ if (streamPushItem != null && streamPushItem.isPushIng()) {
+ // 鎺ㄦ祦鐘舵��
+ pushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
+ mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+ } else {
+ // 鏈帹娴� 鎷夎捣
+ notifyStreamOnline(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
+ mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+ }
+ }else if ("proxy".equals(gbStream.getStreamType())){
+ if(null != proxyByAppAndStream &&proxyByAppAndStream.isStatus()){
+ pushProxyStream(evt, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive,
+ mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+ }else{
+ //寮�鍚唬鐞嗘媺娴�
+ boolean start1 = streamProxyService.start(gbStream.getApp(), gbStream.getStream());
+ if(start1) {
+ pushProxyStream(evt, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive,
+ mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+ }else{
+ //澶辫触鍚庨�氱煡
+ notifyStreamOnline(evt, gbStream, null, platform, callIdHeader, mediaServerItem, port, tcpActive,
+ mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+ }
+ }
+
}
}
}
@@ -442,7 +473,39 @@
/**
* 瀹夋帓鎺ㄦ祦
*/
+ private void pushProxyStream(RequestEvent evt, GbStream gbStream, ParentPlatform platform,
+ CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
+ int port, Boolean tcpActive, boolean mediaTransmissionTCP,
+ String channelId, String addressStr, String ssrc, String requesterId) throws InvalidArgumentException, ParseException, SipException {
+ Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
+ if (streamReady) {
+ // 鑷钩鍙板唴瀹�
+ SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
+ gbStream.getApp(), gbStream.getStream(), channelId,
+ mediaTransmissionTCP);
+ if (sendRtpItem == null) {
+ logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�");
+ responseAck(evt, Response.BUSY_HERE);
+ return;
+ }
+ if (tcpActive != null) {
+ sendRtpItem.setTcpActive(tcpActive);
+ }
+ sendRtpItem.setPlayType(InviteStreamType.PUSH);
+ // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶�
+ sendRtpItem.setStatus(1);
+ sendRtpItem.setCallId(callIdHeader.getCallId());
+ byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog());
+ sendRtpItem.setDialog(dialogByteArray);
+ byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction());
+ sendRtpItem.setTransaction(transactionByteArray);
+ redisCatchStorage.updateSendRTPSever(sendRtpItem);
+ sendStreamAck(mediaServerItem, sendRtpItem, platform, evt);
+
+ }
+
+ }
private void pushStream(RequestEvent evt, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
int port, Boolean tcpActive, boolean mediaTransmissionTCP,
@@ -487,7 +550,6 @@
}
}
-
/**
* 閫氱煡娴佷笂绾�
*/
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
index c23cfcd..5974918 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
@@ -8,6 +8,7 @@
import java.util.Map;
import java.util.Set;
+import com.genersoft.iot.vmp.media.zlm.ZLMRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -53,6 +54,9 @@
@Autowired
private SipConfig sipConfig;
+
+ @Autowired
+ private ZLMRunner zlmRunner;
@Value("${server.ssl.enabled:false}")
private boolean sslEnabled;
@@ -277,7 +281,13 @@
return null;
}
String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerId;
- return (MediaServerItem)redisUtil.get(key);
+ MediaServerItem serverItem=(MediaServerItem)redisUtil.get(key);
+ if(null==serverItem){
+ //zlm鏈嶅姟涓嶅湪绾匡紝鍚姩閲嶈繛
+ reloadZlm();
+ serverItem=(MediaServerItem)redisUtil.get(key);
+ }
+ return serverItem;
}
@Override
@@ -470,8 +480,13 @@
String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId();
if (redisUtil.zSize(key) == null || redisUtil.zSize(key) == 0) {
- logger.info("鑾峰彇璐熻浇鏈�浣庣殑鑺傜偣鏃舵棤鍦ㄧ嚎鑺傜偣");
- return null;
+ logger.info("鑾峰彇璐熻浇鏈�浣庣殑鑺傜偣鏃舵棤鍦ㄧ嚎鑺傜偣锛屽惎鍔ㄩ噸杩炴満鍒�");
+ //鍚姩閲嶈繛
+ reloadZlm();
+ if (redisUtil.zSize(key) == null || redisUtil.zSize(key) == 0) {
+ logger.info("鑾峰彇璐熻浇鏈�浣庣殑鑺傜偣鏃舵棤鍦ㄧ嚎鑺傜偣");
+ return null;
+ }
}
// 鑾峰彇鍒嗘暟鏈�浣庣殑锛屽強骞跺彂鏈�浣庣殑
@@ -633,8 +648,14 @@
MediaServerItem mediaServerItem = getOne(mediaServerId);
if (mediaServerItem == null) {
// zlm杩炴帴閲嶈瘯
- logger.warn("[鏇存柊ZLM 淇濇椿淇℃伅]澶辫触锛屾湭鎵惧埌娴佸獟浣撲俊鎭�");
- return;
+ logger.warn("[鏇存柊ZLM 淇濇椿淇℃伅]澶辫触锛屾湭鎵惧埌娴佸獟浣撲俊鎭�,灏濊瘯閲嶈繛zlm");
+ reloadZlm();
+ mediaServerItem = getOne(mediaServerId);
+ if (mediaServerItem == null) {
+ // zlm杩炴帴閲嶈瘯
+ logger.warn("[鏇存柊ZLM 淇濇椿淇℃伅]澶辫触锛屾湭鎵惧埌娴佸獟浣撲俊鎭�");
+ return;
+ }
}
String key = VideoManagerConstants.MEDIA_SERVER_KEEPALIVE_PREFIX + userSetting.getServerId() + "_" + mediaServerId;
int hookAliveInterval = mediaServerItem.getHookAliveInterval() + 2;
@@ -657,4 +678,12 @@
}
}
+ public void reloadZlm(){
+ try {
+ zlmRunner.run();
+ Thread.sleep(500);//寤惰繜0.5绉掔紦鍐叉椂闂�
+ } catch (Exception e) {
+ logger.warn("灏濊瘯閲嶈繛zlm澶辫触锛�",e);
+ }
+ }
}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
index 5fa83f5..6c6c04b 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
@@ -4,6 +4,7 @@
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
+import com.genersoft.iot.vmp.conf.MediaConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
@@ -78,6 +79,10 @@
@Autowired
TransactionDefinition transactionDefinition;
+ @Autowired
+ private MediaConfig mediaConfig;
+
+
@Override
public List<StreamPushItem> handleJSON(String jsonData, MediaServerItem mediaServerItem) {
if (jsonData == null) {
@@ -142,6 +147,8 @@
stream.setStreamType("push");
stream.setStatus(true);
stream.setCreateTime(DateUtil.getNow());
+ stream.setStreamType("push");
+ stream.setMediaServerId(mediaConfig.getId());
int add = gbStreamMapper.add(stream);
return add > 0;
}
diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/media/MediaController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/media/MediaController.java
index 94fe8df..48973f9 100644
--- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/media/MediaController.java
+++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/media/MediaController.java
@@ -6,6 +6,7 @@
import com.genersoft.iot.vmp.media.zlm.dto.OnPublishHookParam;
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
import com.genersoft.iot.vmp.service.IMediaServerService;
+import com.genersoft.iot.vmp.service.IStreamProxyService;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.service.IMediaService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@@ -37,6 +38,8 @@
@Autowired
private IMediaService mediaService;
+ @Autowired
+ private IStreamProxyService streamProxyService;
/**
@@ -95,8 +98,30 @@
result.setMsg("scccess");
result.setData(streamInfo);
}else {
- result.setCode(-1);
- result.setMsg("fail");
+ //鑾峰彇娴佸け璐ワ紝閲嶅惎鎷夋祦鍚庨噸璇曚竴娆�
+ streamProxyService.stop(app,stream);
+ boolean start = streamProxyService.start(app, stream);
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ if (useSourceIpAsStreamIp != null && useSourceIpAsStreamIp) {
+ String host = request.getHeader("Host");
+ String localAddr = host.split(":")[0];
+ logger.info("浣跨敤{}浣滀负杩斿洖娴佺殑ip", localAddr);
+ streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, localAddr, authority);
+ }else {
+ streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, authority);
+ }
+ if (streamInfo != null){
+ result.setCode(0);
+ result.setMsg("scccess");
+ result.setData(streamInfo);
+ }else {
+ result.setCode(-1);
+ result.setMsg("fail");
+ }
}
return result;
}
--
Gitblit v1.8.0