From d41d6b34af2485198ed01e1888db1571e4da1a6a Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期二, 23 四月 2024 20:59:20 +0800
Subject: [PATCH] Merge branch 'refs/heads/2.7.0'
---
src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java | 20
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java | 17
src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcMessage.java | 24
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java | 65 +
src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java | 9
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java | 60 +
src/main/java/com/genersoft/iot/vmp/vmanager/log/LogController.java | 11
src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java | 29
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java | 2
src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcResponse.java | 99 +
数据库/2.7.0/更新-mysql-2.7.0.sql | 12
src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java | 30
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java | 78 +
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java | 22
src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java | 22
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java | 13
src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java | 2
src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java | 1
src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java | 4
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ChannelOnlineEvent.java | 4
数据库/2.7.0/更新-postgresql-kingbase-2.7.0.sql | 12
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java | 3
src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcRequest.java | 93 +
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java | 363 +----
src/main/java/com/genersoft/iot/vmp/gb28181/utils/XmlUtil.java | 7
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java | 12
src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java | 225 ++++
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java | 0
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java | 199 +++
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java | 23
src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java | 3
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java | 511 ++++-----
/dev/null | 97 -
web_src/src/components/channelList.vue | 12
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java | 357 ++++++
src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java | 304 +++++
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java | 4
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java | 276 ++--
src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java | 155 ++
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java | 9
src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java | 1
src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java | 29
42 files changed, 2,374 insertions(+), 845 deletions(-)
diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
index df230d4..c4911e4 100644
--- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
+++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
@@ -72,6 +72,9 @@
public static final String REGISTER_EXPIRE_TASK_KEY_PREFIX = "VMP_device_register_expire_";
public static final String PUSH_STREAM_LIST = "VMP_PUSH_STREAM_LIST_";
+ public static final String WAITE_SEND_PUSH_STREAM = "VMP_WAITE_SEND_PUSH_STREAM:";
+ public static final String START_SEND_PUSH_STREAM = "VMP_START_SEND_PUSH_STREAM:";
+ public static final String PUSH_STREAM_ONLINE = "VMP_PUSH_STREAM_ONLINE:";
diff --git a/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java b/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java
index a9f5c88..a9b17ae 100644
--- a/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java
+++ b/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java
@@ -66,7 +66,7 @@
private List<String> allowedOrigins = new ArrayList<>();
- private int maxNotifyCountQueue = 10000;
+ private int maxNotifyCountQueue = 100000;
private int registerAgainAfterTime = 60;
diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java
index c14ebcd..dcf2830 100644
--- a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java
+++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java
@@ -29,25 +29,21 @@
private RedisAlarmMsgListener redisAlarmMsgListener;
@Autowired
- private RedisStreamMsgListener redisStreamMsgListener;
-
- @Autowired
- private RedisGbPlayMsgListener redisGbPlayMsgListener;
-
- @Autowired
private RedisPushStreamStatusMsgListener redisPushStreamStatusMsgListener;
@Autowired
private RedisPushStreamStatusListMsgListener redisPushStreamListMsgListener;
- @Autowired
- private RedisPushStreamResponseListener redisPushStreamResponseListener;
@Autowired
private RedisCloseStreamMsgListener redisCloseStreamMsgListener;
+
@Autowired
- private RedisPushStreamCloseResponseListener redisPushStreamCloseResponseListener;
+ private RedisRpcConfig redisRpcConfig;
+
+ @Autowired
+ private RedisPushStreamResponseListener redisPushStreamCloseResponseListener;
/**
@@ -64,13 +60,11 @@
container.setConnectionFactory(connectionFactory);
container.addMessageListener(redisGPSMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_GPS));
container.addMessageListener(redisAlarmMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_SUBSCRIBE_ALARM_RECEIVE));
- container.addMessageListener(redisStreamMsgListener, new PatternTopic(VideoManagerConstants.WVP_MSG_STREAM_CHANGE_PREFIX + "PUSH"));
- container.addMessageListener(redisGbPlayMsgListener, new PatternTopic(RedisGbPlayMsgListener.WVP_PUSH_STREAM_KEY));
container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE));
container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE));
- container.addMessageListener(redisPushStreamResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE));
container.addMessageListener(redisCloseStreamMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE));
- container.addMessageListener(redisPushStreamCloseResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED));
+ container.addMessageListener(redisRpcConfig, new PatternTopic(RedisRpcConfig.REDIS_REQUEST_CHANNEL_KEY));
+ container.addMessageListener(redisPushStreamCloseResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE));
return container;
}
}
diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java
new file mode 100644
index 0000000..661e370
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java
@@ -0,0 +1,225 @@
+package com.genersoft.iot.vmp.conf.redis;
+
+import com.alibaba.fastjson2.JSON;
+import com.genersoft.iot.vmp.common.CommonCallback;
+import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage;
+import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
+import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
+import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
+import com.genersoft.iot.vmp.service.redisMsg.control.RedisRpcController;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.stereotype.Component;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+
+@Component
+public class RedisRpcConfig implements MessageListener {
+
+ private final static Logger logger = LoggerFactory.getLogger(RedisRpcConfig.class);
+
+ public final static String REDIS_REQUEST_CHANNEL_KEY = "WVP_REDIS_REQUEST_CHANNEL_KEY";
+
+ private final Random random = new Random();
+
+ @Autowired
+ private UserSetting userSetting;
+
+ @Autowired
+ private RedisRpcController redisRpcController;
+
+ @Autowired
+ private RedisTemplate<Object, Object> redisTemplate;
+
+ @Autowired
+ private ZlmHttpHookSubscribe hookSubscribe;
+
+ private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
+
+ @Qualifier("taskExecutor")
+ @Autowired
+ private ThreadPoolTaskExecutor taskExecutor;
+
+ @Override
+ public void onMessage(Message message, byte[] pattern) {
+ boolean isEmpty = taskQueue.isEmpty();
+ taskQueue.offer(message);
+ if (isEmpty) {
+ taskExecutor.execute(() -> {
+ while (!taskQueue.isEmpty()) {
+ Message msg = taskQueue.poll();
+ try {
+ RedisRpcMessage redisRpcMessage = JSON.parseObject(new String(msg.getBody()), RedisRpcMessage.class);
+ if (redisRpcMessage.getRequest() != null) {
+ handlerRequest(redisRpcMessage.getRequest());
+ } else if (redisRpcMessage.getResponse() != null){
+ handlerResponse(redisRpcMessage.getResponse());
+ } else {
+ logger.error("[redis rpc 瑙f瀽澶辫触] {}", JSON.toJSONString(redisRpcMessage));
+ }
+ } catch (Exception e) {
+ logger.error("[redis rpc 瑙f瀽寮傚父] ", e);
+ }
+ }
+ });
+ }
+ }
+
+ private void handlerResponse(RedisRpcResponse response) {
+ if (userSetting.getServerId().equals(response.getToId())) {
+ return;
+ }
+ logger.info("[redis-rpc] << {}", response);
+ response(response);
+ }
+
+ private void handlerRequest(RedisRpcRequest request) {
+ try {
+ if (userSetting.getServerId().equals(request.getFromId())) {
+ return;
+ }
+ logger.info("[redis-rpc] << {}", request);
+ Method method = getMethod(request.getUri());
+ // 娌℃湁鎼哄甫鐩爣ID鐨勫彲浠ョ悊瑙d负鍝釜wvp鏈夌粨鏋滃氨鍝釜鍥炲锛屾惡甯︾洰鏍嘔D锛屼絾鏄鏋滄槸涓嶅瓨鍦ㄧ殑uri鍒欑洿鎺ュ洖澶�404
+ if (userSetting.getServerId().equals(request.getToId())) {
+ if (method == null) {
+ // 鍥炲404缁撴灉
+ RedisRpcResponse response = request.getResponse();
+ response.setStatusCode(404);
+ sendResponse(response);
+ return;
+ }
+ RedisRpcResponse response = (RedisRpcResponse)method.invoke(redisRpcController, request);
+ if(response != null) {
+ sendResponse(response);
+ }
+ }else {
+ if (method == null) {
+ return;
+ }
+ RedisRpcResponse response = (RedisRpcResponse)method.invoke(redisRpcController, request);
+ if (response != null) {
+ sendResponse(response);
+ }
+ }
+ }catch (InvocationTargetException | IllegalAccessException e) {
+ logger.error("[redis rpc ] 澶勭悊璇锋眰澶辫触 ", e);
+ }
+
+ }
+
+ private Method getMethod(String name) {
+ // 鍚姩鍚庢壂鎻忔墍鏈夌殑璺緞娉ㄨВ
+ Method[] methods = redisRpcController.getClass().getMethods();
+ for (Method method : methods) {
+ if (method.getName().equals(name)) {
+ return method;
+ }
+ }
+ return null;
+ }
+
+ private void sendResponse(RedisRpcResponse response){
+ logger.info("[redis-rpc] >> {}", response);
+ response.setToId(userSetting.getServerId());
+ RedisRpcMessage message = new RedisRpcMessage();
+ message.setResponse(response);
+ redisTemplate.convertAndSend(REDIS_REQUEST_CHANNEL_KEY, message);
+ }
+
+ private void sendRequest(RedisRpcRequest request){
+ logger.info("[redis-rpc] >> {}", request);
+ RedisRpcMessage message = new RedisRpcMessage();
+ message.setRequest(request);
+ redisTemplate.convertAndSend(REDIS_REQUEST_CHANNEL_KEY, message);
+ }
+
+
+ private final Map<Long, SynchronousQueue<RedisRpcResponse>> topicSubscribers = new ConcurrentHashMap<>();
+ private final Map<Long, CommonCallback<RedisRpcResponse>> callbacks = new ConcurrentHashMap<>();
+
+ public RedisRpcResponse request(RedisRpcRequest request, int timeOut) {
+ request.setSn((long) random.nextInt(1000) + 1);
+ SynchronousQueue<RedisRpcResponse> subscribe = subscribe(request.getSn());
+
+ try {
+ sendRequest(request);
+ return subscribe.poll(timeOut, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ logger.warn("[redis rpc timeout] uri: {}, sn: {}", request.getUri(), request.getSn(), e);
+ } finally {
+ this.unsubscribe(request.getSn());
+ }
+ return null;
+ }
+
+ public void request(RedisRpcRequest request, CommonCallback<RedisRpcResponse> callback) {
+ request.setSn((long) random.nextInt(1000) + 1);
+ setCallback(request.getSn(), callback);
+ sendRequest(request);
+ }
+
+ public Boolean response(RedisRpcResponse response) {
+ SynchronousQueue<RedisRpcResponse> queue = topicSubscribers.get(response.getSn());
+ CommonCallback<RedisRpcResponse> callback = callbacks.get(response.getSn());
+ if (queue != null) {
+ try {
+ return queue.offer(response, 2, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ logger.error("{}", e.getMessage(), e);
+ }
+ }else if (callback != null) {
+ callback.run(response);
+ callbacks.remove(response.getSn());
+ }
+ return false;
+ }
+
+ private void unsubscribe(long key) {
+ topicSubscribers.remove(key);
+ }
+
+
+ private SynchronousQueue<RedisRpcResponse> subscribe(long key) {
+ SynchronousQueue<RedisRpcResponse> queue = null;
+ if (!topicSubscribers.containsKey(key))
+ topicSubscribers.put(key, queue = new SynchronousQueue<>());
+ return queue;
+ }
+
+ private void setCallback(long key, CommonCallback<RedisRpcResponse> callback) {
+ // TODO 濡傛灉澶氫釜涓婄骇鐐规挱鍚屼竴涓�氶亾浼氭湁闂
+ callbacks.put(key, callback);
+ }
+
+ public void removeCallback(long key) {
+ callbacks.remove(key);
+ }
+
+
+ public int getCallbackCount(){
+ return callbacks.size();
+ }
+
+// @Scheduled(fixedRate = 1000) //姣�1绉掓墽琛屼竴娆�
+// public void execute(){
+// logger.info("callbacks鐨勯暱搴�: " + callbacks.size());
+// logger.info("闃熷垪鐨勯暱搴�: " + topicSubscribers.size());
+// logger.info("HOOK鐩戝惉鐨勯暱搴�: " + hookSubscribe.size());
+// logger.info("");
+// }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcMessage.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcMessage.java
new file mode 100644
index 0000000..061df6f
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcMessage.java
@@ -0,0 +1,24 @@
+package com.genersoft.iot.vmp.conf.redis.bean;
+
+public class RedisRpcMessage {
+
+ private RedisRpcRequest request;
+
+ private RedisRpcResponse response;
+
+ public RedisRpcRequest getRequest() {
+ return request;
+ }
+
+ public void setRequest(RedisRpcRequest request) {
+ this.request = request;
+ }
+
+ public RedisRpcResponse getResponse() {
+ return response;
+ }
+
+ public void setResponse(RedisRpcResponse response) {
+ this.response = response;
+ }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcRequest.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcRequest.java
new file mode 100644
index 0000000..a02db67
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcRequest.java
@@ -0,0 +1,93 @@
+package com.genersoft.iot.vmp.conf.redis.bean;
+
+/**
+ * 閫氳繃redis鍙戦�佽姹�
+ */
+public class RedisRpcRequest {
+
+ /**
+ * 鏉ヨ嚜鐨刉VP ID
+ */
+ private String fromId;
+
+
+ /**
+ * 鐩爣鐨刉VP ID
+ */
+ private String toId;
+
+ /**
+ * 搴忓垪鍙�
+ */
+ private long sn;
+
+ /**
+ * 璁块棶鐨勮矾寰�
+ */
+ private String uri;
+
+ /**
+ * 鍙傛暟
+ */
+ private Object param;
+
+ public String getFromId() {
+ return fromId;
+ }
+
+ public void setFromId(String fromId) {
+ this.fromId = fromId;
+ }
+
+ public String getToId() {
+ return toId;
+ }
+
+ public void setToId(String toId) {
+ this.toId = toId;
+ }
+
+ public String getUri() {
+ return uri;
+ }
+
+ public void setUri(String uri) {
+ this.uri = uri;
+ }
+
+ public Object getParam() {
+ return param;
+ }
+
+ public void setParam(Object param) {
+ this.param = param;
+ }
+
+ public long getSn() {
+ return sn;
+ }
+
+ public void setSn(long sn) {
+ this.sn = sn;
+ }
+
+ @Override
+ public String toString() {
+ return "RedisRpcRequest{" +
+ "uri='" + uri + '\'' +
+ ", fromId='" + fromId + '\'' +
+ ", toId='" + toId + '\'' +
+ ", sn=" + sn +
+ ", param=" + param +
+ '}';
+ }
+
+ public RedisRpcResponse getResponse() {
+ RedisRpcResponse response = new RedisRpcResponse();
+ response.setFromId(fromId);
+ response.setToId(toId);
+ response.setSn(sn);
+ response.setUri(uri);
+ return response;
+ }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcResponse.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcResponse.java
new file mode 100644
index 0000000..21f9e7e
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcResponse.java
@@ -0,0 +1,99 @@
+package com.genersoft.iot.vmp.conf.redis.bean;
+
+/**
+ * 閫氳繃redis鍙戦�佸洖澶�
+ */
+public class RedisRpcResponse {
+
+ /**
+ * 鏉ヨ嚜鐨刉VP ID
+ */
+ private String fromId;
+
+
+ /**
+ * 鐩爣鐨刉VP ID
+ */
+ private String toId;
+
+
+ /**
+ * 搴忓垪鍙�
+ */
+ private long sn;
+
+ /**
+ * 鐘舵�佺爜
+ */
+ private int statusCode;
+
+ /**
+ * 璁块棶鐨勮矾寰�
+ */
+ private String uri;
+
+ /**
+ * 鍙傛暟
+ */
+ private Object body;
+
+ public String getFromId() {
+ return fromId;
+ }
+
+ public void setFromId(String fromId) {
+ this.fromId = fromId;
+ }
+
+ public String getToId() {
+ return toId;
+ }
+
+ public void setToId(String toId) {
+ this.toId = toId;
+ }
+
+ public long getSn() {
+ return sn;
+ }
+
+ public void setSn(long sn) {
+ this.sn = sn;
+ }
+
+ public int getStatusCode() {
+ return statusCode;
+ }
+
+ public void setStatusCode(int statusCode) {
+ this.statusCode = statusCode;
+ }
+
+ public String getUri() {
+ return uri;
+ }
+
+ public void setUri(String uri) {
+ this.uri = uri;
+ }
+
+ public Object getBody() {
+ return body;
+ }
+
+ public void setBody(Object body) {
+ this.body = body;
+ }
+
+ @Override
+ public String toString() {
+ return "RedisRpcResponse{" +
+ "uri='" + uri + '\'' +
+ ", fromId='" + fromId + '\'' +
+ ", toId='" + toId + '\'' +
+ ", sn=" + sn +
+ ", statusCode=" + statusCode +
+ ", body=" + body +
+ '}';
+ }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java
index 32b6fac..8290a45 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java
@@ -187,6 +187,18 @@
private double latitude;
/**
+ * 缁忓害
+ */
+ @Schema(description = "鑷畾涔夌粡搴�")
+ private double customLongitude;
+
+ /**
+ * 绾害
+ */
+ @Schema(description = "鑷畾涔夌含搴�")
+ private double customLatitude;
+
+ /**
* 缁忓害 GCJ02
*/
@Schema(description = "GCJ02鍧愭爣绯荤粡搴�")
@@ -226,7 +238,7 @@
* 鏄惁鍚湁闊抽
*/
@Schema(description = "鏄惁鍚湁闊抽")
- private boolean hasAudio;
+ private Boolean hasAudio;
/**
* 鏍囪閫氶亾鐨勭被鍨嬶紝0->鍥芥爣閫氶亾 1->鐩存挱娴侀�氶亾 2->涓氬姟鍒嗙粍/铏氭嫙缁勭粐/琛屾斂鍖哄垝
@@ -586,4 +598,20 @@
public void setStreamIdentification(String streamIdentification) {
this.streamIdentification = streamIdentification;
}
+
+ public double getCustomLongitude() {
+ return customLongitude;
+ }
+
+ public void setCustomLongitude(double customLongitude) {
+ this.customLongitude = customLongitude;
+ }
+
+ public double getCustomLatitude() {
+ return customLatitude;
+ }
+
+ public void setCustomLatitude(double customLatitude) {
+ this.customLatitude = customLatitude;
+ }
}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java
index c133c82..55f09df 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java
@@ -2,6 +2,8 @@
import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
+import com.genersoft.iot.vmp.common.VideoManagerConstants;
+
public class SendRtpItem {
/**
@@ -23,6 +25,11 @@
* 骞冲彴id
*/
private String platformId;
+
+ /**
+ * 骞冲彴鍚嶇О
+ */
+ private String platformName;
/**
* 瀵瑰簲璁惧id
@@ -64,6 +71,11 @@
private boolean tcpActive;
/**
+ * 鑷繁鎺ㄦ祦浣跨敤鐨処P
+ */
+ private String localIp;
+
+ /**
* 鑷繁鎺ㄦ祦浣跨敤鐨勭鍙�
*/
private int localPort;
@@ -81,7 +93,7 @@
/**
* invite 鐨� callId
*/
- private String CallId;
+ private String callId;
/**
* invite 鐨� fromTag
@@ -124,6 +136,11 @@
*/
private String receiveStream;
+ /**
+ * 涓婄骇鐨勭偣鎾被鍨�
+ */
+ private String sessionName;
+
public static SendRtpItem getInstance(RequestPushStreamMsg requestPushStreamMsg) {
SendRtpItem sendRtpItem = new SendRtpItem();
sendRtpItem.setMediaServerId(requestPushStreamMsg.getMediaServerId());
@@ -138,7 +155,7 @@
sendRtpItem.setUsePs(requestPushStreamMsg.isPs());
sendRtpItem.setOnlyAudio(requestPushStreamMsg.isOnlyAudio());
return sendRtpItem;
-
+
}
public static SendRtpItem getInstance(String app, String stream, String ssrc, String dstIp, Integer dstPort, boolean tcp, int sendLocalPort, Integer pt) {
@@ -262,11 +279,11 @@
}
public String getCallId() {
- return CallId;
+ return callId;
}
public void setCallId(String callId) {
- CallId = callId;
+ this.callId = callId;
}
public InviteStreamType getPlayType() {
@@ -341,6 +358,30 @@
this.receiveStream = receiveStream;
}
+ public String getPlatformName() {
+ return platformName;
+ }
+
+ public void setPlatformName(String platformName) {
+ this.platformName = platformName;
+ }
+
+ public String getLocalIp() {
+ return localIp;
+ }
+
+ public void setLocalIp(String localIp) {
+ this.localIp = localIp;
+ }
+
+ public String getSessionName() {
+ return sessionName;
+ }
+
+ public void setSessionName(String sessionName) {
+ this.sessionName = sessionName;
+ }
+
@Override
public String toString() {
return "SendRtpItem{" +
@@ -348,6 +389,7 @@
", port=" + port +
", ssrc='" + ssrc + '\'' +
", platformId='" + platformId + '\'' +
+ ", platformName='" + platformName + '\'' +
", deviceId='" + deviceId + '\'' +
", app='" + app + '\'' +
", channelId='" + channelId + '\'' +
@@ -355,10 +397,11 @@
", stream='" + stream + '\'' +
", tcp=" + tcp +
", tcpActive=" + tcpActive +
+ ", localIp='" + localIp + '\'' +
", localPort=" + localPort +
", mediaServerId='" + mediaServerId + '\'' +
", serverId='" + serverId + '\'' +
- ", CallId='" + CallId + '\'' +
+ ", CallId='" + callId + '\'' +
", fromTag='" + fromTag + '\'' +
", toTag='" + toTag + '\'' +
", pt=" + pt +
@@ -367,6 +410,18 @@
", rtcp=" + rtcp +
", playType=" + playType +
", receiveStream='" + receiveStream + '\'' +
+ ", sessionName='" + sessionName + '\'' +
'}';
}
+
+ public String getRedisKey() {
+ String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX +
+ serverId + "_"
+ + mediaServerId + "_"
+ + platformId + "_"
+ + channelId + "_"
+ + stream + "_"
+ + callId;
+ return key;
+ }
}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java
index 557563e..18ad2b0 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java
@@ -49,6 +49,7 @@
ParentPlatform parentPlatform = null;
Map<String, List<ParentPlatform>> parentPlatformMap = new HashMap<>();
+ Map<String, DeviceChannel> channelMap = new HashMap<>();
if (!ObjectUtils.isEmpty(event.getPlatformId())) {
subscribe = subscribeHolder.getCatalogSubscribe(event.getPlatformId());
if (subscribe == null) {
@@ -67,6 +68,7 @@
for (DeviceChannel deviceChannel : event.getDeviceChannels()) {
List<ParentPlatform> parentPlatformsForGB = storager.queryPlatFormListForGBWithGBId(deviceChannel.getChannelId(), platforms);
parentPlatformMap.put(deviceChannel.getChannelId(), parentPlatformsForGB);
+ channelMap.put(deviceChannel.getChannelId(), deviceChannel);
}
}
}else if (event.getGbStreams() != null) {
@@ -174,7 +176,7 @@
}
logger.info("[Catalog浜嬩欢: {}]骞冲彴锛歿}锛屽奖鍝嶉�氶亾{}", event.getType(), platform.getServerGBId(), gbId);
List<DeviceChannel> deviceChannelList = new ArrayList<>();
- DeviceChannel deviceChannel = storager.queryChannelInParentPlatform(platform.getServerGBId(), gbId);
+ DeviceChannel deviceChannel = channelMap.get(gbId);
deviceChannelList.add(deviceChannel);
GbStream gbStream = storager.queryStreamInParentPlatform(platform.getServerGBId(), gbId);
if(gbStream != null){
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
index 1c8353d..9fb1d4d 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
@@ -592,6 +592,7 @@
Integer finalIndex = index;
String catalogXmlContent = getCatalogXmlContentForCatalogAddOrUpdate(parentPlatform, channels,
deviceChannels.size(), type, subscribeInfo);
+ System.out.println(catalogXmlContent);
logger.info("[鍙戦�丯OTIFY閫氱煡]绫诲瀷锛� {}锛屽彂閫佹暟閲忥細 {}", type, channels.size());
sendNotify(parentPlatform, catalogXmlContent, subscribeInfo, eventResult -> {
logger.error("鍙戦�丯OTIFY閫氱煡娑堟伅澶辫触銆傞敊璇細{} {}", eventResult.statusCode, eventResult.msg);
@@ -621,7 +622,6 @@
private String getCatalogXmlContentForCatalogAddOrUpdate(ParentPlatform parentPlatform, List<DeviceChannel> channels, int sumNum, String type, SubscribeInfo subscribeInfo) {
StringBuffer catalogXml = new StringBuffer(600);
-
String characterSet = parentPlatform.getCharacterSet();
catalogXml.append("<?xml version=\"1.0\" encoding=\"" + characterSet + "\"?>\r\n")
.append("<Notify>\r\n")
@@ -660,6 +660,8 @@
.append("<Owner> " + channel.getOwner()+ "</Owner>\r\n")
.append("<CivilCode>" + channel.getCivilCode() + "</CivilCode>\r\n")
.append("<Address>" + channel.getAddress() + "</Address>\r\n");
+ catalogXml.append("<Longitude>" + channel.getLongitude() + "</Longitude>\r\n");
+ catalogXml.append("<Latitude>" + channel.getLatitude() + "</Latitude>\r\n");
}
if (!"presence".equals(subscribeInfo.getEventType())) {
catalogXml.append("<Event>" + type + "</Event>\r\n");
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java
index 7cbfe70..f3f7431 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java
@@ -6,7 +6,6 @@
import com.google.common.primitives.Bytes;
import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.message.SIPResponse;
-import org.apache.commons.lang3.ArrayUtils;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.Element;
@@ -172,6 +171,7 @@
return getRootElement(evt, "gb2312");
}
public Element getRootElement(RequestEvent evt, String charset) throws DocumentException {
+
if (charset == null) {
charset = "gb2312";
}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
index 10922f4..b76751c 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
@@ -13,10 +13,11 @@
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.service.IPlayService;
-import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
-import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
+import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
+import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
+import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
@@ -51,6 +52,8 @@
@Autowired
private IRedisCatchStorage redisCatchStorage;
+ @Autowired
+ private IRedisRpcService redisRpcService;
@Autowired
private UserSetting userSetting;
@@ -68,9 +71,6 @@
private DynamicTask dynamicTask;
@Autowired
- private RedisGbPlayMsgListener redisGbPlayMsgListener;
-
- @Autowired
private IPlayService playService;
@@ -86,7 +86,7 @@
logger.info("[鏀跺埌ACK]锛� 鏉ヨ嚜->{}", fromUserId);
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId());
if (sendRtpItem == null) {
- logger.warn("[鏀跺埌ACK]锛氭湭鎵惧埌鏉ヨ嚜{}锛岀洰鏍囦负({})鐨勬帹娴佷俊鎭�",fromUserId, toUserId);
+ logger.warn("[鏀跺埌ACK]锛氭湭鎵惧埌鏉ヨ嚜{}锛宑allId: {}", fromUserId, callIdHeader.getCallId());
return;
}
// tcp涓诲姩鏃讹紝姝ゆ椂鏄骇鑱斾笅绾у钩鍙帮紝鍦ㄥ洖澶�200ok鏃讹紝鏈湴宸茬粡璇锋眰zlm寮�鍚洃鍚紝璺宠繃涓嬮潰姝ラ
@@ -106,10 +106,13 @@
if (parentPlatform != null) {
if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) {
- RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(sendRtpItem);
- redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, () -> {
- playService.startSendRtpStreamFailHand(sendRtpItem, parentPlatform, callIdHeader);
- });
+ WVPResult wvpResult = redisRpcService.startSendRtp(sendRtpItem.getRedisKey(), sendRtpItem);
+ if (wvpResult.getCode() == 0) {
+ RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(sendRtpItem);
+ redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, () -> {
+ playService.startSendRtpStreamFailHand(sendRtpItem, parentPlatform, callIdHeader);
+ });
+ }
} else {
try {
if (sendRtpItem.isTcpActive()) {
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
index 302b694..b8eb703 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
@@ -16,10 +16,10 @@
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
+import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
+import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.*;
-import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
-import com.genersoft.iot.vmp.service.bean.RequestStopPushStreamMsg;
-import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
+import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import gov.nist.javax.sip.message.SIPRequest;
@@ -91,7 +91,8 @@
private IStreamPushService pushService;
@Autowired
- private RedisGbPlayMsgListener redisGbPlayMsgListener;
+ private IRedisRpcService redisRpcService;
+
@Override
public void afterPropertiesSet() throws Exception {
@@ -138,17 +139,8 @@
if (userSetting.getUseCustomSsrcForParentInvite()) {
mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc());
}
-
- ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
- if (platform != null) {
- MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
- sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
- sendRtpItem.getPlatformId(), platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
- messageForPushChannel.setPlatFormIndex(platform.getId());
- redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
- }else {
- logger.info("[涓婄骇骞冲彴鍋滄瑙傜湅] 鏈壘鍒板钩鍙皗}鐨勪俊鎭紝鍙戦�乺edis娑堟伅澶辫触", sendRtpItem.getPlatformId());
- }
+ }else {
+ logger.info("[涓婄骇骞冲彴鍋滄瑙傜湅] 鏈壘鍒板钩鍙皗}鐨勪俊鎭紝鍙戦�乺edis娑堟伅澶辫触", sendRtpItem.getPlatformId());
}
}else {
MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
index 46e779d..f1f277f 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -31,11 +31,17 @@
import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.service.IStreamProxyService;
import com.genersoft.iot.vmp.service.IStreamPushService;
+import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
+import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
+import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
+import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
+import com.genersoft.iot.vmp.media.zlm.dto.*;
+import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
+import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
-import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
import com.genersoft.iot.vmp.service.redisMsg.RedisPushStreamResponseListener;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
@@ -51,6 +57,7 @@
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.sdp.*;
@@ -64,6 +71,7 @@
import java.util.Map;
import java.util.Random;
import java.util.Vector;
+import java.util.*;
/**
* SIP鍛戒护绫诲瀷锛� INVITE璇锋眰
@@ -92,16 +100,16 @@
private IRedisCatchStorage redisCatchStorage;
@Autowired
- private IInviteStreamService inviteStreamService;
+ private IRedisRpcService redisRpcService;
+
+ @Autowired
+ private RedisTemplate<Object, Object> redisTemplate;
@Autowired
private SSRCFactory ssrcFactory;
@Autowired
private DynamicTask dynamicTask;
-
- @Autowired
- private RedisPushStreamResponseListener redisPushStreamResponseListener;
@Autowired
private IPlayService playService;
@@ -125,17 +133,16 @@
private UserSetting userSetting;
@Autowired
- private ZLMMediaListManager mediaListManager;
-
- @Autowired
private SipConfig config;
-
-
- @Autowired
- private RedisGbPlayMsgListener redisGbPlayMsgListener;
@Autowired
private VideoStreamSessionManager streamSession;
+
+ @Autowired
+ private SendRtpPortManager sendRtpPortManager;
+
+ @Autowired
+ private RedisPushStreamResponseListener redisPushStreamResponseListener;
@Override
@@ -552,43 +559,79 @@
}
} else if (gbStream != null) {
-
- String ssrc;
- if (userSetting.getUseCustomSsrcForParentInvite() || gb28181Sdp.getSsrc() == null) {
- // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡
- ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
- }else {
- ssrc = gb28181Sdp.getSsrc();
+ SendRtpItem sendRtpItem = new SendRtpItem();
+ if (!userSetting.getUseCustomSsrcForParentInvite() && gb28181Sdp.getSsrc() != null) {
+ sendRtpItem.setSsrc(gb28181Sdp.getSsrc());
}
+ if (tcpActive != null) {
+ sendRtpItem.setTcpActive(tcpActive);
+ }
+ sendRtpItem.setTcp(mediaTransmissionTCP);
+ sendRtpItem.setRtcp(platform.isRtcp());
+ sendRtpItem.setPlatformName(platform.getName());
+ sendRtpItem.setPlatformId(platform.getServerGBId());
+ sendRtpItem.setMediaServerId(mediaServerItem.getId());
+ sendRtpItem.setChannelId(channelId);
+ sendRtpItem.setIp(addressStr);
+ sendRtpItem.setPort(port);
+ sendRtpItem.setUsePs(true);
+ sendRtpItem.setApp(gbStream.getApp());
+ sendRtpItem.setStream(gbStream.getStream());
+ sendRtpItem.setCallId(callIdHeader.getCallId());
+ sendRtpItem.setFromTag(request.getFromTag());
+ sendRtpItem.setOnlyAudio(false);
+ sendRtpItem.setStatus(0);
+ sendRtpItem.setSessionName(sessionName);
+ // 娓呯悊鍙兘瀛樺湪鐨勭紦瀛橀伩鍏嶇敤鍒版棫鐨勬暟鎹�
+ List<SendRtpItem> sendRtpItemList = redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, gbStream.getStream());
+ if (!sendRtpItemList.isEmpty()) {
+ for (SendRtpItem rtpItem : sendRtpItemList) {
+ redisCatchStorage.deleteSendRTPServer(rtpItem);
+ }
+ }
if ("push".equals(gbStream.getStreamType())) {
+ sendRtpItem.setPlayType(InviteStreamType.PUSH);
if (streamPushItem != null) {
// 浠巖edis鏌ヨ鏄惁姝e湪鎺ユ敹杩欎釜鎺ㄦ祦
StreamPushItem pushListItem = redisCatchStorage.getPushListItem(gbStream.getApp(), gbStream.getStream());
if (pushListItem != null) {
- pushListItem.setSelf(userSetting.getServerId().equals(pushListItem.getServerId()));
- // 鎺ㄦ祦鐘舵��
- pushStream(evt, request, gbStream, pushListItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
- mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+ sendRtpItem.setServerId(pushListItem.getSeverId());
+ sendRtpItem.setMediaServerId(pushListItem.getMediaServerId());
+
+ StreamPushItem transform = streamPushService.transform(pushListItem);
+ transform.setSelf(userSetting.getServerId().equals(pushListItem.getSeverId()));
+ redisCatchStorage.updateSendRTPSever(sendRtpItem);
+ // 寮�濮嬫帹娴�
+ sendPushStream(sendRtpItem, mediaServerItem, platform, request);
}else {
- // 鏈帹娴� 鎷夎捣
- notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
- mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+ if (!platform.isStartOfflinePush()) {
+ // 骞冲彴璁剧疆涓叧闂簡鎷夎捣绂荤嚎鐨勬帹娴佸垯鐩存帴鍥炲
+ try {
+ logger.info("[涓婄骇鐐规挱] 澶辫触锛屾帹娴佽澶囨湭鎺ㄦ祦锛宑hannel: {}, app: {}, stream: {}", sendRtpItem.getChannelId(), sendRtpItem.getApp(), sendRtpItem.getStream());
+ responseAck(request, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing");
+ } catch (SipException | InvalidArgumentException | ParseException e) {
+ logger.error("[鍛戒护鍙戦�佸け璐 invite 閫氶亾鏈帹娴�: {}", e.getMessage());
+ }
+ return;
+ }
+ notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request);
}
}
} else if ("proxy".equals(gbStream.getStreamType())) {
if (null != proxyByAppAndStream) {
+ if (sendRtpItem.getSsrc() == null) {
+ // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡
+ String ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
+ sendRtpItem.setSsrc(ssrc);
+ }
if (proxyByAppAndStream.isStatus()) {
- pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive,
- mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+ sendProxyStream(sendRtpItem, mediaServerItem, platform, request);
} else {
//寮�鍚唬鐞嗘媺娴�
- notifyStreamOnline(evt, request, gbStream, null, platform, callIdHeader, mediaServerItem, port, tcpActive,
- mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+ notifyProxyStreamOnline(sendRtpItem, mediaServerItem, platform, request);
}
}
-
-
}
}
}
@@ -614,58 +657,13 @@
/**
* 瀹夋帓鎺ㄦ祦
*/
- private void pushProxyStream(RequestEvent evt, SIPRequest request, GbStream gbStream, ParentPlatform platform,
- CallIdHeader callIdHeader, MediaServer mediaServer,
- int port, Boolean tcpActive, boolean mediaTransmissionTCP,
- String channelId, String addressStr, String ssrc, String requesterId) {
- Boolean streamReady = mediaServerService.isStreamReady(mediaServer, gbStream.getApp(), gbStream.getStream());
+ private void sendProxyStream(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) {
+ Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
if (streamReady != null && streamReady) {
// 鑷钩鍙板唴瀹�
- SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServer, addressStr, port, ssrc, requesterId,
- gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
-
- if (sendRtpItem == null) {
- logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�");
- try {
- responseAck(request, Response.BUSY_HERE);
- } catch (SipException | InvalidArgumentException | ParseException e) {
- logger.error("[鍛戒护鍙戦�佸け璐 invite 鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�: {}", e.getMessage());
- }
- return;
- }
- if (tcpActive != null) {
- sendRtpItem.setTcpActive(tcpActive);
- }
- sendRtpItem.setPlayType(InviteStreamType.PUSH);
- // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶�
- sendRtpItem.setStatus(1);
- sendRtpItem.setCallId(callIdHeader.getCallId());
- sendRtpItem.setFromTag(request.getFromTag());
-
- SIPResponse response = sendStreamAck(mediaServer, request, sendRtpItem, platform, evt);
- if (response != null) {
- sendRtpItem.setToTag(response.getToTag());
- }
- redisCatchStorage.updateSendRTPSever(sendRtpItem);
-
- }
-
- }
-
- private void pushStream(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
- CallIdHeader callIdHeader, MediaServer mediaServerItem,
- int port, Boolean tcpActive, boolean mediaTransmissionTCP,
- String channelId, String addressStr, String ssrc, String requesterId) {
- // 鎺ㄦ祦
- if (streamPushItem.isSelf()) {
- Boolean streamReady = mediaServerService.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
- if (streamReady != null && streamReady) {
- // 鑷钩鍙板唴瀹�
- SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
- gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
-
- if (sendRtpItem == null) {
+ int localPort = sendRtpPortManager.getNextPort(mediaServerItem);
+ if (localPort == 0) {
logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�");
try {
responseAck(request, Response.BUSY_HERE);
@@ -674,226 +672,197 @@
}
return;
}
- if (tcpActive != null) {
- sendRtpItem.setTcpActive(tcpActive);
+ sendRtpItem.setPlayType(InviteStreamType.PROXY);
+ // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶�
+ sendRtpItem.setStatus(1);
+ sendRtpItem.setLocalIp(mediaServerItem.getSdpIp());
+
+ SIPResponse response = sendStreamAck(mediaServer, request, sendRtpItem, platform, evt);
+ if (response != null) {
+ sendRtpItem.setToTag(response.getToTag());
+ }
+ redisCatchStorage.updateSendRTPSever(sendRtpItem);
+ }
+ }
+
+ private void sendPushStream(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) {
+ // 鎺ㄦ祦
+ if (sendRtpItem.getServerId().equals(userSetting.getServerId())) {
+ Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
+ if (streamReady != null && streamReady) {
+ // 鑷钩鍙板唴瀹�
+ int localPort = sendRtpPortManager.getNextPort(mediaServerItem);
+ if (localPort == 0) {
+ logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�");
+ try {
+ responseAck(request, Response.BUSY_HERE);
+ } catch (SipException | InvalidArgumentException | ParseException e) {
+ logger.error("[鍛戒护鍙戦�佸け璐 invite 鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�: {}", e.getMessage());
+ }
+ return;
}
- sendRtpItem.setPlayType(InviteStreamType.PUSH);
// 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶�
sendRtpItem.setStatus(1);
- sendRtpItem.setCallId(callIdHeader.getCallId());
-
- sendRtpItem.setFromTag(request.getFromTag());
- SIPResponse response = sendStreamAck(mediaServerItem, request, sendRtpItem, platform, evt);
+ SIPResponse response = sendStreamAck(request, sendRtpItem, platform);
if (response != null) {
sendRtpItem.setToTag(response.getToTag());
}
+ if (sendRtpItem.getSsrc() == null) {
+ // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡
+ String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
+ sendRtpItem.setSsrc(ssrc);
+ }
redisCatchStorage.updateSendRTPSever(sendRtpItem);
-
} else {
// 涓嶅湪绾� 鎷夎捣
- notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
- mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+ notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request);
}
-
} else {
// 鍏朵粬骞冲彴鍐呭
- otherWvpPushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
- mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+ otherWvpPushStream(sendRtpItem, request, platform);
}
}
/**
* 閫氱煡娴佷笂绾�
*/
- private void notifyStreamOnline(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
- CallIdHeader callIdHeader, MediaServer mediaServerItem,
- int port, Boolean tcpActive, boolean mediaTransmissionTCP,
- String channelId, String addressStr, String ssrc, String requesterId) {
- if ("proxy".equals(gbStream.getStreamType())) {
- // TODO 鎺у埗鍚敤浠ヤ娇璁惧涓婄嚎
- logger.info("[ app={}, stream={} ]閫氶亾鏈帹娴侊紝鍚敤娴佸悗寮�濮嬫帹娴�", gbStream.getApp(), gbStream.getStream());
- // 鐩戝惉娴佷笂绾�
- Hook hook = Hook.getInstance(HookType.on_media_arrival, gbStream.getApp(), gbStream.getStream(), mediaServerItem.getId());
- this.hookSubscribe.addSubscribe(hook, (hookData) -> {
- logger.info("[涓婄骇鐐规挱]鎷夋祦浠g悊宸茬粡灏辩华锛� {}/{}", hookData.getApp(), hookData.getStream());
- dynamicTask.stop(callIdHeader.getCallId());
- pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive,
- mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
- });
- dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
- logger.info("[ app={}, stream={} ] 绛夊緟鎷夋祦浠g悊娴佽秴鏃�", gbStream.getApp(), gbStream.getStream());
- this.hookSubscribe.removeSubscribe(hook);
- }, userSetting.getPlatformPlayTimeout());
- boolean start = streamProxyService.start(gbStream.getApp(), gbStream.getStream());
- if (!start) {
- try {
- responseAck(request, Response.BUSY_HERE, "channel [" + gbStream.getGbId() + "] offline");
- } catch (SipException | InvalidArgumentException | ParseException e) {
- logger.error("[鍛戒护鍙戦�佸け璐 invite 閫氶亾鏈帹娴�: {}", e.getMessage());
- }
- this.hookSubscribe.removeSubscribe(hook);
- dynamicTask.stop(callIdHeader.getCallId());
+ private void notifyProxyStreamOnline(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) {
+ // TODO 鎺у埗鍚敤浠ヤ娇璁惧涓婄嚎
+ logger.info("[ app={}, stream={} ]閫氶亾鏈帹娴侊紝鍚敤娴佸悗寮�濮嬫帹娴�", sendRtpItem.getApp(), sendRtpItem.getStream());
+ // 鐩戝惉娴佷笂绾�
+ HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", mediaServerItem.getId());
+ zlmHttpHookSubscribe.addSubscribe(hookSubscribe, (mediaServerItemInUSe, hookParam) -> {
+ OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam;
+ logger.info("[涓婄骇鐐规挱]鎷夋祦浠g悊宸茬粡灏辩华锛� {}/{}", streamChangedHookParam.getApp(), streamChangedHookParam.getStream());
+ dynamicTask.stop(sendRtpItem.getCallId());
+ sendProxyStream(sendRtpItem, mediaServerItem, platform, request);
+ });
+ dynamicTask.startDelay(sendRtpItem.getCallId(), () -> {
+ logger.info("[ app={}, stream={} ] 绛夊緟鎷夋祦浠g悊娴佽秴鏃�", sendRtpItem.getApp(), sendRtpItem.getStream());
+ zlmHttpHookSubscribe.removeSubscribe(hookSubscribe);
+ }, userSetting.getPlatformPlayTimeout());
+ boolean start = streamProxyService.start(sendRtpItem.getApp(), sendRtpItem.getStream());
+ if (!start) {
+ try {
+ responseAck(request, Response.BUSY_HERE, "channel [" + sendRtpItem.getChannelId() + "] offline");
+ } catch (SipException | InvalidArgumentException | ParseException e) {
+ logger.error("[鍛戒护鍙戦�佸け璐 invite 閫氶亾鏈帹娴�: {}", e.getMessage());
}
- } else if ("push".equals(gbStream.getStreamType())) {
- if (!platform.isStartOfflinePush()) {
- // 骞冲彴璁剧疆涓叧闂簡鎷夎捣绂荤嚎鐨勬帹娴佸垯鐩存帴鍥炲
- try {
- logger.info("[涓婄骇鐐规挱] 澶辫触锛屾帹娴佽澶囨湭鎺ㄦ祦锛宑hannel: {}, app: {}, stream: {}", gbStream.getGbId(), gbStream.getApp(), gbStream.getStream());
- responseAck(request, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing");
- } catch (SipException | InvalidArgumentException | ParseException e) {
- logger.error("[鍛戒护鍙戦�佸け璐 invite 閫氶亾鏈帹娴�: {}", e.getMessage());
- }
- return;
- }
- // 鍙戦�乺edis娑堟伅浠ヤ娇璁惧涓婄嚎
- logger.info("[ app={}, stream={} ]閫氶亾鏈帹娴侊紝鍙戦�乺edis淇℃伅鎺у埗璁惧寮�濮嬫帹娴�", gbStream.getApp(), gbStream.getStream());
-
- MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1,
- gbStream.getApp(), gbStream.getStream(), gbStream.getGbId(), gbStream.getPlatformId(),
- platform.getName(), null, gbStream.getMediaServerId());
- redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
- // 璁剧疆瓒呮椂
- dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
- logger.info("[ app={}, stream={} ] 绛夊緟璁惧寮�濮嬫帹娴佽秴鏃�", gbStream.getApp(), gbStream.getStream());
- try {
- redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream());
- mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream());
- responseAck(request, Response.REQUEST_TIMEOUT); // 瓒呮椂
- } catch (SipException | InvalidArgumentException | ParseException e) {
- logger.error("鏈鐞嗙殑寮傚父 ", e);
- }
- }, userSetting.getPlatformPlayTimeout());
- // 娣诲姞鐩戝惉
- int finalPort = port;
- Boolean finalTcpActive = tcpActive;
-
- // 娣诲姞鍦ㄦ湰鏈轰笂绾跨殑閫氱煡
- mediaListManager.addChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream(), (app, stream, serverId) -> {
- dynamicTask.stop(callIdHeader.getCallId());
- redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream());
- if (serverId.equals(userSetting.getServerId())) {
- SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId,
- app, stream, channelId, mediaTransmissionTCP, platform.isRtcp());
-
- if (sendRtpItem == null) {
- logger.warn("涓婄骇鐐规椂鍒涘缓sendRTPItem澶辫触锛屽彲鑳芥槸鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�");
- try {
- responseAck(request, Response.BUSY_HERE);
- } catch (SipException e) {
- logger.error("鏈鐞嗙殑寮傚父 ", e);
- } catch (InvalidArgumentException e) {
- logger.error("鏈鐞嗙殑寮傚父 ", e);
- } catch (ParseException e) {
- logger.error("鏈鐞嗙殑寮傚父 ", e);
- }
- return;
- }
- if (finalTcpActive != null) {
- sendRtpItem.setTcpActive(finalTcpActive);
- }
- sendRtpItem.setPlayType(InviteStreamType.PUSH);
- // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶�
- sendRtpItem.setStatus(1);
- sendRtpItem.setCallId(callIdHeader.getCallId());
-
- sendRtpItem.setFromTag(request.getFromTag());
- SIPResponse response = sendStreamAck(mediaServerItem, request, sendRtpItem, platform, evt);
- if (response != null) {
- sendRtpItem.setToTag(response.getToTag());
- }
- redisCatchStorage.updateSendRTPSever(sendRtpItem);
- } else {
- // 鍏朵粬骞冲彴鍐呭
- otherWvpPushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
- mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
- }
- });
-
- // 娣诲姞鍥炲鐨勬嫆缁濇垨鑰呴敊璇殑閫氱煡
- redisPushStreamResponseListener.addEvent(gbStream.getApp(), gbStream.getStream(), response -> {
- if (response.getCode() != 0) {
- dynamicTask.stop(callIdHeader.getCallId());
- mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream());
- try {
- responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg());
- } catch (SipException | InvalidArgumentException | ParseException e) {
- logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鐐规挱鍥炲: {}", e.getMessage());
- }
- }
- });
+ zlmHttpHookSubscribe.removeSubscribe(hookSubscribe);
+ dynamicTask.stop(sendRtpItem.getCallId());
}
}
/**
- * 鏉ヨ嚜鍏朵粬wvp鐨勬帹娴�
+ * 閫氱煡娴佷笂绾�
*/
- private void otherWvpPushStream(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
- CallIdHeader callIdHeader, MediaServer mediaServerItem,
- int port, Boolean tcpActive, boolean mediaTransmissionTCP,
- String channelId, String addressStr, String ssrc, String requesterId) {
- logger.info("[绾ц仈鐐规挱]鐩存挱娴佹潵鑷叾浠栧钩鍙帮紝鍙戦�乺edis娑堟伅");
- // 鍙戦�乺edis娑堟伅
- redisGbPlayMsgListener.sendMsg(streamPushItem.getServerId(), streamPushItem.getMediaServerId(),
- streamPushItem.getApp(), streamPushItem.getStream(), addressStr, port, ssrc, requesterId,
- channelId, mediaTransmissionTCP, platform.isRtcp(),platform.getName(), responseSendItemMsg -> {
- SendRtpItem sendRtpItem = responseSendItemMsg.getSendRtpItem();
- if (sendRtpItem == null || responseSendItemMsg.getMediaServerItem() == null) {
- logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�");
- try {
- responseAck(request, Response.BUSY_HERE);
- } catch (SipException e) {
- logger.error("鏈鐞嗙殑寮傚父 ", e);
- } catch (InvalidArgumentException e) {
- logger.error("鏈鐞嗙殑寮傚父 ", e);
- } catch (ParseException e) {
- logger.error("鏈鐞嗙殑寮傚父 ", e);
- }
- return;
- }
- // 鏀跺埌sendItem
- if (tcpActive != null) {
- sendRtpItem.setTcpActive(tcpActive);
- }
- sendRtpItem.setPlayType(InviteStreamType.PUSH);
- // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶�
- sendRtpItem.setStatus(1);
- sendRtpItem.setCallId(callIdHeader.getCallId());
-
- sendRtpItem.setFromTag(request.getFromTag());
- SIPResponse response = sendStreamAck(responseSendItemMsg.getMediaServerItem(), request, sendRtpItem, platform, evt);
- if (response != null) {
- sendRtpItem.setToTag(response.getToTag());
- }
- redisCatchStorage.updateSendRTPSever(sendRtpItem);
- }, (wvpResult) -> {
-
- // 閿欒
- if (wvpResult.getCode() == RedisGbPlayMsgListener.ERROR_CODE_OFFLINE) {
- // 绂荤嚎
- // 鏌ヨ鏄惁鍦ㄦ湰鏈轰笂绾夸簡
- StreamPushItem currentStreamPushItem = streamPushService.getPush(streamPushItem.getApp(), streamPushItem.getStream());
- if (currentStreamPushItem.isPushIng()) {
- // 鍦ㄧ嚎鐘舵��
- pushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
- mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
-
- } else {
- // 涓嶅湪绾� 鎷夎捣
- notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
- mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
- }
- }
+ private void notifyPushStreamOnline(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) {
+ // 鍙戦�乺edis娑堟伅浠ヤ娇璁惧涓婄嚎锛屾祦涓婄嚎鍚庤
+ logger.info("[ app={}, stream={} ]閫氶亾鏈帹娴侊紝鍙戦�乺edis淇℃伅鎺у埗璁惧寮�濮嬫帹娴�", sendRtpItem.getApp(), sendRtpItem.getStream());
+ MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1,
+ sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), sendRtpItem.getPlatformId(),
+ platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
+ redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
+ // 璁剧疆瓒呮椂
+ dynamicTask.startDelay(sendRtpItem.getCallId(), () -> {
+ redisRpcService.stopWaitePushStreamOnline(sendRtpItem);
+ logger.info("[ app={}, stream={} ] 绛夊緟璁惧寮�濮嬫帹娴佽秴鏃�", sendRtpItem.getApp(), sendRtpItem.getStream());
+ try {
+ responseAck(request, Response.REQUEST_TIMEOUT); // 瓒呮椂
+ } catch (SipException | InvalidArgumentException | ParseException e) {
+ logger.error("鏈鐞嗙殑寮傚父 ", e);
+ }
+ }, userSetting.getPlatformPlayTimeout());
+ //
+ long key = redisRpcService.waitePushStreamOnline(sendRtpItem, (sendRtpItemKey) -> {
+ dynamicTask.stop(sendRtpItem.getCallId());
+ if (sendRtpItemKey == null) {
+ logger.warn("[绾ц仈鐐规挱] 绛夊緟鎺ㄦ祦寰楀埌缁撴灉鏈┖锛� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
+ try {
+ responseAck(request, Response.BUSY_HERE);
+ } catch (SipException | InvalidArgumentException | ParseException e) {
+ logger.error("鏈鐞嗙殑寮傚父 ", e);
+ }
+ return;
+ }
+ SendRtpItem sendRtpItemFromRedis = (SendRtpItem)redisTemplate.opsForValue().get(sendRtpItemKey);
+ if (sendRtpItemFromRedis == null) {
+ logger.warn("[绾ц仈鐐规挱] 绛夊緟鎺ㄦ祦, 鏈壘鍒皉edis涓紦瀛樼殑鍙戞祦淇℃伅锛� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
+ try {
+ responseAck(request, Response.BUSY_HERE);
+ } catch (SipException | InvalidArgumentException | ParseException e) {
+ logger.error("鏈鐞嗙殑寮傚父 ", e);
+ }
+ return;
+ }
+ if (sendRtpItemFromRedis.getServerId().equals(userSetting.getServerId())) {
+ logger.info("[绾ц仈鐐规挱] 绛夊緟鐨勬帹娴佸湪鏈钩鍙颁笂绾� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
+ int localPort = sendRtpPortManager.getNextPort(mediaServerItem);
+ if (localPort == 0) {
+ logger.warn("涓婄骇鐐规椂鍒涘缓sendRTPItem澶辫触锛屽彲鑳芥槸鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�");
try {
responseAck(request, Response.BUSY_HERE);
- } catch (InvalidArgumentException | ParseException | SipException e) {
- logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鐐规挱鍥炲 BUSY_HERE: {}", e.getMessage());
+ } catch (SipException | InvalidArgumentException | ParseException e) {
+ logger.error("鏈鐞嗙殑寮傚父 ", e);
}
- });
+ return;
+ }
+ sendRtpItem.setLocalPort(localPort);
+ if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) {
+ sendRtpItem.setLocalIp(platform.getSendStreamIp());
+ }
+
+ // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶�
+ sendRtpItem.setStatus(1);
+ SIPResponse response = sendStreamAck(request, sendRtpItem, platform);
+ if (response != null) {
+ sendRtpItem.setToTag(response.getToTag());
+ }
+ redisCatchStorage.updateSendRTPSever(sendRtpItem);
+ } else {
+ // 鍏朵粬骞冲彴鍐呭
+ otherWvpPushStream(sendRtpItemFromRedis, request, platform);
+ }
+ });
+ // 娣诲姞鍥炲鐨勬嫆缁濇垨鑰呴敊璇殑閫氱煡
+ // redis娑堟伅渚嬪锛� PUBLISH VM_MSG_STREAM_PUSH_RESPONSE '{"code":1,"msg":"澶辫触","app":"1","stream":"2"}'
+ redisPushStreamResponseListener.addEvent(sendRtpItem.getApp(), sendRtpItem.getStream(), response -> {
+ if (response.getCode() != 0) {
+ dynamicTask.stop(sendRtpItem.getCallId());
+ redisRpcService.stopWaitePushStreamOnline(sendRtpItem);
+ redisRpcService.removeCallback(key);
+ try {
+ responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg());
+ } catch (SipException | InvalidArgumentException | ParseException e) {
+ logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鐐规挱鍥炲: {}", e.getMessage());
+ }
+ }
+ });
}
- public SIPResponse sendStreamAck(MediaServer mediaServerItem, SIPRequest request, SendRtpItem sendRtpItem, ParentPlatform platform, RequestEvent evt) {
- String sdpIp = mediaServerItem.getSdpIp();
+
+ /**
+ * 鏉ヨ嚜鍏朵粬wvp鐨勬帹娴�
+ */
+ private void otherWvpPushStream(SendRtpItem sendRtpItem, SIPRequest request, ParentPlatform platform) {
+ logger.info("[绾ц仈鐐规挱] 鏉ヨ嚜鍏朵粬wvp鐨勬帹娴� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
+ sendRtpItem = redisRpcService.getSendRtpItem(sendRtpItem.getRedisKey());
+ if (sendRtpItem == null) {
+ return;
+ }
+ // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶�
+ sendRtpItem.setStatus(1);
+ SIPResponse response = sendStreamAck(request, sendRtpItem, platform);
+ if (response != null) {
+ sendRtpItem.setToTag(response.getToTag());
+ }
+ redisCatchStorage.updateSendRTPSever(sendRtpItem);
+ }
+
+ public SIPResponse sendStreamAck(MediaServerItem mediaServerItem, SIPRequest request, SendRtpItem sendRtpItem, ParentPlatform platform, RequestEvent evt) {
+
+ String sdpIp = sendRtpItem.getLocalIp();
if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) {
sdpIp = platform.getSendStreamIp();
}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java
index cd97786..de93804 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java
@@ -1,7 +1,5 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
-import com.genersoft.iot.vmp.conf.CivilCodeFileConf;
-import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.Device;
@@ -20,10 +18,10 @@
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
import javax.sip.RequestEvent;
import javax.sip.header.FromHeader;
-import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -39,8 +37,6 @@
private final static Logger logger = LoggerFactory.getLogger(NotifyRequestForCatalogProcessor.class);
- private final List<DeviceChannel> updateChannelOnlineList = new CopyOnWriteArrayList<>();
- private final List<DeviceChannel> updateChannelOfflineList = new CopyOnWriteArrayList<>();
private final Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>();
private final Map<String, DeviceChannel> addChannelMap = new ConcurrentHashMap<>();
@@ -60,275 +56,110 @@
private IDeviceChannelService deviceChannelService;
@Autowired
- private DynamicTask dynamicTask;
-
- @Autowired
- private CivilCodeFileConf civilCodeFileConf;
-
- @Autowired
private SipConfig sipConfig;
- private final static String talkKey = "notify-request-for-catalog-task";
+ @Transactional
+ public void process(List<RequestEvent> evtList) {
+ if (evtList.isEmpty()) {
+ return;
+ }
+ for (RequestEvent evt : evtList) {
+ try {
+ long start = System.currentTimeMillis();
+ FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
+ String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
- public void process(RequestEvent evt) {
- try {
- long start = System.currentTimeMillis();
- FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
- String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
+ Device device = redisCatchStorage.getDevice(deviceId);
+ if (device == null || !device.isOnLine()) {
+ logger.warn("[鏀跺埌鐩綍璁㈤槄]锛歿}, 浣嗘槸璁惧宸茬粡绂荤嚎", (device != null ? device.getDeviceId():"" ));
+ return;
+ }
+ Element rootElement = getRootElement(evt, device.getCharset());
+ if (rootElement == null) {
+ logger.warn("[ 鏀跺埌鐩綍璁㈤槄 ] content cannot be null, {}", evt.getRequest());
+ return;
+ }
+ Element deviceListElement = rootElement.element("DeviceList");
+ if (deviceListElement == null) {
+ return;
+ }
+ Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
+ if (deviceListIterator != null) {
- Device device = redisCatchStorage.getDevice(deviceId);
- if (device == null || !device.isOnLine()) {
- logger.warn("[鏀跺埌鐩綍璁㈤槄]锛歿}, 浣嗘槸璁惧宸茬粡绂荤嚎", (device != null ? device.getDeviceId():"" ));
- return;
- }
- Element rootElement = getRootElement(evt, device.getCharset());
- if (rootElement == null) {
- logger.warn("[ 鏀跺埌鐩綍璁㈤槄 ] content cannot be null, {}", evt.getRequest());
- return;
- }
- Element deviceListElement = rootElement.element("DeviceList");
- if (deviceListElement == null) {
- return;
- }
- Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
- if (deviceListIterator != null) {
-
- // 閬嶅巻DeviceList
- while (deviceListIterator.hasNext()) {
- Element itemDevice = deviceListIterator.next();
- Element channelDeviceElement = itemDevice.element("DeviceID");
- if (channelDeviceElement == null) {
- continue;
- }
- Element eventElement = itemDevice.element("Event");
- String event;
- if (eventElement == null) {
- logger.warn("[鏀跺埌鐩綍璁㈤槄]锛歿}, 浣嗘槸Event涓虹┖, 璁句负榛樿鍊� ADD", (device != null ? device.getDeviceId():"" ));
- event = CatalogEvent.ADD;
- }else {
- event = eventElement.getText().toUpperCase();
- }
- DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event);
- if (channel == null) {
- logger.info("[鏀跺埌鐩綍璁㈤槄]锛氫絾鏄В鏋愬け璐� {}", new String(evt.getRequest().getRawContent()));
- continue;
- }
- if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) {
- channel.setParentId(null);
- }
- channel.setDeviceId(device.getDeviceId());
- logger.info("[鏀跺埌鐩綍璁㈤槄]锛歿}/{}", device.getDeviceId(), channel.getChannelId());
- switch (event) {
- case CatalogEvent.ON:
- // 涓婄嚎
- logger.info("[鏀跺埌閫氶亾涓婄嚎閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId());
- updateChannelOnlineList.add(channel);
- if (updateChannelOnlineList.size() > 300) {
- executeSaveForOnline();
- }
- if (userSetting.getDeviceStatusNotify()) {
- // 鍙戦�乺edis娑堟伅
- redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), true);
- }
-
- break;
- case CatalogEvent.OFF :
- // 绂荤嚎
- logger.info("[鏀跺埌閫氶亾绂荤嚎閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId());
- if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
- logger.info("[鏀跺埌閫氶亾绂荤嚎閫氱煡] 浣嗘槸骞冲彴宸查厤缃嫆缁濇娑堟伅锛屾潵鑷澶�: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId());
- }else {
- updateChannelOfflineList.add(channel);
- if (updateChannelOfflineList.size() > 300) {
- executeSaveForOffline();
- }
- if (userSetting.getDeviceStatusNotify()) {
- // 鍙戦�乺edis娑堟伅
- redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
- }
- }
- break;
- case CatalogEvent.VLOST:
- // 瑙嗛涓㈠け
- logger.info("[鏀跺埌閫氶亾瑙嗛涓㈠け閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId());
- if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
- logger.info("[鏀跺埌閫氶亾瑙嗛涓㈠け閫氱煡] 浣嗘槸骞冲彴宸查厤缃嫆缁濇娑堟伅锛屾潵鑷澶�: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId());
- }else {
- updateChannelOfflineList.add(channel);
- if (updateChannelOfflineList.size() > 300) {
- executeSaveForOffline();
- }
- if (userSetting.getDeviceStatusNotify()) {
- // 鍙戦�乺edis娑堟伅
- redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
- }
- }
- break;
- case CatalogEvent.DEFECT:
- // 鏁呴殰
- logger.info("[鏀跺埌閫氶亾瑙嗛鏁呴殰閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId());
- if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
- logger.info("[鏀跺埌閫氶亾瑙嗛鏁呴殰閫氱煡] 浣嗘槸骞冲彴宸查厤缃嫆缁濇娑堟伅锛屾潵鑷澶�: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId());
- }else {
- updateChannelOfflineList.add(channel);
- if (updateChannelOfflineList.size() > 300) {
- executeSaveForOffline();
- }
- if (userSetting.getDeviceStatusNotify()) {
- // 鍙戦�乺edis娑堟伅
- redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
- }
- }
- break;
- case CatalogEvent.ADD:
- // 澧炲姞
- logger.info("[鏀跺埌澧炲姞閫氶亾閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId());
- // 鍒ゆ柇姝ら�氶亾鏄惁瀛樺湪
- DeviceChannel deviceChannel = deviceChannelService.getOne(deviceId, channel.getChannelId());
- if (deviceChannel != null) {
- logger.info("[澧炲姞閫氶亾] 宸插瓨鍦紝涓嶅彂閫侀�氱煡鍙洿鏂帮紝璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId());
- channel.setId(deviceChannel.getId());
- updateChannelMap.put(channel.getChannelId(), channel);
- if (updateChannelMap.keySet().size() > 300) {
- executeSaveForUpdate();
- }
- }else {
- addChannelMap.put(channel.getChannelId(), channel);
- if (userSetting.getDeviceStatusNotify()) {
- // 鍙戦�乺edis娑堟伅
- redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true);
- }
-
- if (addChannelMap.keySet().size() > 300) {
- executeSaveForAdd();
- }
- }
-
- break;
- case CatalogEvent.DEL:
- // 鍒犻櫎
- logger.info("[鏀跺埌鍒犻櫎閫氶亾閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId());
- deleteChannelList.add(channel);
- if (userSetting.getDeviceStatusNotify()) {
- // 鍙戦�乺edis娑堟伅
- redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), false);
- }
- if (deleteChannelList.size() > 300) {
- executeSaveForDelete();
- }
- break;
- case CatalogEvent.UPDATE:
- // 鏇存柊
- logger.info("[鏀跺埌鏇存柊閫氶亾閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId());
- // 鍒ゆ柇姝ら�氶亾鏄惁瀛樺湪
- DeviceChannel deviceChannelForUpdate = deviceChannelService.getOne(deviceId, channel.getChannelId());
- if (deviceChannelForUpdate != null) {
- channel.setId(deviceChannelForUpdate.getId());
- channel.setUpdateTime(DateUtil.getNow());
- updateChannelMap.put(channel.getChannelId(), channel);
- if (updateChannelMap.keySet().size() > 300) {
- executeSaveForUpdate();
- }
- }else {
- addChannelMap.put(channel.getChannelId(), channel);
- if (addChannelMap.keySet().size() > 300) {
- executeSaveForAdd();
- }
- if (userSetting.getDeviceStatusNotify()) {
- // 鍙戦�乺edis娑堟伅
- redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true);
- }
- }
- break;
- default:
- logger.warn("[ NotifyCatalog ] event not found 锛� {}", event );
-
- }
- // 杞彂鍙樺寲淇℃伅
- eventPublisher.catalogEventPublish(null, channel, event);
-
- if (!updateChannelMap.keySet().isEmpty()
- || !addChannelMap.keySet().isEmpty()
- || !updateChannelOnlineList.isEmpty()
- || !updateChannelOfflineList.isEmpty()
- || !deleteChannelList.isEmpty()) {
-
- if (!dynamicTask.contains(talkKey)) {
- dynamicTask.startDelay(talkKey, this::executeSave, 1000);
+ // 閬嶅巻DeviceList
+ while (deviceListIterator.hasNext()) {
+ Element itemDevice = deviceListIterator.next();
+ Element eventElement = itemDevice.element("Event");
+ String event;
+ if (eventElement == null) {
+ logger.warn("[鏀跺埌鐩綍璁㈤槄]锛歿}, 浣嗘槸Event涓虹┖, 璁句负榛樿鍊� ADD", (device != null ? device.getDeviceId():"" ));
+ event = CatalogEvent.ADD;
+ }else {
+ event = eventElement.getText().toUpperCase();
}
+ DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event);
+ if (channel == null) {
+ logger.info("[鏀跺埌鐩綍璁㈤槄]锛氫絾鏄В鏋愬け璐� {}", new String(evt.getRequest().getRawContent()));
+ continue;
+ }
+ if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) {
+ channel.setParentId(null);
+ }
+ channel.setDeviceId(device.getDeviceId());
+ logger.info("[鏀跺埌鐩綍璁㈤槄]锛歿}, {}/{}",event, device.getDeviceId(), channel.getChannelId());
+ switch (event) {
+ case CatalogEvent.ON:
+ // 涓婄嚎
+ deviceChannelService.online(channel);
+ if (userSetting.getDeviceStatusNotify()) {
+ // 鍙戦�乺edis娑堟伅
+ redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), true);
+ }
+
+ break;
+ case CatalogEvent.OFF :
+ case CatalogEvent.VLOST:
+ case CatalogEvent.DEFECT:
+ // 绂荤嚎
+ if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
+ logger.info("[鐩綍璁㈤槄] 绂荤嚎 浣嗘槸骞冲彴宸查厤缃嫆缁濇娑堟伅锛屾潵鑷澶�: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId());
+ }else {
+ deviceChannelService.offline(channel);
+ if (userSetting.getDeviceStatusNotify()) {
+ // 鍙戦�乺edis娑堟伅
+ redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
+ }
+ }
+ break;
+ case CatalogEvent.DEL:
+ // 鍒犻櫎
+ deviceChannelService.delete(channel);
+ if (userSetting.getDeviceStatusNotify()) {
+ // 鍙戦�乺edis娑堟伅
+ redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), false);
+ }
+ break;
+ case CatalogEvent.ADD:
+ case CatalogEvent.UPDATE:
+ // 鏇存柊
+ channel.setUpdateTime(DateUtil.getNow());
+ deviceChannelService.updateChannel(deviceId,channel);
+ if (userSetting.getDeviceStatusNotify()) {
+ // 鍙戦�乺edis娑堟伅
+ redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true);
+ }
+ break;
+ default:
+ logger.warn("[ NotifyCatalog ] event not found 锛� {}", event );
+
+ }
+ // 杞彂鍙樺寲淇℃伅
+ eventPublisher.catalogEventPublish(null, channel, event);
}
}
+ } catch (DocumentException e) {
+ logger.error("鏈鐞嗙殑寮傚父 ", e);
}
- } catch (DocumentException e) {
- logger.error("鏈鐞嗙殑寮傚父 ", e);
}
}
-
- private void executeSave(){
- try {
- executeSaveForAdd();
- } catch (Exception e) {
- logger.error("[瀛樺偍鏀跺埌鐨勫鍔犻�氶亾] 寮傚父锛� ", e );
- }
- try {
- executeSaveForUpdate();
- } catch (Exception e) {
- logger.error("[瀛樺偍鏀跺埌鐨勬洿鏂伴�氶亾] 寮傚父锛� ", e );
- }
- try {
- executeSaveForDelete();
- } catch (Exception e) {
- logger.error("[瀛樺偍鏀跺埌鐨勫垹闄ら�氶亾] 寮傚父锛� ", e );
- }
- try {
- executeSaveForOnline();
- } catch (Exception e) {
- logger.error("[瀛樺偍鏀跺埌鐨勯�氶亾涓婄嚎] 寮傚父锛� ", e );
- }
- try {
- executeSaveForOffline();
- } catch (Exception e) {
- logger.error("[瀛樺偍鏀跺埌鐨勯�氶亾绂荤嚎] 寮傚父锛� ", e );
- }
- dynamicTask.stop(talkKey);
- }
-
- private void executeSaveForUpdate(){
- if (!updateChannelMap.values().isEmpty()) {
- ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values());
- updateChannelMap.clear();
- deviceChannelService.batchUpdateChannel(deviceChannels);
- }
-
- }
-
- private void executeSaveForAdd(){
- if (!addChannelMap.values().isEmpty()) {
- ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(addChannelMap.values());
- addChannelMap.clear();
- deviceChannelService.batchAddChannel(deviceChannels);
- }
- }
-
- private void executeSaveForDelete(){
- if (!deleteChannelList.isEmpty()) {
- deviceChannelService.deleteChannels(deleteChannelList);
- deleteChannelList.clear();
- }
- }
-
- private void executeSaveForOnline(){
- if (!updateChannelOnlineList.isEmpty()) {
- deviceChannelService.channelsOnline(updateChannelOnlineList);
- updateChannelOnlineList.clear();
- }
- }
-
- private void executeSaveForOffline(){
- if (!updateChannelOfflineList.isEmpty()) {
- deviceChannelService.channelsOffline(updateChannelOfflineList);
- updateChannelOfflineList.clear();
- }
- }
-
}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java
new file mode 100755
index 0000000..05a5336
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java
@@ -0,0 +1,199 @@
+package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
+
+import com.alibaba.fastjson2.JSONObject;
+import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.gb28181.bean.Device;
+import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
+import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
+import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
+import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
+import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
+import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
+import com.genersoft.iot.vmp.service.IDeviceChannelService;
+import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
+import com.genersoft.iot.vmp.utils.DateUtil;
+import org.dom4j.DocumentException;
+import org.dom4j.Element;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
+import org.springframework.util.ObjectUtils;
+
+import javax.sip.RequestEvent;
+import javax.sip.header.FromHeader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * SIP鍛戒护绫诲瀷锛� NOTIFY璇锋眰涓殑绉诲姩浣嶇疆璇锋眰澶勭悊
+ */
+@Component
+public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessorParent {
+
+
+ private final static Logger logger = LoggerFactory.getLogger(NotifyRequestForMobilePositionProcessor.class);
+
+
+ @Autowired
+ private UserSetting userSetting;
+
+ @Autowired
+ private EventPublisher eventPublisher;
+
+ @Autowired
+ private IRedisCatchStorage redisCatchStorage;
+
+ @Autowired
+ private IDeviceChannelService deviceChannelService;
+
+ @Transactional
+ public void process(List<RequestEvent> eventList) {
+ if (eventList.isEmpty()) {
+ return;
+ }
+ Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>();
+ List<MobilePosition> addMobilePositionList = new ArrayList<>();
+ for (RequestEvent evt : eventList) {
+ try {
+ FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
+ String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
+ long startTime = System.currentTimeMillis();
+ // 鍥炲 200 OK
+ Element rootElement = getRootElement(evt);
+ if (rootElement == null) {
+ logger.error("澶勭悊MobilePosition绉诲姩浣嶇疆Notify鏃舵湭鑾峰彇鍒版秷鎭綋,{}", evt.getRequest());
+ return;
+ }
+ Device device = redisCatchStorage.getDevice(deviceId);
+ if (device == null) {
+ logger.error("澶勭悊MobilePosition绉诲姩浣嶇疆Notify鏃舵湭鑾峰彇鍒癲evice,{}", deviceId);
+ return;
+ }
+ MobilePosition mobilePosition = new MobilePosition();
+ mobilePosition.setDeviceId(device.getDeviceId());
+ mobilePosition.setDeviceName(device.getName());
+ mobilePosition.setCreateTime(DateUtil.getNow());
+ List<Element> elements = rootElement.elements();
+ for (Element element : elements) {
+ switch (element.getName()){
+ case "DeviceID":
+ String channelId = element.getStringValue();
+ if (!deviceId.equals(channelId)) {
+ mobilePosition.setChannelId(channelId);
+ }
+ continue;
+ case "Time":
+ String timeVal = element.getStringValue();
+ if (ObjectUtils.isEmpty(timeVal)) {
+ mobilePosition.setTime(DateUtil.getNow());
+ } else {
+ mobilePosition.setTime(SipUtils.parseTime(timeVal));
+ }
+ continue;
+ case "Longitude":
+ mobilePosition.setLongitude(Double.parseDouble(element.getStringValue()));
+ continue;
+ case "Latitude":
+ mobilePosition.setLatitude(Double.parseDouble(element.getStringValue()));
+ continue;
+ case "Speed":
+ String speedVal = element.getStringValue();
+ if (NumericUtil.isDouble(speedVal)) {
+ mobilePosition.setSpeed(Double.parseDouble(speedVal));
+ } else {
+ mobilePosition.setSpeed(0.0);
+ }
+ continue;
+ case "Direction":
+ String directionVal = element.getStringValue();
+ if (NumericUtil.isDouble(directionVal)) {
+ mobilePosition.setDirection(Double.parseDouble(directionVal));
+ } else {
+ mobilePosition.setDirection(0.0);
+ }
+ continue;
+ case "Altitude":
+ String altitudeVal = element.getStringValue();
+ if (NumericUtil.isDouble(altitudeVal)) {
+ mobilePosition.setAltitude(Double.parseDouble(altitudeVal));
+ } else {
+ mobilePosition.setAltitude(0.0);
+ }
+ continue;
+
+ }
+ }
+
+// logger.info("[鏀跺埌绉诲姩浣嶇疆璁㈤槄閫氱煡]锛歿}/{}->{}.{}, 鏃堕棿锛� {}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
+// mobilePosition.getLongitude(), mobilePosition.getLatitude(), System.currentTimeMillis() - startTime);
+ mobilePosition.setReportSource("Mobile Position");
+
+ // 鏇存柊device channel 鐨勭粡绾害
+ DeviceChannel deviceChannel = new DeviceChannel();
+ deviceChannel.setDeviceId(device.getDeviceId());
+ deviceChannel.setLongitude(mobilePosition.getLongitude());
+ deviceChannel.setLatitude(mobilePosition.getLatitude());
+ deviceChannel.setGpsTime(mobilePosition.getTime());
+ updateChannelMap.put(deviceId + mobilePosition.getChannelId(), deviceChannel);
+ addMobilePositionList.add(mobilePosition);
+
+
+ // 鍚戝叧鑱斾簡璇ラ�氶亾骞朵笖寮�鍚Щ鍔ㄤ綅缃闃呯殑涓婄骇骞冲彴鍙戦�佺Щ鍔ㄤ綅缃闃呮秷鎭�
+ try {
+ eventPublisher.mobilePositionEventPublish(mobilePosition);
+ }catch (Exception e) {
+ logger.error("[鍚戜笂绾ц浆鍙戠Щ鍔ㄤ綅缃け璐 ", e);
+ }
+ if (mobilePosition.getChannelId().equals(mobilePosition.getDeviceId()) || mobilePosition.getChannelId() == null) {
+ List<DeviceChannel> channels = deviceChannelService.queryChaneListByDeviceId(mobilePosition.getDeviceId());
+ channels.forEach(channel -> {
+ // 鍙戦�乺edis娑堟伅銆� 閫氱煡浣嶇疆淇℃伅鐨勫彉鍖�
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime()));
+ jsonObject.put("serial", channel.getDeviceId());
+ jsonObject.put("code", channel.getChannelId());
+ jsonObject.put("longitude", mobilePosition.getLongitude());
+ jsonObject.put("latitude", mobilePosition.getLatitude());
+ jsonObject.put("altitude", mobilePosition.getAltitude());
+ jsonObject.put("direction", mobilePosition.getDirection());
+ jsonObject.put("speed", mobilePosition.getSpeed());
+ redisCatchStorage.sendMobilePositionMsg(jsonObject);
+ });
+ }else {
+ // 鍙戦�乺edis娑堟伅銆� 閫氱煡浣嶇疆淇℃伅鐨勫彉鍖�
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime()));
+ jsonObject.put("serial", mobilePosition.getDeviceId());
+ jsonObject.put("code", mobilePosition.getChannelId());
+ jsonObject.put("longitude", mobilePosition.getLongitude());
+ jsonObject.put("latitude", mobilePosition.getLatitude());
+ jsonObject.put("altitude", mobilePosition.getAltitude());
+ jsonObject.put("direction", mobilePosition.getDirection());
+ jsonObject.put("speed", mobilePosition.getSpeed());
+ redisCatchStorage.sendMobilePositionMsg(jsonObject);
+ }
+ } catch (DocumentException e) {
+ logger.error("鏈鐞嗙殑寮傚父 ", e);
+ }
+ }
+ if(!updateChannelMap.isEmpty()) {
+ List<DeviceChannel> channels = new ArrayList<>(updateChannelMap.values());
+ logger.info("[绉诲姩浣嶇疆璁㈤槄]鏇存柊閫氶亾浣嶇疆锛� {}", channels.size());
+ deviceChannelService.batchUpdateChannelGPS(channels);
+ updateChannelMap.clear();
+ }
+ if (userSetting.isSavePositionHistory() && !addMobilePositionList.isEmpty()) {
+ try {
+ logger.info("[绉诲姩浣嶇疆璁㈤槄] 娣诲姞閫氶亾杞ㄨ抗鐐逛綅锛� {}", addMobilePositionList.size());
+ deviceChannelService.batchAddMobilePosition(addMobilePositionList);
+ }catch (Exception e) {
+ logger.info("[绉诲姩浣嶇疆璁㈤槄] b娣诲姞閫氶亾杞ㄨ抗鐐逛綅淇濆瓨澶辫触锛� {}", addMobilePositionList.size());
+ }
+ addMobilePositionList.clear();
+ }
+ }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
index e54aa2d..8a42624 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
@@ -25,6 +25,8 @@
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
@@ -35,6 +37,7 @@
import javax.sip.header.FromHeader;
import javax.sip.message.Response;
import java.text.ParseException;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -76,6 +79,9 @@
@Autowired
private NotifyRequestForCatalogProcessor notifyRequestForCatalogProcessor;
+ @Autowired
+ private NotifyRequestForMobilePositionProcessor notifyRequestForMobilePositionProcessor;
+
private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
@Qualifier("taskExecutor")
@@ -97,61 +103,73 @@
responseAck((SIPRequest) evt.getRequest(), Response.BUSY_HERE, null, null);
logger.error("[notify] 寰呭鐞嗘秷鎭槦鍒楀凡婊� {}锛岃繑鍥�486 BUSY_HERE锛屾秷鎭笉鍋氬鐞�", userSetting.getMaxNotifyCountQueue());
return;
- }else {
+ } else {
responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null);
}
- }catch (SipException | InvalidArgumentException | ParseException e) {
+ } catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("鏈鐞嗙殑寮傚父 ", e);
}
boolean runed = !taskQueue.isEmpty();
- logger.info("[notify] 寰呭鐞嗘秷鎭暟閲忥細 {}", taskQueue.size());
taskQueue.offer(new HandlerCatchData(evt, null, null));
- if (!runed) {
- taskExecutor.execute(()-> {
- while (!taskQueue.isEmpty()) {
- try {
- HandlerCatchData take = taskQueue.poll();
- if (take == null) {
- continue;
- }
- Element rootElement = getRootElement(take.getEvt());
- if (rootElement == null) {
- logger.error("澶勭悊NOTIFY娑堟伅鏃舵湭鑾峰彇鍒版秷鎭綋,{}", take.getEvt().getRequest());
- continue;
- }
- String cmd = XmlUtil.getText(rootElement, "CmdType");
-
- if (CmdType.CATALOG.equals(cmd)) {
- logger.info("鎺ユ敹鍒癈atalog閫氱煡");
- notifyRequestForCatalogProcessor.process(take.getEvt());
- } else if (CmdType.ALARM.equals(cmd)) {
- logger.info("鎺ユ敹鍒癆larm閫氱煡");
- processNotifyAlarm(take.getEvt());
- } else if (CmdType.MOBILE_POSITION.equals(cmd)) {
- logger.info("鎺ユ敹鍒癕obilePosition閫氱煡");
- processNotifyMobilePosition(take.getEvt());
- } else {
- logger.info("鎺ユ敹鍒版秷鎭細" + cmd);
- }
- } catch (DocumentException e) {
- logger.error("澶勭悊NOTIFY娑堟伅鏃堕敊璇�", e);
- }
+ }
+ @Scheduled(fixedRate = 200) //姣�200姣鎵ц涓�娆�
+ public void executeTaskQueue(){
+ if (taskQueue.isEmpty()) {
+ return;
+ }
+ try {
+ List<RequestEvent> catalogEventList = new ArrayList<>();
+ List<RequestEvent> alarmEventList = new ArrayList<>();
+ List<RequestEvent> mobilePositionEventList = new ArrayList<>();
+ for (HandlerCatchData take : taskQueue) {
+ if (take == null) {
+ continue;
}
- });
+ Element rootElement = getRootElement(take.getEvt());
+ if (rootElement == null) {
+ logger.error("澶勭悊NOTIFY娑堟伅鏃舵湭鑾峰彇鍒版秷鎭綋,{}", take.getEvt().getRequest());
+ continue;
+ }
+ String cmd = XmlUtil.getText(rootElement, "CmdType");
+
+ if (CmdType.CATALOG.equals(cmd)) {
+ catalogEventList.add(take.getEvt());
+ } else if (CmdType.ALARM.equals(cmd)) {
+ alarmEventList.add(take.getEvt());
+ } else if (CmdType.MOBILE_POSITION.equals(cmd)) {
+ mobilePositionEventList.add(take.getEvt());
+ } else {
+ logger.info("鎺ユ敹鍒版秷鎭細" + cmd);
+ }
+ }
+ taskQueue.clear();
+ if (!alarmEventList.isEmpty()) {
+ processNotifyAlarm(alarmEventList);
+ }
+ if (!catalogEventList.isEmpty()) {
+ notifyRequestForCatalogProcessor.process(catalogEventList);
+ }
+ if (!mobilePositionEventList.isEmpty()) {
+ notifyRequestForMobilePositionProcessor.process(mobilePositionEventList);
+ }
+ } catch (DocumentException e) {
+ logger.error("澶勭悊NOTIFY娑堟伅鏃堕敊璇�", e);
}
}
+
+
/**
* 澶勭悊MobilePosition绉诲姩浣嶇疆Notify
*
* @param evt
*/
- private void processNotifyMobilePosition(RequestEvent evt) {
+ @Async("taskExecutor")
+ public void processNotifyMobilePosition(RequestEvent evt) {
try {
FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
-
// 鍥炲 200 OK
Element rootElement = getRootElement(evt);
if (rootElement == null) {
@@ -179,6 +197,13 @@
if (device == null) {
logger.warn("[mobilePosition绉诲姩浣嶇疆Notify] 鏈壘鍒伴�氶亾{}鎵�灞炵殑璁惧", channelId);
return;
+ }
+ // 鍏煎璁惧閮ㄥ垎璁惧涓婃姤鏄�氶亾缂栧彿涓庤澶囩紪鍙蜂竴鑷寸殑鎯呭喌
+ if(deviceId.equals(channelId)) {
+ List<DeviceChannel> deviceChannels = deviceChannelService.queryChaneListByDeviceId(deviceId);
+ if (deviceChannels.size() == 1) {
+ channelId = deviceChannels.get(0).getChannelId();
+ }
}
if (!ObjectUtils.isEmpty(device.getName())) {
mobilePosition.setDeviceName(device.getName());
@@ -210,8 +235,8 @@
} else {
mobilePosition.setAltitude(0.0);
}
- logger.info("[鏀跺埌绉诲姩浣嶇疆璁㈤槄閫氱煡]锛歿}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
- mobilePosition.getLongitude(), mobilePosition.getLatitude());
+// logger.info("[鏀跺埌绉诲姩浣嶇疆璁㈤槄閫氱煡]锛歿}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
+// mobilePosition.getLongitude(), mobilePosition.getLatitude());
mobilePosition.setReportSource("Mobile Position");
// 鏇存柊device channel 鐨勭粡绾害
@@ -221,12 +246,12 @@
deviceChannel.setLongitude(mobilePosition.getLongitude());
deviceChannel.setLatitude(mobilePosition.getLatitude());
deviceChannel.setGpsTime(mobilePosition.getTime());
- deviceChannel = deviceChannelService.updateGps(deviceChannel, device);
-
- mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84());
- mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84());
- mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
- mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
+// deviceChannel = deviceChannelService.updateGps(deviceChannel, device);
+//
+// mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84());
+// mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84());
+// mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
+// mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
@@ -237,95 +262,97 @@
/***
* 澶勭悊alarm璁惧鎶ヨNotify
- *
- * @param evt
*/
- private void processNotifyAlarm(RequestEvent evt) {
+ private void processNotifyAlarm(List<RequestEvent> evtList) {
if (!sipConfig.isAlarm()) {
return;
}
- try {
- FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
- String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
+ if (!evtList.isEmpty()) {
+ for (RequestEvent evt : evtList) {
+ try {
+ FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
+ String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
- Element rootElement = getRootElement(evt);
- if (rootElement == null) {
- logger.error("澶勭悊alarm璁惧鎶ヨNotify鏃舵湭鑾峰彇鍒版秷鎭綋{}", evt.getRequest());
- return;
- }
- Element deviceIdElement = rootElement.element("DeviceID");
- String channelId = deviceIdElement.getText().toString();
+ Element rootElement = getRootElement(evt);
+ if (rootElement == null) {
+ logger.error("澶勭悊alarm璁惧鎶ヨNotify鏃舵湭鑾峰彇鍒版秷鎭綋{}", evt.getRequest());
+ return;
+ }
+ Element deviceIdElement = rootElement.element("DeviceID");
+ String channelId = deviceIdElement.getText().toString();
- Device device = redisCatchStorage.getDevice(deviceId);
- if (device == null) {
- logger.warn("[ NotifyAlarm ] 鏈壘鍒拌澶囷細{}", deviceId);
- return;
- }
- rootElement = getRootElement(evt, device.getCharset());
- if (rootElement == null) {
- logger.warn("[ NotifyAlarm ] content cannot be null, {}", evt.getRequest());
- return;
- }
- DeviceAlarm deviceAlarm = new DeviceAlarm();
- deviceAlarm.setDeviceId(deviceId);
- deviceAlarm.setAlarmPriority(XmlUtil.getText(rootElement, "AlarmPriority"));
- deviceAlarm.setAlarmMethod(XmlUtil.getText(rootElement, "AlarmMethod"));
- String alarmTime = XmlUtil.getText(rootElement, "AlarmTime");
- if (alarmTime == null) {
- logger.warn("[ NotifyAlarm ] AlarmTime cannot be null");
- return;
- }
- deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime));
- if (XmlUtil.getText(rootElement, "AlarmDescription") == null) {
- deviceAlarm.setAlarmDescription("");
- } else {
- deviceAlarm.setAlarmDescription(XmlUtil.getText(rootElement, "AlarmDescription"));
- }
- if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Longitude"))) {
- deviceAlarm.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude")));
- } else {
- deviceAlarm.setLongitude(0.00);
- }
- if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Latitude"))) {
- deviceAlarm.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude")));
- } else {
- deviceAlarm.setLatitude(0.00);
- }
- logger.info("[鏀跺埌Notify-Alarm]锛歿}/{}", device.getDeviceId(), deviceAlarm.getChannelId());
- if ("4".equals(deviceAlarm.getAlarmMethod())) {
- MobilePosition mobilePosition = new MobilePosition();
- mobilePosition.setChannelId(channelId);
- mobilePosition.setCreateTime(DateUtil.getNow());
- mobilePosition.setDeviceId(deviceAlarm.getDeviceId());
- mobilePosition.setTime(deviceAlarm.getAlarmTime());
- mobilePosition.setLongitude(deviceAlarm.getLongitude());
- mobilePosition.setLatitude(deviceAlarm.getLatitude());
- mobilePosition.setReportSource("GPS Alarm");
+ Device device = redisCatchStorage.getDevice(deviceId);
+ if (device == null) {
+ logger.warn("[ NotifyAlarm ] 鏈壘鍒拌澶囷細{}", deviceId);
+ return;
+ }
+ rootElement = getRootElement(evt, device.getCharset());
+ if (rootElement == null) {
+ logger.warn("[ NotifyAlarm ] content cannot be null, {}", evt.getRequest());
+ return;
+ }
+ DeviceAlarm deviceAlarm = new DeviceAlarm();
+ deviceAlarm.setDeviceId(deviceId);
+ deviceAlarm.setAlarmPriority(XmlUtil.getText(rootElement, "AlarmPriority"));
+ deviceAlarm.setAlarmMethod(XmlUtil.getText(rootElement, "AlarmMethod"));
+ String alarmTime = XmlUtil.getText(rootElement, "AlarmTime");
+ if (alarmTime == null) {
+ logger.warn("[ NotifyAlarm ] AlarmTime cannot be null");
+ return;
+ }
+ deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime));
+ if (XmlUtil.getText(rootElement, "AlarmDescription") == null) {
+ deviceAlarm.setAlarmDescription("");
+ } else {
+ deviceAlarm.setAlarmDescription(XmlUtil.getText(rootElement, "AlarmDescription"));
+ }
+ if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Longitude"))) {
+ deviceAlarm.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude")));
+ } else {
+ deviceAlarm.setLongitude(0.00);
+ }
+ if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Latitude"))) {
+ deviceAlarm.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude")));
+ } else {
+ deviceAlarm.setLatitude(0.00);
+ }
+ logger.info("[鏀跺埌Notify-Alarm]锛歿}/{}", device.getDeviceId(), deviceAlarm.getChannelId());
+ if ("4".equals(deviceAlarm.getAlarmMethod())) {
+ MobilePosition mobilePosition = new MobilePosition();
+ mobilePosition.setChannelId(channelId);
+ mobilePosition.setCreateTime(DateUtil.getNow());
+ mobilePosition.setDeviceId(deviceAlarm.getDeviceId());
+ mobilePosition.setTime(deviceAlarm.getAlarmTime());
+ mobilePosition.setLongitude(deviceAlarm.getLongitude());
+ mobilePosition.setLatitude(deviceAlarm.getLatitude());
+ mobilePosition.setReportSource("GPS Alarm");
- // 鏇存柊device channel 鐨勭粡绾害
- DeviceChannel deviceChannel = new DeviceChannel();
- deviceChannel.setDeviceId(device.getDeviceId());
- deviceChannel.setChannelId(channelId);
- deviceChannel.setLongitude(mobilePosition.getLongitude());
- deviceChannel.setLatitude(mobilePosition.getLatitude());
- deviceChannel.setGpsTime(mobilePosition.getTime());
+ // 鏇存柊device channel 鐨勭粡绾害
+ DeviceChannel deviceChannel = new DeviceChannel();
+ deviceChannel.setDeviceId(device.getDeviceId());
+ deviceChannel.setChannelId(channelId);
+ deviceChannel.setLongitude(mobilePosition.getLongitude());
+ deviceChannel.setLatitude(mobilePosition.getLatitude());
+ deviceChannel.setGpsTime(mobilePosition.getTime());
- deviceChannel = deviceChannelService.updateGps(deviceChannel, device);
+ deviceChannel = deviceChannelService.updateGps(deviceChannel, device);
- mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84());
- mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84());
- mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
- mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
+ mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84());
+ mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84());
+ mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
+ mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
- deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
+ deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
+ }
+
+ // 鍥炲200 OK
+ if (redisCatchStorage.deviceIsOnline(deviceId)) {
+ publisher.deviceAlarmEventPublish(deviceAlarm);
+ }
+ } catch (DocumentException e) {
+ logger.error("鏈鐞嗙殑寮傚父 ", e);
+ }
}
-
- // 鍥炲200 OK
- if (redisCatchStorage.deviceIsOnline(deviceId)) {
- publisher.deviceAlarmEventPublish(deviceAlarm);
- }
- } catch (DocumentException e) {
- logger.error("鏈鐞嗙殑寮傚父 ", e);
}
}
@@ -353,4 +380,9 @@
public void setRedisCatchStorage(IRedisCatchStorage redisCatchStorage) {
this.redisCatchStorage = redisCatchStorage;
}
+
+ @Scheduled(fixedRate = 10000) //姣�1绉掓墽琛屼竴娆�
+ public void execute(){
+ logger.info("[寰呭鐞哊otify娑堟伅鏁伴噺]: {}", taskQueue.size());
+ }
}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/utils/XmlUtil.java b/src/main/java/com/genersoft/iot/vmp/gb28181/utils/XmlUtil.java
index 70702bb..6596f53 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/utils/XmlUtil.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/utils/XmlUtil.java
@@ -532,16 +532,17 @@
String status = getText(itemDevice, "Status");
if (status != null) {
// ONLINE OFFLINE HIKVISION DS-7716N-E4 NVR鐨勫吋瀹规�у鐞�
- if (status.equals("ON") || status.equals("On") || status.equals("ONLINE") || status.equals("OK")) {
+ if (status.equalsIgnoreCase("ON") || status.equalsIgnoreCase("On") || status.equalsIgnoreCase("ONLINE") || status.equalsIgnoreCase("OK")) {
deviceChannel.setStatus(true);
}
- if (status.equals("OFF") || status.equals("Off") || status.equals("OFFLINE")) {
+ if (status.equalsIgnoreCase("OFF") || status.equalsIgnoreCase("Off") || status.equalsIgnoreCase("OFFLINE")) {
deviceChannel.setStatus(false);
}
}else {
deviceChannel.setStatus(true);
}
-
+// logger.info("鐘舵�佸瓧绗︿覆锛� {}", status);
+// logger.info("鐘舵�佺粨鏋滐細 {}", deviceChannel.isStatus());
// 缁忓害
String longitude = getText(itemDevice, "Longitude");
if (NumericUtil.isDouble(longitude)) {
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
index f5b1b6c..030dd7d 100755
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -5,6 +5,8 @@
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
+import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
+import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
@@ -21,6 +23,8 @@
import com.genersoft.iot.vmp.media.zlm.event.HookZlmServerKeepaliveEvent;
import com.genersoft.iot.vmp.media.zlm.event.HookZlmServerStartEvent;
import com.genersoft.iot.vmp.service.*;
+import com.genersoft.iot.vmp.service.bean.SSRCInfo;
+import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import org.slf4j.Logger;
@@ -66,6 +70,10 @@
@Autowired
private IRedisCatchStorage redisCatchStorage;
+
+ @Autowired
+ private IRedisRpcService redisRpcService;
+
@Autowired
private IInviteStreamService inviteStreamService;
@@ -86,9 +94,6 @@
@Autowired
private EventPublisher eventPublisher;
-
- @Autowired
- private ZLMMediaListManager zlmMediaListManager;
@Autowired
private HookSubscribe subscribe;
@@ -117,6 +122,9 @@
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
+
+ @Autowired
+ private IStreamPushService streamPushService;
/**
* 鏈嶅姟鍣ㄥ畾鏃朵笂鎶ユ椂闂达紝涓婃姤闂撮殧鍙厤缃紝榛樿10s涓婃姤涓�娆�
@@ -172,6 +180,54 @@
if (mediaServer == null) {
return new HookResultForOnPublish(0, "success");
}
+ // 鎺ㄦ祦閴存潈鐨勫鐞�
+ if (!"rtp".equals(param.getApp())) {
+ StreamProxyItem stream = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream());
+ if (stream != null) {
+ HookResultForOnPublish result = HookResultForOnPublish.SUCCESS();
+ result.setEnable_audio(stream.isEnableAudio());
+ result.setEnable_mp4(stream.isEnableMp4());
+ return result;
+ }
+ if (userSetting.getPushAuthority()) {
+ // 鎺ㄦ祦閴存潈
+ if (param.getParams() == null) {
+ logger.info("鎺ㄦ祦閴存潈澶辫触锛� 缂哄皯蹇呰鍙傛暟锛歴ign=md5(user琛ㄧ殑pushKey)");
+ return new HookResultForOnPublish(401, "Unauthorized");
+ }
+ Map<String, String> paramMap = urlParamToMap(param.getParams());
+ String sign = paramMap.get("sign");
+ if (sign == null) {
+ logger.info("鎺ㄦ祦閴存潈澶辫触锛� 缂哄皯蹇呰鍙傛暟锛歴ign=md5(user琛ㄧ殑pushKey)");
+ return new HookResultForOnPublish(401, "Unauthorized");
+ }
+ // 鎺ㄦ祦鑷畾涔夋挱鏀鹃壌鏉冪爜
+ String callId = paramMap.get("callId");
+ // 閴存潈閰嶇疆
+ boolean hasAuthority = userService.checkPushAuthority(callId, sign);
+ if (!hasAuthority) {
+ logger.info("鎺ㄦ祦閴存潈澶辫触锛� sign 鏃犳潈闄�: callId={}. sign={}", callId, sign);
+ return new HookResultForOnPublish(401, "Unauthorized");
+ }
+ StreamAuthorityInfo streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(param);
+ streamAuthorityInfo.setCallId(callId);
+ streamAuthorityInfo.setSign(sign);
+ // 閴存潈閫氳繃
+ redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), param.getStream(), streamAuthorityInfo);
+ }
+ } else {
+ zlmMediaListManager.sendStreamEvent(param.getApp(), param.getStream(), param.getMediaServerId());
+ }
+
+
+ HookResultForOnPublish result = HookResultForOnPublish.SUCCESS();
+ result.setEnable_audio(true);
+ taskExecutor.execute(() -> {
+ ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_publish, json);
+ if (subscribe != null) {
+ subscribe.response(mediaInfo, param);
+ }
+ });
ResultForOnPublish resultForOnPublish = mediaService.authenticatePublish(mediaServer, param.getApp(), param.getStream(), param.getParams());
if (resultForOnPublish != null) {
@@ -207,6 +263,221 @@
MediaDepartureEvent mediaDepartureEvent = MediaDepartureEvent.getInstance(this, param, mediaServer);
applicationEventPublisher.publishEvent(mediaDepartureEvent);
}
+
+ JSONObject json = (JSONObject) JSON.toJSON(param);
+ taskExecutor.execute(() -> {
+ ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_stream_changed, json);
+ MediaServerItem mediaInfo = mediaServerService.getOne(param.getMediaServerId());
+ if (mediaInfo == null) {
+ logger.info("[ZLM HOOK] 娴佸彉鍖栨湭鎵惧埌ZLM, {}", param.getMediaServerId());
+ return;
+ }
+ if (subscribe != null) {
+ subscribe.response(mediaInfo, param);
+ }
+
+ List<OnStreamChangedHookParam.MediaTrack> tracks = param.getTracks();
+ // TODO 閲嶆瀯姝ゅ閫昏緫
+ if (param.isRegist()) {
+ // 澶勭悊娴佹敞鍐岀殑閴存潈淇℃伅锛� 娴佹敞閿�杩欓噷涓嶅啀鍒犻櫎閴存潈淇℃伅锛屼笅娆℃潵浜嗘柊鐨勯壌鏉冧俊鎭細瀵瑰氨鐨勮繘琛岃鐩�
+ if (param.getOriginType() == OriginType.RTMP_PUSH.ordinal()
+ || param.getOriginType() == OriginType.RTSP_PUSH.ordinal()
+ || param.getOriginType() == OriginType.RTC_PUSH.ordinal()) {
+ StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(param.getApp(), param.getStream());
+ if (streamAuthorityInfo == null) {
+ streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(param);
+ } else {
+ streamAuthorityInfo.setOriginType(param.getOriginType());
+ streamAuthorityInfo.setOriginTypeStr(param.getOriginTypeStr());
+ }
+ redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), param.getStream(), streamAuthorityInfo);
+ }
+ }
+ if ("rtsp".equals(param.getSchema())) {
+ logger.info("娴佸彉鍖栵細娉ㄥ唽->{}, app->{}, stream->{}", param.isRegist(), param.getApp(), param.getStream());
+ if (param.isRegist()) {
+ mediaServerService.addCount(param.getMediaServerId());
+ } else {
+ mediaServerService.removeCount(param.getMediaServerId());
+ }
+
+ int updateStatusResult = streamProxyService.updateStatus(param.isRegist(), param.getApp(), param.getStream());
+ if (updateStatusResult > 0) {
+
+ }
+
+ if ("rtp".equals(param.getApp()) && !param.isRegist()) {
+ InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, param.getStream());
+ if (inviteInfo != null && (inviteInfo.getType() == InviteSessionType.PLAY || inviteInfo.getType() == InviteSessionType.PLAYBACK)) {
+ inviteStreamService.removeInviteInfo(inviteInfo);
+ storager.stopPlay(inviteInfo.getDeviceId(), inviteInfo.getChannelId());
+ }
+ } else if ("broadcast".equals(param.getApp())) {
+ // 璇煶瀵硅鎺ㄦ祦 stream闇�瑕佹弧瓒虫牸寮廳eviceId_channelId
+ if (param.getStream().indexOf("_") > 0) {
+ String[] streamArray = param.getStream().split("_");
+ if (streamArray.length == 2) {
+ String deviceId = streamArray[0];
+ String channelId = streamArray[1];
+ Device device = deviceService.getDevice(deviceId);
+ if (device != null) {
+ if (param.isRegist()) {
+ if (audioBroadcastManager.exit(deviceId, channelId)) {
+ playService.stopAudioBroadcast(deviceId, channelId);
+ }
+ // 寮�鍚闊冲璁查�氶亾
+ try {
+ playService.audioBroadcastCmd(device, channelId, mediaInfo, param.getApp(), param.getStream(), 60, false, (msg) -> {
+ logger.info("[璇煶瀵硅] 閫氶亾寤虹珛鎴愬姛, device: {}, channel: {}", deviceId, channelId);
+ });
+ } catch (InvalidArgumentException | ParseException | SipException e) {
+ logger.error("[鍛戒护鍙戦�佸け璐 璇煶瀵硅: {}", e.getMessage());
+ }
+ } else {
+ // 娴佹敞閿�
+ playService.stopAudioBroadcast(deviceId, channelId);
+ }
+ } else {
+ logger.info("[璇煶瀵硅] 鏈壘鍒拌澶囷細{}", deviceId);
+ }
+ }
+ }
+ } else if ("talk".equals(param.getApp())) {
+ // 璇煶瀵硅鎺ㄦ祦 stream闇�瑕佹弧瓒虫牸寮廳eviceId_channelId
+ if (param.getStream().indexOf("_") > 0) {
+ String[] streamArray = param.getStream().split("_");
+ if (streamArray.length == 2) {
+ String deviceId = streamArray[0];
+ String channelId = streamArray[1];
+ Device device = deviceService.getDevice(deviceId);
+ if (device != null) {
+ if (param.isRegist()) {
+ if (audioBroadcastManager.exit(deviceId, channelId)) {
+ playService.stopAudioBroadcast(deviceId, channelId);
+ }
+ // 寮�鍚闊冲璁查�氶亾
+ playService.talkCmd(device, channelId, mediaInfo, param.getStream(), (msg) -> {
+ logger.info("[璇煶瀵硅] 閫氶亾寤虹珛鎴愬姛, device: {}, channel: {}", deviceId, channelId);
+ });
+ } else {
+ // 娴佹敞閿�
+ playService.stopTalk(device, channelId, param.isRegist());
+ }
+ } else {
+ logger.info("[璇煶瀵硅] 鏈壘鍒拌澶囷細{}", deviceId);
+ }
+ }
+ }
+
+ } else {
+ if (!"rtp".equals(param.getApp())) {
+ String type = OriginType.values()[param.getOriginType()].getType();
+ if (param.isRegist()) {
+ StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(
+ param.getApp(), param.getStream());
+ String callId = null;
+ if (streamAuthorityInfo != null) {
+ callId = streamAuthorityInfo.getCallId();
+ }
+ StreamInfo streamInfoByAppAndStream = mediaService.getStreamInfoByAppAndStream(mediaInfo,
+ param.getApp(), param.getStream(), tracks, callId);
+ param.setStreamInfo(new StreamContent(streamInfoByAppAndStream));
+ redisCatchStorage.addStream(mediaInfo, type, param.getApp(), param.getStream(), param);
+ if (param.getOriginType() == OriginType.RTSP_PUSH.ordinal()
+ || param.getOriginType() == OriginType.RTMP_PUSH.ordinal()
+ || param.getOriginType() == OriginType.RTC_PUSH.ordinal()) {
+ param.setSeverId(userSetting.getServerId());
+ zlmMediaListManager.addPush(param);
+
+ // 鍐椾綑鏁版嵁锛岃嚜宸辩郴缁熶腑鑷敤
+ redisCatchStorage.addPushListItem(param.getApp(), param.getStream(), param);
+ }
+ } else {
+ // 鍏煎娴佹敞閿�鏃剁被鍨嬩粠redis璁板綍鑾峰彇
+ OnStreamChangedHookParam onStreamChangedHookParam = redisCatchStorage.getStreamInfo(
+ param.getApp(), param.getStream(), param.getMediaServerId());
+ if (onStreamChangedHookParam != null) {
+ type = OriginType.values()[onStreamChangedHookParam.getOriginType()].getType();
+ redisCatchStorage.removeStream(mediaInfo.getId(), type, param.getApp(), param.getStream());
+ if ("PUSH".equalsIgnoreCase(type)) {
+ // 鍐椾綑鏁版嵁锛岃嚜宸辩郴缁熶腑鑷敤
+ redisCatchStorage.removePushListItem(param.getApp(), param.getStream(), param.getMediaServerId());
+ }
+ }
+ GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream());
+ if (gbStream != null) {
+// eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF);
+ }
+ zlmMediaListManager.removeMedia(param.getApp(), param.getStream());
+ }
+ GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream());
+ if (gbStream != null) {
+ if (userSetting.isUsePushingAsStatus()) {
+ eventPublisher.catalogEventPublishForStream(null, gbStream, param.isRegist() ? CatalogEvent.ON : CatalogEvent.OFF);
+ }
+ }
+ if (type != null) {
+ // 鍙戦�佹祦鍙樺寲redis娑堟伅
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put("serverId", userSetting.getServerId());
+ jsonObject.put("app", param.getApp());
+ jsonObject.put("stream", param.getStream());
+ jsonObject.put("register", param.isRegist());
+ jsonObject.put("mediaServerId", param.getMediaServerId());
+ redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
+ }
+ }
+ }
+ if (!param.isRegist()) {
+ List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(param.getStream());
+ if (!sendRtpItems.isEmpty()) {
+ for (SendRtpItem sendRtpItem : sendRtpItems) {
+ if (sendRtpItem == null) {
+ continue;
+ }
+
+ if (sendRtpItem.getApp().equals(param.getApp())) {
+ logger.info(sendRtpItem.toString());
+ if (userSetting.getServerId().equals(sendRtpItem.getServerId())) {
+ MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
+ sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
+ sendRtpItem.getPlatformId(), null, userSetting.getServerId(), param.getMediaServerId());
+ // 閫氱煡鍏朵粬wvp鍋滄鍙戞祦
+ redisCatchStorage.sendPushStreamClose(messageForPushChannel);
+ }else {
+ String platformId = sendRtpItem.getPlatformId();
+ ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
+ Device device = deviceService.getDevice(platformId);
+
+ try {
+ if (platform != null) {
+ commanderFroPlatform.streamByeCmd(platform, sendRtpItem);
+ redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(),
+ sendRtpItem.getCallId(), sendRtpItem.getStream());
+ } else {
+ cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId());
+ if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST)
+ || sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) {
+ AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+ if (audioBroadcastCatch != null) {
+ // 鏉ヨ嚜涓婄骇骞冲彴鐨勫仠姝㈠璁�
+ logger.info("[鍋滄瀵硅] 鏉ヨ嚜涓婄骇锛屽钩鍙帮細{}, 閫氶亾锛歿}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+ audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+ }
+ }
+ }
+ } catch (SipException | InvalidArgumentException | ParseException |
+ SsrcTransactionNotFoundException e) {
+ logger.error("[鍛戒护鍙戦�佸け璐 鍙戦�丅YE: {}", e.getMessage());
+ }
+ }
+
+ }
+ }
+ }
+ }
+ }
+ });
return HookResult.SUCCESS();
}
@@ -220,6 +491,62 @@
logger.info("[ZLM HOOK]娴佹棤浜鸿鐪嬶細{}->{}->{}/{}", param.getMediaServerId(), param.getSchema(),
param.getApp(), param.getStream());
JSONObject ret = new JSONObject();
+ ret.put("code", 0);
+ // 鍥芥爣绫诲瀷鐨勬祦
+ if ("rtp".equals(param.getApp())) {
+ ret.put("close", userSetting.getStreamOnDemand());
+ // 鍥芥爣娴侊紝 鐐规挱/褰曞儚鍥炴斁/褰曞儚涓嬭浇
+ InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, param.getStream());
+ // 鐐规挱
+ if (inviteInfo != null) {
+ // 褰曞儚涓嬭浇
+ if (inviteInfo.getType() == InviteSessionType.DOWNLOAD) {
+ ret.put("close", false);
+ return ret;
+ }
+ // 鏀跺埌鏃犱汉瑙傜湅璇存槑娴佷篃娌℃湁鍦ㄥ線涓婄骇鎺ㄩ��
+ if (redisCatchStorage.isChannelSendingRTP(inviteInfo.getChannelId())) {
+ List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChannelId(
+ inviteInfo.getChannelId());
+ if (!sendRtpItems.isEmpty()) {
+ for (SendRtpItem sendRtpItem : sendRtpItems) {
+ ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
+ try {
+ commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem.getCallId());
+ } catch (SipException | InvalidArgumentException | ParseException e) {
+ logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鍙戦�丅YE: {}", e.getMessage());
+ }
+ redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(),
+ sendRtpItem.getCallId(), sendRtpItem.getStream());
+ if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) {
+ MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
+ sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
+ sendRtpItem.getPlatformId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
+ messageForPushChannel.setPlatFormIndex(parentPlatform.getId());
+ redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
+ }
+ }
+ }
+ }
+ Device device = deviceService.getDevice(inviteInfo.getDeviceId());
+ if (device != null) {
+ try {
+ // 澶氭煡璇竴娆¢槻姝㈠凡缁忚澶勭悊浜�
+ InviteInfo info = inviteStreamService.getInviteInfo(inviteInfo.getType(),
+ inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
+ if (info != null) {
+ cmder.streamByeCmd(device, inviteInfo.getChannelId(),
+ inviteInfo.getStream(), null);
+ } else {
+ logger.info("[鏃犱汉瑙傜湅] 鏈壘鍒拌澶囩殑鐐规挱淇℃伅锛� {}锛� 娴侊細{}", inviteInfo.getDeviceId(), param.getStream());
+ }
+ } catch (InvalidArgumentException | ParseException | SipException |
+ SsrcTransactionNotFoundException e) {
+ logger.error("[鏃犱汉瑙傜湅]鐐规挱锛� 鍙戦�丅YE澶辫触 {}", e.getMessage());
+ }
+ } else {
+ logger.info("[鏃犱汉瑙傜湅] 鏈壘鍒拌澶囷細 {}锛屾祦锛歿}", inviteInfo.getDeviceId(), param.getStream());
+ }
boolean close = mediaService.closeStreamOnNoneReader(param.getMediaServerId(), param.getApp(), param.getStream(), param.getSchema());
ret.put("code", close);
@@ -282,16 +609,22 @@
if (!"rtp".equals(param.getApp())) {
return HookResult.SUCCESS();
}
- try {
- MediaSendRtpStoppedEvent event = new MediaSendRtpStoppedEvent(this);
- MediaServer mediaServerItem = mediaServerService.getOne(param.getMediaServerId());
- if (mediaServerItem != null) {
- event.setMediaServer(mediaServerItem);
- applicationEventPublisher.publishEvent(event);
+ taskExecutor.execute(() -> {
+ List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(param.getStream());
+ if (sendRtpItems.size() > 0) {
+ for (SendRtpItem sendRtpItem : sendRtpItems) {
+ ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
+ ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
+ try {
+ commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem.getCallId());
+ } catch (SipException | InvalidArgumentException | ParseException e) {
+ logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鍙戦�丅YE: {}", e.getMessage());
+ }
+ redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(),
+ sendRtpItem.getCallId(), sendRtpItem.getStream());
+ }
}
- }catch (Exception e) {
- logger.info("[ZLM-HOOK-rtp鍙戦�佸叧闂璢 鍙戦�侀�氱煡澶辫触 ", e);
- }
+ });
return HookResult.SUCCESS();
}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java
deleted file mode 100755
index 80699d2..0000000
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java
+++ /dev/null
@@ -1,121 +0,0 @@
-package com.genersoft.iot.vmp.media.zlm;
-
-import com.genersoft.iot.vmp.conf.UserSetting;
-import com.genersoft.iot.vmp.gb28181.bean.GbStream;
-import com.genersoft.iot.vmp.media.bean.MediaServer;
-import com.genersoft.iot.vmp.media.service.IMediaServerService;
-import com.genersoft.iot.vmp.media.zlm.dto.ChannelOnlineEvent;
-import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
-import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
-import com.genersoft.iot.vmp.service.IStreamPushService;
-import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
-import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
-import com.genersoft.iot.vmp.storager.dao.StreamPushMapper;
-import com.genersoft.iot.vmp.utils.DateUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-import java.text.ParseException;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * @author lin
- */
-@Component
-public class ZLMMediaListManager {
-
- private Logger logger = LoggerFactory.getLogger("ZLMMediaListManager");
-
- @Autowired
- private IVideoManagerStorage storager;
-
- @Autowired
- private GbStreamMapper gbStreamMapper;
-
- @Autowired
- private IStreamPushService streamPushService;
-
-
- @Autowired
- private StreamPushMapper streamPushMapper;
-
- @Autowired
- private UserSetting userSetting;
-
-
- @Autowired
- private IMediaServerService mediaServerService;
-
- private Map<String, ChannelOnlineEvent> channelOnPublishEvents = new ConcurrentHashMap<>();
-
- public StreamPushItem addPush(OnStreamChangedHookParam onStreamChangedHookParam) {
- StreamPushItem transform = streamPushService.transform(onStreamChangedHookParam);
- StreamPushItem pushInDb = streamPushService.getPush(onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream());
- transform.setPushIng(onStreamChangedHookParam.isRegist());
- transform.setUpdateTime(DateUtil.getNow());
- transform.setPushTime(DateUtil.getNow());
- transform.setSelf(userSetting.getServerId().equals(onStreamChangedHookParam.getSeverId()));
- if (pushInDb == null) {
- transform.setCreateTime(DateUtil.getNow());
- streamPushMapper.add(transform);
- }else {
- streamPushMapper.update(transform);
- gbStreamMapper.updateMediaServer(onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream(), onStreamChangedHookParam.getMediaServerId());
- }
- ChannelOnlineEvent channelOnlineEventLister = getChannelOnlineEventLister(transform.getApp(), transform.getStream());
- if ( channelOnlineEventLister != null) {
- try {
- channelOnlineEventLister.run(transform.getApp(), transform.getStream(), transform.getServerId());;
- } catch (ParseException e) {
- logger.error("addPush: ", e);
- }
- removedChannelOnlineEventLister(transform.getApp(), transform.getStream());
- }
- return transform;
- }
-
- public void sendStreamEvent(String app, String stream, String mediaServerId) {
- MediaServer mediaServerItem = mediaServerService.getOne(mediaServerId);
- // 鏌ョ湅鎺ㄦ祦鐘舵��
- Boolean streamReady = mediaServerService.isStreamReady(mediaServerItem, app, stream);
- if (streamReady != null && streamReady) {
- ChannelOnlineEvent channelOnlineEventLister = getChannelOnlineEventLister(app, stream);
- if (channelOnlineEventLister != null) {
- try {
- channelOnlineEventLister.run(app, stream, mediaServerId);
- } catch (ParseException e) {
- logger.error("sendStreamEvent: ", e);
- }
- removedChannelOnlineEventLister(app, stream);
- }
- }
- }
-
- public int removeMedia(String app, String streamId) {
- // 鏌ユ壘鏄惁鍏宠仈浜嗗浗鏍囷紝 鍏宠仈浜嗕笉鍒犻櫎锛� 缃负绂荤嚎
- GbStream gbStream = gbStreamMapper.selectOne(app, streamId);
- int result;
- if (gbStream == null) {
- result = storager.removeMedia(app, streamId);
- }else {
- result =storager.mediaOffline(app, streamId);
- }
- return result;
- }
-
- public void addChannelOnlineEventLister(String app, String stream, ChannelOnlineEvent callback) {
- this.channelOnPublishEvents.put(app + "_" + stream, callback);
- }
-
- public void removedChannelOnlineEventLister(String app, String stream) {
- this.channelOnPublishEvents.remove(app + "_" + stream);
- }
-
- public ChannelOnlineEvent getChannelOnlineEventLister(String app, String stream) {
- return this.channelOnPublishEvents.get(app + "_" + stream);
- }
-
-}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java
index 3ba48d3..7b0bf39 100755
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java
@@ -264,4 +264,13 @@
}
return result;
}
+
+ public JSONObject stopSendRtpStream(MediaServerItem mediaServerItem, SendRtpItem sendRtpItem) {
+ Map<String, Object> param = new HashMap<>();
+ param.put("vhost", "__defaultVhost__");
+ param.put("app", sendRtpItem.getApp());
+ param.put("stream", sendRtpItem.getStream());
+ param.put("ssrc", sendRtpItem.getSsrc());
+ return zlmresTfulUtils.stopSendRtp(mediaServerItem, param);
+ }
}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ChannelOnlineEvent.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ChannelOnlineEvent.java
index 714838e..6b3c94f 100755
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ChannelOnlineEvent.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ChannelOnlineEvent.java
@@ -1,5 +1,7 @@
package com.genersoft.iot.vmp.media.zlm.dto;
+import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
+
import java.text.ParseException;
/**
@@ -7,5 +9,5 @@
*/
public interface ChannelOnlineEvent {
- void run(String app, String stream, String serverId) throws ParseException;
+ void run(SendRtpItem sendRtpItem) throws ParseException;
}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java b/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java
index 16ff831..3ea8e5f 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java
@@ -99,4 +99,13 @@
void updateChannelGPS(Device device, DeviceChannel deviceChannel, MobilePosition mobilePosition);
void stopPlay(String deviceId, String channelId);
+ void batchUpdateChannelGPS(List<DeviceChannel> channelList);
+
+ void batchAddMobilePosition(List<MobilePosition> addMobilePositionList);
+
+ void online(DeviceChannel channel);
+
+ void offline(DeviceChannel channel);
+
+ void delete(DeviceChannel channel);
}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java
index 1507331..c718681 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java
@@ -115,4 +115,5 @@
Map<String, StreamPushItem> getAllAppAndStreamMap();
+ void updatePush(OnStreamChangedHookParam param);
}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java b/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java
index 1a9e3e5..6a4f866 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java
@@ -61,6 +61,7 @@
messageForPushChannel.setGbId(gbId);
messageForPushChannel.setApp(app);
messageForPushChannel.setStream(stream);
+ messageForPushChannel.setServerId(serverId);
messageForPushChannel.setMediaServerId(mediaServerId);
messageForPushChannel.setPlatFormId(platFormId);
messageForPushChannel.setPlatFormName(platFormName);
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java
index 6807632..16fc5b5 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java
@@ -23,6 +23,7 @@
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;
import java.util.ArrayList;
@@ -248,8 +249,24 @@
}
@Override
+ public void online(DeviceChannel channel) {
+ channelMapper.online(channel.getDeviceId(), channel.getChannelId());
+ }
+
+ @Override
public int channelsOffline(List<DeviceChannel> channels) {
return channelMapper.batchOffline(channels);
+ }
+
+
+ @Override
+ public void offline(DeviceChannel channel) {
+ channelMapper.offline(channel.getDeviceId(), channel.getChannelId());
+ }
+
+ @Override
+ public void delete(DeviceChannel channel) {
+ channelMapper.del(channel.getDeviceId(), channel.getChannelId());
}
@Override
@@ -358,4 +375,47 @@
public void stopPlay(String deviceId, String channelId) {
channelMapper.stopPlay(deviceId, channelId);
}
+
+ @Override
+ @Transactional
+ public void batchUpdateChannelGPS(List<DeviceChannel> channelList) {
+ for (DeviceChannel deviceChannel : channelList) {
+ deviceChannel.setUpdateTime(DateUtil.getNow());
+ if (deviceChannel.getGpsTime() == null) {
+ deviceChannel.setGpsTime(DateUtil.getNow());
+ }
+ }
+ int count = 1000;
+ if (channelList.size() > count) {
+ for (int i = 0; i < channelList.size(); i+=count) {
+ int toIndex = i+count;
+ if ( i + count > channelList.size()) {
+ toIndex = channelList.size();
+ }
+ List<DeviceChannel> channels = channelList.subList(i, toIndex);
+ channelMapper.batchUpdatePosition(channels);
+ }
+ }else {
+ channelMapper.batchUpdatePosition(channelList);
+ }
+ }
+
+ @Override
+ @Transactional
+ public void batchAddMobilePosition(List<MobilePosition> mobilePositions) {
+// int count = 500;
+// if (mobilePositions.size() > count) {
+// for (int i = 0; i < mobilePositions.size(); i+=count) {
+// int toIndex = i+count;
+// if ( i + count > mobilePositions.size()) {
+// toIndex = mobilePositions.size();
+// }
+// List<MobilePosition> mobilePositionsSub = mobilePositions.subList(i, toIndex);
+// deviceMobilePositionMapper.batchadd(mobilePositionsSub);
+// }
+// }else {
+// deviceMobilePositionMapper.batchadd(mobilePositions);
+// }
+ deviceMobilePositionMapper.batchadd(mobilePositions);
+ }
}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
index 8fb772d..47b902e 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
@@ -558,6 +558,7 @@
removeMobilePositionSubscribe(deviceInStore, result->{
// 寮�鍚闃�
deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition());
+ deviceInStore.setMobilePositionSubmissionInterval(device.getMobilePositionSubmissionInterval());
addMobilePositionSubscribe(deviceInStore);
// 鍥犱负鏄紓姝ユ墽琛岋紝闇�瑕佸湪杩欓噷鏇存柊涓嬫暟鎹�
deviceMapper.updateCustom(deviceInStore);
@@ -566,12 +567,14 @@
}else {
// 寮�鍚闃�
deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition());
+ deviceInStore.setMobilePositionSubmissionInterval(device.getMobilePositionSubmissionInterval());
addMobilePositionSubscribe(deviceInStore);
}
}else if (device.getSubscribeCycleForMobilePosition() == 0) {
// 鍙栨秷璁㈤槄
deviceInStore.setSubscribeCycleForMobilePosition(0);
+ deviceInStore.setMobilePositionSubmissionInterval(0);
removeMobilePositionSubscribe(deviceInStore, null);
}
}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
index 40de0d2..cd47062 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -28,7 +28,6 @@
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.*;
-import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.CloudRecordUtils;
@@ -111,6 +110,14 @@
@Autowired
private RedisGbPlayMsgListener redisGbPlayMsgListener;
+
+
+ @Qualifier("taskExecutor")
+ @Autowired
+ private ThreadPoolTaskExecutor taskExecutor;
+
+ @Autowired
+ private ZlmHttpHookSubscribe hookSubscribe;
@Autowired
private SSRCFactory ssrcFactory;
@@ -266,7 +273,6 @@
logger.warn("[鐐规挱] 鏈壘鍒板彲鐢ㄧ殑zlm deviceId: {},channelId:{}", deviceId, channelId);
throw new ControllerException(ErrorCode.ERROR100.getCode(), "鏈壘鍒板彲鐢ㄧ殑zlm");
}
-
Device device = redisCatchStorage.getDevice(deviceId);
if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE") && !mediaServerItem.isRtpEnable()) {
logger.warn("[鐐规挱] 鍗曠鍙f敹娴佹椂涓嶆敮鎸乀CP涓诲姩鏂瑰紡鏀舵祦 deviceId: {},channelId:{}", deviceId, channelId);
@@ -280,6 +286,8 @@
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
if (inviteInfo != null ) {
if (inviteInfo.getStreamInfo() == null) {
+ // 閲婃斁鐢熸垚鐨剆src锛屼娇鐢ㄤ笂涓�娆$敵璇风殑
+ ssrcFactory.releaseSsrc(mediaServerItem.getId(), ssrc);
// 鐐规挱鍙戣捣浜嗕絾鏄皻鏈垚鍔�, 浠呮敞鍐屽洖璋冪瓑寰呯粨鏋滃嵆鍙�
inviteStreamService.once(InviteSessionType.PLAY, deviceId, channelId, null, callback);
logger.info("[鐐规挱寮�濮媇 宸茬粡璇锋眰涓紝绛夊緟缁撴灉锛� deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
index 7a14940..a64b71a 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
@@ -633,4 +633,21 @@
public Map<String, StreamPushItem> getAllAppAndStreamMap() {
return streamPushMapper.getAllAppAndStreamMap();
}
+
+ @Override
+ public void updatePush(OnStreamChangedHookParam param) {
+ StreamPushItem transform = transform(param);
+ StreamPushItem pushInDb = getPush(param.getApp(), param.getStream());
+ transform.setPushIng(param.isRegist());
+ transform.setUpdateTime(DateUtil.getNow());
+ transform.setPushTime(DateUtil.getNow());
+ transform.setSelf(userSetting.getServerId().equals(param.getSeverId()));
+ if (pushInDb == null) {
+ transform.setCreateTime(DateUtil.getNow());
+ streamPushMapper.add(transform);
+ }else {
+ streamPushMapper.update(transform);
+ gbStreamMapper.updateMediaServer(param.getApp(), param.getStream(), param.getMediaServerId());
+ }
+ }
}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java
new file mode 100644
index 0000000..b4bd72c
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java
@@ -0,0 +1,22 @@
+package com.genersoft.iot.vmp.service.redisMsg;
+
+import com.genersoft.iot.vmp.common.CommonCallback;
+import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
+import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
+
+public interface IRedisRpcService {
+
+ SendRtpItem getSendRtpItem(String sendRtpItemKey);
+
+ WVPResult startSendRtp(String sendRtpItemKey, SendRtpItem sendRtpItem);
+
+ WVPResult stopSendRtp(String sendRtpItemKey);
+
+ long waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback<String> callback);
+
+ void stopWaitePushStreamOnline(SendRtpItem sendRtpItem);
+
+ void rtpSendStopped(String sendRtpItemKey);
+
+ void removeCallback(long key);
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
deleted file mode 100755
index 68595a8..0000000
--- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
+++ /dev/null
@@ -1,451 +0,0 @@
-package com.genersoft.iot.vmp.service.redisMsg;
-
-import com.alibaba.fastjson2.JSON;
-import com.alibaba.fastjson2.JSONObject;
-import com.genersoft.iot.vmp.common.VideoManagerConstants;
-import com.genersoft.iot.vmp.conf.DynamicTask;
-import com.genersoft.iot.vmp.conf.UserSetting;
-import com.genersoft.iot.vmp.conf.exception.ControllerException;
-import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
-import com.genersoft.iot.vmp.media.bean.MediaServer;
-import com.genersoft.iot.vmp.media.event.hook.Hook;
-import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
-import com.genersoft.iot.vmp.media.event.hook.HookType;
-import com.genersoft.iot.vmp.media.service.IMediaServerService;
-import com.genersoft.iot.vmp.service.bean.*;
-import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
-import com.genersoft.iot.vmp.utils.redis.RedisUtil;
-import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.data.redis.connection.Message;
-import org.springframework.data.redis.connection.MessageListener;
-import org.springframework.data.redis.core.RedisTemplate;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-import org.springframework.stereotype.Component;
-
-import java.text.ParseException;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-
-/**
- * 鐩戝惉涓嬬骇鍙戦�佹帹閫佷俊鎭紝骞跺彂閫佸浗鏍囨帹娴佹秷鎭笂绾�
- * @author lin
- */
-@Component
-public class RedisGbPlayMsgListener implements MessageListener {
-
- private final static Logger logger = LoggerFactory.getLogger(RedisGbPlayMsgListener.class);
-
- public static final String WVP_PUSH_STREAM_KEY = "WVP_PUSH_STREAM";
-
- /**
- * 娴佸獟浣撲笉瀛樺湪鐨勯敊璇帥
- */
- public static final int ERROR_CODE_MEDIA_SERVER_NOT_FOUND = -1;
-
- /**
- * 绂荤嚎鐨勯敊璇帥
- */
- public static final int ERROR_CODE_OFFLINE = -2;
-
- /**
- * 瓒呮椂鐨勯敊璇帥
- */
- public static final int ERROR_CODE_TIMEOUT = -3;
-
- private final Map<String, PlayMsgCallback> callbacks = new ConcurrentHashMap<>();
- private final Map<String, PlayMsgCallbackForStartSendRtpStream> callbacksForStartSendRtpStream = new ConcurrentHashMap<>();
- private final Map<String, PlayMsgErrorCallback> callbacksForError = new ConcurrentHashMap<>();
-
- @Autowired
- private UserSetting userSetting;
-
-
- @Autowired
- private RedisTemplate<Object, Object> redisTemplate;
-
- @Autowired
- private IMediaServerService mediaServerService;
-
- @Autowired
- private IRedisCatchStorage redisCatchStorage;
-
-
- @Autowired
- private DynamicTask dynamicTask;
-
-
- @Autowired
- private HookSubscribe subscribe;
-
- private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
-
- @Qualifier("taskExecutor")
- @Autowired
- private ThreadPoolTaskExecutor taskExecutor;
-
-
- public interface PlayMsgCallback{
- void handler(ResponseSendItemMsg responseSendItemMsg) throws ParseException;
- }
-
- public interface PlayMsgCallbackForStartSendRtpStream{
- void handler();
- }
-
- public interface PlayMsgErrorCallback{
- void handler(WVPResult wvpResult);
- }
-
- @Override
- public void onMessage(Message message, byte[] bytes) {
- boolean isEmpty = taskQueue.isEmpty();
- taskQueue.offer(message);
- if (isEmpty) {
- taskExecutor.execute(() -> {
- while (!taskQueue.isEmpty()) {
- Message msg = taskQueue.poll();
- try {
- WvpRedisMsg wvpRedisMsg = JSON.parseObject(msg.getBody(), WvpRedisMsg.class);
- logger.info("[鏀跺埌REDIS閫氱煡] 娑堟伅锛� {}", JSON.toJSONString(wvpRedisMsg));
- if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) {
- continue;
- }
- if (WvpRedisMsg.isRequest(wvpRedisMsg)) {
- logger.info("[鏀跺埌REDIS閫氱煡] 璇锋眰锛� {}", new String(msg.getBody()));
-
- switch (wvpRedisMsg.getCmd()){
- case WvpRedisMsgCmd.GET_SEND_ITEM:
- RequestSendItemMsg content = JSON.parseObject(wvpRedisMsg.getContent(), RequestSendItemMsg.class);
- requestSendItemMsgHand(content, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
- break;
- case WvpRedisMsgCmd.REQUEST_PUSH_STREAM:
- RequestPushStreamMsg param = JSON.to(RequestPushStreamMsg.class, wvpRedisMsg.getContent());
- requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
- break;
- case WvpRedisMsgCmd.REQUEST_STOP_PUSH_STREAM:
- RequestStopPushStreamMsg streamMsg = JSON.to(RequestStopPushStreamMsg.class, wvpRedisMsg.getContent());
- requestStopPushStreamMsgHand(streamMsg, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
- break;
- default:
- break;
- }
-
- }else {
- logger.info("[鏀跺埌REDIS閫氱煡] 鍥炲锛� {}", new String(msg.getBody()));
- switch (wvpRedisMsg.getCmd()){
- case WvpRedisMsgCmd.GET_SEND_ITEM:
-
- WVPResult content = JSON.to(WVPResult.class, wvpRedisMsg.getContent());
-
- String key = wvpRedisMsg.getSerial();
- switch (content.getCode()) {
- case 0:
- ResponseSendItemMsg responseSendItemMsg =JSON.to(ResponseSendItemMsg.class, content.getData());
- PlayMsgCallback playMsgCallback = callbacks.get(key);
- if (playMsgCallback != null) {
- callbacksForError.remove(key);
- try {
- playMsgCallback.handler(responseSendItemMsg);
- } catch (ParseException e) {
- logger.error("[REDIS娑堟伅澶勭悊寮傚父] ", e);
- }
- }
- break;
- case ERROR_CODE_MEDIA_SERVER_NOT_FOUND:
- case ERROR_CODE_OFFLINE:
- case ERROR_CODE_TIMEOUT:
- PlayMsgErrorCallback errorCallback = callbacksForError.get(key);
- if (errorCallback != null) {
- callbacks.remove(key);
- errorCallback.handler(content);
- }
- break;
- default:
- break;
- }
- break;
- case WvpRedisMsgCmd.REQUEST_PUSH_STREAM:
- WVPResult wvpResult = JSON.to(WVPResult.class, wvpRedisMsg.getContent());
- String serial = wvpRedisMsg.getSerial();
- switch (wvpResult.getCode()) {
- case 0:
- PlayMsgCallbackForStartSendRtpStream playMsgCallback = callbacksForStartSendRtpStream.get(serial);
- if (playMsgCallback != null) {
- callbacksForError.remove(serial);
- playMsgCallback.handler();
- }
- break;
- case ERROR_CODE_MEDIA_SERVER_NOT_FOUND:
- case ERROR_CODE_OFFLINE:
- case ERROR_CODE_TIMEOUT:
- PlayMsgErrorCallback errorCallback = callbacksForError.get(serial);
- if (errorCallback != null) {
- callbacks.remove(serial);
- errorCallback.handler(wvpResult);
- }
- break;
- default:
- break;
- }
- break;
- default:
- break;
- }
-
- }
- }catch (Exception e) {
- logger.warn("[RedisGbPlayMsg] 鍙戠幇鏈鐞嗙殑寮傚父, \r\n{}", JSON.toJSONString(message));
- logger.error("[RedisGbPlayMsg] 寮傚父鍐呭锛� ", e);
- }
- }
- });
- }
- }
-
- /**
- * 澶勭悊鏀跺埌鐨勮姹傛帹娴佺殑璇锋眰
- */
- private void requestPushStreamMsgHand(RequestPushStreamMsg requestPushStreamMsg, String fromId, String serial) {
- MediaServer mediaServer = mediaServerService.getOne(requestPushStreamMsg.getMediaServerId());
- if (mediaServer == null) {
- // TODO 鍥炲閿欒
- return;
- }
- SendRtpItem sendRtpItem = SendRtpItem.getInstance(requestPushStreamMsg);
-
- try {
- mediaServerService.startSendRtp(mediaServer, null, sendRtpItem);
- }catch (ControllerException e) {
- return;
- }
-
- // 鍥炲娑堟伅
- WVPResult<JSONObject> result = new WVPResult<>();
- result.setCode(0);
-
- WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), fromId,
- WvpRedisMsgCmd.REQUEST_PUSH_STREAM, serial, JSON.toJSONString(result));
- JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
- redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
- }
-
- /**
- * 澶勭悊鏀跺埌鐨勮姹俿endItem鐨勮姹�
- */
- private void requestSendItemMsgHand(RequestSendItemMsg content, String toId, String serial) {
- MediaServer mediaServerItem = mediaServerService.getOne(content.getMediaServerId());
- if (mediaServerItem == null) {
- logger.info("[鍥炲鎺ㄦ祦淇℃伅] 娴佸獟浣搟}涓嶅瓨鍦� ", content.getMediaServerId());
-
- WVPResult<SendRtpItem> result = new WVPResult<>();
- result.setCode(ERROR_CODE_MEDIA_SERVER_NOT_FOUND);
- result.setMsg("娴佸獟浣撲笉瀛樺湪");
-
- WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), toId,
- WvpRedisMsgCmd.GET_SEND_ITEM, serial, JSON.toJSONString(result));
-
- JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
- redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
- return;
- }
- // 纭畾娴佹槸鍚﹀湪绾�
- Boolean streamReady = mediaServerService.isStreamReady(mediaServerItem, content.getApp(), content.getStream());
- if (streamReady != null && streamReady) {
- logger.info("[鍥炲鎺ㄦ祦淇℃伅] {}/{}", content.getApp(), content.getStream());
- responseSendItem(mediaServerItem, content, toId, serial);
- }else {
- // 娴佸凡缁忕绾�
- // 鍙戦�乺edis娑堟伅浠ヤ娇璁惧涓婄嚎
- logger.info("[ app={}, stream={} ]閫氶亾绂荤嚎锛屽彂閫乺edis淇℃伅鎺у埗璁惧寮�濮嬫帹娴�",content.getApp(), content.getStream());
-
- String taskKey = UUID.randomUUID().toString();
- // 璁剧疆瓒呮椂
- dynamicTask.startDelay(taskKey, ()->{
- logger.info("[ app={}, stream={} ] 绛夊緟璁惧寮�濮嬫帹娴佽秴鏃�", content.getApp(), content.getStream());
- WVPResult<SendRtpItem> result = new WVPResult<>();
- result.setCode(ERROR_CODE_TIMEOUT);
- WvpRedisMsg response = WvpRedisMsg.getResponseInstance(
- userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, JSON.toJSONString(result)
- );
- JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
- redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
- }, userSetting.getPlatformPlayTimeout());
-
- // 娣诲姞璁㈤槄
- Hook hook = Hook.getInstance(HookType.on_media_arrival, content.getApp(), content.getStream(), content.getMediaServerId());
- subscribe.addSubscribe(hook, (hookData)->{
- dynamicTask.stop(taskKey);
- responseSendItem(mediaServerItem, content, toId, serial);
- });
-
- MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1, content.getApp(), content.getStream(),
- content.getChannelId(), content.getPlatformId(), content.getPlatformName(), content.getServerId(),
- content.getMediaServerId());
-
- String key = VideoManagerConstants.VM_MSG_STREAM_PUSH_REQUESTED;
- logger.info("[redis鍙戦�侀�氱煡] 鎺ㄦ祦琚姹� {}: {}/{}", key, messageForPushChannel.getApp(), messageForPushChannel.getStream());
- redisTemplate.convertAndSend(key, JSON.toJSON(messageForPushChannel));
- }
- }
-
- /**
- * 灏嗚幏鍙栧埌鐨剆endItem鍙戦�佸嚭鍘�
- */
- private void responseSendItem(MediaServer mediaServerItem, RequestSendItemMsg content, String toId, String serial) {
- SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, content.getIp(),
- content.getPort(), content.getSsrc(), content.getPlatformId(),
- content.getApp(), content.getStream(), content.getChannelId(),
- content.getTcp(), content.getRtcp());
-
- WVPResult<ResponseSendItemMsg> result = new WVPResult<>();
- result.setCode(0);
- ResponseSendItemMsg responseSendItemMsg = new ResponseSendItemMsg();
- responseSendItemMsg.setSendRtpItem(sendRtpItem);
- responseSendItemMsg.setMediaServerItem(mediaServerItem);
- result.setData(responseSendItemMsg);
- redisCatchStorage.updateSendRTPSever(sendRtpItem);
-
- WvpRedisMsg response = WvpRedisMsg.getResponseInstance(
- userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, JSON.toJSONString(result)
- );
- JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
- redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
- }
-
- /**
- * 鍙戦�佹秷鎭姹備笅绾х敓鎴愭帹娴佷俊鎭�
- * @param serverId 涓嬬骇鏈嶅姟ID
- * @param app 搴旂敤鍚�
- * @param stream 娴両D
- * @param ip 鐩爣IP
- * @param port 鐩爣绔彛
- * @param ssrc ssrc
- * @param platformId 骞冲彴鍥芥爣缂栧彿
- * @param channelId 閫氶亾ID
- * @param isTcp 鏄惁浣跨敤TCP
- * @param callback 寰楀埌淇℃伅鐨勫洖璋�
- */
- public void sendMsg(String serverId, String mediaServerId, String app, String stream, String ip, int port, String ssrc,
- String platformId, String channelId, boolean isTcp, boolean rtcp, String platformName, PlayMsgCallback callback, PlayMsgErrorCallback errorCallback) {
- RequestSendItemMsg requestSendItemMsg = RequestSendItemMsg.getInstance(
- serverId, mediaServerId, app, stream, ip, port, ssrc, platformId, channelId, isTcp, rtcp, platformName);
- requestSendItemMsg.setServerId(serverId);
- String key = UUID.randomUUID().toString();
- WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId, WvpRedisMsgCmd.GET_SEND_ITEM,
- key, JSON.toJSONString(requestSendItemMsg));
-
- JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg);
- logger.info("[璇锋眰鎺ㄦ祦SendItem] {}: {}", serverId, jsonObject);
- callbacks.put(key, callback);
- callbacksForError.put(key, errorCallback);
- dynamicTask.startDelay(key, ()->{
- callbacks.remove(key);
- callbacksForError.remove(key);
- WVPResult<Object> wvpResult = new WVPResult<>();
- wvpResult.setCode(ERROR_CODE_TIMEOUT);
- wvpResult.setMsg("timeout");
- errorCallback.handler(wvpResult);
- }, userSetting.getPlatformPlayTimeout());
- redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
- }
-
- /**
- * 鍙戦�佽姹傛帹娴佺殑娑堟伅
- * @param param 鎺ㄦ祦鍙傛暟
- * @param callback 鍥炶皟
- */
- public void sendMsgForStartSendRtpStream(String serverId, RequestPushStreamMsg param, PlayMsgCallbackForStartSendRtpStream callback) {
- String key = UUID.randomUUID().toString();
- WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId,
- WvpRedisMsgCmd.REQUEST_PUSH_STREAM, key, JSON.toJSONString(param));
-
- JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg);
- logger.info("[REDIS 璇锋眰鍏朵粬骞冲彴鎺ㄦ祦] {}: {}", serverId, jsonObject);
- dynamicTask.startDelay(key, ()->{
- callbacksForStartSendRtpStream.remove(key);
- callbacksForError.remove(key);
- }, userSetting.getPlatformPlayTimeout());
- callbacksForStartSendRtpStream.put(key, callback);
- callbacksForError.put(key, (wvpResult)->{
- logger.info("[REDIS 璇锋眰鍏朵粬骞冲彴鎺ㄦ祦] 澶辫触: {}", wvpResult.getMsg());
- callbacksForStartSendRtpStream.remove(key);
- callbacksForError.remove(key);
- });
- redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
- }
-
- /**
- * 鍙戦�佽姹傛帹娴佺殑娑堟伅
- */
- public void sendMsgForStopSendRtpStream(String serverId, RequestStopPushStreamMsg streamMsg) {
- String key = UUID.randomUUID().toString();
- WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId,
- WvpRedisMsgCmd.REQUEST_STOP_PUSH_STREAM, key, JSON.toJSONString(streamMsg));
-
- JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg);
- logger.info("[REDIS 璇锋眰鍏朵粬骞冲彴鍋滄鎺ㄦ祦] {}: {}", serverId, jsonObject);
- redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
- }
-
- private SendRtpItem querySendRTPServer(String platformGbId, String channelId, String streamId, String callId) {
- if (platformGbId == null) {
- platformGbId = "*";
- }
- if (channelId == null) {
- channelId = "*";
- }
- if (streamId == null) {
- streamId = "*";
- }
- if (callId == null) {
- callId = "*";
- }
- String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX
- + userSetting.getServerId() + "_*_"
- + platformGbId + "_"
- + channelId + "_"
- + streamId + "_"
- + callId;
- List<Object> scan = RedisUtil.scan(redisTemplate, key);
- if (scan.size() > 0) {
- return (SendRtpItem)redisTemplate.opsForValue().get(scan.get(0));
- }else {
- return null;
- }
- }
-
- /**
- * 澶勭悊鏀跺埌鐨勮姹傛帹娴佺殑璇锋眰
- */
- private void requestStopPushStreamMsgHand(RequestStopPushStreamMsg streamMsg, String fromId, String serial) {
- SendRtpItem sendRtpItem = streamMsg.getSendRtpItem();
- if (sendRtpItem == null) {
- logger.info("[REDIS 鎵ц鍏朵粬骞冲彴鐨勮姹傚仠姝㈡帹娴乚 澶辫触锛� sendRtpItem涓篘ULL");
- return;
- }
- MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
- if (mediaInfo == null) {
- // TODO 鍥炲閿欒
- return;
- }
-
- if (mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc())) {
- logger.info("[REDIS 鎵ц鍏朵粬骞冲彴鐨勮姹傚仠姝㈡帹娴乚 鎴愬姛锛� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
- // 鍙戦�乺edis娑堟伅
- MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
- sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
- sendRtpItem.getPlatformId(), streamMsg.getPlatformName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
- messageForPushChannel.setPlatFormIndex(streamMsg.getPlatFormIndex());
- redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
- }
-
- }
-}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java
deleted file mode 100755
index fcfeff3..0000000
--- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java
+++ /dev/null
@@ -1,111 +0,0 @@
-package com.genersoft.iot.vmp.service.redisMsg;
-
-import com.alibaba.fastjson2.JSON;
-import com.genersoft.iot.vmp.conf.UserSetting;
-import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType;
-import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
-import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
-import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
-import com.genersoft.iot.vmp.media.bean.MediaServer;
-import com.genersoft.iot.vmp.media.service.IMediaServerService;
-import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
-import com.genersoft.iot.vmp.service.IStreamPushService;
-import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
-import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse;
-import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
-import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.data.redis.connection.Message;
-import org.springframework.data.redis.connection.MessageListener;
-import org.springframework.stereotype.Component;
-
-import javax.sip.InvalidArgumentException;
-import javax.sip.SipException;
-import java.text.ParseException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * 鎺ユ敹redis鍙戦�佺殑缁撴潫鎺ㄦ祦璇锋眰
- * @author lin
- */
-@Component
-public class RedisPushStreamCloseResponseListener implements MessageListener {
-
- private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamCloseResponseListener.class);
-
- @Autowired
- private IStreamPushService streamPushService;
-
- @Autowired
- private IRedisCatchStorage redisCatchStorage;
-
- @Autowired
- private IVideoManagerStorage storager;
-
- @Autowired
- private ISIPCommanderForPlatform commanderFroPlatform;
-
- @Autowired
- private UserSetting userSetting;
-
- @Autowired
- private IMediaServerService mediaServerService;
-
-
- private Map<String, PushStreamResponseEvent> responseEvents = new ConcurrentHashMap<>();
-
- public interface PushStreamResponseEvent{
- void run(MessageForPushChannelResponse response);
- }
-
- @Override
- public void onMessage(Message message, byte[] bytes) {
- logger.info("[REDIS娑堟伅-鎺ㄦ祦缁撴潫]锛� {}", new String(message.getBody()));
- MessageForPushChannel pushChannel = JSON.parseObject(message.getBody(), MessageForPushChannel.class);
- StreamPushItem push = streamPushService.getPush(pushChannel.getApp(), pushChannel.getStream());
- if (push != null) {
- List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChannelId(
- push.getGbId());
- if (!sendRtpItems.isEmpty()) {
- for (SendRtpItem sendRtpItem : sendRtpItems) {
- ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
- if (parentPlatform != null) {
- redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStream());
- try {
- commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem);
- } catch (SipException | InvalidArgumentException | ParseException e) {
- logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鍙戦�丅YE: {}", e.getMessage());
- }
- }
- if (push.isSelf()) {
- // 鍋滄鍚戜笂绾ф帹娴�
- logger.info("[REDIS娑堟伅-鎺ㄦ祦缁撴潫] 鍋滄鍚戜笂绾ф帹娴侊細{}", sendRtpItem.getStream());
- MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
- redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStream());
- mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc());
- if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) {
- MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
- sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
- sendRtpItem.getPlatformId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
- messageForPushChannel.setPlatFormIndex(parentPlatform.getId());
- redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
- }
- }
- }
- }
- }
-
- }
-
- public void addEvent(String app, String stream, PushStreamResponseEvent callback) {
- responseEvents.put(app + stream, callback);
- }
-
- public void removeEvent(String app, String stream) {
- responseEvents.remove(app + stream);
- }
-}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java
old mode 100755
new mode 100644
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java
deleted file mode 100755
index 85709a7..0000000
--- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java
+++ /dev/null
@@ -1,97 +0,0 @@
-package com.genersoft.iot.vmp.service.redisMsg;
-
-import com.alibaba.fastjson2.JSON;
-import com.alibaba.fastjson2.JSONObject;
-import com.genersoft.iot.vmp.conf.UserSetting;
-import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
-import com.genersoft.iot.vmp.media.zlm.dto.ChannelOnlineEvent;
-import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.data.redis.connection.Message;
-import org.springframework.data.redis.connection.MessageListener;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-import org.springframework.stereotype.Component;
-
-import java.text.ParseException;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-
-/**
- * 鎺ユ敹鍏朵粬wvp鍙戦�佹祦鍙樺寲閫氱煡
- * @author lin
- */
-@Component
-public class RedisStreamMsgListener implements MessageListener {
-
- private final static Logger logger = LoggerFactory.getLogger(RedisStreamMsgListener.class);
-
- @Autowired
- private UserSetting userSetting;
-
- @Autowired
- private ZLMMediaListManager zlmMediaListManager;
-
- private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
-
- @Qualifier("taskExecutor")
- @Autowired
- private ThreadPoolTaskExecutor taskExecutor;
-
- @Override
- public void onMessage(Message message, byte[] bytes) {
- boolean isEmpty = taskQueue.isEmpty();
- taskQueue.offer(message);
- if (isEmpty) {
- taskExecutor.execute(() -> {
- while (!taskQueue.isEmpty()) {
- Message msg = taskQueue.poll();
- try {
- JSONObject steamMsgJson = JSON.parseObject(msg.getBody(), JSONObject.class);
- if (steamMsgJson == null) {
- logger.warn("[鏀跺埌redis 娴佸彉鍖朷娑堟伅瑙f瀽澶辫触");
- continue;
- }
- String serverId = steamMsgJson.getString("serverId");
-
- if (userSetting.getServerId().equals(serverId)) {
- // 鑷繁鍙戦�佺殑娑堟伅蹇界暐鍗冲彲
- continue;
- }
- logger.info("[鏀跺埌redis 娴佸彉鍖朷锛� {}", new String(message.getBody()));
- String app = steamMsgJson.getString("app");
- String stream = steamMsgJson.getString("stream");
- boolean register = steamMsgJson.getBoolean("register");
- String mediaServerId = steamMsgJson.getString("mediaServerId");
- OnStreamChangedHookParam onStreamChangedHookParam = new OnStreamChangedHookParam();
- onStreamChangedHookParam.setSeverId(serverId);
- onStreamChangedHookParam.setApp(app);
- onStreamChangedHookParam.setStream(stream);
- onStreamChangedHookParam.setRegist(register);
- onStreamChangedHookParam.setMediaServerId(mediaServerId);
- onStreamChangedHookParam.setCreateStamp(System.currentTimeMillis()/1000);
- onStreamChangedHookParam.setAliveSecond(0L);
- onStreamChangedHookParam.setTotalReaderCount(0);
- onStreamChangedHookParam.setOriginType(0);
- onStreamChangedHookParam.setOriginTypeStr("0");
- onStreamChangedHookParam.setOriginTypeStr("unknown");
- ChannelOnlineEvent channelOnlineEventLister = zlmMediaListManager.getChannelOnlineEventLister(app, stream);
- if ( channelOnlineEventLister != null) {
- try {
- channelOnlineEventLister.run(app, stream, serverId);;
- } catch (ParseException e) {
- logger.error("addPush: ", e);
- }
- zlmMediaListManager.removedChannelOnlineEventLister(app, stream);
- }
- }catch (Exception e) {
- logger.warn("[REDIS娑堟伅-娴佸彉鍖朷 鍙戠幇鏈鐞嗙殑寮傚父, \r\n{}", JSON.toJSONString(message));
- logger.error("[REDIS娑堟伅-娴佸彉鍖朷 寮傚父鍐呭锛� ", e);
- }
- }
- });
- }
- }
-}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java
new file mode 100644
index 0000000..af79204
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java
@@ -0,0 +1,304 @@
+package com.genersoft.iot.vmp.service.redisMsg.control;
+
+import com.alibaba.fastjson2.JSONObject;
+import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
+import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage;
+import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
+import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
+import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
+import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
+import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
+import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
+import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
+import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
+import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
+import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
+import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
+import com.genersoft.iot.vmp.service.IMediaServerService;
+import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
+import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
+import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
+import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Component;
+
+import javax.sip.InvalidArgumentException;
+import javax.sip.SipException;
+import java.text.ParseException;
+
+/**
+ * 鍏朵粬wvp鍙戣捣鐨剅pc璋冪敤锛岃繖閲岀殑鏂规硶琚� RedisRpcConfig 閫氳繃鍙嶅皠瀵绘壘瀵瑰簲鐨勬柟娉曞悕绉拌皟鐢�
+ */
+@Component
+public class RedisRpcController {
+
+ private final static Logger logger = LoggerFactory.getLogger(RedisRpcController.class);
+
+ @Autowired
+ private SSRCFactory ssrcFactory;
+
+ @Autowired
+ private IMediaServerService mediaServerService;
+
+ @Autowired
+ private SendRtpPortManager sendRtpPortManager;
+
+ @Autowired
+ private IRedisCatchStorage redisCatchStorage;
+
+ @Autowired
+ private UserSetting userSetting;
+
+ @Autowired
+ private ZlmHttpHookSubscribe hookSubscribe;
+
+ @Autowired
+ private ZLMServerFactory zlmServerFactory;
+
+
+ @Autowired
+ private RedisTemplate<Object, Object> redisTemplate;
+
+
+ @Autowired
+ private ISIPCommanderForPlatform commanderFroPlatform;
+
+
+ @Autowired
+ private IVideoManagerStorage storager;
+
+
+ /**
+ * 鑾峰彇鍙戞祦鐨勪俊鎭�
+ */
+ public RedisRpcResponse getSendRtpItem(RedisRpcRequest request) {
+ String sendRtpItemKey = request.getParam().toString();
+ SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(sendRtpItemKey);
+ if (sendRtpItem == null) {
+ logger.info("[redis-rpc] 鑾峰彇鍙戞祦鐨勪俊鎭�, 鏈壘鍒皉edis涓殑鍙戞祦淇℃伅锛� key锛歿}", sendRtpItemKey);
+ RedisRpcResponse response = request.getResponse();
+ response.setStatusCode(200);
+ return response;
+ }
+ logger.info("[redis-rpc] 鑾峰彇鍙戞祦鐨勪俊鎭細 {}/{}, 鐩爣鍦板潃锛� {}锛歿}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
+ // 鏌ヨ鏈骇鏄惁鏈夎繖涓祦
+ MediaServerItem mediaServerItem = mediaServerService.getMediaServerByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream());
+ if (mediaServerItem == null) {
+ RedisRpcResponse response = request.getResponse();
+ response.setStatusCode(200);
+ }
+ // 鑷钩鍙板唴瀹�
+ int localPort = sendRtpPortManager.getNextPort(mediaServerItem);
+ if (localPort == 0) {
+ logger.info("[redis-rpc] getSendRtpItem->鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�" );
+ RedisRpcResponse response = request.getResponse();
+ response.setStatusCode(200);
+ }
+ // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶�
+ sendRtpItem.setStatus(1);
+ sendRtpItem.setServerId(userSetting.getServerId());
+ sendRtpItem.setLocalIp(mediaServerItem.getSdpIp());
+ if (sendRtpItem.getSsrc() == null) {
+ // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡
+ String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
+ sendRtpItem.setSsrc(ssrc);
+ }
+ redisCatchStorage.updateSendRTPSever(sendRtpItem);
+ redisTemplate.opsForValue().set(sendRtpItemKey, sendRtpItem);
+ RedisRpcResponse response = request.getResponse();
+ response.setStatusCode(200);
+ response.setBody(sendRtpItemKey);
+ return response;
+ }
+
+ /**
+ * 鐩戝惉娴佷笂绾�
+ */
+ public RedisRpcResponse waitePushStreamOnline(RedisRpcRequest request) {
+ SendRtpItem sendRtpItem = JSONObject.parseObject(request.getParam().toString(), SendRtpItem.class);
+ logger.info("[redis-rpc] 鐩戝惉娴佷笂绾匡細 {}/{}, 鐩爣鍦板潃锛� {}锛歿}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
+ // 鏌ヨ鏈骇鏄惁鏈夎繖涓祦
+ MediaServerItem mediaServerItem = mediaServerService.getMediaServerByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream());
+ if (mediaServerItem != null) {
+ logger.info("[redis-rpc] 鐩戝惉娴佷笂绾挎椂鍙戠幇娴佸凡瀛樺湪鐩存帴杩斿洖锛� {}/{}, 鐩爣鍦板潃锛� {}锛歿}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
+ // 璇诲彇redis涓殑涓婄骇鐐规挱淇℃伅锛岀敓鎴恠endRtpItm鍙戦�佸嚭鍘�
+ if (sendRtpItem.getSsrc() == null) {
+ // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡
+ String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
+ sendRtpItem.setSsrc(ssrc);
+ }
+ sendRtpItem.setMediaServerId(mediaServerItem.getId());
+ sendRtpItem.setLocalIp(mediaServerItem.getSdpIp());
+ sendRtpItem.setServerId(userSetting.getServerId());
+
+ redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem);
+ RedisRpcResponse response = request.getResponse();
+ response.setBody(sendRtpItem.getRedisKey());
+ response.setStatusCode(200);
+ }
+ // 鐩戝惉娴佷笂绾裤�� 娴佷笂绾跨洿鎺ュ彂閫乻endRtpItem娑堟伅缁欏疄闄呯殑淇′护澶勭悊鑰�
+ HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed(
+ sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null);
+
+ hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> {
+ logger.info("[redis-rpc] 鐩戝惉娴佷笂绾匡紝娴佸凡涓婄嚎锛� {}/{}, 鐩爣鍦板潃锛� {}锛歿}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
+ // 璇诲彇redis涓殑涓婄骇鐐规挱淇℃伅锛岀敓鎴恠endRtpItm鍙戦�佸嚭鍘�
+ if (sendRtpItem.getSsrc() == null) {
+ // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡
+ String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItemInUse.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItemInUse.getId());
+ sendRtpItem.setSsrc(ssrc);
+ }
+ sendRtpItem.setMediaServerId(mediaServerItemInUse.getId());
+ sendRtpItem.setLocalIp(mediaServerItemInUse.getSdpIp());
+ sendRtpItem.setServerId(userSetting.getServerId());
+
+ redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem);
+ RedisRpcResponse response = request.getResponse();
+ response.setBody(sendRtpItem.getRedisKey());
+ response.setStatusCode(200);
+ // 鎵嬪姩鍙戦�佺粨鏋�
+ sendResponse(response);
+ hookSubscribe.removeSubscribe(hook);
+
+ });
+ return null;
+ }
+
+ /**
+ * 鍋滄鐩戝惉娴佷笂绾�
+ */
+ public RedisRpcResponse stopWaitePushStreamOnline(RedisRpcRequest request) {
+ SendRtpItem sendRtpItem = JSONObject.parseObject(request.getParam().toString(), SendRtpItem.class);
+ logger.info("[redis-rpc] 鍋滄鐩戝惉娴佷笂绾匡細 {}/{}, 鐩爣鍦板潃锛� {}锛歿}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
+ // 鐩戝惉娴佷笂绾裤�� 娴佷笂绾跨洿鎺ュ彂閫乻endRtpItem娑堟伅缁欏疄闄呯殑淇′护澶勭悊鑰�
+ HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed(
+ sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null);
+ hookSubscribe.removeSubscribe(hook);
+ RedisRpcResponse response = request.getResponse();
+ response.setStatusCode(200);
+ return response;
+ }
+
+
+ /**
+ * 寮�濮嬪彂娴�
+ */
+ public RedisRpcResponse startSendRtp(RedisRpcRequest request) {
+ String sendRtpItemKey = request.getParam().toString();
+ SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(sendRtpItemKey);
+ RedisRpcResponse response = request.getResponse();
+ response.setStatusCode(200);
+ if (sendRtpItem == null) {
+ logger.info("[redis-rpc] 寮�濮嬪彂娴�, 鏈壘鍒皉edis涓殑鍙戞祦淇℃伅锛� key锛歿}", sendRtpItemKey);
+ WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "鏈壘鍒皉edis涓殑鍙戞祦淇℃伅");
+ response.setBody(wvpResult);
+ return response;
+ }
+ logger.info("[redis-rpc] 寮�濮嬪彂娴侊細 {}/{}, 鐩爣鍦板潃锛� {}锛歿}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
+ MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId());
+ if (mediaServerItem == null) {
+ logger.info("[redis-rpc] startSendRtp->鏈壘鍒癕ediaServer锛� {}", sendRtpItem.getMediaServerId() );
+ WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "鏈壘鍒癕ediaServer");
+ response.setBody(wvpResult);
+ return response;
+ }
+
+ Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
+ if (!streamReady) {
+ logger.info("[redis-rpc] startSendRtp->娴佷笉鍦ㄧ嚎锛� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() );
+ WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "娴佷笉鍦ㄧ嚎");
+ response.setBody(wvpResult);
+ return response;
+ }
+ JSONObject jsonObject = zlmServerFactory.startSendRtp(mediaServerItem, sendRtpItem);
+ if (jsonObject.getInteger("code") == 0) {
+ logger.info("[redis-rpc] 鍙戞祦鎴愬姛锛� {}/{}, 鐩爣鍦板潃锛� {}锛歿}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
+ WVPResult wvpResult = WVPResult.success();
+ response.setBody(wvpResult);
+ }else {
+ logger.info("[redis-rpc] 鍙戞祦澶辫触锛� {}/{}, 鐩爣鍦板潃锛� {}锛歿}锛� {}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), jsonObject);
+ WVPResult wvpResult = WVPResult.fail(jsonObject.getInteger("code"), jsonObject.getString("msg"));
+ response.setBody(wvpResult);
+ }
+ return response;
+ }
+
+ /**
+ * 鍋滄鍙戞祦
+ */
+ public RedisRpcResponse stopSendRtp(RedisRpcRequest request) {
+ String sendRtpItemKey = request.getParam().toString();
+ SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(sendRtpItemKey);
+ RedisRpcResponse response = request.getResponse();
+ response.setStatusCode(200);
+ if (sendRtpItem == null) {
+ logger.info("[redis-rpc] 鍋滄鎺ㄦ祦, 鏈壘鍒皉edis涓殑鍙戞祦淇℃伅锛� key锛歿}", sendRtpItemKey);
+ WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "鏈壘鍒皉edis涓殑鍙戞祦淇℃伅");
+ response.setBody(wvpResult);
+ return response;
+ }
+ logger.info("[redis-rpc] 鍋滄鎺ㄦ祦锛� {}/{}, 鐩爣鍦板潃锛� {}锛歿}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
+ MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId());
+ if (mediaServerItem == null) {
+ logger.info("[redis-rpc] stopSendRtp->鏈壘鍒癕ediaServer锛� {}", sendRtpItem.getMediaServerId() );
+ WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "鏈壘鍒癕ediaServer");
+ response.setBody(wvpResult);
+ return response;
+ }
+ JSONObject jsonObject = zlmServerFactory.stopSendRtpStream(mediaServerItem, sendRtpItem);
+ if (jsonObject.getInteger("code") == 0) {
+ logger.info("[redis-rpc] 鍋滄鎺ㄦ祦鎴愬姛锛� {}/{}, 鐩爣鍦板潃锛� {}锛歿}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
+ response.setBody(WVPResult.success());
+ return response;
+ }else {
+ int code = jsonObject.getInteger("code");
+ String msg = jsonObject.getString("msg");
+ logger.info("[redis-rpc] 鍋滄鎺ㄦ祦澶辫触锛� {}/{}, 鐩爣鍦板潃锛� {}锛歿}锛� code锛� {}, msg: {}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), code, msg );
+ response.setBody(WVPResult.fail(code, msg));
+ return response;
+ }
+ }
+
+ /**
+ * 鍏朵粬wvp閫氱煡鎺ㄦ祦宸茬粡鍋滄浜�
+ */
+ public RedisRpcResponse rtpSendStopped(RedisRpcRequest request) {
+ String sendRtpItemKey = request.getParam().toString();
+ SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(sendRtpItemKey);
+ RedisRpcResponse response = request.getResponse();
+ response.setStatusCode(200);
+ if (sendRtpItem == null) {
+ logger.info("[redis-rpc] 鎺ㄦ祦宸茬粡鍋滄, 鏈壘鍒皉edis涓殑鍙戞祦淇℃伅锛� key锛歿}", sendRtpItemKey);
+ return response;
+ }
+ logger.info("[redis-rpc] 鎺ㄦ祦宸茬粡鍋滄锛� {}/{}, 鐩爣鍦板潃锛� {}锛歿}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
+ String platformId = sendRtpItem.getPlatformId();
+ ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
+ if (platform == null) {
+ return response;
+ }
+ try {
+ commanderFroPlatform.streamByeCmd(platform, sendRtpItem);
+ redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(),
+ sendRtpItem.getCallId(), sendRtpItem.getStream());
+ redisCatchStorage.sendPlatformStopPlayMsg(sendRtpItem, platform);
+ } catch (SipException | InvalidArgumentException | ParseException e) {
+ logger.error("[鍛戒护鍙戦�佸け璐 鍙戦�丅YE: {}", e.getMessage());
+ }
+ return response;
+ }
+
+ private void sendResponse(RedisRpcResponse response){
+ logger.info("[redis-rpc] >> {}", response);
+ response.setToId(userSetting.getServerId());
+ RedisRpcMessage message = new RedisRpcMessage();
+ message.setResponse(response);
+ redisTemplate.convertAndSend(RedisRpcConfig.REDIS_REQUEST_CHANNEL_KEY, message);
+ }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java
new file mode 100644
index 0000000..8ffb562
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java
@@ -0,0 +1,155 @@
+package com.genersoft.iot.vmp.service.redisMsg.service;
+
+import com.alibaba.fastjson2.JSON;
+import com.genersoft.iot.vmp.common.CommonCallback;
+import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
+import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
+import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
+import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
+import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
+import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
+import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
+import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
+import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
+import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
+import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Service;
+
+@Service
+public class RedisRpcServiceImpl implements IRedisRpcService {
+
+ private final static Logger logger = LoggerFactory.getLogger(RedisRpcServiceImpl.class);
+
+ @Autowired
+ private RedisRpcConfig redisRpcConfig;
+
+ @Autowired
+ private UserSetting userSetting;
+
+ @Autowired
+ private ZlmHttpHookSubscribe hookSubscribe;
+
+ @Autowired
+ private SSRCFactory ssrcFactory;
+
+ @Autowired
+ private RedisTemplate<Object, Object> redisTemplate;
+
+ private RedisRpcRequest buildRequest(String uri, Object param) {
+ RedisRpcRequest request = new RedisRpcRequest();
+ request.setFromId(userSetting.getServerId());
+ request.setParam(param);
+ request.setUri(uri);
+ return request;
+ }
+
+ @Override
+ public SendRtpItem getSendRtpItem(String sendRtpItemKey) {
+ RedisRpcRequest request = buildRequest("getSendRtpItem", sendRtpItemKey);
+ RedisRpcResponse response = redisRpcConfig.request(request, 10);
+ if (response.getBody() == null) {
+ return null;
+ }
+ return (SendRtpItem)redisTemplate.opsForValue().get(response.getBody().toString());
+ }
+
+ @Override
+ public WVPResult startSendRtp(String sendRtpItemKey, SendRtpItem sendRtpItem) {
+ logger.info("[璇锋眰鍏朵粬WVP] 寮�濮嬫帹娴侊紝wvp锛歿}锛� {}/{}", sendRtpItem.getServerId(), sendRtpItem.getApp(), sendRtpItem.getStream());
+ RedisRpcRequest request = buildRequest("startSendRtp", sendRtpItemKey);
+ request.setToId(sendRtpItem.getServerId());
+ RedisRpcResponse response = redisRpcConfig.request(request, 10);
+ return JSON.parseObject(response.getBody().toString(), WVPResult.class);
+ }
+
+ @Override
+ public WVPResult stopSendRtp(String sendRtpItemKey) {
+ SendRtpItem sendRtpItem = (SendRtpItem)redisTemplate.opsForValue().get(sendRtpItemKey);
+ if (sendRtpItem == null) {
+ logger.info("[璇锋眰鍏朵粬WVP] 鍋滄鎺ㄦ祦, 鏈壘鍒皉edis涓殑鍙戞祦淇℃伅锛� key锛歿}", sendRtpItemKey);
+ return WVPResult.fail(ErrorCode.ERROR100.getCode(), "鏈壘鍒板彂娴佷俊鎭�");
+ }
+ logger.info("[璇锋眰鍏朵粬WVP] 鍋滄鎺ㄦ祦锛寃vp锛歿}锛� {}/{}", sendRtpItem.getServerId(), sendRtpItem.getApp(), sendRtpItem.getStream());
+ RedisRpcRequest request = buildRequest("stopSendRtp", sendRtpItemKey);
+ request.setToId(sendRtpItem.getServerId());
+ RedisRpcResponse response = redisRpcConfig.request(request, 10);
+ return JSON.parseObject(response.getBody().toString(), WVPResult.class);
+ }
+
+ @Override
+ public long waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback<String> callback) {
+ logger.info("[璇锋眰鎵�鏈塛VP鐩戝惉娴佷笂绾縘 {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
+ // 鐩戝惉娴佷笂绾裤�� 娴佷笂绾跨洿鎺ュ彂閫乻endRtpItem娑堟伅缁欏疄闄呯殑淇′护澶勭悊鑰�
+ HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed(
+ sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null);
+ RedisRpcRequest request = buildRequest("waitePushStreamOnline", sendRtpItem);
+ request.setToId(sendRtpItem.getServerId());
+ hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> {
+
+ // 璇诲彇redis涓殑涓婄骇鐐规挱淇℃伅锛岀敓鎴恠endRtpItm鍙戦�佸嚭鍘�
+ if (sendRtpItem.getSsrc() == null) {
+ // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡
+ String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItemInUse.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItemInUse.getId());
+ sendRtpItem.setSsrc(ssrc);
+ }
+ sendRtpItem.setMediaServerId(mediaServerItemInUse.getId());
+ sendRtpItem.setLocalIp(mediaServerItemInUse.getSdpIp());
+ sendRtpItem.setServerId(userSetting.getServerId());
+ redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem);
+ if (callback != null) {
+ callback.run(sendRtpItem.getRedisKey());
+ }
+ hookSubscribe.removeSubscribe(hook);
+ redisRpcConfig.removeCallback(request.getSn());
+ });
+
+ redisRpcConfig.request(request, response -> {
+ if (response.getBody() == null) {
+ logger.info("[璇锋眰鎵�鏈塛VP鐩戝惉娴佷笂绾縘 娴佷笂绾�,浣嗘槸鏈壘鍒板彂娴佷俊鎭細{}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
+ return;
+ }
+ logger.info("[璇锋眰鎵�鏈塛VP鐩戝惉娴佷笂绾縘 娴佷笂绾� {}/{}->{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.toString());
+
+ if (callback != null) {
+ callback.run(response.getBody().toString());
+ }
+ hookSubscribe.removeSubscribe(hook);
+ });
+ return request.getSn();
+ }
+
+ @Override
+ public void stopWaitePushStreamOnline(SendRtpItem sendRtpItem) {
+ logger.info("[鍋滄WVP鐩戝惉娴佷笂绾縘 {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
+ HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed(
+ sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null);
+ hookSubscribe.removeSubscribe(hook);
+ RedisRpcRequest request = buildRequest("stopWaitePushStreamOnline", sendRtpItem);
+ request.setToId(sendRtpItem.getServerId());
+ redisRpcConfig.request(request, 10);
+ }
+
+ @Override
+ public void rtpSendStopped(String sendRtpItemKey) {
+ SendRtpItem sendRtpItem = (SendRtpItem)redisTemplate.opsForValue().get(sendRtpItemKey);
+ if (sendRtpItem == null) {
+ logger.info("[鍋滄WVP鐩戝惉娴佷笂绾縘 鏈壘鍒皉edis涓殑鍙戞祦淇℃伅锛� key锛歿}", sendRtpItemKey);
+ return;
+ }
+ RedisRpcRequest request = buildRequest("rtpSendStopped", sendRtpItemKey);
+ request.setToId(sendRtpItem.getServerId());
+ redisRpcConfig.request(request, 10);
+ }
+
+ @Override
+ public void removeCallback(long key) {
+ redisRpcConfig.removeCallback(key);
+ }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
index 1233623..21911b9 100755
--- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
@@ -45,6 +45,8 @@
void updateSendRTPSever(SendRtpItem sendRtpItem);
+ List<SendRtpItem> querySendRTPServer(String platformGbId, String channelId, String streamId);
+
/**
* 鏌ヨRTP鎺ㄩ�佷俊鎭紦瀛�
* @param platformGbId
@@ -197,6 +199,8 @@
void addDiskInfo(List<Map<String, Object>> diskInfo);
+ void deleteSendRTPServer(SendRtpItem sendRtpItem);
+
List<SendRtpItem> queryAllSendRTPServer();
List<Device> getAllDevices();
@@ -209,7 +213,7 @@
void sendPlatformStartPlayMsg(MessageForPushChannel messageForPushChannel);
- void sendPlatformStopPlayMsg(MessageForPushChannel messageForPushChannel);
+ void sendPlatformStopPlayMsg(SendRtpItem sendRtpItem, ParentPlatform platform);
void addPushListItem(String app, String stream, MediaArrivalEvent param);
@@ -219,4 +223,11 @@
void sendPushStreamClose(MessageForPushChannel messageForPushChannel);
+ void addWaiteSendRtpItem(SendRtpItem sendRtpItem, int platformPlayTimeout);
+
+ SendRtpItem getWaiteSendRtpItem(String app, String stream);
+
+ void sendStartSendRtp(SendRtpItem sendRtpItem);
+
+ void sendPushStreamOnline(SendRtpItem sendRtpItem);
}
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java
index c03d73a..10d0ee1 100755
--- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java
@@ -52,8 +52,8 @@
"<if test='status != null'>, status=#{status}</if>" +
"<if test='streamId != null'>, stream_id=#{streamId}</if>" +
"<if test='hasAudio != null'>, has_audio=#{hasAudio}</if>" +
- ", custom_longitude=#{longitude}" +
- ", custom_latitude=#{latitude}" +
+ "<if test='customLongitude != null'>, custom_longitude=#{customLongitude}</if>" +
+ "<if test='customLatitude != null'>, custom_latitude=#{customLatitude}</if>" +
"<if test='longitudeGcj02 != null'>, longitude_gcj02=#{longitudeGcj02}</if>" +
"<if test='latitudeGcj02 != null'>, latitude_gcj02=#{latitudeGcj02}</if>" +
"<if test='longitudeWgs84 != null'>, longitude_wgs84=#{longitudeWgs84}</if>" +
@@ -89,8 +89,10 @@
"dc.password, " +
"COALESCE(dc.custom_ptz_type, dc.ptz_type) AS ptz_type, " +
"dc.status, " +
- "COALESCE(dc.custom_longitude, dc.longitude) AS longitude, " +
- "COALESCE(dc.custom_latitude, dc.latitude) AS latitude, " +
+ "dc.longitude, " +
+ "dc.latitude, " +
+ "dc.custom_longitude, " +
+ "dc.custom_latitude, " +
"dc.stream_id, " +
"dc.device_id, " +
"dc.parental, " +
@@ -345,6 +347,8 @@
"<if test='item.hasAudio != null'>, has_audio=#{item.hasAudio}</if>" +
"<if test='item.longitude != null'>, longitude=#{item.longitude}</if>" +
"<if test='item.latitude != null'>, latitude=#{item.latitude}</if>" +
+ "<if test='item.customLongitude != null'>, custom_longitude=#{item.customLongitude}</if>" +
+ "<if test='item.customLatitude != null'>, custom_latitude=#{item.customLatitude}</if>" +
"<if test='item.longitudeGcj02 != null'>, longitude_gcj02=#{item.longitudeGcj02}</if>" +
"<if test='item.latitudeGcj02 != null'>, latitude_gcj02=#{item.latitudeGcj02}</if>" +
"<if test='item.longitudeWgs84 != null'>, longitude_wgs84=#{item.longitudeWgs84}</if>" +
@@ -397,6 +401,23 @@
" </script>"})
int updatePosition(DeviceChannel deviceChannel);
+ @Update({"<script>" +
+ "<foreach collection='deviceChannelList' item='item' separator=';'>" +
+ " UPDATE" +
+ " wvp_device_channel" +
+ " SET gps_time=#{item.gpsTime}" +
+ "<if test='item.longitude != null'>, longitude=#{item.longitude}</if>" +
+ "<if test='item.latitude != null'>, latitude=#{item.latitude}</if>" +
+ "<if test='item.longitudeGcj02 != null'>, longitude_gcj02=#{item.longitudeGcj02}</if>" +
+ "<if test='item.latitudeGcj02 != null'>, latitude_gcj02=#{item.latitudeGcj02}</if>" +
+ "<if test='item.longitudeWgs84 != null'>, longitude_wgs84=#{item.longitudeWgs84}</if>" +
+ "<if test='item.latitudeWgs84 != null'>, latitude_wgs84=#{item.latitudeWgs84}</if>" +
+ "WHERE device_id=#{item.deviceId} " +
+ " <if test='item.channelId != null' > AND channel_id=#{item.channelId}</if>" +
+ "</foreach>" +
+ "</script>"})
+ int batchUpdatePosition(List<DeviceChannel> deviceChannelList);
+
@Select("SELECT * FROM wvp_device_channel WHERE length(trim(stream_id)) > 0")
List<DeviceChannel> getAllChannelInPlay();
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java
index 7bf243c..c28b16e 100755
--- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java
@@ -33,4 +33,33 @@
@Delete("DELETE FROM wvp_device_mobile_position WHERE device_id = #{deviceId}")
int clearMobilePositionsByDeviceId(String deviceId);
+
+ @Insert("<script> " +
+ "insert into wvp_device_mobile_position " +
+ "(device_id,channel_id, device_name,time,longitude,latitude,altitude,speed,direction,report_source," +
+ "longitude_gcj02,latitude_gcj02,longitude_wgs84,latitude_wgs84,create_time)"+
+ "values " +
+ "<foreach collection='mobilePositions' index='index' item='item' separator=','> " +
+ "(#{item.deviceId}, #{item.channelId}, #{item.deviceName}, #{item.time}, #{item.longitude}, " +
+ "#{item.latitude}, #{item.altitude}, #{item.speed},#{item.direction}," +
+ "#{item.reportSource}, #{item.longitudeGcj02}, #{item.latitudeGcj02}, #{item.longitudeWgs84}, #{item.latitudeWgs84}, " +
+ "#{item.createTime}) " +
+ "</foreach> " +
+ "</script>")
+ void batchadd2(List<MobilePosition> mobilePositions);
+
+ @Insert("<script> " +
+ "<foreach collection='mobilePositions' index='index' item='item' separator=','> " +
+ "insert into wvp_device_mobile_position " +
+ "(device_id,channel_id, device_name,time,longitude,latitude,altitude,speed,direction,report_source," +
+ "longitude_gcj02,latitude_gcj02,longitude_wgs84,latitude_wgs84,create_time)"+
+ "values " +
+ "(#{item.deviceId}, #{item.channelId}, #{item.deviceName}, #{item.time}, #{item.longitude}, " +
+ "#{item.latitude}, #{item.altitude}, #{item.speed},#{item.direction}," +
+ "#{item.reportSource}, #{item.longitudeGcj02}, #{item.latitudeGcj02}, #{item.longitudeWgs84}, #{item.latitudeWgs84}, " +
+ "#{item.createTime}); " +
+ "</foreach> " +
+ "</script>")
+ void batchadd(List<MobilePosition> mobilePositions);
+
}
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
index 51878c7..6388932 100755
--- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -12,6 +12,8 @@
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
+import com.genersoft.iot.vmp.gb28181.bean.*;
+import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
@@ -146,15 +148,26 @@
@Override
public void updateSendRTPSever(SendRtpItem sendRtpItem) {
+ redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem);
+ }
- String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX +
- userSetting.getServerId() + "_"
- + sendRtpItem.getMediaServerId() + "_"
- + sendRtpItem.getPlatformId() + "_"
- + sendRtpItem.getChannelId() + "_"
- + sendRtpItem.getStream() + "_"
- + sendRtpItem.getCallId();
- redisTemplate.opsForValue().set(key, sendRtpItem);
+ @Override
+ public List<SendRtpItem> querySendRTPServer(String platformGbId, String channelId, String streamId) {
+ String scanKey = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX
+ + userSetting.getServerId() + "_*_"
+ + platformGbId + "_"
+ + channelId + "_"
+ + streamId + "_"
+ + "*";
+ List<SendRtpItem> result = new ArrayList<>();
+ List<Object> scan = RedisUtil.scan(redisTemplate, scanKey);
+ if (!scan.isEmpty()) {
+ for (Object o : scan) {
+ String key = (String) o;
+ result.add(JsonUtil.redisJsonToObject(redisTemplate, key, SendRtpItem.class));
+ }
+ }
+ return result;
}
@Override
@@ -172,7 +185,7 @@
callId = "*";
}
String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX
- + userSetting.getServerId() + "_*_"
+ + "*_*_"
+ platformGbId + "_"
+ channelId + "_"
+ streamId + "_"
@@ -268,9 +281,18 @@
List<Object> scan = RedisUtil.scan(redisTemplate, key);
if (scan.size() > 0) {
for (Object keyStr : scan) {
+ logger.info("[鍒犻櫎 redis鐨凷endRTP]锛� {}", keyStr.toString());
redisTemplate.delete(keyStr);
}
}
+ }
+
+ /**
+ * 鍒犻櫎RTP鎺ㄩ�佷俊鎭紦瀛�
+ */
+ @Override
+ public void deleteSendRTPServer(SendRtpItem sendRtpItem) {
+ deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),sendRtpItem.getCallId(), sendRtpItem.getStream());
}
@Override
@@ -555,7 +577,7 @@
@Override
public void sendMobilePositionMsg(JSONObject jsonObject) {
String key = VideoManagerConstants.VM_MSG_SUBSCRIBE_MOBILE_POSITION;
- logger.info("[redis鍙戦�侀�氱煡] 鍙戦�� 绉诲姩浣嶇疆 {}: {}", key, jsonObject.toString());
+// logger.info("[redis鍙戦�侀�氱煡] 鍙戦�� 绉诲姩浣嶇疆 {}: {}", key, jsonObject.toString());
redisTemplate.convertAndSend(key, jsonObject);
}
@@ -646,9 +668,15 @@
}
@Override
- public void sendPlatformStopPlayMsg(MessageForPushChannel msg) {
+ public void sendPlatformStopPlayMsg(SendRtpItem sendRtpItem, ParentPlatform platform) {
+
+ MessageForPushChannel msg = MessageForPushChannel.getInstance(0,
+ sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
+ sendRtpItem.getPlatformId(), platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
+ msg.setPlatFormIndex(platform.getId());
+
String key = VideoManagerConstants.VM_MSG_STREAM_STOP_PLAY_NOTIFY;
- logger.info("[redis鍙戦�侀�氱煡] 鍙戦�� 涓婄骇骞冲彴鍋滄瑙傜湅 {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId());
+ logger.info("[redis鍙戦�侀�氱煡] 鍙戦�� 涓婄骇骞冲彴鍋滄瑙傜湅 {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), platform.getServerGBId());
redisTemplate.convertAndSend(key, JSON.toJSON(msg));
}
@@ -681,4 +709,30 @@
logger.info("[redis鍙戦�侀�氱煡] 鍙戦�� 鍋滄鍚戜笂绾ф帹娴� {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId());
redisTemplate.convertAndSend(key, JSON.toJSON(msg));
}
+
+ @Override
+ public void addWaiteSendRtpItem(SendRtpItem sendRtpItem, int platformPlayTimeout) {
+ String key = VideoManagerConstants.WAITE_SEND_PUSH_STREAM + sendRtpItem.getApp() + "_" + sendRtpItem.getStream();
+ redisTemplate.opsForValue().set(key, sendRtpItem);
+ }
+
+ @Override
+ public SendRtpItem getWaiteSendRtpItem(String app, String stream) {
+ String key = VideoManagerConstants.WAITE_SEND_PUSH_STREAM + app + "_" + stream;
+ return JsonUtil.redisJsonToObject(redisTemplate, key, SendRtpItem.class);
+ }
+
+ @Override
+ public void sendStartSendRtp(SendRtpItem sendRtpItem) {
+ String key = VideoManagerConstants.START_SEND_PUSH_STREAM + sendRtpItem.getApp() + "_" + sendRtpItem.getStream();
+ logger.info("[redis鍙戦�侀�氱煡] 閫氱煡鍏朵粬WVP鎺ㄦ祦 {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getPlatformId());
+ redisTemplate.convertAndSend(key, JSON.toJSON(sendRtpItem));
+ }
+
+ @Override
+ public void sendPushStreamOnline(SendRtpItem sendRtpItem) {
+ String key = VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED;
+ logger.info("[redis鍙戦�侀�氱煡] 娴佷笂绾� {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getPlatformId());
+ redisTemplate.convertAndSend(key, JSON.toJSON(sendRtpItem));
+ }
}
diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/log/LogController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/log/LogController.java
index 7ad595d..59a6943 100755
--- a/src/main/java/com/genersoft/iot/vmp/vmanager/log/LogController.java
+++ b/src/main/java/com/genersoft/iot/vmp/vmanager/log/LogController.java
@@ -2,6 +2,7 @@
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
+import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
import com.genersoft.iot.vmp.conf.security.JwtUtils;
import com.genersoft.iot.vmp.service.ILogService;
import com.genersoft.iot.vmp.storager.dao.dto.LogDto;
@@ -92,4 +93,14 @@
logService.clear();
}
+ @Autowired
+ private RedisRpcConfig redisRpcConfig;
+
+ @GetMapping("/test/count")
+ public Object count() {
+ return redisRpcConfig.getCallbackCount();
+ }
+
+
+
}
diff --git a/web_src/src/components/channelList.vue b/web_src/src/components/channelList.vue
index 02e92e7..ef49205 100755
--- a/web_src/src/components/channelList.vue
+++ b/web_src/src/components/channelList.vue
@@ -326,7 +326,9 @@
e.ptzType = e.ptzType + "";
that.$set(e, "edit", false);
that.$set(e, "location", "");
- if (e.longitude && e.latitude) {
+ if (e.customLongitude && e.customLatitude) {
+ that.$set(e, "location", e.customLongitude + "," + e.customLatitude);
+ }else if (e.longitude && e.latitude) {
that.$set(e, "location", e.longitude + "," + e.latitude);
}
});
@@ -481,7 +483,9 @@
e.ptzType = e.ptzType + "";
this.$set(e, "edit", false);
this.$set(e, "location", "");
- if (e.longitude && e.latitude) {
+ if (e.customLongitude && e.customLatitude) {
+ this.$set(e, "location", e.customLongitude + "," + e.customLatitude);
+ }else if (e.longitude && e.latitude) {
this.$set(e, "location", e.longitude + "," + e.latitude);
}
});
@@ -603,8 +607,8 @@
this.$message.warning("浣嶇疆淇℃伅鏍煎紡鏈夎锛屼緥锛�117.234,36.378");
return;
} else {
- row.longitude = parseFloat(segements[0]);
- row.latitude = parseFloat(segements[1]);
+ row.customLongitude = parseFloat(segements[0]);
+ row.custom_latitude = parseFloat(segements[1]);
if (!(row.longitude && row.latitude)) {
this.$message.warning("浣嶇疆淇℃伅鏍煎紡鏈夎锛屼緥锛�117.234,36.378");
return;
diff --git "a/\346\225\260\346\215\256\345\272\223/2.7.0/\346\233\264\346\226\260-mysql-2.7.0.sql" "b/\346\225\260\346\215\256\345\272\223/2.7.0/\346\233\264\346\226\260-mysql-2.7.0.sql"
index c229fb1..b14a5c8 100644
--- "a/\346\225\260\346\215\256\345\272\223/2.7.0/\346\233\264\346\226\260-mysql-2.7.0.sql"
+++ "b/\346\225\260\346\215\256\345\272\223/2.7.0/\346\233\264\346\226\260-mysql-2.7.0.sql"
@@ -4,5 +4,15 @@
alter table wvp_device
drop switch_primary_sub_stream;
+# 绗竴涓ˉ涓佸寘
alter table wvp_platform
- add send_stream_ip character varying(50);
\ No newline at end of file
+ add send_stream_ip character varying(50);
+
+alter table wvp_device
+ change on_line on_line bool default false;
+
+alter table wvp_device
+ change id id serial primary key;
+
+alter table wvp_device
+ change ssrc_check ssrc_check bool default false;
\ No newline at end of file
diff --git "a/\346\225\260\346\215\256\345\272\223/2.7.0/\346\233\264\346\226\260-postgresql-kingbase-2.7.0.sql" "b/\346\225\260\346\215\256\345\272\223/2.7.0/\346\233\264\346\226\260-postgresql-kingbase-2.7.0.sql"
index c229fb1..b14a5c8 100644
--- "a/\346\225\260\346\215\256\345\272\223/2.7.0/\346\233\264\346\226\260-postgresql-kingbase-2.7.0.sql"
+++ "b/\346\225\260\346\215\256\345\272\223/2.7.0/\346\233\264\346\226\260-postgresql-kingbase-2.7.0.sql"
@@ -4,5 +4,15 @@
alter table wvp_device
drop switch_primary_sub_stream;
+# 绗竴涓ˉ涓佸寘
alter table wvp_platform
- add send_stream_ip character varying(50);
\ No newline at end of file
+ add send_stream_ip character varying(50);
+
+alter table wvp_device
+ change on_line on_line bool default false;
+
+alter table wvp_device
+ change id id serial primary key;
+
+alter table wvp_device
+ change ssrc_check ssrc_check bool default false;
\ No newline at end of file
--
Gitblit v1.8.0