From 8870f5f5a182f4af527dc2b89ad75063019df14f Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期四, 10 十一月 2022 09:40:01 +0800
Subject: [PATCH] 优化使用来源ip作为流ip
---
src/main/java/com/genersoft/iot/vmp/common/StreamURL.java | 4
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java | 166 ++++++++++--------
src/main/java/com/genersoft/iot/vmp/conf/SipConfig.java | 9 -
src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java | 92 ++++++++++
src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java | 2
src/main/java/com/genersoft/iot/vmp/vmanager/bean/DeferredResultFilter.java | 6
src/main/resources/all-application.yml | 2
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java | 65 +++++--
src/main/java/com/genersoft/iot/vmp/vmanager/bean/DeferredResultEx.java | 31 +++
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java | 10
src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java | 7
src/main/java/com/genersoft/iot/vmp/utils/redis/FastJsonRedisSerializer.java | 10
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java | 74 +++++--
src/main/java/com/genersoft/iot/vmp/service/IPlayService.java | 7
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPSender.java | 9
15 files changed, 343 insertions(+), 151 deletions(-)
diff --git a/src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java b/src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java
index 20a286c..46b0e00 100644
--- a/src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java
+++ b/src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java
@@ -2,10 +2,10 @@
import io.swagger.v3.oas.annotations.media.Schema;
-import java.net.URL;
+import java.io.Serializable;
@Schema(description = "娴佷俊鎭�")
-public class StreamInfo {
+public class StreamInfo implements Serializable {
@Schema(description = "搴旂敤鍚�")
private String app;
@@ -79,6 +79,94 @@
@Schema(description = "鏄惁鏆傚仠锛堝綍鍍忓洖鏀句娇鐢級")
private boolean pause;
+ public void setFlv(StreamURL flv) {
+ this.flv = flv;
+ }
+
+ public void setHttps_flv(StreamURL https_flv) {
+ this.https_flv = https_flv;
+ }
+
+ public void setWs_flv(StreamURL ws_flv) {
+ this.ws_flv = ws_flv;
+ }
+
+ public void setWss_flv(StreamURL wss_flv) {
+ this.wss_flv = wss_flv;
+ }
+
+ public void setFmp4(StreamURL fmp4) {
+ this.fmp4 = fmp4;
+ }
+
+ public void setHttps_fmp4(StreamURL https_fmp4) {
+ this.https_fmp4 = https_fmp4;
+ }
+
+ public void setWs_fmp4(StreamURL ws_fmp4) {
+ this.ws_fmp4 = ws_fmp4;
+ }
+
+ public void setWss_fmp4(StreamURL wss_fmp4) {
+ this.wss_fmp4 = wss_fmp4;
+ }
+
+ public void setHls(StreamURL hls) {
+ this.hls = hls;
+ }
+
+ public void setHttps_hls(StreamURL https_hls) {
+ this.https_hls = https_hls;
+ }
+
+ public void setWs_hls(StreamURL ws_hls) {
+ this.ws_hls = ws_hls;
+ }
+
+ public void setWss_hls(StreamURL wss_hls) {
+ this.wss_hls = wss_hls;
+ }
+
+ public void setTs(StreamURL ts) {
+ this.ts = ts;
+ }
+
+ public void setHttps_ts(StreamURL https_ts) {
+ this.https_ts = https_ts;
+ }
+
+ public void setWs_ts(StreamURL ws_ts) {
+ this.ws_ts = ws_ts;
+ }
+
+ public void setWss_ts(StreamURL wss_ts) {
+ this.wss_ts = wss_ts;
+ }
+
+ public void setRtmp(StreamURL rtmp) {
+ this.rtmp = rtmp;
+ }
+
+ public void setRtmps(StreamURL rtmps) {
+ this.rtmps = rtmps;
+ }
+
+ public void setRtsp(StreamURL rtsp) {
+ this.rtsp = rtsp;
+ }
+
+ public void setRtsps(StreamURL rtsps) {
+ this.rtsps = rtsps;
+ }
+
+ public void setRtc(StreamURL rtc) {
+ this.rtc = rtc;
+ }
+
+ public void setRtcs(StreamURL rtcs) {
+ this.rtcs = rtcs;
+ }
+
public void setRtmp(String host, int port, int sslPort, String app, String stream, String callIdParam) {
String file = String.format("%s/%s/%s", app, stream, callIdParam);
this.rtmp = new StreamURL("rtmp", host, port, file);
diff --git a/src/main/java/com/genersoft/iot/vmp/common/StreamURL.java b/src/main/java/com/genersoft/iot/vmp/common/StreamURL.java
index bb67dee..eecf469 100644
--- a/src/main/java/com/genersoft/iot/vmp/common/StreamURL.java
+++ b/src/main/java/com/genersoft/iot/vmp/common/StreamURL.java
@@ -2,9 +2,11 @@
import io.swagger.v3.oas.annotations.media.Schema;
+import java.io.Serializable;
+
@Schema(description = "娴佸湴鍧�淇℃伅")
-public class StreamURL {
+public class StreamURL implements Serializable {
@Schema(description = "鍗忚")
private String protocol;
diff --git a/src/main/java/com/genersoft/iot/vmp/conf/SipConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/SipConfig.java
index 67b89d6..ff9008e 100644
--- a/src/main/java/com/genersoft/iot/vmp/conf/SipConfig.java
+++ b/src/main/java/com/genersoft/iot/vmp/conf/SipConfig.java
@@ -20,8 +20,6 @@
Integer ptzSpeed = 50;
- Integer keepaliveTimeOut = 255;
-
Integer registerTimeInterval = 120;
private boolean alarm;
@@ -50,9 +48,6 @@
this.ptzSpeed = ptzSpeed;
}
- public void setKeepaliveTimeOut(Integer keepaliveTimeOut) {
- this.keepaliveTimeOut = keepaliveTimeOut;
- }
public void setRegisterTimeInterval(Integer registerTimeInterval) {
this.registerTimeInterval = registerTimeInterval;
@@ -84,10 +79,6 @@
public Integer getPtzSpeed() {
return ptzSpeed;
- }
-
- public Integer getKeepaliveTimeOut() {
- return keepaliveTimeOut;
}
public Integer getRegisterTimeInterval() {
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java b/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java
index f3c8bed..152c7fd 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java
@@ -11,6 +11,7 @@
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
+import org.springframework.util.ObjectUtils;
import javax.sip.*;
import java.util.*;
@@ -107,6 +108,9 @@
}
public SipProviderImpl getUdpSipProvider(String ip) {
+ if (ObjectUtils.isEmpty(ip)) {
+ return null;
+ }
return udpSipProviderMap.get(ip);
}
@@ -125,6 +129,9 @@
}
public SipProviderImpl getTcpSipProvider(String ip) {
+ if (ObjectUtils.isEmpty(ip)) {
+ return null;
+ }
return tcpSipProviderMap.get(ip);
}
}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPSender.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPSender.java
index a240ce4..f723a6f 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPSender.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPSender.java
@@ -5,22 +5,19 @@
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.utils.GitUtil;
import gov.nist.javax.sip.SipProviderImpl;
-import gov.nist.javax.sip.message.SIPRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
+import org.springframework.util.ObjectUtils;
import javax.sip.SipException;
-import javax.sip.SipFactory;
import javax.sip.header.CallIdHeader;
import javax.sip.header.UserAgentHeader;
import javax.sip.header.ViaHeader;
import javax.sip.message.Message;
import javax.sip.message.Request;
import javax.sip.message.Response;
-import java.net.InetAddress;
import java.text.ParseException;
/**
@@ -109,6 +106,10 @@
}
public CallIdHeader getNewCallIdHeader(String ip, String transport){
+ if (ObjectUtils.isEmpty(ip) || ObjectUtils.isEmpty(transport)) {
+ return transport.equalsIgnoreCase("TCP") ? sipLayer.getTcpSipProvider().getNewCallId()
+ : sipLayer.getUdpSipProvider().getNewCallId();
+ }
return transport.equalsIgnoreCase("TCP") ? sipLayer.getTcpSipProvider(ip).getNewCallId()
: sipLayer.getUdpSipProvider(ip).getNewCallId();
}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java
index ba65581..3d5c294 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java
@@ -1,14 +1,15 @@
package com.genersoft.iot.vmp.gb28181.transmit.callback;
-import java.util.HashMap;
+import com.alibaba.fastjson2.JSON;
+import com.genersoft.iot.vmp.vmanager.bean.DeferredResultEx;
+import org.springframework.stereotype.Component;
+import org.springframework.util.ObjectUtils;
+import org.springframework.web.context.request.async.DeferredResult;
+
+import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-
-import org.springframework.http.HttpStatus;
-import org.springframework.http.ResponseEntity;
-import org.springframework.stereotype.Component;
-import org.springframework.web.context.request.async.DeferredResult;
/**
* @description: 寮傛璇锋眰澶勭悊
@@ -51,31 +52,48 @@
public static final String CALLBACK_CMD_BROADCAST = "CALLBACK_BROADCAST";
- private Map<String, Map<String, DeferredResult>> map = new ConcurrentHashMap<>();
+ private Map<String, Map<String, DeferredResultEx>> map = new ConcurrentHashMap<>();
- public void put(String key, String id, DeferredResult result) {
- Map<String, DeferredResult> deferredResultMap = map.get(key);
+ public void put(String key, String id, DeferredResultEx result) {
+ Map<String, DeferredResultEx> deferredResultMap = map.get(key);
if (deferredResultMap == null) {
deferredResultMap = new ConcurrentHashMap<>();
map.put(key, deferredResultMap);
}
deferredResultMap.put(id, result);
}
-
- public DeferredResult get(String key, String id) {
- Map<String, DeferredResult> deferredResultMap = map.get(key);
+
+ public void put(String key, String id, DeferredResult result) {
+ Map<String, DeferredResultEx> deferredResultMap = map.get(key);
if (deferredResultMap == null) {
+ deferredResultMap = new ConcurrentHashMap<>();
+ map.put(key, deferredResultMap);
+ }
+ deferredResultMap.put(id, new DeferredResultEx(result));
+ }
+
+ public DeferredResultEx get(String key, String id) {
+ Map<String, DeferredResultEx> deferredResultMap = map.get(key);
+ if (deferredResultMap == null || ObjectUtils.isEmpty(id)) {
return null;
}
return deferredResultMap.get(id);
+ }
+
+ public Collection<DeferredResultEx> getAllByKey(String key) {
+ Map<String, DeferredResultEx> deferredResultMap = map.get(key);
+ if (deferredResultMap == null) {
+ return null;
+ }
+ return deferredResultMap.values();
}
public boolean exist(String key, String id){
if (key == null) {
return false;
}
- Map<String, DeferredResult> deferredResultMap = map.get(key);
+ Map<String, DeferredResultEx> deferredResultMap = map.get(key);
if (id == null) {
return deferredResultMap != null;
}else {
@@ -88,15 +106,15 @@
* @param msg
*/
public void invokeResult(RequestMessage msg) {
- Map<String, DeferredResult> deferredResultMap = map.get(msg.getKey());
+ Map<String, DeferredResultEx> deferredResultMap = map.get(msg.getKey());
if (deferredResultMap == null) {
return;
}
- DeferredResult result = deferredResultMap.get(msg.getId());
+ DeferredResultEx result = deferredResultMap.get(msg.getId());
if (result == null) {
return;
}
- result.setResult(msg.getData());
+ result.getDeferredResult().setResult(msg.getData());
deferredResultMap.remove(msg.getId());
if (deferredResultMap.size() == 0) {
map.remove(msg.getKey());
@@ -108,18 +126,27 @@
* @param msg
*/
public void invokeAllResult(RequestMessage msg) {
- Map<String, DeferredResult> deferredResultMap = map.get(msg.getKey());
+ Map<String, DeferredResultEx> deferredResultMap = map.get(msg.getKey());
if (deferredResultMap == null) {
return;
}
Set<String> ids = deferredResultMap.keySet();
for (String id : ids) {
- DeferredResult result = deferredResultMap.get(id);
+ DeferredResultEx result = deferredResultMap.get(id);
if (result == null) {
return;
}
- result.setResult(msg.getData());
+ if (result.getFilter() != null) {
+ Object handler = result.getFilter().handler(msg.getData());
+ System.out.println(JSON.toJSONString(handler));
+ result.getDeferredResult().setResult(handler);
+ }else {
+ result.getDeferredResult().setResult(msg.getData());
+ }
+
}
map.remove(msg.getKey());
}
+
+
}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
index ea8275f..8817122 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -8,14 +8,13 @@
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.SIPSender;
-import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
-import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
+import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.*;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IPlayService;
@@ -39,9 +38,10 @@
import org.springframework.stereotype.Component;
import javax.sdp.*;
-import javax.sip.*;
+import javax.sip.InvalidArgumentException;
+import javax.sip.RequestEvent;
+import javax.sip.SipException;
import javax.sip.header.CallIdHeader;
-import javax.sip.message.Request;
import javax.sip.message.Response;
import java.text.ParseException;
import java.time.Instant;
@@ -479,7 +479,7 @@
playService.play(mediaServerItem, ssrcInfo, device, channelId, hookEvent, errorEvent, (code, msg) -> {
logger.info("[涓婄骇鐐规挱]瓒呮椂, 鐢ㄦ埛锛歿}锛� 閫氶亾锛歿}", username, channelId);
redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null);
- }, null);
+ });
} else {
sendRtpItem.setStreamId(playTransaction.getStream());
// 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶�
diff --git a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java
index 1991392..8b3984f 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java
@@ -13,7 +13,6 @@
import com.genersoft.iot.vmp.service.bean.PlayBackCallback;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
-import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult;
import org.springframework.web.context.request.async.DeferredResult;
import javax.sip.InvalidArgumentException;
@@ -25,12 +24,12 @@
*/
public interface IPlayService {
- void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String uuid);
+ void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId);
void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
- InviteTimeOutCallback timeoutCallback, String uuid);
- PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId, ZlmHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent, Runnable timeoutCallback);
+ InviteTimeOutCallback timeoutCallback);
+ void play(MediaServerItem mediaServerItem, String deviceId, String channelId, ZlmHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent, Runnable timeoutCallback);
MediaServerItem getNewMediaServerItem(Device device);
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
index f899db3..3bbe88b 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -1,49 +1,28 @@
package com.genersoft.iot.vmp.service.impl;
-import java.math.BigDecimal;
-import java.math.RoundingMode;
-import java.text.ParseException;
-import java.util.*;
-
-import javax.sip.InvalidArgumentException;
-import javax.sip.ResponseEvent;
-import javax.sip.SipException;
-
-import com.genersoft.iot.vmp.conf.exception.ControllerException;
-import com.genersoft.iot.vmp.conf.exception.ServiceException;
-import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
-import com.genersoft.iot.vmp.gb28181.bean.*;
-import com.genersoft.iot.vmp.service.IDeviceService;
-import com.genersoft.iot.vmp.utils.redis.RedisUtil;
-import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-import org.springframework.stereotype.Service;
-import org.springframework.util.ObjectUtils;
-import org.springframework.web.context.request.async.DeferredResult;
-
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.conf.exception.ControllerException;
+import com.genersoft.iot.vmp.conf.exception.ServiceException;
+import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
+import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
+import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils;
+import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
+import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
-import com.genersoft.iot.vmp.utils.DateUtil;
-import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils;
-import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
-import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
+import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IMediaService;
import com.genersoft.iot.vmp.service.IPlayService;
@@ -53,8 +32,27 @@
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
+import com.genersoft.iot.vmp.utils.DateUtil;
+import com.genersoft.iot.vmp.utils.redis.RedisUtil;
+import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
-import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.stereotype.Service;
+import org.springframework.util.ObjectUtils;
+import org.springframework.web.context.request.async.DeferredResult;
+
+import javax.sip.InvalidArgumentException;
+import javax.sip.ResponseEvent;
+import javax.sip.SipException;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.text.ParseException;
+import java.util.List;
+import java.util.UUID;
@SuppressWarnings(value = {"rawtypes", "unchecked"})
@Service
@@ -111,46 +109,19 @@
private ThreadPoolTaskExecutor taskExecutor;
@Override
- public PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId,
- ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
- Runnable timeoutCallback) {
+ public void play(MediaServerItem mediaServerItem, String deviceId, String channelId,
+ ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
+ Runnable timeoutCallback) {
if (mediaServerItem == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "鏈壘鍒板彲鐢ㄧ殑zlm");
}
- PlayResult playResult = new PlayResult();
+
RequestMessage msg = new RequestMessage();
String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId;
msg.setKey(key);
- String uuid = UUID.randomUUID().toString();
- msg.setId(uuid);
- playResult.setUuid(uuid);
- DeferredResult<WVPResult<StreamInfo>> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
- playResult.setResult(result);
- // 褰曞儚鏌ヨ浠hannelId浣滀负deviceId鏌ヨ
- resultHolder.put(key, uuid, result);
Device device = redisCatchStorage.getDevice(deviceId);
StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
- playResult.setDevice(device);
-
- result.onCompletion(() -> {
- // 鐐规挱缁撴潫鏃惰皟鐢ㄦ埅鍥炬帴鍙�
- taskExecutor.execute(() -> {
- // TODO 搴旇鍦ㄤ笂娴佹椂璋冪敤鏇村ソ锛岀粨鏉熶篃鍙兘鏄敊璇粨鏉�
- String path = "snap";
- String fileName = deviceId + "_" + channelId + ".jpg";
- WVPResult wvpResult = (WVPResult) result.getResult();
- if (Objects.requireNonNull(wvpResult).getCode() == 0) {
- StreamInfo streamInfoForSuccess = (StreamInfo) wvpResult.getData();
- MediaServerItem mediaInfo = mediaServerService.getOne(streamInfoForSuccess.getMediaServerId());
- String streamUrl = streamInfoForSuccess.getFmp4().getUrl();
-
- // 璇锋眰鎴浘
- logger.info("[璇锋眰鎴浘]: " + fileName);
- zlmresTfulUtils.getSnap(mediaInfo, streamUrl, 15, 1, path, fileName);
- }
- });
- });
if (streamInfo != null) {
String streamId = streamInfo.getStream();
@@ -160,7 +131,7 @@
wvpResult.setMsg("鐐规挱澶辫触锛� redis缂撳瓨streamId绛変簬null");
msg.setData(wvpResult);
resultHolder.invokeAllResult(msg);
- return playResult;
+ return;
}
String mediaServerId = streamInfo.getMediaServerId();
MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
@@ -178,14 +149,13 @@
msg.setData(wvpResult);
resultHolder.invokeAllResult(msg);
- return playResult;
+ return;
} else {
WVPResult wvpResult = new WVPResult();
wvpResult.setCode(ErrorCode.SUCCESS.getCode());
wvpResult.setMsg(ErrorCode.SUCCESS.getMsg());
wvpResult.setData(streamInfo);
msg.setData(wvpResult);
-
resultHolder.invokeAllResult(msg);
if (hookEvent != null) {
hookEvent.response(mediaServerItem, JSON.parseObject(JSON.toJSONString(streamInfo)));
@@ -211,7 +181,6 @@
streamId = String.format("%s_%s", device.getDeviceId(), channelId);
}
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck(), false);
- logger.info(JSONObject.toJSONString(ssrcInfo));
play(mediaServerItem, ssrcInfo, device, channelId, (mediaServerItemInUse, response) -> {
if (hookEvent != null) {
hookEvent.response(mediaServerItem, response);
@@ -238,16 +207,15 @@
msg.setData(wvpResult);
// 鍥炲涔嬪墠鎵�鏈夌殑鐐规挱璇锋眰
resultHolder.invokeAllResult(msg);
- }, uuid);
+ });
}
- return playResult;
}
@Override
public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
- InviteTimeOutCallback timeoutCallback, String uuid) {
+ InviteTimeOutCallback timeoutCallback) {
String streamId = null;
if (mediaServerItem.isRtpEnable()) {
@@ -281,6 +249,16 @@
//绔彛鑾峰彇澶辫触鐨剆srcInfo 娌℃湁蹇呰鍙戦�佺偣鎾寚浠�
if (ssrcInfo.getPort() <= 0) {
logger.info("[鐐规挱绔彛鍒嗛厤寮傚父]锛宒eviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channelId, ssrcInfo);
+ dynamicTask.stop(timeOutTaskKey);
+ // 閲婃斁ssrc
+ mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
+
+ streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
+
+ RequestMessage msg = new RequestMessage();
+ msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + device.getDeviceId() + channelId);
+ msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), "鐐规挱绔彛鍒嗛厤寮傚父"));
+ resultHolder.invokeAllResult(msg);
return;
}
try {
@@ -289,9 +267,15 @@
System.out.println("鍋滄瓒呮椂浠诲姟锛� " + timeOutTaskKey);
dynamicTask.stop(timeOutTaskKey);
// hook鍝嶅簲
- onPublishHandlerForPlay(mediaServerItemInuse, response, device.getDeviceId(), channelId, uuid);
+ onPublishHandlerForPlay(mediaServerItemInuse, response, device.getDeviceId(), channelId);
hookEvent.response(mediaServerItemInuse, response);
logger.info("[鐐规挱鎴愬姛] deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
+ String streamUrl = String.format("rtsp://127.0.0.1:%s/%s/%s", mediaServerItemInuse.getRtspPort(), "rtp", stream);
+ String path = "snap";
+ String fileName = device.getDeviceId() + "_" + channelId + ".jpg";
+ // 璇锋眰鎴浘
+ logger.info("[璇锋眰鎴浘]: " + fileName);
+ zlmresTfulUtils.getSnap(mediaServerItemInuse, streamUrl, 15, 1, path, fileName);
}, (event) -> {
ResponseEvent responseEvent = (ResponseEvent) event.event;
@@ -331,7 +315,7 @@
logger.info("[ZLM HOOK] ssrc淇鍚庢敹鍒拌闃呮秷鎭細 " + response.toJSONString());
dynamicTask.stop(timeOutTaskKey);
// hook鍝嶅簲
- onPublishHandlerForPlay(mediaServerItemInUse, response, device.getDeviceId(), channelId, uuid);
+ onPublishHandlerForPlay(mediaServerItemInUse, response, device.getDeviceId(), channelId);
hookEvent.response(mediaServerItemInUse, response);
});
}
@@ -367,13 +351,41 @@
}
@Override
- public void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String uuid) {
+ public void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId) {
+ StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId);
RequestMessage msg = new RequestMessage();
- if (uuid != null) {
+ msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId);
+ if (streamInfo != null) {
+ DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
+ if (deviceChannel != null) {
+ deviceChannel.setStreamId(streamInfo.getStream());
+ storager.startPlay(deviceId, channelId, streamInfo.getStream());
+ }
+ redisCatchStorage.startPlay(streamInfo);
+
+ WVPResult wvpResult = new WVPResult();
+ wvpResult.setCode(ErrorCode.SUCCESS.getCode());
+ wvpResult.setMsg(ErrorCode.SUCCESS.getMsg());
+ wvpResult.setData(streamInfo);
+
+ msg.setData(wvpResult);
+ resultHolder.invokeAllResult(msg);
+
+ } else {
+ logger.warn("璁惧棰勮API璋冪敤澶辫触锛�");
+ msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), "璁惧棰勮API璋冪敤澶辫触锛�"));
+ resultHolder.invokeAllResult(msg);
+ }
+ }
+
+ private void onPublishHandlerForPlayback(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String uuid) {
+ RequestMessage msg = new RequestMessage();
+ msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId);
+ if (!ObjectUtils.isEmpty(uuid)) {
msg.setId(uuid);
}
- msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId);
StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId);
+
if (streamInfo != null) {
DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
if (deviceChannel != null) {
@@ -390,8 +402,8 @@
resultHolder.invokeAllResult(msg);
} else {
- logger.warn("璁惧棰勮API璋冪敤澶辫触锛�");
- msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), "璁惧棰勮API璋冪敤澶辫触锛�"));
+ logger.warn("褰曞儚鍥炴斁璋冪敤澶辫触锛�");
+ msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), "褰曞儚鍥炴斁璋冪敤澶辫触锛�"));
resultHolder.invokeAllResult(msg);
}
}
@@ -545,7 +557,7 @@
logger.info("[ZLM HOOK] ssrc淇鍚庢敹鍒拌闃呮秷鎭細 " + response.toJSONString());
dynamicTask.stop(playBackTimeOutTaskKey);
// hook鍝嶅簲
- onPublishHandlerForPlay(mediaServerItemInUse, response, device.getDeviceId(), channelId, uuid);
+ onPublishHandlerForPlayback(mediaServerItemInUse, response, device.getDeviceId(), channelId, uuid);
hookEvent.call(new InviteStreamInfo(mediaServerItem, null, eventResult.callId, "rtp", ssrcInfo.getStream()));
});
}
@@ -568,6 +580,8 @@
return result;
}
+
+
@Override
public DeferredResult<WVPResult<StreamInfo>> download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack) {
Device device = storager.queryVideoDevice(deviceId);
diff --git a/src/main/java/com/genersoft/iot/vmp/utils/redis/FastJsonRedisSerializer.java b/src/main/java/com/genersoft/iot/vmp/utils/redis/FastJsonRedisSerializer.java
index 81e6249..466a503 100644
--- a/src/main/java/com/genersoft/iot/vmp/utils/redis/FastJsonRedisSerializer.java
+++ b/src/main/java/com/genersoft/iot/vmp/utils/redis/FastJsonRedisSerializer.java
@@ -1,14 +1,12 @@
package com.genersoft.iot.vmp.utils.redis;
-import java.nio.charset.Charset;
-
+import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONReader;
import com.alibaba.fastjson2.JSONWriter;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.SerializationException;
-import com.alibaba.fastjson2.JSON;
-import com.alibaba.fastjson2.JSONWriter.Feature;
+import java.nio.charset.Charset;
/**
* @description:浣跨敤fastjson瀹炵幇redis鐨勫簭鍒楀寲
@@ -31,7 +29,7 @@
if (t == null) {
return new byte[0];
}
- return JSON.toJSONString(t, JSONWriter.Feature.WriteClassName).getBytes(DEFAULT_CHARSET);
+ return JSON.toJSONString(t, JSONWriter.Feature.WriteClassName, JSONWriter.Feature.WritePairAsJavaBean).getBytes(DEFAULT_CHARSET);
}
@Override
@@ -42,4 +40,6 @@
String str = new String(bytes, DEFAULT_CHARSET);
return JSON.parseObject(str, clazz, JSONReader.Feature.SupportAutoType);
}
+
+
}
diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/bean/DeferredResultEx.java b/src/main/java/com/genersoft/iot/vmp/vmanager/bean/DeferredResultEx.java
new file mode 100644
index 0000000..0b9d3d9
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/vmanager/bean/DeferredResultEx.java
@@ -0,0 +1,31 @@
+package com.genersoft.iot.vmp.vmanager.bean;
+
+import org.springframework.web.context.request.async.DeferredResult;
+
+public class DeferredResultEx<T> {
+
+ private DeferredResult<T> deferredResult;
+
+ private DeferredResultFilter filter;
+
+ public DeferredResultEx(DeferredResult<T> result) {
+ this.deferredResult = result;
+ }
+
+
+ public DeferredResult<T> getDeferredResult() {
+ return deferredResult;
+ }
+
+ public void setDeferredResult(DeferredResult<T> deferredResult) {
+ this.deferredResult = deferredResult;
+ }
+
+ public DeferredResultFilter getFilter() {
+ return filter;
+ }
+
+ public void setFilter(DeferredResultFilter filter) {
+ this.filter = filter;
+ }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/bean/DeferredResultFilter.java b/src/main/java/com/genersoft/iot/vmp/vmanager/bean/DeferredResultFilter.java
new file mode 100644
index 0000000..18c2240
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/vmanager/bean/DeferredResultFilter.java
@@ -0,0 +1,6 @@
+package com.genersoft.iot.vmp.vmanager.bean;
+
+public interface DeferredResultFilter {
+
+ Object handler(Object o);
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java
index ae76b95..87e203a 100644
--- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java
+++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java
@@ -1,44 +1,36 @@
package com.genersoft.iot.vmp.vmanager.gb28181.play;
import com.alibaba.fastjson2.JSONArray;
+import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
+import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
-import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
+import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
-import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
-import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
-import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
-import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult;
import com.genersoft.iot.vmp.service.IMediaService;
import com.genersoft.iot.vmp.service.IPlayService;
-
+import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
+import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
+import com.genersoft.iot.vmp.vmanager.bean.DeferredResultEx;
+import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
+import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.web.bind.annotation.CrossOrigin;
-import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.PathVariable;
-import org.springframework.web.bind.annotation.PostMapping;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RestController;
-
-import com.alibaba.fastjson2.JSONObject;
-import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
-import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
+import org.springframework.web.bind.annotation.*;
import org.springframework.web.context.request.async.DeferredResult;
-import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
@@ -91,16 +83,52 @@
public DeferredResult<WVPResult<StreamInfo>> play(HttpServletRequest request, @PathVariable String deviceId,
@PathVariable String channelId) {
+ String localAddr = request.getLocalAddr();
+ String localName = request.getLocalName();
+ String remoteHost = request.getRemoteHost();
+ String remoteAddr = request.getRemoteAddr();
+ String remoteUser = request.getRemoteUser();
+ String requestURI = request.getRequestURI();
+ System.out.println(3333333);
+ System.out.println(localAddr);
+ System.out.println(localName);
+ System.out.println(remoteHost);
+ System.out.println(remoteAddr);
+ System.out.println(remoteUser);
+ System.out.println(requestURI);
+ System.out.println(4444444);
// 鑾峰彇鍙敤鐨剒lm
Device device = storager.queryVideoDevice(deviceId);
MediaServerItem newMediaServerItem = playService.getNewMediaServerItem(device);
- PlayResult playResult = playService.play(newMediaServerItem, deviceId, channelId, null, null, null);
- playResult.getResult().onCompletion(()->{
- WVPResult<StreamInfo> result = (WVPResult<StreamInfo>)playResult.getResult().getResult();
- result.getData().channgeStreamIp(request.getLocalAddr());
- playResult.getResult().setResult(result);
+
+ RequestMessage msg = new RequestMessage();
+ String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId;
+ boolean exist = resultHolder.exist(key, null);
+ msg.setKey(key);
+ String uuid = UUID.randomUUID().toString();
+ msg.setId(uuid);
+ DeferredResult<WVPResult<StreamInfo>> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
+ DeferredResultEx<WVPResult<StreamInfo>> deferredResultEx = new DeferredResultEx<>(result);
+
+ deferredResultEx.setFilter(result1 -> {
+ System.out.println(1111);
+ System.out.println(request.getLocalName());
+ WVPResult<StreamInfo> wvpResult = (WVPResult<StreamInfo>)result1;
+ if (wvpResult.getCode() == ErrorCode.SUCCESS.getCode()) {
+ StreamInfo data = wvpResult.getData();
+ data.channgeStreamIp(request.getLocalName());
+ ((WVPResult<StreamInfo>)result1).setData(data);
+ }
+ return result1;
});
- return playResult.getResult();
+
+ // 褰曞儚鏌ヨ浠hannelId浣滀负deviceId鏌ヨ
+ resultHolder.put(key, uuid, deferredResultEx);
+
+ if (!exist) {
+ playService.play(newMediaServerItem, deviceId, channelId, null, null, null);
+ }
+ return result;
}
diff --git a/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java b/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java
index 0f003c7..ab769f5 100644
--- a/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java
+++ b/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java
@@ -112,7 +112,7 @@
return resultDeferredResult;
}
MediaServerItem newMediaServerItem = playService.getNewMediaServerItem(device);
- PlayResult play = playService.play(newMediaServerItem, serial, code, (mediaServerItem, response)->{
+ playService.play(newMediaServerItem, serial, code, (mediaServerItem, response)->{
StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(serial, code);
JSONObject result = new JSONObject();
result.put("StreamID", streamInfo.getStream());
diff --git a/src/main/resources/all-application.yml b/src/main/resources/all-application.yml
index a620859..759b744 100644
--- a/src/main/resources/all-application.yml
+++ b/src/main/resources/all-application.yml
@@ -105,8 +105,6 @@
id: 44010200492000000001
# [鍙�塢 榛樿璁惧璁よ瘉瀵嗙爜锛屽悗缁墿灞曚娇鐢ㄨ澶囧崟鐙瘑鐮�, 绉婚櫎瀵嗙爜灏嗕笉杩涜鏍¢獙
password: admin123
- # [鍙�塢 蹇冭烦瓒呮椂鏃堕棿锛� 寤鸿璁剧疆涓哄績璺冲懆鏈熺殑涓夊��
- keepalive-timeout: 255
# [鍙�塢 鍥芥爣绾ц仈娉ㄥ唽澶辫触锛屽啀娆″彂璧锋敞鍐岀殑鏃堕棿闂撮殧銆� 榛樿60绉�
register-time-interval: 60
# [鍙�塢 浜戝彴鎺у埗閫熷害
--
Gitblit v1.8.0