From d41d6b34af2485198ed01e1888db1571e4da1a6a Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期二, 23 四月 2024 20:59:20 +0800 Subject: [PATCH] Merge branch 'refs/heads/2.7.0' --- src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java | 20 src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java | 17 src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcMessage.java | 24 src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java | 65 + src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java | 9 src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java | 60 + src/main/java/com/genersoft/iot/vmp/vmanager/log/LogController.java | 11 src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java | 29 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java | 2 src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcResponse.java | 99 + 数据库/2.7.0/更新-mysql-2.7.0.sql | 12 src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java | 30 src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java | 78 + src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java | 22 src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java | 22 src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java | 13 src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java | 2 src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java | 1 src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java | 4 src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ChannelOnlineEvent.java | 4 数据库/2.7.0/更新-postgresql-kingbase-2.7.0.sql | 12 src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java | 3 src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcRequest.java | 93 + src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java | 363 +---- src/main/java/com/genersoft/iot/vmp/gb28181/utils/XmlUtil.java | 7 src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java | 12 src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java | 225 ++++ src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java | 0 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java | 199 +++ src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java | 23 src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java | 3 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java | 511 ++++----- /dev/null | 97 - web_src/src/components/channelList.vue | 12 src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java | 357 ++++++ src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java | 304 +++++ src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java | 4 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java | 276 ++-- src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java | 155 ++ src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java | 9 src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java | 1 src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java | 29 42 files changed, 2,374 insertions(+), 845 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java index df230d4..c4911e4 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java +++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java @@ -72,6 +72,9 @@ public static final String REGISTER_EXPIRE_TASK_KEY_PREFIX = "VMP_device_register_expire_"; public static final String PUSH_STREAM_LIST = "VMP_PUSH_STREAM_LIST_"; + public static final String WAITE_SEND_PUSH_STREAM = "VMP_WAITE_SEND_PUSH_STREAM:"; + public static final String START_SEND_PUSH_STREAM = "VMP_START_SEND_PUSH_STREAM:"; + public static final String PUSH_STREAM_ONLINE = "VMP_PUSH_STREAM_ONLINE:"; diff --git a/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java b/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java index a9f5c88..a9b17ae 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java @@ -66,7 +66,7 @@ private List<String> allowedOrigins = new ArrayList<>(); - private int maxNotifyCountQueue = 10000; + private int maxNotifyCountQueue = 100000; private int registerAgainAfterTime = 60; diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java index c14ebcd..dcf2830 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java @@ -29,25 +29,21 @@ private RedisAlarmMsgListener redisAlarmMsgListener; @Autowired - private RedisStreamMsgListener redisStreamMsgListener; - - @Autowired - private RedisGbPlayMsgListener redisGbPlayMsgListener; - - @Autowired private RedisPushStreamStatusMsgListener redisPushStreamStatusMsgListener; @Autowired private RedisPushStreamStatusListMsgListener redisPushStreamListMsgListener; - @Autowired - private RedisPushStreamResponseListener redisPushStreamResponseListener; @Autowired private RedisCloseStreamMsgListener redisCloseStreamMsgListener; + @Autowired - private RedisPushStreamCloseResponseListener redisPushStreamCloseResponseListener; + private RedisRpcConfig redisRpcConfig; + + @Autowired + private RedisPushStreamResponseListener redisPushStreamCloseResponseListener; /** @@ -64,13 +60,11 @@ container.setConnectionFactory(connectionFactory); container.addMessageListener(redisGPSMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_GPS)); container.addMessageListener(redisAlarmMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_SUBSCRIBE_ALARM_RECEIVE)); - container.addMessageListener(redisStreamMsgListener, new PatternTopic(VideoManagerConstants.WVP_MSG_STREAM_CHANGE_PREFIX + "PUSH")); - container.addMessageListener(redisGbPlayMsgListener, new PatternTopic(RedisGbPlayMsgListener.WVP_PUSH_STREAM_KEY)); container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE)); container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE)); - container.addMessageListener(redisPushStreamResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE)); container.addMessageListener(redisCloseStreamMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE)); - container.addMessageListener(redisPushStreamCloseResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED)); + container.addMessageListener(redisRpcConfig, new PatternTopic(RedisRpcConfig.REDIS_REQUEST_CHANNEL_KEY)); + container.addMessageListener(redisPushStreamCloseResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE)); return container; } } diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java new file mode 100644 index 0000000..661e370 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java @@ -0,0 +1,225 @@ +package com.genersoft.iot.vmp.conf.redis; + +import com.alibaba.fastjson2.JSON; +import com.genersoft.iot.vmp.common.CommonCallback; +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage; +import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest; +import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse; +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; +import com.genersoft.iot.vmp.service.redisMsg.control.RedisRpcController; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Component; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; + +@Component +public class RedisRpcConfig implements MessageListener { + + private final static Logger logger = LoggerFactory.getLogger(RedisRpcConfig.class); + + public final static String REDIS_REQUEST_CHANNEL_KEY = "WVP_REDIS_REQUEST_CHANNEL_KEY"; + + private final Random random = new Random(); + + @Autowired + private UserSetting userSetting; + + @Autowired + private RedisRpcController redisRpcController; + + @Autowired + private RedisTemplate<Object, Object> redisTemplate; + + @Autowired + private ZlmHttpHookSubscribe hookSubscribe; + + private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); + + @Qualifier("taskExecutor") + @Autowired + private ThreadPoolTaskExecutor taskExecutor; + + @Override + public void onMessage(Message message, byte[] pattern) { + boolean isEmpty = taskQueue.isEmpty(); + taskQueue.offer(message); + if (isEmpty) { + taskExecutor.execute(() -> { + while (!taskQueue.isEmpty()) { + Message msg = taskQueue.poll(); + try { + RedisRpcMessage redisRpcMessage = JSON.parseObject(new String(msg.getBody()), RedisRpcMessage.class); + if (redisRpcMessage.getRequest() != null) { + handlerRequest(redisRpcMessage.getRequest()); + } else if (redisRpcMessage.getResponse() != null){ + handlerResponse(redisRpcMessage.getResponse()); + } else { + logger.error("[redis rpc 瑙f瀽澶辫触] {}", JSON.toJSONString(redisRpcMessage)); + } + } catch (Exception e) { + logger.error("[redis rpc 瑙f瀽寮傚父] ", e); + } + } + }); + } + } + + private void handlerResponse(RedisRpcResponse response) { + if (userSetting.getServerId().equals(response.getToId())) { + return; + } + logger.info("[redis-rpc] << {}", response); + response(response); + } + + private void handlerRequest(RedisRpcRequest request) { + try { + if (userSetting.getServerId().equals(request.getFromId())) { + return; + } + logger.info("[redis-rpc] << {}", request); + Method method = getMethod(request.getUri()); + // 娌℃湁鎼哄甫鐩爣ID鐨勫彲浠ョ悊瑙d负鍝釜wvp鏈夌粨鏋滃氨鍝釜鍥炲锛屾惡甯︾洰鏍嘔D锛屼絾鏄鏋滄槸涓嶅瓨鍦ㄧ殑uri鍒欑洿鎺ュ洖澶�404 + if (userSetting.getServerId().equals(request.getToId())) { + if (method == null) { + // 鍥炲404缁撴灉 + RedisRpcResponse response = request.getResponse(); + response.setStatusCode(404); + sendResponse(response); + return; + } + RedisRpcResponse response = (RedisRpcResponse)method.invoke(redisRpcController, request); + if(response != null) { + sendResponse(response); + } + }else { + if (method == null) { + return; + } + RedisRpcResponse response = (RedisRpcResponse)method.invoke(redisRpcController, request); + if (response != null) { + sendResponse(response); + } + } + }catch (InvocationTargetException | IllegalAccessException e) { + logger.error("[redis rpc ] 澶勭悊璇锋眰澶辫触 ", e); + } + + } + + private Method getMethod(String name) { + // 鍚姩鍚庢壂鎻忔墍鏈夌殑璺緞娉ㄨВ + Method[] methods = redisRpcController.getClass().getMethods(); + for (Method method : methods) { + if (method.getName().equals(name)) { + return method; + } + } + return null; + } + + private void sendResponse(RedisRpcResponse response){ + logger.info("[redis-rpc] >> {}", response); + response.setToId(userSetting.getServerId()); + RedisRpcMessage message = new RedisRpcMessage(); + message.setResponse(response); + redisTemplate.convertAndSend(REDIS_REQUEST_CHANNEL_KEY, message); + } + + private void sendRequest(RedisRpcRequest request){ + logger.info("[redis-rpc] >> {}", request); + RedisRpcMessage message = new RedisRpcMessage(); + message.setRequest(request); + redisTemplate.convertAndSend(REDIS_REQUEST_CHANNEL_KEY, message); + } + + + private final Map<Long, SynchronousQueue<RedisRpcResponse>> topicSubscribers = new ConcurrentHashMap<>(); + private final Map<Long, CommonCallback<RedisRpcResponse>> callbacks = new ConcurrentHashMap<>(); + + public RedisRpcResponse request(RedisRpcRequest request, int timeOut) { + request.setSn((long) random.nextInt(1000) + 1); + SynchronousQueue<RedisRpcResponse> subscribe = subscribe(request.getSn()); + + try { + sendRequest(request); + return subscribe.poll(timeOut, TimeUnit.SECONDS); + } catch (InterruptedException e) { + logger.warn("[redis rpc timeout] uri: {}, sn: {}", request.getUri(), request.getSn(), e); + } finally { + this.unsubscribe(request.getSn()); + } + return null; + } + + public void request(RedisRpcRequest request, CommonCallback<RedisRpcResponse> callback) { + request.setSn((long) random.nextInt(1000) + 1); + setCallback(request.getSn(), callback); + sendRequest(request); + } + + public Boolean response(RedisRpcResponse response) { + SynchronousQueue<RedisRpcResponse> queue = topicSubscribers.get(response.getSn()); + CommonCallback<RedisRpcResponse> callback = callbacks.get(response.getSn()); + if (queue != null) { + try { + return queue.offer(response, 2, TimeUnit.SECONDS); + } catch (InterruptedException e) { + logger.error("{}", e.getMessage(), e); + } + }else if (callback != null) { + callback.run(response); + callbacks.remove(response.getSn()); + } + return false; + } + + private void unsubscribe(long key) { + topicSubscribers.remove(key); + } + + + private SynchronousQueue<RedisRpcResponse> subscribe(long key) { + SynchronousQueue<RedisRpcResponse> queue = null; + if (!topicSubscribers.containsKey(key)) + topicSubscribers.put(key, queue = new SynchronousQueue<>()); + return queue; + } + + private void setCallback(long key, CommonCallback<RedisRpcResponse> callback) { + // TODO 濡傛灉澶氫釜涓婄骇鐐规挱鍚屼竴涓�氶亾浼氭湁闂 + callbacks.put(key, callback); + } + + public void removeCallback(long key) { + callbacks.remove(key); + } + + + public int getCallbackCount(){ + return callbacks.size(); + } + +// @Scheduled(fixedRate = 1000) //姣�1绉掓墽琛屼竴娆� +// public void execute(){ +// logger.info("callbacks鐨勯暱搴�: " + callbacks.size()); +// logger.info("闃熷垪鐨勯暱搴�: " + topicSubscribers.size()); +// logger.info("HOOK鐩戝惉鐨勯暱搴�: " + hookSubscribe.size()); +// logger.info(""); +// } +} diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcMessage.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcMessage.java new file mode 100644 index 0000000..061df6f --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcMessage.java @@ -0,0 +1,24 @@ +package com.genersoft.iot.vmp.conf.redis.bean; + +public class RedisRpcMessage { + + private RedisRpcRequest request; + + private RedisRpcResponse response; + + public RedisRpcRequest getRequest() { + return request; + } + + public void setRequest(RedisRpcRequest request) { + this.request = request; + } + + public RedisRpcResponse getResponse() { + return response; + } + + public void setResponse(RedisRpcResponse response) { + this.response = response; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcRequest.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcRequest.java new file mode 100644 index 0000000..a02db67 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcRequest.java @@ -0,0 +1,93 @@ +package com.genersoft.iot.vmp.conf.redis.bean; + +/** + * 閫氳繃redis鍙戦�佽姹� + */ +public class RedisRpcRequest { + + /** + * 鏉ヨ嚜鐨刉VP ID + */ + private String fromId; + + + /** + * 鐩爣鐨刉VP ID + */ + private String toId; + + /** + * 搴忓垪鍙� + */ + private long sn; + + /** + * 璁块棶鐨勮矾寰� + */ + private String uri; + + /** + * 鍙傛暟 + */ + private Object param; + + public String getFromId() { + return fromId; + } + + public void setFromId(String fromId) { + this.fromId = fromId; + } + + public String getToId() { + return toId; + } + + public void setToId(String toId) { + this.toId = toId; + } + + public String getUri() { + return uri; + } + + public void setUri(String uri) { + this.uri = uri; + } + + public Object getParam() { + return param; + } + + public void setParam(Object param) { + this.param = param; + } + + public long getSn() { + return sn; + } + + public void setSn(long sn) { + this.sn = sn; + } + + @Override + public String toString() { + return "RedisRpcRequest{" + + "uri='" + uri + '\'' + + ", fromId='" + fromId + '\'' + + ", toId='" + toId + '\'' + + ", sn=" + sn + + ", param=" + param + + '}'; + } + + public RedisRpcResponse getResponse() { + RedisRpcResponse response = new RedisRpcResponse(); + response.setFromId(fromId); + response.setToId(toId); + response.setSn(sn); + response.setUri(uri); + return response; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcResponse.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcResponse.java new file mode 100644 index 0000000..21f9e7e --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcResponse.java @@ -0,0 +1,99 @@ +package com.genersoft.iot.vmp.conf.redis.bean; + +/** + * 閫氳繃redis鍙戦�佸洖澶� + */ +public class RedisRpcResponse { + + /** + * 鏉ヨ嚜鐨刉VP ID + */ + private String fromId; + + + /** + * 鐩爣鐨刉VP ID + */ + private String toId; + + + /** + * 搴忓垪鍙� + */ + private long sn; + + /** + * 鐘舵�佺爜 + */ + private int statusCode; + + /** + * 璁块棶鐨勮矾寰� + */ + private String uri; + + /** + * 鍙傛暟 + */ + private Object body; + + public String getFromId() { + return fromId; + } + + public void setFromId(String fromId) { + this.fromId = fromId; + } + + public String getToId() { + return toId; + } + + public void setToId(String toId) { + this.toId = toId; + } + + public long getSn() { + return sn; + } + + public void setSn(long sn) { + this.sn = sn; + } + + public int getStatusCode() { + return statusCode; + } + + public void setStatusCode(int statusCode) { + this.statusCode = statusCode; + } + + public String getUri() { + return uri; + } + + public void setUri(String uri) { + this.uri = uri; + } + + public Object getBody() { + return body; + } + + public void setBody(Object body) { + this.body = body; + } + + @Override + public String toString() { + return "RedisRpcResponse{" + + "uri='" + uri + '\'' + + ", fromId='" + fromId + '\'' + + ", toId='" + toId + '\'' + + ", sn=" + sn + + ", statusCode=" + statusCode + + ", body=" + body + + '}'; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java index 32b6fac..8290a45 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java @@ -187,6 +187,18 @@ private double latitude; /** + * 缁忓害 + */ + @Schema(description = "鑷畾涔夌粡搴�") + private double customLongitude; + + /** + * 绾害 + */ + @Schema(description = "鑷畾涔夌含搴�") + private double customLatitude; + + /** * 缁忓害 GCJ02 */ @Schema(description = "GCJ02鍧愭爣绯荤粡搴�") @@ -226,7 +238,7 @@ * 鏄惁鍚湁闊抽 */ @Schema(description = "鏄惁鍚湁闊抽") - private boolean hasAudio; + private Boolean hasAudio; /** * 鏍囪閫氶亾鐨勭被鍨嬶紝0->鍥芥爣閫氶亾 1->鐩存挱娴侀�氶亾 2->涓氬姟鍒嗙粍/铏氭嫙缁勭粐/琛屾斂鍖哄垝 @@ -586,4 +598,20 @@ public void setStreamIdentification(String streamIdentification) { this.streamIdentification = streamIdentification; } + + public double getCustomLongitude() { + return customLongitude; + } + + public void setCustomLongitude(double customLongitude) { + this.customLongitude = customLongitude; + } + + public double getCustomLatitude() { + return customLatitude; + } + + public void setCustomLatitude(double customLatitude) { + this.customLatitude = customLatitude; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java index c133c82..55f09df 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java @@ -2,6 +2,8 @@ import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg; +import com.genersoft.iot.vmp.common.VideoManagerConstants; + public class SendRtpItem { /** @@ -23,6 +25,11 @@ * 骞冲彴id */ private String platformId; + + /** + * 骞冲彴鍚嶇О + */ + private String platformName; /** * 瀵瑰簲璁惧id @@ -64,6 +71,11 @@ private boolean tcpActive; /** + * 鑷繁鎺ㄦ祦浣跨敤鐨処P + */ + private String localIp; + + /** * 鑷繁鎺ㄦ祦浣跨敤鐨勭鍙� */ private int localPort; @@ -81,7 +93,7 @@ /** * invite 鐨� callId */ - private String CallId; + private String callId; /** * invite 鐨� fromTag @@ -124,6 +136,11 @@ */ private String receiveStream; + /** + * 涓婄骇鐨勭偣鎾被鍨� + */ + private String sessionName; + public static SendRtpItem getInstance(RequestPushStreamMsg requestPushStreamMsg) { SendRtpItem sendRtpItem = new SendRtpItem(); sendRtpItem.setMediaServerId(requestPushStreamMsg.getMediaServerId()); @@ -138,7 +155,7 @@ sendRtpItem.setUsePs(requestPushStreamMsg.isPs()); sendRtpItem.setOnlyAudio(requestPushStreamMsg.isOnlyAudio()); return sendRtpItem; - + } public static SendRtpItem getInstance(String app, String stream, String ssrc, String dstIp, Integer dstPort, boolean tcp, int sendLocalPort, Integer pt) { @@ -262,11 +279,11 @@ } public String getCallId() { - return CallId; + return callId; } public void setCallId(String callId) { - CallId = callId; + this.callId = callId; } public InviteStreamType getPlayType() { @@ -341,6 +358,30 @@ this.receiveStream = receiveStream; } + public String getPlatformName() { + return platformName; + } + + public void setPlatformName(String platformName) { + this.platformName = platformName; + } + + public String getLocalIp() { + return localIp; + } + + public void setLocalIp(String localIp) { + this.localIp = localIp; + } + + public String getSessionName() { + return sessionName; + } + + public void setSessionName(String sessionName) { + this.sessionName = sessionName; + } + @Override public String toString() { return "SendRtpItem{" + @@ -348,6 +389,7 @@ ", port=" + port + ", ssrc='" + ssrc + '\'' + ", platformId='" + platformId + '\'' + + ", platformName='" + platformName + '\'' + ", deviceId='" + deviceId + '\'' + ", app='" + app + '\'' + ", channelId='" + channelId + '\'' + @@ -355,10 +397,11 @@ ", stream='" + stream + '\'' + ", tcp=" + tcp + ", tcpActive=" + tcpActive + + ", localIp='" + localIp + '\'' + ", localPort=" + localPort + ", mediaServerId='" + mediaServerId + '\'' + ", serverId='" + serverId + '\'' + - ", CallId='" + CallId + '\'' + + ", CallId='" + callId + '\'' + ", fromTag='" + fromTag + '\'' + ", toTag='" + toTag + '\'' + ", pt=" + pt + @@ -367,6 +410,18 @@ ", rtcp=" + rtcp + ", playType=" + playType + ", receiveStream='" + receiveStream + '\'' + + ", sessionName='" + sessionName + '\'' + '}'; } + + public String getRedisKey() { + String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + + serverId + "_" + + mediaServerId + "_" + + platformId + "_" + + channelId + "_" + + stream + "_" + + callId; + return key; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java index 557563e..18ad2b0 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java @@ -49,6 +49,7 @@ ParentPlatform parentPlatform = null; Map<String, List<ParentPlatform>> parentPlatformMap = new HashMap<>(); + Map<String, DeviceChannel> channelMap = new HashMap<>(); if (!ObjectUtils.isEmpty(event.getPlatformId())) { subscribe = subscribeHolder.getCatalogSubscribe(event.getPlatformId()); if (subscribe == null) { @@ -67,6 +68,7 @@ for (DeviceChannel deviceChannel : event.getDeviceChannels()) { List<ParentPlatform> parentPlatformsForGB = storager.queryPlatFormListForGBWithGBId(deviceChannel.getChannelId(), platforms); parentPlatformMap.put(deviceChannel.getChannelId(), parentPlatformsForGB); + channelMap.put(deviceChannel.getChannelId(), deviceChannel); } } }else if (event.getGbStreams() != null) { @@ -174,7 +176,7 @@ } logger.info("[Catalog浜嬩欢: {}]骞冲彴锛歿}锛屽奖鍝嶉�氶亾{}", event.getType(), platform.getServerGBId(), gbId); List<DeviceChannel> deviceChannelList = new ArrayList<>(); - DeviceChannel deviceChannel = storager.queryChannelInParentPlatform(platform.getServerGBId(), gbId); + DeviceChannel deviceChannel = channelMap.get(gbId); deviceChannelList.add(deviceChannel); GbStream gbStream = storager.queryStreamInParentPlatform(platform.getServerGBId(), gbId); if(gbStream != null){ diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index 1c8353d..9fb1d4d 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -592,6 +592,7 @@ Integer finalIndex = index; String catalogXmlContent = getCatalogXmlContentForCatalogAddOrUpdate(parentPlatform, channels, deviceChannels.size(), type, subscribeInfo); + System.out.println(catalogXmlContent); logger.info("[鍙戦�丯OTIFY閫氱煡]绫诲瀷锛� {}锛屽彂閫佹暟閲忥細 {}", type, channels.size()); sendNotify(parentPlatform, catalogXmlContent, subscribeInfo, eventResult -> { logger.error("鍙戦�丯OTIFY閫氱煡娑堟伅澶辫触銆傞敊璇細{} {}", eventResult.statusCode, eventResult.msg); @@ -621,7 +622,6 @@ private String getCatalogXmlContentForCatalogAddOrUpdate(ParentPlatform parentPlatform, List<DeviceChannel> channels, int sumNum, String type, SubscribeInfo subscribeInfo) { StringBuffer catalogXml = new StringBuffer(600); - String characterSet = parentPlatform.getCharacterSet(); catalogXml.append("<?xml version=\"1.0\" encoding=\"" + characterSet + "\"?>\r\n") .append("<Notify>\r\n") @@ -660,6 +660,8 @@ .append("<Owner> " + channel.getOwner()+ "</Owner>\r\n") .append("<CivilCode>" + channel.getCivilCode() + "</CivilCode>\r\n") .append("<Address>" + channel.getAddress() + "</Address>\r\n"); + catalogXml.append("<Longitude>" + channel.getLongitude() + "</Longitude>\r\n"); + catalogXml.append("<Latitude>" + channel.getLatitude() + "</Latitude>\r\n"); } if (!"presence".equals(subscribeInfo.getEventType())) { catalogXml.append("<Event>" + type + "</Event>\r\n"); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java index 7cbfe70..f3f7431 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java @@ -6,7 +6,6 @@ import com.google.common.primitives.Bytes; import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPResponse; -import org.apache.commons.lang3.ArrayUtils; import org.dom4j.Document; import org.dom4j.DocumentException; import org.dom4j.Element; @@ -172,6 +171,7 @@ return getRootElement(evt, "gb2312"); } public Element getRootElement(RequestEvent evt, String charset) throws DocumentException { + if (charset == null) { charset = "gb2312"; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index 10922f4..b76751c 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -13,10 +13,11 @@ import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.service.IPlayService; -import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg; -import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener; +import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; +import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; +import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; @@ -51,6 +52,8 @@ @Autowired private IRedisCatchStorage redisCatchStorage; + @Autowired + private IRedisRpcService redisRpcService; @Autowired private UserSetting userSetting; @@ -68,9 +71,6 @@ private DynamicTask dynamicTask; @Autowired - private RedisGbPlayMsgListener redisGbPlayMsgListener; - - @Autowired private IPlayService playService; @@ -86,7 +86,7 @@ logger.info("[鏀跺埌ACK]锛� 鏉ヨ嚜->{}", fromUserId); SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId()); if (sendRtpItem == null) { - logger.warn("[鏀跺埌ACK]锛氭湭鎵惧埌鏉ヨ嚜{}锛岀洰鏍囦负({})鐨勬帹娴佷俊鎭�",fromUserId, toUserId); + logger.warn("[鏀跺埌ACK]锛氭湭鎵惧埌鏉ヨ嚜{}锛宑allId: {}", fromUserId, callIdHeader.getCallId()); return; } // tcp涓诲姩鏃讹紝姝ゆ椂鏄骇鑱斾笅绾у钩鍙帮紝鍦ㄥ洖澶�200ok鏃讹紝鏈湴宸茬粡璇锋眰zlm寮�鍚洃鍚紝璺宠繃涓嬮潰姝ラ @@ -106,10 +106,13 @@ if (parentPlatform != null) { if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) { - RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(sendRtpItem); - redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, () -> { - playService.startSendRtpStreamFailHand(sendRtpItem, parentPlatform, callIdHeader); - }); + WVPResult wvpResult = redisRpcService.startSendRtp(sendRtpItem.getRedisKey(), sendRtpItem); + if (wvpResult.getCode() == 0) { + RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(sendRtpItem); + redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, () -> { + playService.startSendRtpStreamFailHand(sendRtpItem, parentPlatform, callIdHeader); + }); + } } else { try { if (sendRtpItem.isTcpActive()) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java index 302b694..b8eb703 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -16,10 +16,10 @@ import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; +import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.*; -import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; -import com.genersoft.iot.vmp.service.bean.RequestStopPushStreamMsg; -import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener; +import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import gov.nist.javax.sip.message.SIPRequest; @@ -91,7 +91,8 @@ private IStreamPushService pushService; @Autowired - private RedisGbPlayMsgListener redisGbPlayMsgListener; + private IRedisRpcService redisRpcService; + @Override public void afterPropertiesSet() throws Exception { @@ -138,17 +139,8 @@ if (userSetting.getUseCustomSsrcForParentInvite()) { mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc()); } - - ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId()); - if (platform != null) { - MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, - sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), - sendRtpItem.getPlatformId(), platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); - messageForPushChannel.setPlatFormIndex(platform.getId()); - redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel); - }else { - logger.info("[涓婄骇骞冲彴鍋滄瑙傜湅] 鏈壘鍒板钩鍙皗}鐨勪俊鎭紝鍙戦�乺edis娑堟伅澶辫触", sendRtpItem.getPlatformId()); - } + }else { + logger.info("[涓婄骇骞冲彴鍋滄瑙傜湅] 鏈壘鍒板钩鍙皗}鐨勪俊鎭紝鍙戦�乺edis娑堟伅澶辫触", sendRtpItem.getPlatformId()); } }else { MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 46e779d..f1f277f 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -31,11 +31,17 @@ import com.genersoft.iot.vmp.service.IPlayService; import com.genersoft.iot.vmp.service.IStreamProxyService; import com.genersoft.iot.vmp.service.IStreamPushService; +import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; +import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; +import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; +import com.genersoft.iot.vmp.media.zlm.dto.*; +import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; +import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.service.bean.InviteErrorCode; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.bean.SSRCInfo; -import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener; import com.genersoft.iot.vmp.service.redisMsg.RedisPushStreamResponseListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; @@ -51,6 +57,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import javax.sdp.*; @@ -64,6 +71,7 @@ import java.util.Map; import java.util.Random; import java.util.Vector; +import java.util.*; /** * SIP鍛戒护绫诲瀷锛� INVITE璇锋眰 @@ -92,16 +100,16 @@ private IRedisCatchStorage redisCatchStorage; @Autowired - private IInviteStreamService inviteStreamService; + private IRedisRpcService redisRpcService; + + @Autowired + private RedisTemplate<Object, Object> redisTemplate; @Autowired private SSRCFactory ssrcFactory; @Autowired private DynamicTask dynamicTask; - - @Autowired - private RedisPushStreamResponseListener redisPushStreamResponseListener; @Autowired private IPlayService playService; @@ -125,17 +133,16 @@ private UserSetting userSetting; @Autowired - private ZLMMediaListManager mediaListManager; - - @Autowired private SipConfig config; - - - @Autowired - private RedisGbPlayMsgListener redisGbPlayMsgListener; @Autowired private VideoStreamSessionManager streamSession; + + @Autowired + private SendRtpPortManager sendRtpPortManager; + + @Autowired + private RedisPushStreamResponseListener redisPushStreamResponseListener; @Override @@ -552,43 +559,79 @@ } } else if (gbStream != null) { - - String ssrc; - if (userSetting.getUseCustomSsrcForParentInvite() || gb28181Sdp.getSsrc() == null) { - // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡 - ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); - }else { - ssrc = gb28181Sdp.getSsrc(); + SendRtpItem sendRtpItem = new SendRtpItem(); + if (!userSetting.getUseCustomSsrcForParentInvite() && gb28181Sdp.getSsrc() != null) { + sendRtpItem.setSsrc(gb28181Sdp.getSsrc()); } + if (tcpActive != null) { + sendRtpItem.setTcpActive(tcpActive); + } + sendRtpItem.setTcp(mediaTransmissionTCP); + sendRtpItem.setRtcp(platform.isRtcp()); + sendRtpItem.setPlatformName(platform.getName()); + sendRtpItem.setPlatformId(platform.getServerGBId()); + sendRtpItem.setMediaServerId(mediaServerItem.getId()); + sendRtpItem.setChannelId(channelId); + sendRtpItem.setIp(addressStr); + sendRtpItem.setPort(port); + sendRtpItem.setUsePs(true); + sendRtpItem.setApp(gbStream.getApp()); + sendRtpItem.setStream(gbStream.getStream()); + sendRtpItem.setCallId(callIdHeader.getCallId()); + sendRtpItem.setFromTag(request.getFromTag()); + sendRtpItem.setOnlyAudio(false); + sendRtpItem.setStatus(0); + sendRtpItem.setSessionName(sessionName); + // 娓呯悊鍙兘瀛樺湪鐨勭紦瀛橀伩鍏嶇敤鍒版棫鐨勬暟鎹� + List<SendRtpItem> sendRtpItemList = redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, gbStream.getStream()); + if (!sendRtpItemList.isEmpty()) { + for (SendRtpItem rtpItem : sendRtpItemList) { + redisCatchStorage.deleteSendRTPServer(rtpItem); + } + } if ("push".equals(gbStream.getStreamType())) { + sendRtpItem.setPlayType(InviteStreamType.PUSH); if (streamPushItem != null) { // 浠巖edis鏌ヨ鏄惁姝e湪鎺ユ敹杩欎釜鎺ㄦ祦 StreamPushItem pushListItem = redisCatchStorage.getPushListItem(gbStream.getApp(), gbStream.getStream()); if (pushListItem != null) { - pushListItem.setSelf(userSetting.getServerId().equals(pushListItem.getServerId())); - // 鎺ㄦ祦鐘舵�� - pushStream(evt, request, gbStream, pushListItem, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + sendRtpItem.setServerId(pushListItem.getSeverId()); + sendRtpItem.setMediaServerId(pushListItem.getMediaServerId()); + + StreamPushItem transform = streamPushService.transform(pushListItem); + transform.setSelf(userSetting.getServerId().equals(pushListItem.getSeverId())); + redisCatchStorage.updateSendRTPSever(sendRtpItem); + // 寮�濮嬫帹娴� + sendPushStream(sendRtpItem, mediaServerItem, platform, request); }else { - // 鏈帹娴� 鎷夎捣 - notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + if (!platform.isStartOfflinePush()) { + // 骞冲彴璁剧疆涓叧闂簡鎷夎捣绂荤嚎鐨勬帹娴佸垯鐩存帴鍥炲 + try { + logger.info("[涓婄骇鐐规挱] 澶辫触锛屾帹娴佽澶囨湭鎺ㄦ祦锛宑hannel: {}, app: {}, stream: {}", sendRtpItem.getChannelId(), sendRtpItem.getApp(), sendRtpItem.getStream()); + responseAck(request, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing"); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[鍛戒护鍙戦�佸け璐 invite 閫氶亾鏈帹娴�: {}", e.getMessage()); + } + return; + } + notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request); } } } else if ("proxy".equals(gbStream.getStreamType())) { if (null != proxyByAppAndStream) { + if (sendRtpItem.getSsrc() == null) { + // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡 + String ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); + sendRtpItem.setSsrc(ssrc); + } if (proxyByAppAndStream.isStatus()) { - pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + sendProxyStream(sendRtpItem, mediaServerItem, platform, request); } else { //寮�鍚唬鐞嗘媺娴� - notifyStreamOnline(evt, request, gbStream, null, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + notifyProxyStreamOnline(sendRtpItem, mediaServerItem, platform, request); } } - - } } } @@ -614,58 +657,13 @@ /** * 瀹夋帓鎺ㄦ祦 */ - private void pushProxyStream(RequestEvent evt, SIPRequest request, GbStream gbStream, ParentPlatform platform, - CallIdHeader callIdHeader, MediaServer mediaServer, - int port, Boolean tcpActive, boolean mediaTransmissionTCP, - String channelId, String addressStr, String ssrc, String requesterId) { - Boolean streamReady = mediaServerService.isStreamReady(mediaServer, gbStream.getApp(), gbStream.getStream()); + private void sendProxyStream(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) { + Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream()); if (streamReady != null && streamReady) { // 鑷钩鍙板唴瀹� - SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServer, addressStr, port, ssrc, requesterId, - gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp()); - - if (sendRtpItem == null) { - logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�"); - try { - responseAck(request, Response.BUSY_HERE); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[鍛戒护鍙戦�佸け璐 invite 鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�: {}", e.getMessage()); - } - return; - } - if (tcpActive != null) { - sendRtpItem.setTcpActive(tcpActive); - } - sendRtpItem.setPlayType(InviteStreamType.PUSH); - // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� - sendRtpItem.setStatus(1); - sendRtpItem.setCallId(callIdHeader.getCallId()); - sendRtpItem.setFromTag(request.getFromTag()); - - SIPResponse response = sendStreamAck(mediaServer, request, sendRtpItem, platform, evt); - if (response != null) { - sendRtpItem.setToTag(response.getToTag()); - } - redisCatchStorage.updateSendRTPSever(sendRtpItem); - - } - - } - - private void pushStream(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform, - CallIdHeader callIdHeader, MediaServer mediaServerItem, - int port, Boolean tcpActive, boolean mediaTransmissionTCP, - String channelId, String addressStr, String ssrc, String requesterId) { - // 鎺ㄦ祦 - if (streamPushItem.isSelf()) { - Boolean streamReady = mediaServerService.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream()); - if (streamReady != null && streamReady) { - // 鑷钩鍙板唴瀹� - SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, - gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp()); - - if (sendRtpItem == null) { + int localPort = sendRtpPortManager.getNextPort(mediaServerItem); + if (localPort == 0) { logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�"); try { responseAck(request, Response.BUSY_HERE); @@ -674,226 +672,197 @@ } return; } - if (tcpActive != null) { - sendRtpItem.setTcpActive(tcpActive); + sendRtpItem.setPlayType(InviteStreamType.PROXY); + // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� + sendRtpItem.setStatus(1); + sendRtpItem.setLocalIp(mediaServerItem.getSdpIp()); + + SIPResponse response = sendStreamAck(mediaServer, request, sendRtpItem, platform, evt); + if (response != null) { + sendRtpItem.setToTag(response.getToTag()); + } + redisCatchStorage.updateSendRTPSever(sendRtpItem); + } + } + + private void sendPushStream(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) { + // 鎺ㄦ祦 + if (sendRtpItem.getServerId().equals(userSetting.getServerId())) { + Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream()); + if (streamReady != null && streamReady) { + // 鑷钩鍙板唴瀹� + int localPort = sendRtpPortManager.getNextPort(mediaServerItem); + if (localPort == 0) { + logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�"); + try { + responseAck(request, Response.BUSY_HERE); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[鍛戒护鍙戦�佸け璐 invite 鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�: {}", e.getMessage()); + } + return; } - sendRtpItem.setPlayType(InviteStreamType.PUSH); // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� sendRtpItem.setStatus(1); - sendRtpItem.setCallId(callIdHeader.getCallId()); - - sendRtpItem.setFromTag(request.getFromTag()); - SIPResponse response = sendStreamAck(mediaServerItem, request, sendRtpItem, platform, evt); + SIPResponse response = sendStreamAck(request, sendRtpItem, platform); if (response != null) { sendRtpItem.setToTag(response.getToTag()); } + if (sendRtpItem.getSsrc() == null) { + // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡 + String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); + sendRtpItem.setSsrc(ssrc); + } redisCatchStorage.updateSendRTPSever(sendRtpItem); - } else { // 涓嶅湪绾� 鎷夎捣 - notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request); } - } else { // 鍏朵粬骞冲彴鍐呭 - otherWvpPushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + otherWvpPushStream(sendRtpItem, request, platform); } } /** * 閫氱煡娴佷笂绾� */ - private void notifyStreamOnline(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform, - CallIdHeader callIdHeader, MediaServer mediaServerItem, - int port, Boolean tcpActive, boolean mediaTransmissionTCP, - String channelId, String addressStr, String ssrc, String requesterId) { - if ("proxy".equals(gbStream.getStreamType())) { - // TODO 鎺у埗鍚敤浠ヤ娇璁惧涓婄嚎 - logger.info("[ app={}, stream={} ]閫氶亾鏈帹娴侊紝鍚敤娴佸悗寮�濮嬫帹娴�", gbStream.getApp(), gbStream.getStream()); - // 鐩戝惉娴佷笂绾� - Hook hook = Hook.getInstance(HookType.on_media_arrival, gbStream.getApp(), gbStream.getStream(), mediaServerItem.getId()); - this.hookSubscribe.addSubscribe(hook, (hookData) -> { - logger.info("[涓婄骇鐐规挱]鎷夋祦浠g悊宸茬粡灏辩华锛� {}/{}", hookData.getApp(), hookData.getStream()); - dynamicTask.stop(callIdHeader.getCallId()); - pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); - }); - dynamicTask.startDelay(callIdHeader.getCallId(), () -> { - logger.info("[ app={}, stream={} ] 绛夊緟鎷夋祦浠g悊娴佽秴鏃�", gbStream.getApp(), gbStream.getStream()); - this.hookSubscribe.removeSubscribe(hook); - }, userSetting.getPlatformPlayTimeout()); - boolean start = streamProxyService.start(gbStream.getApp(), gbStream.getStream()); - if (!start) { - try { - responseAck(request, Response.BUSY_HERE, "channel [" + gbStream.getGbId() + "] offline"); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[鍛戒护鍙戦�佸け璐 invite 閫氶亾鏈帹娴�: {}", e.getMessage()); - } - this.hookSubscribe.removeSubscribe(hook); - dynamicTask.stop(callIdHeader.getCallId()); + private void notifyProxyStreamOnline(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) { + // TODO 鎺у埗鍚敤浠ヤ娇璁惧涓婄嚎 + logger.info("[ app={}, stream={} ]閫氶亾鏈帹娴侊紝鍚敤娴佸悗寮�濮嬫帹娴�", sendRtpItem.getApp(), sendRtpItem.getStream()); + // 鐩戝惉娴佷笂绾� + HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", mediaServerItem.getId()); + zlmHttpHookSubscribe.addSubscribe(hookSubscribe, (mediaServerItemInUSe, hookParam) -> { + OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam; + logger.info("[涓婄骇鐐规挱]鎷夋祦浠g悊宸茬粡灏辩华锛� {}/{}", streamChangedHookParam.getApp(), streamChangedHookParam.getStream()); + dynamicTask.stop(sendRtpItem.getCallId()); + sendProxyStream(sendRtpItem, mediaServerItem, platform, request); + }); + dynamicTask.startDelay(sendRtpItem.getCallId(), () -> { + logger.info("[ app={}, stream={} ] 绛夊緟鎷夋祦浠g悊娴佽秴鏃�", sendRtpItem.getApp(), sendRtpItem.getStream()); + zlmHttpHookSubscribe.removeSubscribe(hookSubscribe); + }, userSetting.getPlatformPlayTimeout()); + boolean start = streamProxyService.start(sendRtpItem.getApp(), sendRtpItem.getStream()); + if (!start) { + try { + responseAck(request, Response.BUSY_HERE, "channel [" + sendRtpItem.getChannelId() + "] offline"); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[鍛戒护鍙戦�佸け璐 invite 閫氶亾鏈帹娴�: {}", e.getMessage()); } - } else if ("push".equals(gbStream.getStreamType())) { - if (!platform.isStartOfflinePush()) { - // 骞冲彴璁剧疆涓叧闂簡鎷夎捣绂荤嚎鐨勬帹娴佸垯鐩存帴鍥炲 - try { - logger.info("[涓婄骇鐐规挱] 澶辫触锛屾帹娴佽澶囨湭鎺ㄦ祦锛宑hannel: {}, app: {}, stream: {}", gbStream.getGbId(), gbStream.getApp(), gbStream.getStream()); - responseAck(request, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing"); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[鍛戒护鍙戦�佸け璐 invite 閫氶亾鏈帹娴�: {}", e.getMessage()); - } - return; - } - // 鍙戦�乺edis娑堟伅浠ヤ娇璁惧涓婄嚎 - logger.info("[ app={}, stream={} ]閫氶亾鏈帹娴侊紝鍙戦�乺edis淇℃伅鎺у埗璁惧寮�濮嬫帹娴�", gbStream.getApp(), gbStream.getStream()); - - MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1, - gbStream.getApp(), gbStream.getStream(), gbStream.getGbId(), gbStream.getPlatformId(), - platform.getName(), null, gbStream.getMediaServerId()); - redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel); - // 璁剧疆瓒呮椂 - dynamicTask.startDelay(callIdHeader.getCallId(), () -> { - logger.info("[ app={}, stream={} ] 绛夊緟璁惧寮�濮嬫帹娴佽秴鏃�", gbStream.getApp(), gbStream.getStream()); - try { - redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream()); - mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream()); - responseAck(request, Response.REQUEST_TIMEOUT); // 瓒呮椂 - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("鏈鐞嗙殑寮傚父 ", e); - } - }, userSetting.getPlatformPlayTimeout()); - // 娣诲姞鐩戝惉 - int finalPort = port; - Boolean finalTcpActive = tcpActive; - - // 娣诲姞鍦ㄦ湰鏈轰笂绾跨殑閫氱煡 - mediaListManager.addChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream(), (app, stream, serverId) -> { - dynamicTask.stop(callIdHeader.getCallId()); - redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream()); - if (serverId.equals(userSetting.getServerId())) { - SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId, - app, stream, channelId, mediaTransmissionTCP, platform.isRtcp()); - - if (sendRtpItem == null) { - logger.warn("涓婄骇鐐规椂鍒涘缓sendRTPItem澶辫触锛屽彲鑳芥槸鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�"); - try { - responseAck(request, Response.BUSY_HERE); - } catch (SipException e) { - logger.error("鏈鐞嗙殑寮傚父 ", e); - } catch (InvalidArgumentException e) { - logger.error("鏈鐞嗙殑寮傚父 ", e); - } catch (ParseException e) { - logger.error("鏈鐞嗙殑寮傚父 ", e); - } - return; - } - if (finalTcpActive != null) { - sendRtpItem.setTcpActive(finalTcpActive); - } - sendRtpItem.setPlayType(InviteStreamType.PUSH); - // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� - sendRtpItem.setStatus(1); - sendRtpItem.setCallId(callIdHeader.getCallId()); - - sendRtpItem.setFromTag(request.getFromTag()); - SIPResponse response = sendStreamAck(mediaServerItem, request, sendRtpItem, platform, evt); - if (response != null) { - sendRtpItem.setToTag(response.getToTag()); - } - redisCatchStorage.updateSendRTPSever(sendRtpItem); - } else { - // 鍏朵粬骞冲彴鍐呭 - otherWvpPushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); - } - }); - - // 娣诲姞鍥炲鐨勬嫆缁濇垨鑰呴敊璇殑閫氱煡 - redisPushStreamResponseListener.addEvent(gbStream.getApp(), gbStream.getStream(), response -> { - if (response.getCode() != 0) { - dynamicTask.stop(callIdHeader.getCallId()); - mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream()); - try { - responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg()); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鐐规挱鍥炲: {}", e.getMessage()); - } - } - }); + zlmHttpHookSubscribe.removeSubscribe(hookSubscribe); + dynamicTask.stop(sendRtpItem.getCallId()); } } /** - * 鏉ヨ嚜鍏朵粬wvp鐨勬帹娴� + * 閫氱煡娴佷笂绾� */ - private void otherWvpPushStream(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform, - CallIdHeader callIdHeader, MediaServer mediaServerItem, - int port, Boolean tcpActive, boolean mediaTransmissionTCP, - String channelId, String addressStr, String ssrc, String requesterId) { - logger.info("[绾ц仈鐐规挱]鐩存挱娴佹潵鑷叾浠栧钩鍙帮紝鍙戦�乺edis娑堟伅"); - // 鍙戦�乺edis娑堟伅 - redisGbPlayMsgListener.sendMsg(streamPushItem.getServerId(), streamPushItem.getMediaServerId(), - streamPushItem.getApp(), streamPushItem.getStream(), addressStr, port, ssrc, requesterId, - channelId, mediaTransmissionTCP, platform.isRtcp(),platform.getName(), responseSendItemMsg -> { - SendRtpItem sendRtpItem = responseSendItemMsg.getSendRtpItem(); - if (sendRtpItem == null || responseSendItemMsg.getMediaServerItem() == null) { - logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�"); - try { - responseAck(request, Response.BUSY_HERE); - } catch (SipException e) { - logger.error("鏈鐞嗙殑寮傚父 ", e); - } catch (InvalidArgumentException e) { - logger.error("鏈鐞嗙殑寮傚父 ", e); - } catch (ParseException e) { - logger.error("鏈鐞嗙殑寮傚父 ", e); - } - return; - } - // 鏀跺埌sendItem - if (tcpActive != null) { - sendRtpItem.setTcpActive(tcpActive); - } - sendRtpItem.setPlayType(InviteStreamType.PUSH); - // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� - sendRtpItem.setStatus(1); - sendRtpItem.setCallId(callIdHeader.getCallId()); - - sendRtpItem.setFromTag(request.getFromTag()); - SIPResponse response = sendStreamAck(responseSendItemMsg.getMediaServerItem(), request, sendRtpItem, platform, evt); - if (response != null) { - sendRtpItem.setToTag(response.getToTag()); - } - redisCatchStorage.updateSendRTPSever(sendRtpItem); - }, (wvpResult) -> { - - // 閿欒 - if (wvpResult.getCode() == RedisGbPlayMsgListener.ERROR_CODE_OFFLINE) { - // 绂荤嚎 - // 鏌ヨ鏄惁鍦ㄦ湰鏈轰笂绾夸簡 - StreamPushItem currentStreamPushItem = streamPushService.getPush(streamPushItem.getApp(), streamPushItem.getStream()); - if (currentStreamPushItem.isPushIng()) { - // 鍦ㄧ嚎鐘舵�� - pushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); - - } else { - // 涓嶅湪绾� 鎷夎捣 - notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); - } - } + private void notifyPushStreamOnline(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) { + // 鍙戦�乺edis娑堟伅浠ヤ娇璁惧涓婄嚎锛屾祦涓婄嚎鍚庤 + logger.info("[ app={}, stream={} ]閫氶亾鏈帹娴侊紝鍙戦�乺edis淇℃伅鎺у埗璁惧寮�濮嬫帹娴�", sendRtpItem.getApp(), sendRtpItem.getStream()); + MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1, + sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), sendRtpItem.getPlatformId(), + platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); + redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel); + // 璁剧疆瓒呮椂 + dynamicTask.startDelay(sendRtpItem.getCallId(), () -> { + redisRpcService.stopWaitePushStreamOnline(sendRtpItem); + logger.info("[ app={}, stream={} ] 绛夊緟璁惧寮�濮嬫帹娴佽秴鏃�", sendRtpItem.getApp(), sendRtpItem.getStream()); + try { + responseAck(request, Response.REQUEST_TIMEOUT); // 瓒呮椂 + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("鏈鐞嗙殑寮傚父 ", e); + } + }, userSetting.getPlatformPlayTimeout()); + // + long key = redisRpcService.waitePushStreamOnline(sendRtpItem, (sendRtpItemKey) -> { + dynamicTask.stop(sendRtpItem.getCallId()); + if (sendRtpItemKey == null) { + logger.warn("[绾ц仈鐐规挱] 绛夊緟鎺ㄦ祦寰楀埌缁撴灉鏈┖锛� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); + try { + responseAck(request, Response.BUSY_HERE); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("鏈鐞嗙殑寮傚父 ", e); + } + return; + } + SendRtpItem sendRtpItemFromRedis = (SendRtpItem)redisTemplate.opsForValue().get(sendRtpItemKey); + if (sendRtpItemFromRedis == null) { + logger.warn("[绾ц仈鐐规挱] 绛夊緟鎺ㄦ祦, 鏈壘鍒皉edis涓紦瀛樼殑鍙戞祦淇℃伅锛� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); + try { + responseAck(request, Response.BUSY_HERE); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("鏈鐞嗙殑寮傚父 ", e); + } + return; + } + if (sendRtpItemFromRedis.getServerId().equals(userSetting.getServerId())) { + logger.info("[绾ц仈鐐规挱] 绛夊緟鐨勬帹娴佸湪鏈钩鍙颁笂绾� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); + int localPort = sendRtpPortManager.getNextPort(mediaServerItem); + if (localPort == 0) { + logger.warn("涓婄骇鐐规椂鍒涘缓sendRTPItem澶辫触锛屽彲鑳芥槸鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�"); try { responseAck(request, Response.BUSY_HERE); - } catch (InvalidArgumentException | ParseException | SipException e) { - logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鐐规挱鍥炲 BUSY_HERE: {}", e.getMessage()); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("鏈鐞嗙殑寮傚父 ", e); } - }); + return; + } + sendRtpItem.setLocalPort(localPort); + if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) { + sendRtpItem.setLocalIp(platform.getSendStreamIp()); + } + + // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� + sendRtpItem.setStatus(1); + SIPResponse response = sendStreamAck(request, sendRtpItem, platform); + if (response != null) { + sendRtpItem.setToTag(response.getToTag()); + } + redisCatchStorage.updateSendRTPSever(sendRtpItem); + } else { + // 鍏朵粬骞冲彴鍐呭 + otherWvpPushStream(sendRtpItemFromRedis, request, platform); + } + }); + // 娣诲姞鍥炲鐨勬嫆缁濇垨鑰呴敊璇殑閫氱煡 + // redis娑堟伅渚嬪锛� PUBLISH VM_MSG_STREAM_PUSH_RESPONSE '{"code":1,"msg":"澶辫触","app":"1","stream":"2"}' + redisPushStreamResponseListener.addEvent(sendRtpItem.getApp(), sendRtpItem.getStream(), response -> { + if (response.getCode() != 0) { + dynamicTask.stop(sendRtpItem.getCallId()); + redisRpcService.stopWaitePushStreamOnline(sendRtpItem); + redisRpcService.removeCallback(key); + try { + responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg()); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鐐规挱鍥炲: {}", e.getMessage()); + } + } + }); } - public SIPResponse sendStreamAck(MediaServer mediaServerItem, SIPRequest request, SendRtpItem sendRtpItem, ParentPlatform platform, RequestEvent evt) { - String sdpIp = mediaServerItem.getSdpIp(); + + /** + * 鏉ヨ嚜鍏朵粬wvp鐨勬帹娴� + */ + private void otherWvpPushStream(SendRtpItem sendRtpItem, SIPRequest request, ParentPlatform platform) { + logger.info("[绾ц仈鐐规挱] 鏉ヨ嚜鍏朵粬wvp鐨勬帹娴� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); + sendRtpItem = redisRpcService.getSendRtpItem(sendRtpItem.getRedisKey()); + if (sendRtpItem == null) { + return; + } + // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� + sendRtpItem.setStatus(1); + SIPResponse response = sendStreamAck(request, sendRtpItem, platform); + if (response != null) { + sendRtpItem.setToTag(response.getToTag()); + } + redisCatchStorage.updateSendRTPSever(sendRtpItem); + } + + public SIPResponse sendStreamAck(MediaServerItem mediaServerItem, SIPRequest request, SendRtpItem sendRtpItem, ParentPlatform platform, RequestEvent evt) { + + String sdpIp = sendRtpItem.getLocalIp(); if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) { sdpIp = platform.getSendStreamIp(); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java index cd97786..de93804 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java @@ -1,7 +1,5 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; -import com.genersoft.iot.vmp.conf.CivilCodeFileConf; -import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.Device; @@ -20,10 +18,10 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; import javax.sip.RequestEvent; import javax.sip.header.FromHeader; -import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -39,8 +37,6 @@ private final static Logger logger = LoggerFactory.getLogger(NotifyRequestForCatalogProcessor.class); - private final List<DeviceChannel> updateChannelOnlineList = new CopyOnWriteArrayList<>(); - private final List<DeviceChannel> updateChannelOfflineList = new CopyOnWriteArrayList<>(); private final Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>(); private final Map<String, DeviceChannel> addChannelMap = new ConcurrentHashMap<>(); @@ -60,275 +56,110 @@ private IDeviceChannelService deviceChannelService; @Autowired - private DynamicTask dynamicTask; - - @Autowired - private CivilCodeFileConf civilCodeFileConf; - - @Autowired private SipConfig sipConfig; - private final static String talkKey = "notify-request-for-catalog-task"; + @Transactional + public void process(List<RequestEvent> evtList) { + if (evtList.isEmpty()) { + return; + } + for (RequestEvent evt : evtList) { + try { + long start = System.currentTimeMillis(); + FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); + String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); - public void process(RequestEvent evt) { - try { - long start = System.currentTimeMillis(); - FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); - String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); + Device device = redisCatchStorage.getDevice(deviceId); + if (device == null || !device.isOnLine()) { + logger.warn("[鏀跺埌鐩綍璁㈤槄]锛歿}, 浣嗘槸璁惧宸茬粡绂荤嚎", (device != null ? device.getDeviceId():"" )); + return; + } + Element rootElement = getRootElement(evt, device.getCharset()); + if (rootElement == null) { + logger.warn("[ 鏀跺埌鐩綍璁㈤槄 ] content cannot be null, {}", evt.getRequest()); + return; + } + Element deviceListElement = rootElement.element("DeviceList"); + if (deviceListElement == null) { + return; + } + Iterator<Element> deviceListIterator = deviceListElement.elementIterator(); + if (deviceListIterator != null) { - Device device = redisCatchStorage.getDevice(deviceId); - if (device == null || !device.isOnLine()) { - logger.warn("[鏀跺埌鐩綍璁㈤槄]锛歿}, 浣嗘槸璁惧宸茬粡绂荤嚎", (device != null ? device.getDeviceId():"" )); - return; - } - Element rootElement = getRootElement(evt, device.getCharset()); - if (rootElement == null) { - logger.warn("[ 鏀跺埌鐩綍璁㈤槄 ] content cannot be null, {}", evt.getRequest()); - return; - } - Element deviceListElement = rootElement.element("DeviceList"); - if (deviceListElement == null) { - return; - } - Iterator<Element> deviceListIterator = deviceListElement.elementIterator(); - if (deviceListIterator != null) { - - // 閬嶅巻DeviceList - while (deviceListIterator.hasNext()) { - Element itemDevice = deviceListIterator.next(); - Element channelDeviceElement = itemDevice.element("DeviceID"); - if (channelDeviceElement == null) { - continue; - } - Element eventElement = itemDevice.element("Event"); - String event; - if (eventElement == null) { - logger.warn("[鏀跺埌鐩綍璁㈤槄]锛歿}, 浣嗘槸Event涓虹┖, 璁句负榛樿鍊� ADD", (device != null ? device.getDeviceId():"" )); - event = CatalogEvent.ADD; - }else { - event = eventElement.getText().toUpperCase(); - } - DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event); - if (channel == null) { - logger.info("[鏀跺埌鐩綍璁㈤槄]锛氫絾鏄В鏋愬け璐� {}", new String(evt.getRequest().getRawContent())); - continue; - } - if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) { - channel.setParentId(null); - } - channel.setDeviceId(device.getDeviceId()); - logger.info("[鏀跺埌鐩綍璁㈤槄]锛歿}/{}", device.getDeviceId(), channel.getChannelId()); - switch (event) { - case CatalogEvent.ON: - // 涓婄嚎 - logger.info("[鏀跺埌閫氶亾涓婄嚎閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - updateChannelOnlineList.add(channel); - if (updateChannelOnlineList.size() > 300) { - executeSaveForOnline(); - } - if (userSetting.getDeviceStatusNotify()) { - // 鍙戦�乺edis娑堟伅 - redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), true); - } - - break; - case CatalogEvent.OFF : - // 绂荤嚎 - logger.info("[鏀跺埌閫氶亾绂荤嚎閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - if (userSetting.getRefuseChannelStatusChannelFormNotify()) { - logger.info("[鏀跺埌閫氶亾绂荤嚎閫氱煡] 浣嗘槸骞冲彴宸查厤缃嫆缁濇娑堟伅锛屾潵鑷澶�: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - }else { - updateChannelOfflineList.add(channel); - if (updateChannelOfflineList.size() > 300) { - executeSaveForOffline(); - } - if (userSetting.getDeviceStatusNotify()) { - // 鍙戦�乺edis娑堟伅 - redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); - } - } - break; - case CatalogEvent.VLOST: - // 瑙嗛涓㈠け - logger.info("[鏀跺埌閫氶亾瑙嗛涓㈠け閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - if (userSetting.getRefuseChannelStatusChannelFormNotify()) { - logger.info("[鏀跺埌閫氶亾瑙嗛涓㈠け閫氱煡] 浣嗘槸骞冲彴宸查厤缃嫆缁濇娑堟伅锛屾潵鑷澶�: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - }else { - updateChannelOfflineList.add(channel); - if (updateChannelOfflineList.size() > 300) { - executeSaveForOffline(); - } - if (userSetting.getDeviceStatusNotify()) { - // 鍙戦�乺edis娑堟伅 - redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); - } - } - break; - case CatalogEvent.DEFECT: - // 鏁呴殰 - logger.info("[鏀跺埌閫氶亾瑙嗛鏁呴殰閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - if (userSetting.getRefuseChannelStatusChannelFormNotify()) { - logger.info("[鏀跺埌閫氶亾瑙嗛鏁呴殰閫氱煡] 浣嗘槸骞冲彴宸查厤缃嫆缁濇娑堟伅锛屾潵鑷澶�: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - }else { - updateChannelOfflineList.add(channel); - if (updateChannelOfflineList.size() > 300) { - executeSaveForOffline(); - } - if (userSetting.getDeviceStatusNotify()) { - // 鍙戦�乺edis娑堟伅 - redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); - } - } - break; - case CatalogEvent.ADD: - // 澧炲姞 - logger.info("[鏀跺埌澧炲姞閫氶亾閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - // 鍒ゆ柇姝ら�氶亾鏄惁瀛樺湪 - DeviceChannel deviceChannel = deviceChannelService.getOne(deviceId, channel.getChannelId()); - if (deviceChannel != null) { - logger.info("[澧炲姞閫氶亾] 宸插瓨鍦紝涓嶅彂閫侀�氱煡鍙洿鏂帮紝璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - channel.setId(deviceChannel.getId()); - updateChannelMap.put(channel.getChannelId(), channel); - if (updateChannelMap.keySet().size() > 300) { - executeSaveForUpdate(); - } - }else { - addChannelMap.put(channel.getChannelId(), channel); - if (userSetting.getDeviceStatusNotify()) { - // 鍙戦�乺edis娑堟伅 - redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true); - } - - if (addChannelMap.keySet().size() > 300) { - executeSaveForAdd(); - } - } - - break; - case CatalogEvent.DEL: - // 鍒犻櫎 - logger.info("[鏀跺埌鍒犻櫎閫氶亾閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - deleteChannelList.add(channel); - if (userSetting.getDeviceStatusNotify()) { - // 鍙戦�乺edis娑堟伅 - redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), false); - } - if (deleteChannelList.size() > 300) { - executeSaveForDelete(); - } - break; - case CatalogEvent.UPDATE: - // 鏇存柊 - logger.info("[鏀跺埌鏇存柊閫氶亾閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - // 鍒ゆ柇姝ら�氶亾鏄惁瀛樺湪 - DeviceChannel deviceChannelForUpdate = deviceChannelService.getOne(deviceId, channel.getChannelId()); - if (deviceChannelForUpdate != null) { - channel.setId(deviceChannelForUpdate.getId()); - channel.setUpdateTime(DateUtil.getNow()); - updateChannelMap.put(channel.getChannelId(), channel); - if (updateChannelMap.keySet().size() > 300) { - executeSaveForUpdate(); - } - }else { - addChannelMap.put(channel.getChannelId(), channel); - if (addChannelMap.keySet().size() > 300) { - executeSaveForAdd(); - } - if (userSetting.getDeviceStatusNotify()) { - // 鍙戦�乺edis娑堟伅 - redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true); - } - } - break; - default: - logger.warn("[ NotifyCatalog ] event not found 锛� {}", event ); - - } - // 杞彂鍙樺寲淇℃伅 - eventPublisher.catalogEventPublish(null, channel, event); - - if (!updateChannelMap.keySet().isEmpty() - || !addChannelMap.keySet().isEmpty() - || !updateChannelOnlineList.isEmpty() - || !updateChannelOfflineList.isEmpty() - || !deleteChannelList.isEmpty()) { - - if (!dynamicTask.contains(talkKey)) { - dynamicTask.startDelay(talkKey, this::executeSave, 1000); + // 閬嶅巻DeviceList + while (deviceListIterator.hasNext()) { + Element itemDevice = deviceListIterator.next(); + Element eventElement = itemDevice.element("Event"); + String event; + if (eventElement == null) { + logger.warn("[鏀跺埌鐩綍璁㈤槄]锛歿}, 浣嗘槸Event涓虹┖, 璁句负榛樿鍊� ADD", (device != null ? device.getDeviceId():"" )); + event = CatalogEvent.ADD; + }else { + event = eventElement.getText().toUpperCase(); } + DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event); + if (channel == null) { + logger.info("[鏀跺埌鐩綍璁㈤槄]锛氫絾鏄В鏋愬け璐� {}", new String(evt.getRequest().getRawContent())); + continue; + } + if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) { + channel.setParentId(null); + } + channel.setDeviceId(device.getDeviceId()); + logger.info("[鏀跺埌鐩綍璁㈤槄]锛歿}, {}/{}",event, device.getDeviceId(), channel.getChannelId()); + switch (event) { + case CatalogEvent.ON: + // 涓婄嚎 + deviceChannelService.online(channel); + if (userSetting.getDeviceStatusNotify()) { + // 鍙戦�乺edis娑堟伅 + redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), true); + } + + break; + case CatalogEvent.OFF : + case CatalogEvent.VLOST: + case CatalogEvent.DEFECT: + // 绂荤嚎 + if (userSetting.getRefuseChannelStatusChannelFormNotify()) { + logger.info("[鐩綍璁㈤槄] 绂荤嚎 浣嗘槸骞冲彴宸查厤缃嫆缁濇娑堟伅锛屾潵鑷澶�: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); + }else { + deviceChannelService.offline(channel); + if (userSetting.getDeviceStatusNotify()) { + // 鍙戦�乺edis娑堟伅 + redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); + } + } + break; + case CatalogEvent.DEL: + // 鍒犻櫎 + deviceChannelService.delete(channel); + if (userSetting.getDeviceStatusNotify()) { + // 鍙戦�乺edis娑堟伅 + redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), false); + } + break; + case CatalogEvent.ADD: + case CatalogEvent.UPDATE: + // 鏇存柊 + channel.setUpdateTime(DateUtil.getNow()); + deviceChannelService.updateChannel(deviceId,channel); + if (userSetting.getDeviceStatusNotify()) { + // 鍙戦�乺edis娑堟伅 + redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true); + } + break; + default: + logger.warn("[ NotifyCatalog ] event not found 锛� {}", event ); + + } + // 杞彂鍙樺寲淇℃伅 + eventPublisher.catalogEventPublish(null, channel, event); } } + } catch (DocumentException e) { + logger.error("鏈鐞嗙殑寮傚父 ", e); } - } catch (DocumentException e) { - logger.error("鏈鐞嗙殑寮傚父 ", e); } } - - private void executeSave(){ - try { - executeSaveForAdd(); - } catch (Exception e) { - logger.error("[瀛樺偍鏀跺埌鐨勫鍔犻�氶亾] 寮傚父锛� ", e ); - } - try { - executeSaveForUpdate(); - } catch (Exception e) { - logger.error("[瀛樺偍鏀跺埌鐨勬洿鏂伴�氶亾] 寮傚父锛� ", e ); - } - try { - executeSaveForDelete(); - } catch (Exception e) { - logger.error("[瀛樺偍鏀跺埌鐨勫垹闄ら�氶亾] 寮傚父锛� ", e ); - } - try { - executeSaveForOnline(); - } catch (Exception e) { - logger.error("[瀛樺偍鏀跺埌鐨勯�氶亾涓婄嚎] 寮傚父锛� ", e ); - } - try { - executeSaveForOffline(); - } catch (Exception e) { - logger.error("[瀛樺偍鏀跺埌鐨勯�氶亾绂荤嚎] 寮傚父锛� ", e ); - } - dynamicTask.stop(talkKey); - } - - private void executeSaveForUpdate(){ - if (!updateChannelMap.values().isEmpty()) { - ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values()); - updateChannelMap.clear(); - deviceChannelService.batchUpdateChannel(deviceChannels); - } - - } - - private void executeSaveForAdd(){ - if (!addChannelMap.values().isEmpty()) { - ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(addChannelMap.values()); - addChannelMap.clear(); - deviceChannelService.batchAddChannel(deviceChannels); - } - } - - private void executeSaveForDelete(){ - if (!deleteChannelList.isEmpty()) { - deviceChannelService.deleteChannels(deleteChannelList); - deleteChannelList.clear(); - } - } - - private void executeSaveForOnline(){ - if (!updateChannelOnlineList.isEmpty()) { - deviceChannelService.channelsOnline(updateChannelOnlineList); - updateChannelOnlineList.clear(); - } - } - - private void executeSaveForOffline(){ - if (!updateChannelOfflineList.isEmpty()) { - deviceChannelService.channelsOffline(updateChannelOfflineList); - updateChannelOfflineList.clear(); - } - } - } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java new file mode 100755 index 0000000..05a5336 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java @@ -0,0 +1,199 @@ +package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; + +import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; +import com.genersoft.iot.vmp.gb28181.event.EventPublisher; +import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; +import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; +import com.genersoft.iot.vmp.gb28181.utils.SipUtils; +import com.genersoft.iot.vmp.service.IDeviceChannelService; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.utils.DateUtil; +import org.dom4j.DocumentException; +import org.dom4j.Element; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.util.ObjectUtils; + +import javax.sip.RequestEvent; +import javax.sip.header.FromHeader; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * SIP鍛戒护绫诲瀷锛� NOTIFY璇锋眰涓殑绉诲姩浣嶇疆璇锋眰澶勭悊 + */ +@Component +public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessorParent { + + + private final static Logger logger = LoggerFactory.getLogger(NotifyRequestForMobilePositionProcessor.class); + + + @Autowired + private UserSetting userSetting; + + @Autowired + private EventPublisher eventPublisher; + + @Autowired + private IRedisCatchStorage redisCatchStorage; + + @Autowired + private IDeviceChannelService deviceChannelService; + + @Transactional + public void process(List<RequestEvent> eventList) { + if (eventList.isEmpty()) { + return; + } + Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>(); + List<MobilePosition> addMobilePositionList = new ArrayList<>(); + for (RequestEvent evt : eventList) { + try { + FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); + String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); + long startTime = System.currentTimeMillis(); + // 鍥炲 200 OK + Element rootElement = getRootElement(evt); + if (rootElement == null) { + logger.error("澶勭悊MobilePosition绉诲姩浣嶇疆Notify鏃舵湭鑾峰彇鍒版秷鎭綋,{}", evt.getRequest()); + return; + } + Device device = redisCatchStorage.getDevice(deviceId); + if (device == null) { + logger.error("澶勭悊MobilePosition绉诲姩浣嶇疆Notify鏃舵湭鑾峰彇鍒癲evice,{}", deviceId); + return; + } + MobilePosition mobilePosition = new MobilePosition(); + mobilePosition.setDeviceId(device.getDeviceId()); + mobilePosition.setDeviceName(device.getName()); + mobilePosition.setCreateTime(DateUtil.getNow()); + List<Element> elements = rootElement.elements(); + for (Element element : elements) { + switch (element.getName()){ + case "DeviceID": + String channelId = element.getStringValue(); + if (!deviceId.equals(channelId)) { + mobilePosition.setChannelId(channelId); + } + continue; + case "Time": + String timeVal = element.getStringValue(); + if (ObjectUtils.isEmpty(timeVal)) { + mobilePosition.setTime(DateUtil.getNow()); + } else { + mobilePosition.setTime(SipUtils.parseTime(timeVal)); + } + continue; + case "Longitude": + mobilePosition.setLongitude(Double.parseDouble(element.getStringValue())); + continue; + case "Latitude": + mobilePosition.setLatitude(Double.parseDouble(element.getStringValue())); + continue; + case "Speed": + String speedVal = element.getStringValue(); + if (NumericUtil.isDouble(speedVal)) { + mobilePosition.setSpeed(Double.parseDouble(speedVal)); + } else { + mobilePosition.setSpeed(0.0); + } + continue; + case "Direction": + String directionVal = element.getStringValue(); + if (NumericUtil.isDouble(directionVal)) { + mobilePosition.setDirection(Double.parseDouble(directionVal)); + } else { + mobilePosition.setDirection(0.0); + } + continue; + case "Altitude": + String altitudeVal = element.getStringValue(); + if (NumericUtil.isDouble(altitudeVal)) { + mobilePosition.setAltitude(Double.parseDouble(altitudeVal)); + } else { + mobilePosition.setAltitude(0.0); + } + continue; + + } + } + +// logger.info("[鏀跺埌绉诲姩浣嶇疆璁㈤槄閫氱煡]锛歿}/{}->{}.{}, 鏃堕棿锛� {}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(), +// mobilePosition.getLongitude(), mobilePosition.getLatitude(), System.currentTimeMillis() - startTime); + mobilePosition.setReportSource("Mobile Position"); + + // 鏇存柊device channel 鐨勭粡绾害 + DeviceChannel deviceChannel = new DeviceChannel(); + deviceChannel.setDeviceId(device.getDeviceId()); + deviceChannel.setLongitude(mobilePosition.getLongitude()); + deviceChannel.setLatitude(mobilePosition.getLatitude()); + deviceChannel.setGpsTime(mobilePosition.getTime()); + updateChannelMap.put(deviceId + mobilePosition.getChannelId(), deviceChannel); + addMobilePositionList.add(mobilePosition); + + + // 鍚戝叧鑱斾簡璇ラ�氶亾骞朵笖寮�鍚Щ鍔ㄤ綅缃闃呯殑涓婄骇骞冲彴鍙戦�佺Щ鍔ㄤ綅缃闃呮秷鎭� + try { + eventPublisher.mobilePositionEventPublish(mobilePosition); + }catch (Exception e) { + logger.error("[鍚戜笂绾ц浆鍙戠Щ鍔ㄤ綅缃け璐 ", e); + } + if (mobilePosition.getChannelId().equals(mobilePosition.getDeviceId()) || mobilePosition.getChannelId() == null) { + List<DeviceChannel> channels = deviceChannelService.queryChaneListByDeviceId(mobilePosition.getDeviceId()); + channels.forEach(channel -> { + // 鍙戦�乺edis娑堟伅銆� 閫氱煡浣嶇疆淇℃伅鐨勫彉鍖� + JSONObject jsonObject = new JSONObject(); + jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime())); + jsonObject.put("serial", channel.getDeviceId()); + jsonObject.put("code", channel.getChannelId()); + jsonObject.put("longitude", mobilePosition.getLongitude()); + jsonObject.put("latitude", mobilePosition.getLatitude()); + jsonObject.put("altitude", mobilePosition.getAltitude()); + jsonObject.put("direction", mobilePosition.getDirection()); + jsonObject.put("speed", mobilePosition.getSpeed()); + redisCatchStorage.sendMobilePositionMsg(jsonObject); + }); + }else { + // 鍙戦�乺edis娑堟伅銆� 閫氱煡浣嶇疆淇℃伅鐨勫彉鍖� + JSONObject jsonObject = new JSONObject(); + jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime())); + jsonObject.put("serial", mobilePosition.getDeviceId()); + jsonObject.put("code", mobilePosition.getChannelId()); + jsonObject.put("longitude", mobilePosition.getLongitude()); + jsonObject.put("latitude", mobilePosition.getLatitude()); + jsonObject.put("altitude", mobilePosition.getAltitude()); + jsonObject.put("direction", mobilePosition.getDirection()); + jsonObject.put("speed", mobilePosition.getSpeed()); + redisCatchStorage.sendMobilePositionMsg(jsonObject); + } + } catch (DocumentException e) { + logger.error("鏈鐞嗙殑寮傚父 ", e); + } + } + if(!updateChannelMap.isEmpty()) { + List<DeviceChannel> channels = new ArrayList<>(updateChannelMap.values()); + logger.info("[绉诲姩浣嶇疆璁㈤槄]鏇存柊閫氶亾浣嶇疆锛� {}", channels.size()); + deviceChannelService.batchUpdateChannelGPS(channels); + updateChannelMap.clear(); + } + if (userSetting.isSavePositionHistory() && !addMobilePositionList.isEmpty()) { + try { + logger.info("[绉诲姩浣嶇疆璁㈤槄] 娣诲姞閫氶亾杞ㄨ抗鐐逛綅锛� {}", addMobilePositionList.size()); + deviceChannelService.batchAddMobilePosition(addMobilePositionList); + }catch (Exception e) { + logger.info("[绉诲姩浣嶇疆璁㈤槄] b娣诲姞閫氶亾杞ㄨ抗鐐逛綅淇濆瓨澶辫触锛� {}", addMobilePositionList.size()); + } + addMobilePositionList.clear(); + } + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java index e54aa2d..8a42624 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -25,6 +25,8 @@ import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.scheduling.annotation.Async; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; @@ -35,6 +37,7 @@ import javax.sip.header.FromHeader; import javax.sip.message.Response; import java.text.ParseException; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; @@ -76,6 +79,9 @@ @Autowired private NotifyRequestForCatalogProcessor notifyRequestForCatalogProcessor; + @Autowired + private NotifyRequestForMobilePositionProcessor notifyRequestForMobilePositionProcessor; + private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>(); @Qualifier("taskExecutor") @@ -97,61 +103,73 @@ responseAck((SIPRequest) evt.getRequest(), Response.BUSY_HERE, null, null); logger.error("[notify] 寰呭鐞嗘秷鎭槦鍒楀凡婊� {}锛岃繑鍥�486 BUSY_HERE锛屾秷鎭笉鍋氬鐞�", userSetting.getMaxNotifyCountQueue()); return; - }else { + } else { responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null); } - }catch (SipException | InvalidArgumentException | ParseException e) { + } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("鏈鐞嗙殑寮傚父 ", e); } boolean runed = !taskQueue.isEmpty(); - logger.info("[notify] 寰呭鐞嗘秷鎭暟閲忥細 {}", taskQueue.size()); taskQueue.offer(new HandlerCatchData(evt, null, null)); - if (!runed) { - taskExecutor.execute(()-> { - while (!taskQueue.isEmpty()) { - try { - HandlerCatchData take = taskQueue.poll(); - if (take == null) { - continue; - } - Element rootElement = getRootElement(take.getEvt()); - if (rootElement == null) { - logger.error("澶勭悊NOTIFY娑堟伅鏃舵湭鑾峰彇鍒版秷鎭綋,{}", take.getEvt().getRequest()); - continue; - } - String cmd = XmlUtil.getText(rootElement, "CmdType"); - - if (CmdType.CATALOG.equals(cmd)) { - logger.info("鎺ユ敹鍒癈atalog閫氱煡"); - notifyRequestForCatalogProcessor.process(take.getEvt()); - } else if (CmdType.ALARM.equals(cmd)) { - logger.info("鎺ユ敹鍒癆larm閫氱煡"); - processNotifyAlarm(take.getEvt()); - } else if (CmdType.MOBILE_POSITION.equals(cmd)) { - logger.info("鎺ユ敹鍒癕obilePosition閫氱煡"); - processNotifyMobilePosition(take.getEvt()); - } else { - logger.info("鎺ユ敹鍒版秷鎭細" + cmd); - } - } catch (DocumentException e) { - logger.error("澶勭悊NOTIFY娑堟伅鏃堕敊璇�", e); - } + } + @Scheduled(fixedRate = 200) //姣�200姣鎵ц涓�娆� + public void executeTaskQueue(){ + if (taskQueue.isEmpty()) { + return; + } + try { + List<RequestEvent> catalogEventList = new ArrayList<>(); + List<RequestEvent> alarmEventList = new ArrayList<>(); + List<RequestEvent> mobilePositionEventList = new ArrayList<>(); + for (HandlerCatchData take : taskQueue) { + if (take == null) { + continue; } - }); + Element rootElement = getRootElement(take.getEvt()); + if (rootElement == null) { + logger.error("澶勭悊NOTIFY娑堟伅鏃舵湭鑾峰彇鍒版秷鎭綋,{}", take.getEvt().getRequest()); + continue; + } + String cmd = XmlUtil.getText(rootElement, "CmdType"); + + if (CmdType.CATALOG.equals(cmd)) { + catalogEventList.add(take.getEvt()); + } else if (CmdType.ALARM.equals(cmd)) { + alarmEventList.add(take.getEvt()); + } else if (CmdType.MOBILE_POSITION.equals(cmd)) { + mobilePositionEventList.add(take.getEvt()); + } else { + logger.info("鎺ユ敹鍒版秷鎭細" + cmd); + } + } + taskQueue.clear(); + if (!alarmEventList.isEmpty()) { + processNotifyAlarm(alarmEventList); + } + if (!catalogEventList.isEmpty()) { + notifyRequestForCatalogProcessor.process(catalogEventList); + } + if (!mobilePositionEventList.isEmpty()) { + notifyRequestForMobilePositionProcessor.process(mobilePositionEventList); + } + } catch (DocumentException e) { + logger.error("澶勭悊NOTIFY娑堟伅鏃堕敊璇�", e); } } + + /** * 澶勭悊MobilePosition绉诲姩浣嶇疆Notify * * @param evt */ - private void processNotifyMobilePosition(RequestEvent evt) { + @Async("taskExecutor") + public void processNotifyMobilePosition(RequestEvent evt) { try { FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); - // 鍥炲 200 OK Element rootElement = getRootElement(evt); if (rootElement == null) { @@ -179,6 +197,13 @@ if (device == null) { logger.warn("[mobilePosition绉诲姩浣嶇疆Notify] 鏈壘鍒伴�氶亾{}鎵�灞炵殑璁惧", channelId); return; + } + // 鍏煎璁惧閮ㄥ垎璁惧涓婃姤鏄�氶亾缂栧彿涓庤澶囩紪鍙蜂竴鑷寸殑鎯呭喌 + if(deviceId.equals(channelId)) { + List<DeviceChannel> deviceChannels = deviceChannelService.queryChaneListByDeviceId(deviceId); + if (deviceChannels.size() == 1) { + channelId = deviceChannels.get(0).getChannelId(); + } } if (!ObjectUtils.isEmpty(device.getName())) { mobilePosition.setDeviceName(device.getName()); @@ -210,8 +235,8 @@ } else { mobilePosition.setAltitude(0.0); } - logger.info("[鏀跺埌绉诲姩浣嶇疆璁㈤槄閫氱煡]锛歿}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(), - mobilePosition.getLongitude(), mobilePosition.getLatitude()); +// logger.info("[鏀跺埌绉诲姩浣嶇疆璁㈤槄閫氱煡]锛歿}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(), +// mobilePosition.getLongitude(), mobilePosition.getLatitude()); mobilePosition.setReportSource("Mobile Position"); // 鏇存柊device channel 鐨勭粡绾害 @@ -221,12 +246,12 @@ deviceChannel.setLongitude(mobilePosition.getLongitude()); deviceChannel.setLatitude(mobilePosition.getLatitude()); deviceChannel.setGpsTime(mobilePosition.getTime()); - deviceChannel = deviceChannelService.updateGps(deviceChannel, device); - - mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84()); - mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84()); - mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); - mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); +// deviceChannel = deviceChannelService.updateGps(deviceChannel, device); +// +// mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84()); +// mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84()); +// mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); +// mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition); @@ -237,95 +262,97 @@ /*** * 澶勭悊alarm璁惧鎶ヨNotify - * - * @param evt */ - private void processNotifyAlarm(RequestEvent evt) { + private void processNotifyAlarm(List<RequestEvent> evtList) { if (!sipConfig.isAlarm()) { return; } - try { - FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); - String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); + if (!evtList.isEmpty()) { + for (RequestEvent evt : evtList) { + try { + FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); + String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); - Element rootElement = getRootElement(evt); - if (rootElement == null) { - logger.error("澶勭悊alarm璁惧鎶ヨNotify鏃舵湭鑾峰彇鍒版秷鎭綋{}", evt.getRequest()); - return; - } - Element deviceIdElement = rootElement.element("DeviceID"); - String channelId = deviceIdElement.getText().toString(); + Element rootElement = getRootElement(evt); + if (rootElement == null) { + logger.error("澶勭悊alarm璁惧鎶ヨNotify鏃舵湭鑾峰彇鍒版秷鎭綋{}", evt.getRequest()); + return; + } + Element deviceIdElement = rootElement.element("DeviceID"); + String channelId = deviceIdElement.getText().toString(); - Device device = redisCatchStorage.getDevice(deviceId); - if (device == null) { - logger.warn("[ NotifyAlarm ] 鏈壘鍒拌澶囷細{}", deviceId); - return; - } - rootElement = getRootElement(evt, device.getCharset()); - if (rootElement == null) { - logger.warn("[ NotifyAlarm ] content cannot be null, {}", evt.getRequest()); - return; - } - DeviceAlarm deviceAlarm = new DeviceAlarm(); - deviceAlarm.setDeviceId(deviceId); - deviceAlarm.setAlarmPriority(XmlUtil.getText(rootElement, "AlarmPriority")); - deviceAlarm.setAlarmMethod(XmlUtil.getText(rootElement, "AlarmMethod")); - String alarmTime = XmlUtil.getText(rootElement, "AlarmTime"); - if (alarmTime == null) { - logger.warn("[ NotifyAlarm ] AlarmTime cannot be null"); - return; - } - deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime)); - if (XmlUtil.getText(rootElement, "AlarmDescription") == null) { - deviceAlarm.setAlarmDescription(""); - } else { - deviceAlarm.setAlarmDescription(XmlUtil.getText(rootElement, "AlarmDescription")); - } - if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Longitude"))) { - deviceAlarm.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude"))); - } else { - deviceAlarm.setLongitude(0.00); - } - if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Latitude"))) { - deviceAlarm.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude"))); - } else { - deviceAlarm.setLatitude(0.00); - } - logger.info("[鏀跺埌Notify-Alarm]锛歿}/{}", device.getDeviceId(), deviceAlarm.getChannelId()); - if ("4".equals(deviceAlarm.getAlarmMethod())) { - MobilePosition mobilePosition = new MobilePosition(); - mobilePosition.setChannelId(channelId); - mobilePosition.setCreateTime(DateUtil.getNow()); - mobilePosition.setDeviceId(deviceAlarm.getDeviceId()); - mobilePosition.setTime(deviceAlarm.getAlarmTime()); - mobilePosition.setLongitude(deviceAlarm.getLongitude()); - mobilePosition.setLatitude(deviceAlarm.getLatitude()); - mobilePosition.setReportSource("GPS Alarm"); + Device device = redisCatchStorage.getDevice(deviceId); + if (device == null) { + logger.warn("[ NotifyAlarm ] 鏈壘鍒拌澶囷細{}", deviceId); + return; + } + rootElement = getRootElement(evt, device.getCharset()); + if (rootElement == null) { + logger.warn("[ NotifyAlarm ] content cannot be null, {}", evt.getRequest()); + return; + } + DeviceAlarm deviceAlarm = new DeviceAlarm(); + deviceAlarm.setDeviceId(deviceId); + deviceAlarm.setAlarmPriority(XmlUtil.getText(rootElement, "AlarmPriority")); + deviceAlarm.setAlarmMethod(XmlUtil.getText(rootElement, "AlarmMethod")); + String alarmTime = XmlUtil.getText(rootElement, "AlarmTime"); + if (alarmTime == null) { + logger.warn("[ NotifyAlarm ] AlarmTime cannot be null"); + return; + } + deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime)); + if (XmlUtil.getText(rootElement, "AlarmDescription") == null) { + deviceAlarm.setAlarmDescription(""); + } else { + deviceAlarm.setAlarmDescription(XmlUtil.getText(rootElement, "AlarmDescription")); + } + if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Longitude"))) { + deviceAlarm.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude"))); + } else { + deviceAlarm.setLongitude(0.00); + } + if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Latitude"))) { + deviceAlarm.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude"))); + } else { + deviceAlarm.setLatitude(0.00); + } + logger.info("[鏀跺埌Notify-Alarm]锛歿}/{}", device.getDeviceId(), deviceAlarm.getChannelId()); + if ("4".equals(deviceAlarm.getAlarmMethod())) { + MobilePosition mobilePosition = new MobilePosition(); + mobilePosition.setChannelId(channelId); + mobilePosition.setCreateTime(DateUtil.getNow()); + mobilePosition.setDeviceId(deviceAlarm.getDeviceId()); + mobilePosition.setTime(deviceAlarm.getAlarmTime()); + mobilePosition.setLongitude(deviceAlarm.getLongitude()); + mobilePosition.setLatitude(deviceAlarm.getLatitude()); + mobilePosition.setReportSource("GPS Alarm"); - // 鏇存柊device channel 鐨勭粡绾害 - DeviceChannel deviceChannel = new DeviceChannel(); - deviceChannel.setDeviceId(device.getDeviceId()); - deviceChannel.setChannelId(channelId); - deviceChannel.setLongitude(mobilePosition.getLongitude()); - deviceChannel.setLatitude(mobilePosition.getLatitude()); - deviceChannel.setGpsTime(mobilePosition.getTime()); + // 鏇存柊device channel 鐨勭粡绾害 + DeviceChannel deviceChannel = new DeviceChannel(); + deviceChannel.setDeviceId(device.getDeviceId()); + deviceChannel.setChannelId(channelId); + deviceChannel.setLongitude(mobilePosition.getLongitude()); + deviceChannel.setLatitude(mobilePosition.getLatitude()); + deviceChannel.setGpsTime(mobilePosition.getTime()); - deviceChannel = deviceChannelService.updateGps(deviceChannel, device); + deviceChannel = deviceChannelService.updateGps(deviceChannel, device); - mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84()); - mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84()); - mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); - mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); + mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84()); + mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84()); + mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); + mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); - deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition); + deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition); + } + + // 鍥炲200 OK + if (redisCatchStorage.deviceIsOnline(deviceId)) { + publisher.deviceAlarmEventPublish(deviceAlarm); + } + } catch (DocumentException e) { + logger.error("鏈鐞嗙殑寮傚父 ", e); + } } - - // 鍥炲200 OK - if (redisCatchStorage.deviceIsOnline(deviceId)) { - publisher.deviceAlarmEventPublish(deviceAlarm); - } - } catch (DocumentException e) { - logger.error("鏈鐞嗙殑寮傚父 ", e); } } @@ -353,4 +380,9 @@ public void setRedisCatchStorage(IRedisCatchStorage redisCatchStorage) { this.redisCatchStorage = redisCatchStorage; } + + @Scheduled(fixedRate = 10000) //姣�1绉掓墽琛屼竴娆� + public void execute(){ + logger.info("[寰呭鐞哊otify娑堟伅鏁伴噺]: {}", taskQueue.size()); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/utils/XmlUtil.java b/src/main/java/com/genersoft/iot/vmp/gb28181/utils/XmlUtil.java index 70702bb..6596f53 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/utils/XmlUtil.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/utils/XmlUtil.java @@ -532,16 +532,17 @@ String status = getText(itemDevice, "Status"); if (status != null) { // ONLINE OFFLINE HIKVISION DS-7716N-E4 NVR鐨勫吋瀹规�у鐞� - if (status.equals("ON") || status.equals("On") || status.equals("ONLINE") || status.equals("OK")) { + if (status.equalsIgnoreCase("ON") || status.equalsIgnoreCase("On") || status.equalsIgnoreCase("ONLINE") || status.equalsIgnoreCase("OK")) { deviceChannel.setStatus(true); } - if (status.equals("OFF") || status.equals("Off") || status.equals("OFFLINE")) { + if (status.equalsIgnoreCase("OFF") || status.equalsIgnoreCase("Off") || status.equalsIgnoreCase("OFFLINE")) { deviceChannel.setStatus(false); } }else { deviceChannel.setStatus(true); } - +// logger.info("鐘舵�佸瓧绗︿覆锛� {}", status); +// logger.info("鐘舵�佺粨鏋滐細 {}", deviceChannel.isStatus()); // 缁忓害 String longitude = getText(itemDevice, "Longitude"); if (NumericUtil.isDouble(longitude)) { diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index f5b1b6c..030dd7d 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -5,6 +5,8 @@ import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; +import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; +import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; @@ -21,6 +23,8 @@ import com.genersoft.iot.vmp.media.zlm.event.HookZlmServerKeepaliveEvent; import com.genersoft.iot.vmp.media.zlm.event.HookZlmServerStartEvent; import com.genersoft.iot.vmp.service.*; +import com.genersoft.iot.vmp.service.bean.SSRCInfo; +import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import org.slf4j.Logger; @@ -66,6 +70,10 @@ @Autowired private IRedisCatchStorage redisCatchStorage; + + @Autowired + private IRedisRpcService redisRpcService; + @Autowired private IInviteStreamService inviteStreamService; @@ -86,9 +94,6 @@ @Autowired private EventPublisher eventPublisher; - - @Autowired - private ZLMMediaListManager zlmMediaListManager; @Autowired private HookSubscribe subscribe; @@ -117,6 +122,9 @@ @Autowired private ApplicationEventPublisher applicationEventPublisher; + + @Autowired + private IStreamPushService streamPushService; /** * 鏈嶅姟鍣ㄥ畾鏃朵笂鎶ユ椂闂达紝涓婃姤闂撮殧鍙厤缃紝榛樿10s涓婃姤涓�娆� @@ -172,6 +180,54 @@ if (mediaServer == null) { return new HookResultForOnPublish(0, "success"); } + // 鎺ㄦ祦閴存潈鐨勫鐞� + if (!"rtp".equals(param.getApp())) { + StreamProxyItem stream = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream()); + if (stream != null) { + HookResultForOnPublish result = HookResultForOnPublish.SUCCESS(); + result.setEnable_audio(stream.isEnableAudio()); + result.setEnable_mp4(stream.isEnableMp4()); + return result; + } + if (userSetting.getPushAuthority()) { + // 鎺ㄦ祦閴存潈 + if (param.getParams() == null) { + logger.info("鎺ㄦ祦閴存潈澶辫触锛� 缂哄皯蹇呰鍙傛暟锛歴ign=md5(user琛ㄧ殑pushKey)"); + return new HookResultForOnPublish(401, "Unauthorized"); + } + Map<String, String> paramMap = urlParamToMap(param.getParams()); + String sign = paramMap.get("sign"); + if (sign == null) { + logger.info("鎺ㄦ祦閴存潈澶辫触锛� 缂哄皯蹇呰鍙傛暟锛歴ign=md5(user琛ㄧ殑pushKey)"); + return new HookResultForOnPublish(401, "Unauthorized"); + } + // 鎺ㄦ祦鑷畾涔夋挱鏀鹃壌鏉冪爜 + String callId = paramMap.get("callId"); + // 閴存潈閰嶇疆 + boolean hasAuthority = userService.checkPushAuthority(callId, sign); + if (!hasAuthority) { + logger.info("鎺ㄦ祦閴存潈澶辫触锛� sign 鏃犳潈闄�: callId={}. sign={}", callId, sign); + return new HookResultForOnPublish(401, "Unauthorized"); + } + StreamAuthorityInfo streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(param); + streamAuthorityInfo.setCallId(callId); + streamAuthorityInfo.setSign(sign); + // 閴存潈閫氳繃 + redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), param.getStream(), streamAuthorityInfo); + } + } else { + zlmMediaListManager.sendStreamEvent(param.getApp(), param.getStream(), param.getMediaServerId()); + } + + + HookResultForOnPublish result = HookResultForOnPublish.SUCCESS(); + result.setEnable_audio(true); + taskExecutor.execute(() -> { + ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_publish, json); + if (subscribe != null) { + subscribe.response(mediaInfo, param); + } + }); ResultForOnPublish resultForOnPublish = mediaService.authenticatePublish(mediaServer, param.getApp(), param.getStream(), param.getParams()); if (resultForOnPublish != null) { @@ -207,6 +263,221 @@ MediaDepartureEvent mediaDepartureEvent = MediaDepartureEvent.getInstance(this, param, mediaServer); applicationEventPublisher.publishEvent(mediaDepartureEvent); } + + JSONObject json = (JSONObject) JSON.toJSON(param); + taskExecutor.execute(() -> { + ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_stream_changed, json); + MediaServerItem mediaInfo = mediaServerService.getOne(param.getMediaServerId()); + if (mediaInfo == null) { + logger.info("[ZLM HOOK] 娴佸彉鍖栨湭鎵惧埌ZLM, {}", param.getMediaServerId()); + return; + } + if (subscribe != null) { + subscribe.response(mediaInfo, param); + } + + List<OnStreamChangedHookParam.MediaTrack> tracks = param.getTracks(); + // TODO 閲嶆瀯姝ゅ閫昏緫 + if (param.isRegist()) { + // 澶勭悊娴佹敞鍐岀殑閴存潈淇℃伅锛� 娴佹敞閿�杩欓噷涓嶅啀鍒犻櫎閴存潈淇℃伅锛屼笅娆℃潵浜嗘柊鐨勯壌鏉冧俊鎭細瀵瑰氨鐨勮繘琛岃鐩� + if (param.getOriginType() == OriginType.RTMP_PUSH.ordinal() + || param.getOriginType() == OriginType.RTSP_PUSH.ordinal() + || param.getOriginType() == OriginType.RTC_PUSH.ordinal()) { + StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(param.getApp(), param.getStream()); + if (streamAuthorityInfo == null) { + streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(param); + } else { + streamAuthorityInfo.setOriginType(param.getOriginType()); + streamAuthorityInfo.setOriginTypeStr(param.getOriginTypeStr()); + } + redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), param.getStream(), streamAuthorityInfo); + } + } + if ("rtsp".equals(param.getSchema())) { + logger.info("娴佸彉鍖栵細娉ㄥ唽->{}, app->{}, stream->{}", param.isRegist(), param.getApp(), param.getStream()); + if (param.isRegist()) { + mediaServerService.addCount(param.getMediaServerId()); + } else { + mediaServerService.removeCount(param.getMediaServerId()); + } + + int updateStatusResult = streamProxyService.updateStatus(param.isRegist(), param.getApp(), param.getStream()); + if (updateStatusResult > 0) { + + } + + if ("rtp".equals(param.getApp()) && !param.isRegist()) { + InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, param.getStream()); + if (inviteInfo != null && (inviteInfo.getType() == InviteSessionType.PLAY || inviteInfo.getType() == InviteSessionType.PLAYBACK)) { + inviteStreamService.removeInviteInfo(inviteInfo); + storager.stopPlay(inviteInfo.getDeviceId(), inviteInfo.getChannelId()); + } + } else if ("broadcast".equals(param.getApp())) { + // 璇煶瀵硅鎺ㄦ祦 stream闇�瑕佹弧瓒虫牸寮廳eviceId_channelId + if (param.getStream().indexOf("_") > 0) { + String[] streamArray = param.getStream().split("_"); + if (streamArray.length == 2) { + String deviceId = streamArray[0]; + String channelId = streamArray[1]; + Device device = deviceService.getDevice(deviceId); + if (device != null) { + if (param.isRegist()) { + if (audioBroadcastManager.exit(deviceId, channelId)) { + playService.stopAudioBroadcast(deviceId, channelId); + } + // 寮�鍚闊冲璁查�氶亾 + try { + playService.audioBroadcastCmd(device, channelId, mediaInfo, param.getApp(), param.getStream(), 60, false, (msg) -> { + logger.info("[璇煶瀵硅] 閫氶亾寤虹珛鎴愬姛, device: {}, channel: {}", deviceId, channelId); + }); + } catch (InvalidArgumentException | ParseException | SipException e) { + logger.error("[鍛戒护鍙戦�佸け璐 璇煶瀵硅: {}", e.getMessage()); + } + } else { + // 娴佹敞閿� + playService.stopAudioBroadcast(deviceId, channelId); + } + } else { + logger.info("[璇煶瀵硅] 鏈壘鍒拌澶囷細{}", deviceId); + } + } + } + } else if ("talk".equals(param.getApp())) { + // 璇煶瀵硅鎺ㄦ祦 stream闇�瑕佹弧瓒虫牸寮廳eviceId_channelId + if (param.getStream().indexOf("_") > 0) { + String[] streamArray = param.getStream().split("_"); + if (streamArray.length == 2) { + String deviceId = streamArray[0]; + String channelId = streamArray[1]; + Device device = deviceService.getDevice(deviceId); + if (device != null) { + if (param.isRegist()) { + if (audioBroadcastManager.exit(deviceId, channelId)) { + playService.stopAudioBroadcast(deviceId, channelId); + } + // 寮�鍚闊冲璁查�氶亾 + playService.talkCmd(device, channelId, mediaInfo, param.getStream(), (msg) -> { + logger.info("[璇煶瀵硅] 閫氶亾寤虹珛鎴愬姛, device: {}, channel: {}", deviceId, channelId); + }); + } else { + // 娴佹敞閿� + playService.stopTalk(device, channelId, param.isRegist()); + } + } else { + logger.info("[璇煶瀵硅] 鏈壘鍒拌澶囷細{}", deviceId); + } + } + } + + } else { + if (!"rtp".equals(param.getApp())) { + String type = OriginType.values()[param.getOriginType()].getType(); + if (param.isRegist()) { + StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo( + param.getApp(), param.getStream()); + String callId = null; + if (streamAuthorityInfo != null) { + callId = streamAuthorityInfo.getCallId(); + } + StreamInfo streamInfoByAppAndStream = mediaService.getStreamInfoByAppAndStream(mediaInfo, + param.getApp(), param.getStream(), tracks, callId); + param.setStreamInfo(new StreamContent(streamInfoByAppAndStream)); + redisCatchStorage.addStream(mediaInfo, type, param.getApp(), param.getStream(), param); + if (param.getOriginType() == OriginType.RTSP_PUSH.ordinal() + || param.getOriginType() == OriginType.RTMP_PUSH.ordinal() + || param.getOriginType() == OriginType.RTC_PUSH.ordinal()) { + param.setSeverId(userSetting.getServerId()); + zlmMediaListManager.addPush(param); + + // 鍐椾綑鏁版嵁锛岃嚜宸辩郴缁熶腑鑷敤 + redisCatchStorage.addPushListItem(param.getApp(), param.getStream(), param); + } + } else { + // 鍏煎娴佹敞閿�鏃剁被鍨嬩粠redis璁板綍鑾峰彇 + OnStreamChangedHookParam onStreamChangedHookParam = redisCatchStorage.getStreamInfo( + param.getApp(), param.getStream(), param.getMediaServerId()); + if (onStreamChangedHookParam != null) { + type = OriginType.values()[onStreamChangedHookParam.getOriginType()].getType(); + redisCatchStorage.removeStream(mediaInfo.getId(), type, param.getApp(), param.getStream()); + if ("PUSH".equalsIgnoreCase(type)) { + // 鍐椾綑鏁版嵁锛岃嚜宸辩郴缁熶腑鑷敤 + redisCatchStorage.removePushListItem(param.getApp(), param.getStream(), param.getMediaServerId()); + } + } + GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream()); + if (gbStream != null) { +// eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF); + } + zlmMediaListManager.removeMedia(param.getApp(), param.getStream()); + } + GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream()); + if (gbStream != null) { + if (userSetting.isUsePushingAsStatus()) { + eventPublisher.catalogEventPublishForStream(null, gbStream, param.isRegist() ? CatalogEvent.ON : CatalogEvent.OFF); + } + } + if (type != null) { + // 鍙戦�佹祦鍙樺寲redis娑堟伅 + JSONObject jsonObject = new JSONObject(); + jsonObject.put("serverId", userSetting.getServerId()); + jsonObject.put("app", param.getApp()); + jsonObject.put("stream", param.getStream()); + jsonObject.put("register", param.isRegist()); + jsonObject.put("mediaServerId", param.getMediaServerId()); + redisCatchStorage.sendStreamChangeMsg(type, jsonObject); + } + } + } + if (!param.isRegist()) { + List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(param.getStream()); + if (!sendRtpItems.isEmpty()) { + for (SendRtpItem sendRtpItem : sendRtpItems) { + if (sendRtpItem == null) { + continue; + } + + if (sendRtpItem.getApp().equals(param.getApp())) { + logger.info(sendRtpItem.toString()); + if (userSetting.getServerId().equals(sendRtpItem.getServerId())) { + MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, + sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), + sendRtpItem.getPlatformId(), null, userSetting.getServerId(), param.getMediaServerId()); + // 閫氱煡鍏朵粬wvp鍋滄鍙戞祦 + redisCatchStorage.sendPushStreamClose(messageForPushChannel); + }else { + String platformId = sendRtpItem.getPlatformId(); + ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId); + Device device = deviceService.getDevice(platformId); + + try { + if (platform != null) { + commanderFroPlatform.streamByeCmd(platform, sendRtpItem); + redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(), + sendRtpItem.getCallId(), sendRtpItem.getStream()); + } else { + cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId()); + if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST) + || sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) { + AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + if (audioBroadcastCatch != null) { + // 鏉ヨ嚜涓婄骇骞冲彴鐨勫仠姝㈠璁� + logger.info("[鍋滄瀵硅] 鏉ヨ嚜涓婄骇锛屽钩鍙帮細{}, 閫氶亾锛歿}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + } + } + } + } catch (SipException | InvalidArgumentException | ParseException | + SsrcTransactionNotFoundException e) { + logger.error("[鍛戒护鍙戦�佸け璐 鍙戦�丅YE: {}", e.getMessage()); + } + } + + } + } + } + } + } + }); return HookResult.SUCCESS(); } @@ -220,6 +491,62 @@ logger.info("[ZLM HOOK]娴佹棤浜鸿鐪嬶細{}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream()); JSONObject ret = new JSONObject(); + ret.put("code", 0); + // 鍥芥爣绫诲瀷鐨勬祦 + if ("rtp".equals(param.getApp())) { + ret.put("close", userSetting.getStreamOnDemand()); + // 鍥芥爣娴侊紝 鐐规挱/褰曞儚鍥炴斁/褰曞儚涓嬭浇 + InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, param.getStream()); + // 鐐规挱 + if (inviteInfo != null) { + // 褰曞儚涓嬭浇 + if (inviteInfo.getType() == InviteSessionType.DOWNLOAD) { + ret.put("close", false); + return ret; + } + // 鏀跺埌鏃犱汉瑙傜湅璇存槑娴佷篃娌℃湁鍦ㄥ線涓婄骇鎺ㄩ�� + if (redisCatchStorage.isChannelSendingRTP(inviteInfo.getChannelId())) { + List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChannelId( + inviteInfo.getChannelId()); + if (!sendRtpItems.isEmpty()) { + for (SendRtpItem sendRtpItem : sendRtpItems) { + ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId()); + try { + commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem.getCallId()); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鍙戦�丅YE: {}", e.getMessage()); + } + redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(), + sendRtpItem.getCallId(), sendRtpItem.getStream()); + if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) { + MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, + sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), + sendRtpItem.getPlatformId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); + messageForPushChannel.setPlatFormIndex(parentPlatform.getId()); + redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel); + } + } + } + } + Device device = deviceService.getDevice(inviteInfo.getDeviceId()); + if (device != null) { + try { + // 澶氭煡璇竴娆¢槻姝㈠凡缁忚澶勭悊浜� + InviteInfo info = inviteStreamService.getInviteInfo(inviteInfo.getType(), + inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream()); + if (info != null) { + cmder.streamByeCmd(device, inviteInfo.getChannelId(), + inviteInfo.getStream(), null); + } else { + logger.info("[鏃犱汉瑙傜湅] 鏈壘鍒拌澶囩殑鐐规挱淇℃伅锛� {}锛� 娴侊細{}", inviteInfo.getDeviceId(), param.getStream()); + } + } catch (InvalidArgumentException | ParseException | SipException | + SsrcTransactionNotFoundException e) { + logger.error("[鏃犱汉瑙傜湅]鐐规挱锛� 鍙戦�丅YE澶辫触 {}", e.getMessage()); + } + } else { + logger.info("[鏃犱汉瑙傜湅] 鏈壘鍒拌澶囷細 {}锛屾祦锛歿}", inviteInfo.getDeviceId(), param.getStream()); + } boolean close = mediaService.closeStreamOnNoneReader(param.getMediaServerId(), param.getApp(), param.getStream(), param.getSchema()); ret.put("code", close); @@ -282,16 +609,22 @@ if (!"rtp".equals(param.getApp())) { return HookResult.SUCCESS(); } - try { - MediaSendRtpStoppedEvent event = new MediaSendRtpStoppedEvent(this); - MediaServer mediaServerItem = mediaServerService.getOne(param.getMediaServerId()); - if (mediaServerItem != null) { - event.setMediaServer(mediaServerItem); - applicationEventPublisher.publishEvent(event); + taskExecutor.execute(() -> { + List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(param.getStream()); + if (sendRtpItems.size() > 0) { + for (SendRtpItem sendRtpItem : sendRtpItems) { + ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId()); + ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc()); + try { + commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem.getCallId()); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鍙戦�丅YE: {}", e.getMessage()); + } + redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(), + sendRtpItem.getCallId(), sendRtpItem.getStream()); + } } - }catch (Exception e) { - logger.info("[ZLM-HOOK-rtp鍙戦�佸叧闂璢 鍙戦�侀�氱煡澶辫触 ", e); - } + }); return HookResult.SUCCESS(); } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java deleted file mode 100755 index 80699d2..0000000 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java +++ /dev/null @@ -1,121 +0,0 @@ -package com.genersoft.iot.vmp.media.zlm; - -import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.gb28181.bean.GbStream; -import com.genersoft.iot.vmp.media.bean.MediaServer; -import com.genersoft.iot.vmp.media.service.IMediaServerService; -import com.genersoft.iot.vmp.media.zlm.dto.ChannelOnlineEvent; -import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; -import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; -import com.genersoft.iot.vmp.service.IStreamPushService; -import com.genersoft.iot.vmp.storager.IVideoManagerStorage; -import com.genersoft.iot.vmp.storager.dao.GbStreamMapper; -import com.genersoft.iot.vmp.storager.dao.StreamPushMapper; -import com.genersoft.iot.vmp.utils.DateUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import java.text.ParseException; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -/** - * @author lin - */ -@Component -public class ZLMMediaListManager { - - private Logger logger = LoggerFactory.getLogger("ZLMMediaListManager"); - - @Autowired - private IVideoManagerStorage storager; - - @Autowired - private GbStreamMapper gbStreamMapper; - - @Autowired - private IStreamPushService streamPushService; - - - @Autowired - private StreamPushMapper streamPushMapper; - - @Autowired - private UserSetting userSetting; - - - @Autowired - private IMediaServerService mediaServerService; - - private Map<String, ChannelOnlineEvent> channelOnPublishEvents = new ConcurrentHashMap<>(); - - public StreamPushItem addPush(OnStreamChangedHookParam onStreamChangedHookParam) { - StreamPushItem transform = streamPushService.transform(onStreamChangedHookParam); - StreamPushItem pushInDb = streamPushService.getPush(onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream()); - transform.setPushIng(onStreamChangedHookParam.isRegist()); - transform.setUpdateTime(DateUtil.getNow()); - transform.setPushTime(DateUtil.getNow()); - transform.setSelf(userSetting.getServerId().equals(onStreamChangedHookParam.getSeverId())); - if (pushInDb == null) { - transform.setCreateTime(DateUtil.getNow()); - streamPushMapper.add(transform); - }else { - streamPushMapper.update(transform); - gbStreamMapper.updateMediaServer(onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream(), onStreamChangedHookParam.getMediaServerId()); - } - ChannelOnlineEvent channelOnlineEventLister = getChannelOnlineEventLister(transform.getApp(), transform.getStream()); - if ( channelOnlineEventLister != null) { - try { - channelOnlineEventLister.run(transform.getApp(), transform.getStream(), transform.getServerId());; - } catch (ParseException e) { - logger.error("addPush: ", e); - } - removedChannelOnlineEventLister(transform.getApp(), transform.getStream()); - } - return transform; - } - - public void sendStreamEvent(String app, String stream, String mediaServerId) { - MediaServer mediaServerItem = mediaServerService.getOne(mediaServerId); - // 鏌ョ湅鎺ㄦ祦鐘舵�� - Boolean streamReady = mediaServerService.isStreamReady(mediaServerItem, app, stream); - if (streamReady != null && streamReady) { - ChannelOnlineEvent channelOnlineEventLister = getChannelOnlineEventLister(app, stream); - if (channelOnlineEventLister != null) { - try { - channelOnlineEventLister.run(app, stream, mediaServerId); - } catch (ParseException e) { - logger.error("sendStreamEvent: ", e); - } - removedChannelOnlineEventLister(app, stream); - } - } - } - - public int removeMedia(String app, String streamId) { - // 鏌ユ壘鏄惁鍏宠仈浜嗗浗鏍囷紝 鍏宠仈浜嗕笉鍒犻櫎锛� 缃负绂荤嚎 - GbStream gbStream = gbStreamMapper.selectOne(app, streamId); - int result; - if (gbStream == null) { - result = storager.removeMedia(app, streamId); - }else { - result =storager.mediaOffline(app, streamId); - } - return result; - } - - public void addChannelOnlineEventLister(String app, String stream, ChannelOnlineEvent callback) { - this.channelOnPublishEvents.put(app + "_" + stream, callback); - } - - public void removedChannelOnlineEventLister(String app, String stream) { - this.channelOnPublishEvents.remove(app + "_" + stream); - } - - public ChannelOnlineEvent getChannelOnlineEventLister(String app, String stream) { - return this.channelOnPublishEvents.get(app + "_" + stream); - } - -} diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java index 3ba48d3..7b0bf39 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java @@ -264,4 +264,13 @@ } return result; } + + public JSONObject stopSendRtpStream(MediaServerItem mediaServerItem, SendRtpItem sendRtpItem) { + Map<String, Object> param = new HashMap<>(); + param.put("vhost", "__defaultVhost__"); + param.put("app", sendRtpItem.getApp()); + param.put("stream", sendRtpItem.getStream()); + param.put("ssrc", sendRtpItem.getSsrc()); + return zlmresTfulUtils.stopSendRtp(mediaServerItem, param); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ChannelOnlineEvent.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ChannelOnlineEvent.java index 714838e..6b3c94f 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ChannelOnlineEvent.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ChannelOnlineEvent.java @@ -1,5 +1,7 @@ package com.genersoft.iot.vmp.media.zlm.dto; +import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; + import java.text.ParseException; /** @@ -7,5 +9,5 @@ */ public interface ChannelOnlineEvent { - void run(String app, String stream, String serverId) throws ParseException; + void run(SendRtpItem sendRtpItem) throws ParseException; } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java b/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java index 16ff831..3ea8e5f 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java @@ -99,4 +99,13 @@ void updateChannelGPS(Device device, DeviceChannel deviceChannel, MobilePosition mobilePosition); void stopPlay(String deviceId, String channelId); + void batchUpdateChannelGPS(List<DeviceChannel> channelList); + + void batchAddMobilePosition(List<MobilePosition> addMobilePositionList); + + void online(DeviceChannel channel); + + void offline(DeviceChannel channel); + + void delete(DeviceChannel channel); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java index 1507331..c718681 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java @@ -115,4 +115,5 @@ Map<String, StreamPushItem> getAllAppAndStreamMap(); + void updatePush(OnStreamChangedHookParam param); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java b/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java index 1a9e3e5..6a4f866 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java @@ -61,6 +61,7 @@ messageForPushChannel.setGbId(gbId); messageForPushChannel.setApp(app); messageForPushChannel.setStream(stream); + messageForPushChannel.setServerId(serverId); messageForPushChannel.setMediaServerId(mediaServerId); messageForPushChannel.setPlatFormId(platFormId); messageForPushChannel.setPlatFormName(platFormName); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java index 6807632..16fc5b5 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java @@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import org.springframework.util.ObjectUtils; import java.util.ArrayList; @@ -248,8 +249,24 @@ } @Override + public void online(DeviceChannel channel) { + channelMapper.online(channel.getDeviceId(), channel.getChannelId()); + } + + @Override public int channelsOffline(List<DeviceChannel> channels) { return channelMapper.batchOffline(channels); + } + + + @Override + public void offline(DeviceChannel channel) { + channelMapper.offline(channel.getDeviceId(), channel.getChannelId()); + } + + @Override + public void delete(DeviceChannel channel) { + channelMapper.del(channel.getDeviceId(), channel.getChannelId()); } @Override @@ -358,4 +375,47 @@ public void stopPlay(String deviceId, String channelId) { channelMapper.stopPlay(deviceId, channelId); } + + @Override + @Transactional + public void batchUpdateChannelGPS(List<DeviceChannel> channelList) { + for (DeviceChannel deviceChannel : channelList) { + deviceChannel.setUpdateTime(DateUtil.getNow()); + if (deviceChannel.getGpsTime() == null) { + deviceChannel.setGpsTime(DateUtil.getNow()); + } + } + int count = 1000; + if (channelList.size() > count) { + for (int i = 0; i < channelList.size(); i+=count) { + int toIndex = i+count; + if ( i + count > channelList.size()) { + toIndex = channelList.size(); + } + List<DeviceChannel> channels = channelList.subList(i, toIndex); + channelMapper.batchUpdatePosition(channels); + } + }else { + channelMapper.batchUpdatePosition(channelList); + } + } + + @Override + @Transactional + public void batchAddMobilePosition(List<MobilePosition> mobilePositions) { +// int count = 500; +// if (mobilePositions.size() > count) { +// for (int i = 0; i < mobilePositions.size(); i+=count) { +// int toIndex = i+count; +// if ( i + count > mobilePositions.size()) { +// toIndex = mobilePositions.size(); +// } +// List<MobilePosition> mobilePositionsSub = mobilePositions.subList(i, toIndex); +// deviceMobilePositionMapper.batchadd(mobilePositionsSub); +// } +// }else { +// deviceMobilePositionMapper.batchadd(mobilePositions); +// } + deviceMobilePositionMapper.batchadd(mobilePositions); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java index 8fb772d..47b902e 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java @@ -558,6 +558,7 @@ removeMobilePositionSubscribe(deviceInStore, result->{ // 寮�鍚闃� deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition()); + deviceInStore.setMobilePositionSubmissionInterval(device.getMobilePositionSubmissionInterval()); addMobilePositionSubscribe(deviceInStore); // 鍥犱负鏄紓姝ユ墽琛岋紝闇�瑕佸湪杩欓噷鏇存柊涓嬫暟鎹� deviceMapper.updateCustom(deviceInStore); @@ -566,12 +567,14 @@ }else { // 寮�鍚闃� deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition()); + deviceInStore.setMobilePositionSubmissionInterval(device.getMobilePositionSubmissionInterval()); addMobilePositionSubscribe(deviceInStore); } }else if (device.getSubscribeCycleForMobilePosition() == 0) { // 鍙栨秷璁㈤槄 deviceInStore.setSubscribeCycleForMobilePosition(0); + deviceInStore.setMobilePositionSubmissionInterval(0); removeMobilePositionSubscribe(deviceInStore, null); } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index 40de0d2..cd47062 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -28,7 +28,6 @@ import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.*; -import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.CloudRecordUtils; @@ -111,6 +110,14 @@ @Autowired private RedisGbPlayMsgListener redisGbPlayMsgListener; + + + @Qualifier("taskExecutor") + @Autowired + private ThreadPoolTaskExecutor taskExecutor; + + @Autowired + private ZlmHttpHookSubscribe hookSubscribe; @Autowired private SSRCFactory ssrcFactory; @@ -266,7 +273,6 @@ logger.warn("[鐐规挱] 鏈壘鍒板彲鐢ㄧ殑zlm deviceId: {},channelId:{}", deviceId, channelId); throw new ControllerException(ErrorCode.ERROR100.getCode(), "鏈壘鍒板彲鐢ㄧ殑zlm"); } - Device device = redisCatchStorage.getDevice(deviceId); if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE") && !mediaServerItem.isRtpEnable()) { logger.warn("[鐐规挱] 鍗曠鍙f敹娴佹椂涓嶆敮鎸乀CP涓诲姩鏂瑰紡鏀舵祦 deviceId: {},channelId:{}", deviceId, channelId); @@ -280,6 +286,8 @@ InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId); if (inviteInfo != null ) { if (inviteInfo.getStreamInfo() == null) { + // 閲婃斁鐢熸垚鐨剆src锛屼娇鐢ㄤ笂涓�娆$敵璇风殑 + ssrcFactory.releaseSsrc(mediaServerItem.getId(), ssrc); // 鐐规挱鍙戣捣浜嗕絾鏄皻鏈垚鍔�, 浠呮敞鍐屽洖璋冪瓑寰呯粨鏋滃嵆鍙� inviteStreamService.once(InviteSessionType.PLAY, deviceId, channelId, null, callback); logger.info("[鐐规挱寮�濮媇 宸茬粡璇锋眰涓紝绛夊緟缁撴灉锛� deviceId: {}, channelId: {}", device.getDeviceId(), channelId); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java index 7a14940..a64b71a 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java @@ -633,4 +633,21 @@ public Map<String, StreamPushItem> getAllAppAndStreamMap() { return streamPushMapper.getAllAppAndStreamMap(); } + + @Override + public void updatePush(OnStreamChangedHookParam param) { + StreamPushItem transform = transform(param); + StreamPushItem pushInDb = getPush(param.getApp(), param.getStream()); + transform.setPushIng(param.isRegist()); + transform.setUpdateTime(DateUtil.getNow()); + transform.setPushTime(DateUtil.getNow()); + transform.setSelf(userSetting.getServerId().equals(param.getSeverId())); + if (pushInDb == null) { + transform.setCreateTime(DateUtil.getNow()); + streamPushMapper.add(transform); + }else { + streamPushMapper.update(transform); + gbStreamMapper.updateMediaServer(param.getApp(), param.getStream(), param.getMediaServerId()); + } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java new file mode 100644 index 0000000..b4bd72c --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java @@ -0,0 +1,22 @@ +package com.genersoft.iot.vmp.service.redisMsg; + +import com.genersoft.iot.vmp.common.CommonCallback; +import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.vmanager.bean.WVPResult; + +public interface IRedisRpcService { + + SendRtpItem getSendRtpItem(String sendRtpItemKey); + + WVPResult startSendRtp(String sendRtpItemKey, SendRtpItem sendRtpItem); + + WVPResult stopSendRtp(String sendRtpItemKey); + + long waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback<String> callback); + + void stopWaitePushStreamOnline(SendRtpItem sendRtpItem); + + void rtpSendStopped(String sendRtpItemKey); + + void removeCallback(long key); +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java deleted file mode 100755 index 68595a8..0000000 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java +++ /dev/null @@ -1,451 +0,0 @@ -package com.genersoft.iot.vmp.service.redisMsg; - -import com.alibaba.fastjson2.JSON; -import com.alibaba.fastjson2.JSONObject; -import com.genersoft.iot.vmp.common.VideoManagerConstants; -import com.genersoft.iot.vmp.conf.DynamicTask; -import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.conf.exception.ControllerException; -import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; -import com.genersoft.iot.vmp.media.bean.MediaServer; -import com.genersoft.iot.vmp.media.event.hook.Hook; -import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; -import com.genersoft.iot.vmp.media.event.hook.HookType; -import com.genersoft.iot.vmp.media.service.IMediaServerService; -import com.genersoft.iot.vmp.service.bean.*; -import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import com.genersoft.iot.vmp.utils.redis.RedisUtil; -import com.genersoft.iot.vmp.vmanager.bean.WVPResult; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.data.redis.connection.Message; -import org.springframework.data.redis.connection.MessageListener; -import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import org.springframework.stereotype.Component; - -import java.text.ParseException; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; - - -/** - * 鐩戝惉涓嬬骇鍙戦�佹帹閫佷俊鎭紝骞跺彂閫佸浗鏍囨帹娴佹秷鎭笂绾� - * @author lin - */ -@Component -public class RedisGbPlayMsgListener implements MessageListener { - - private final static Logger logger = LoggerFactory.getLogger(RedisGbPlayMsgListener.class); - - public static final String WVP_PUSH_STREAM_KEY = "WVP_PUSH_STREAM"; - - /** - * 娴佸獟浣撲笉瀛樺湪鐨勯敊璇帥 - */ - public static final int ERROR_CODE_MEDIA_SERVER_NOT_FOUND = -1; - - /** - * 绂荤嚎鐨勯敊璇帥 - */ - public static final int ERROR_CODE_OFFLINE = -2; - - /** - * 瓒呮椂鐨勯敊璇帥 - */ - public static final int ERROR_CODE_TIMEOUT = -3; - - private final Map<String, PlayMsgCallback> callbacks = new ConcurrentHashMap<>(); - private final Map<String, PlayMsgCallbackForStartSendRtpStream> callbacksForStartSendRtpStream = new ConcurrentHashMap<>(); - private final Map<String, PlayMsgErrorCallback> callbacksForError = new ConcurrentHashMap<>(); - - @Autowired - private UserSetting userSetting; - - - @Autowired - private RedisTemplate<Object, Object> redisTemplate; - - @Autowired - private IMediaServerService mediaServerService; - - @Autowired - private IRedisCatchStorage redisCatchStorage; - - - @Autowired - private DynamicTask dynamicTask; - - - @Autowired - private HookSubscribe subscribe; - - private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); - - @Qualifier("taskExecutor") - @Autowired - private ThreadPoolTaskExecutor taskExecutor; - - - public interface PlayMsgCallback{ - void handler(ResponseSendItemMsg responseSendItemMsg) throws ParseException; - } - - public interface PlayMsgCallbackForStartSendRtpStream{ - void handler(); - } - - public interface PlayMsgErrorCallback{ - void handler(WVPResult wvpResult); - } - - @Override - public void onMessage(Message message, byte[] bytes) { - boolean isEmpty = taskQueue.isEmpty(); - taskQueue.offer(message); - if (isEmpty) { - taskExecutor.execute(() -> { - while (!taskQueue.isEmpty()) { - Message msg = taskQueue.poll(); - try { - WvpRedisMsg wvpRedisMsg = JSON.parseObject(msg.getBody(), WvpRedisMsg.class); - logger.info("[鏀跺埌REDIS閫氱煡] 娑堟伅锛� {}", JSON.toJSONString(wvpRedisMsg)); - if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) { - continue; - } - if (WvpRedisMsg.isRequest(wvpRedisMsg)) { - logger.info("[鏀跺埌REDIS閫氱煡] 璇锋眰锛� {}", new String(msg.getBody())); - - switch (wvpRedisMsg.getCmd()){ - case WvpRedisMsgCmd.GET_SEND_ITEM: - RequestSendItemMsg content = JSON.parseObject(wvpRedisMsg.getContent(), RequestSendItemMsg.class); - requestSendItemMsgHand(content, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); - break; - case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: - RequestPushStreamMsg param = JSON.to(RequestPushStreamMsg.class, wvpRedisMsg.getContent()); - requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); - break; - case WvpRedisMsgCmd.REQUEST_STOP_PUSH_STREAM: - RequestStopPushStreamMsg streamMsg = JSON.to(RequestStopPushStreamMsg.class, wvpRedisMsg.getContent()); - requestStopPushStreamMsgHand(streamMsg, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); - break; - default: - break; - } - - }else { - logger.info("[鏀跺埌REDIS閫氱煡] 鍥炲锛� {}", new String(msg.getBody())); - switch (wvpRedisMsg.getCmd()){ - case WvpRedisMsgCmd.GET_SEND_ITEM: - - WVPResult content = JSON.to(WVPResult.class, wvpRedisMsg.getContent()); - - String key = wvpRedisMsg.getSerial(); - switch (content.getCode()) { - case 0: - ResponseSendItemMsg responseSendItemMsg =JSON.to(ResponseSendItemMsg.class, content.getData()); - PlayMsgCallback playMsgCallback = callbacks.get(key); - if (playMsgCallback != null) { - callbacksForError.remove(key); - try { - playMsgCallback.handler(responseSendItemMsg); - } catch (ParseException e) { - logger.error("[REDIS娑堟伅澶勭悊寮傚父] ", e); - } - } - break; - case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: - case ERROR_CODE_OFFLINE: - case ERROR_CODE_TIMEOUT: - PlayMsgErrorCallback errorCallback = callbacksForError.get(key); - if (errorCallback != null) { - callbacks.remove(key); - errorCallback.handler(content); - } - break; - default: - break; - } - break; - case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: - WVPResult wvpResult = JSON.to(WVPResult.class, wvpRedisMsg.getContent()); - String serial = wvpRedisMsg.getSerial(); - switch (wvpResult.getCode()) { - case 0: - PlayMsgCallbackForStartSendRtpStream playMsgCallback = callbacksForStartSendRtpStream.get(serial); - if (playMsgCallback != null) { - callbacksForError.remove(serial); - playMsgCallback.handler(); - } - break; - case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: - case ERROR_CODE_OFFLINE: - case ERROR_CODE_TIMEOUT: - PlayMsgErrorCallback errorCallback = callbacksForError.get(serial); - if (errorCallback != null) { - callbacks.remove(serial); - errorCallback.handler(wvpResult); - } - break; - default: - break; - } - break; - default: - break; - } - - } - }catch (Exception e) { - logger.warn("[RedisGbPlayMsg] 鍙戠幇鏈鐞嗙殑寮傚父, \r\n{}", JSON.toJSONString(message)); - logger.error("[RedisGbPlayMsg] 寮傚父鍐呭锛� ", e); - } - } - }); - } - } - - /** - * 澶勭悊鏀跺埌鐨勮姹傛帹娴佺殑璇锋眰 - */ - private void requestPushStreamMsgHand(RequestPushStreamMsg requestPushStreamMsg, String fromId, String serial) { - MediaServer mediaServer = mediaServerService.getOne(requestPushStreamMsg.getMediaServerId()); - if (mediaServer == null) { - // TODO 鍥炲閿欒 - return; - } - SendRtpItem sendRtpItem = SendRtpItem.getInstance(requestPushStreamMsg); - - try { - mediaServerService.startSendRtp(mediaServer, null, sendRtpItem); - }catch (ControllerException e) { - return; - } - - // 鍥炲娑堟伅 - WVPResult<JSONObject> result = new WVPResult<>(); - result.setCode(0); - - WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), fromId, - WvpRedisMsgCmd.REQUEST_PUSH_STREAM, serial, JSON.toJSONString(result)); - JSONObject jsonObject = (JSONObject)JSON.toJSON(response); - redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); - } - - /** - * 澶勭悊鏀跺埌鐨勮姹俿endItem鐨勮姹� - */ - private void requestSendItemMsgHand(RequestSendItemMsg content, String toId, String serial) { - MediaServer mediaServerItem = mediaServerService.getOne(content.getMediaServerId()); - if (mediaServerItem == null) { - logger.info("[鍥炲鎺ㄦ祦淇℃伅] 娴佸獟浣搟}涓嶅瓨鍦� ", content.getMediaServerId()); - - WVPResult<SendRtpItem> result = new WVPResult<>(); - result.setCode(ERROR_CODE_MEDIA_SERVER_NOT_FOUND); - result.setMsg("娴佸獟浣撲笉瀛樺湪"); - - WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), toId, - WvpRedisMsgCmd.GET_SEND_ITEM, serial, JSON.toJSONString(result)); - - JSONObject jsonObject = (JSONObject)JSON.toJSON(response); - redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); - return; - } - // 纭畾娴佹槸鍚﹀湪绾� - Boolean streamReady = mediaServerService.isStreamReady(mediaServerItem, content.getApp(), content.getStream()); - if (streamReady != null && streamReady) { - logger.info("[鍥炲鎺ㄦ祦淇℃伅] {}/{}", content.getApp(), content.getStream()); - responseSendItem(mediaServerItem, content, toId, serial); - }else { - // 娴佸凡缁忕绾� - // 鍙戦�乺edis娑堟伅浠ヤ娇璁惧涓婄嚎 - logger.info("[ app={}, stream={} ]閫氶亾绂荤嚎锛屽彂閫乺edis淇℃伅鎺у埗璁惧寮�濮嬫帹娴�",content.getApp(), content.getStream()); - - String taskKey = UUID.randomUUID().toString(); - // 璁剧疆瓒呮椂 - dynamicTask.startDelay(taskKey, ()->{ - logger.info("[ app={}, stream={} ] 绛夊緟璁惧寮�濮嬫帹娴佽秴鏃�", content.getApp(), content.getStream()); - WVPResult<SendRtpItem> result = new WVPResult<>(); - result.setCode(ERROR_CODE_TIMEOUT); - WvpRedisMsg response = WvpRedisMsg.getResponseInstance( - userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, JSON.toJSONString(result) - ); - JSONObject jsonObject = (JSONObject)JSON.toJSON(response); - redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); - }, userSetting.getPlatformPlayTimeout()); - - // 娣诲姞璁㈤槄 - Hook hook = Hook.getInstance(HookType.on_media_arrival, content.getApp(), content.getStream(), content.getMediaServerId()); - subscribe.addSubscribe(hook, (hookData)->{ - dynamicTask.stop(taskKey); - responseSendItem(mediaServerItem, content, toId, serial); - }); - - MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1, content.getApp(), content.getStream(), - content.getChannelId(), content.getPlatformId(), content.getPlatformName(), content.getServerId(), - content.getMediaServerId()); - - String key = VideoManagerConstants.VM_MSG_STREAM_PUSH_REQUESTED; - logger.info("[redis鍙戦�侀�氱煡] 鎺ㄦ祦琚姹� {}: {}/{}", key, messageForPushChannel.getApp(), messageForPushChannel.getStream()); - redisTemplate.convertAndSend(key, JSON.toJSON(messageForPushChannel)); - } - } - - /** - * 灏嗚幏鍙栧埌鐨剆endItem鍙戦�佸嚭鍘� - */ - private void responseSendItem(MediaServer mediaServerItem, RequestSendItemMsg content, String toId, String serial) { - SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, content.getIp(), - content.getPort(), content.getSsrc(), content.getPlatformId(), - content.getApp(), content.getStream(), content.getChannelId(), - content.getTcp(), content.getRtcp()); - - WVPResult<ResponseSendItemMsg> result = new WVPResult<>(); - result.setCode(0); - ResponseSendItemMsg responseSendItemMsg = new ResponseSendItemMsg(); - responseSendItemMsg.setSendRtpItem(sendRtpItem); - responseSendItemMsg.setMediaServerItem(mediaServerItem); - result.setData(responseSendItemMsg); - redisCatchStorage.updateSendRTPSever(sendRtpItem); - - WvpRedisMsg response = WvpRedisMsg.getResponseInstance( - userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, JSON.toJSONString(result) - ); - JSONObject jsonObject = (JSONObject)JSON.toJSON(response); - redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); - } - - /** - * 鍙戦�佹秷鎭姹備笅绾х敓鎴愭帹娴佷俊鎭� - * @param serverId 涓嬬骇鏈嶅姟ID - * @param app 搴旂敤鍚� - * @param stream 娴両D - * @param ip 鐩爣IP - * @param port 鐩爣绔彛 - * @param ssrc ssrc - * @param platformId 骞冲彴鍥芥爣缂栧彿 - * @param channelId 閫氶亾ID - * @param isTcp 鏄惁浣跨敤TCP - * @param callback 寰楀埌淇℃伅鐨勫洖璋� - */ - public void sendMsg(String serverId, String mediaServerId, String app, String stream, String ip, int port, String ssrc, - String platformId, String channelId, boolean isTcp, boolean rtcp, String platformName, PlayMsgCallback callback, PlayMsgErrorCallback errorCallback) { - RequestSendItemMsg requestSendItemMsg = RequestSendItemMsg.getInstance( - serverId, mediaServerId, app, stream, ip, port, ssrc, platformId, channelId, isTcp, rtcp, platformName); - requestSendItemMsg.setServerId(serverId); - String key = UUID.randomUUID().toString(); - WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId, WvpRedisMsgCmd.GET_SEND_ITEM, - key, JSON.toJSONString(requestSendItemMsg)); - - JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg); - logger.info("[璇锋眰鎺ㄦ祦SendItem] {}: {}", serverId, jsonObject); - callbacks.put(key, callback); - callbacksForError.put(key, errorCallback); - dynamicTask.startDelay(key, ()->{ - callbacks.remove(key); - callbacksForError.remove(key); - WVPResult<Object> wvpResult = new WVPResult<>(); - wvpResult.setCode(ERROR_CODE_TIMEOUT); - wvpResult.setMsg("timeout"); - errorCallback.handler(wvpResult); - }, userSetting.getPlatformPlayTimeout()); - redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); - } - - /** - * 鍙戦�佽姹傛帹娴佺殑娑堟伅 - * @param param 鎺ㄦ祦鍙傛暟 - * @param callback 鍥炶皟 - */ - public void sendMsgForStartSendRtpStream(String serverId, RequestPushStreamMsg param, PlayMsgCallbackForStartSendRtpStream callback) { - String key = UUID.randomUUID().toString(); - WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId, - WvpRedisMsgCmd.REQUEST_PUSH_STREAM, key, JSON.toJSONString(param)); - - JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg); - logger.info("[REDIS 璇锋眰鍏朵粬骞冲彴鎺ㄦ祦] {}: {}", serverId, jsonObject); - dynamicTask.startDelay(key, ()->{ - callbacksForStartSendRtpStream.remove(key); - callbacksForError.remove(key); - }, userSetting.getPlatformPlayTimeout()); - callbacksForStartSendRtpStream.put(key, callback); - callbacksForError.put(key, (wvpResult)->{ - logger.info("[REDIS 璇锋眰鍏朵粬骞冲彴鎺ㄦ祦] 澶辫触: {}", wvpResult.getMsg()); - callbacksForStartSendRtpStream.remove(key); - callbacksForError.remove(key); - }); - redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); - } - - /** - * 鍙戦�佽姹傛帹娴佺殑娑堟伅 - */ - public void sendMsgForStopSendRtpStream(String serverId, RequestStopPushStreamMsg streamMsg) { - String key = UUID.randomUUID().toString(); - WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId, - WvpRedisMsgCmd.REQUEST_STOP_PUSH_STREAM, key, JSON.toJSONString(streamMsg)); - - JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg); - logger.info("[REDIS 璇锋眰鍏朵粬骞冲彴鍋滄鎺ㄦ祦] {}: {}", serverId, jsonObject); - redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); - } - - private SendRtpItem querySendRTPServer(String platformGbId, String channelId, String streamId, String callId) { - if (platformGbId == null) { - platformGbId = "*"; - } - if (channelId == null) { - channelId = "*"; - } - if (streamId == null) { - streamId = "*"; - } - if (callId == null) { - callId = "*"; - } - String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX - + userSetting.getServerId() + "_*_" - + platformGbId + "_" - + channelId + "_" - + streamId + "_" - + callId; - List<Object> scan = RedisUtil.scan(redisTemplate, key); - if (scan.size() > 0) { - return (SendRtpItem)redisTemplate.opsForValue().get(scan.get(0)); - }else { - return null; - } - } - - /** - * 澶勭悊鏀跺埌鐨勮姹傛帹娴佺殑璇锋眰 - */ - private void requestStopPushStreamMsgHand(RequestStopPushStreamMsg streamMsg, String fromId, String serial) { - SendRtpItem sendRtpItem = streamMsg.getSendRtpItem(); - if (sendRtpItem == null) { - logger.info("[REDIS 鎵ц鍏朵粬骞冲彴鐨勮姹傚仠姝㈡帹娴乚 澶辫触锛� sendRtpItem涓篘ULL"); - return; - } - MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - if (mediaInfo == null) { - // TODO 鍥炲閿欒 - return; - } - - if (mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc())) { - logger.info("[REDIS 鎵ц鍏朵粬骞冲彴鐨勮姹傚仠姝㈡帹娴乚 鎴愬姛锛� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); - // 鍙戦�乺edis娑堟伅 - MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, - sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), - sendRtpItem.getPlatformId(), streamMsg.getPlatformName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); - messageForPushChannel.setPlatFormIndex(streamMsg.getPlatFormIndex()); - redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel); - } - - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java deleted file mode 100755 index fcfeff3..0000000 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java +++ /dev/null @@ -1,111 +0,0 @@ -package com.genersoft.iot.vmp.service.redisMsg; - -import com.alibaba.fastjson2.JSON; -import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType; -import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; -import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; -import com.genersoft.iot.vmp.media.bean.MediaServer; -import com.genersoft.iot.vmp.media.service.IMediaServerService; -import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; -import com.genersoft.iot.vmp.service.IStreamPushService; -import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; -import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse; -import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import com.genersoft.iot.vmp.storager.IVideoManagerStorage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.redis.connection.Message; -import org.springframework.data.redis.connection.MessageListener; -import org.springframework.stereotype.Component; - -import javax.sip.InvalidArgumentException; -import javax.sip.SipException; -import java.text.ParseException; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -/** - * 鎺ユ敹redis鍙戦�佺殑缁撴潫鎺ㄦ祦璇锋眰 - * @author lin - */ -@Component -public class RedisPushStreamCloseResponseListener implements MessageListener { - - private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamCloseResponseListener.class); - - @Autowired - private IStreamPushService streamPushService; - - @Autowired - private IRedisCatchStorage redisCatchStorage; - - @Autowired - private IVideoManagerStorage storager; - - @Autowired - private ISIPCommanderForPlatform commanderFroPlatform; - - @Autowired - private UserSetting userSetting; - - @Autowired - private IMediaServerService mediaServerService; - - - private Map<String, PushStreamResponseEvent> responseEvents = new ConcurrentHashMap<>(); - - public interface PushStreamResponseEvent{ - void run(MessageForPushChannelResponse response); - } - - @Override - public void onMessage(Message message, byte[] bytes) { - logger.info("[REDIS娑堟伅-鎺ㄦ祦缁撴潫]锛� {}", new String(message.getBody())); - MessageForPushChannel pushChannel = JSON.parseObject(message.getBody(), MessageForPushChannel.class); - StreamPushItem push = streamPushService.getPush(pushChannel.getApp(), pushChannel.getStream()); - if (push != null) { - List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChannelId( - push.getGbId()); - if (!sendRtpItems.isEmpty()) { - for (SendRtpItem sendRtpItem : sendRtpItems) { - ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId()); - if (parentPlatform != null) { - redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStream()); - try { - commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鍙戦�丅YE: {}", e.getMessage()); - } - } - if (push.isSelf()) { - // 鍋滄鍚戜笂绾ф帹娴� - logger.info("[REDIS娑堟伅-鎺ㄦ祦缁撴潫] 鍋滄鍚戜笂绾ф帹娴侊細{}", sendRtpItem.getStream()); - MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStream()); - mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc()); - if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) { - MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, - sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), - sendRtpItem.getPlatformId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); - messageForPushChannel.setPlatFormIndex(parentPlatform.getId()); - redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel); - } - } - } - } - } - - } - - public void addEvent(String app, String stream, PushStreamResponseEvent callback) { - responseEvents.put(app + stream, callback); - } - - public void removeEvent(String app, String stream) { - responseEvents.remove(app + stream); - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java old mode 100755 new mode 100644 diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java deleted file mode 100755 index 85709a7..0000000 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java +++ /dev/null @@ -1,97 +0,0 @@ -package com.genersoft.iot.vmp.service.redisMsg; - -import com.alibaba.fastjson2.JSON; -import com.alibaba.fastjson2.JSONObject; -import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; -import com.genersoft.iot.vmp.media.zlm.dto.ChannelOnlineEvent; -import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.data.redis.connection.Message; -import org.springframework.data.redis.connection.MessageListener; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import org.springframework.stereotype.Component; - -import java.text.ParseException; -import java.util.concurrent.ConcurrentLinkedQueue; - - -/** - * 鎺ユ敹鍏朵粬wvp鍙戦�佹祦鍙樺寲閫氱煡 - * @author lin - */ -@Component -public class RedisStreamMsgListener implements MessageListener { - - private final static Logger logger = LoggerFactory.getLogger(RedisStreamMsgListener.class); - - @Autowired - private UserSetting userSetting; - - @Autowired - private ZLMMediaListManager zlmMediaListManager; - - private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); - - @Qualifier("taskExecutor") - @Autowired - private ThreadPoolTaskExecutor taskExecutor; - - @Override - public void onMessage(Message message, byte[] bytes) { - boolean isEmpty = taskQueue.isEmpty(); - taskQueue.offer(message); - if (isEmpty) { - taskExecutor.execute(() -> { - while (!taskQueue.isEmpty()) { - Message msg = taskQueue.poll(); - try { - JSONObject steamMsgJson = JSON.parseObject(msg.getBody(), JSONObject.class); - if (steamMsgJson == null) { - logger.warn("[鏀跺埌redis 娴佸彉鍖朷娑堟伅瑙f瀽澶辫触"); - continue; - } - String serverId = steamMsgJson.getString("serverId"); - - if (userSetting.getServerId().equals(serverId)) { - // 鑷繁鍙戦�佺殑娑堟伅蹇界暐鍗冲彲 - continue; - } - logger.info("[鏀跺埌redis 娴佸彉鍖朷锛� {}", new String(message.getBody())); - String app = steamMsgJson.getString("app"); - String stream = steamMsgJson.getString("stream"); - boolean register = steamMsgJson.getBoolean("register"); - String mediaServerId = steamMsgJson.getString("mediaServerId"); - OnStreamChangedHookParam onStreamChangedHookParam = new OnStreamChangedHookParam(); - onStreamChangedHookParam.setSeverId(serverId); - onStreamChangedHookParam.setApp(app); - onStreamChangedHookParam.setStream(stream); - onStreamChangedHookParam.setRegist(register); - onStreamChangedHookParam.setMediaServerId(mediaServerId); - onStreamChangedHookParam.setCreateStamp(System.currentTimeMillis()/1000); - onStreamChangedHookParam.setAliveSecond(0L); - onStreamChangedHookParam.setTotalReaderCount(0); - onStreamChangedHookParam.setOriginType(0); - onStreamChangedHookParam.setOriginTypeStr("0"); - onStreamChangedHookParam.setOriginTypeStr("unknown"); - ChannelOnlineEvent channelOnlineEventLister = zlmMediaListManager.getChannelOnlineEventLister(app, stream); - if ( channelOnlineEventLister != null) { - try { - channelOnlineEventLister.run(app, stream, serverId);; - } catch (ParseException e) { - logger.error("addPush: ", e); - } - zlmMediaListManager.removedChannelOnlineEventLister(app, stream); - } - }catch (Exception e) { - logger.warn("[REDIS娑堟伅-娴佸彉鍖朷 鍙戠幇鏈鐞嗙殑寮傚父, \r\n{}", JSON.toJSONString(message)); - logger.error("[REDIS娑堟伅-娴佸彉鍖朷 寮傚父鍐呭锛� ", e); - } - } - }); - } - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java new file mode 100644 index 0000000..af79204 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java @@ -0,0 +1,304 @@ +package com.genersoft.iot.vmp.service.redisMsg.control; + +import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig; +import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage; +import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest; +import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse; +import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; +import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; +import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; +import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; +import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.storager.IVideoManagerStorage; +import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; +import com.genersoft.iot.vmp.vmanager.bean.WVPResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Component; + +import javax.sip.InvalidArgumentException; +import javax.sip.SipException; +import java.text.ParseException; + +/** + * 鍏朵粬wvp鍙戣捣鐨剅pc璋冪敤锛岃繖閲岀殑鏂规硶琚� RedisRpcConfig 閫氳繃鍙嶅皠瀵绘壘瀵瑰簲鐨勬柟娉曞悕绉拌皟鐢� + */ +@Component +public class RedisRpcController { + + private final static Logger logger = LoggerFactory.getLogger(RedisRpcController.class); + + @Autowired + private SSRCFactory ssrcFactory; + + @Autowired + private IMediaServerService mediaServerService; + + @Autowired + private SendRtpPortManager sendRtpPortManager; + + @Autowired + private IRedisCatchStorage redisCatchStorage; + + @Autowired + private UserSetting userSetting; + + @Autowired + private ZlmHttpHookSubscribe hookSubscribe; + + @Autowired + private ZLMServerFactory zlmServerFactory; + + + @Autowired + private RedisTemplate<Object, Object> redisTemplate; + + + @Autowired + private ISIPCommanderForPlatform commanderFroPlatform; + + + @Autowired + private IVideoManagerStorage storager; + + + /** + * 鑾峰彇鍙戞祦鐨勪俊鎭� + */ + public RedisRpcResponse getSendRtpItem(RedisRpcRequest request) { + String sendRtpItemKey = request.getParam().toString(); + SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(sendRtpItemKey); + if (sendRtpItem == null) { + logger.info("[redis-rpc] 鑾峰彇鍙戞祦鐨勪俊鎭�, 鏈壘鍒皉edis涓殑鍙戞祦淇℃伅锛� key锛歿}", sendRtpItemKey); + RedisRpcResponse response = request.getResponse(); + response.setStatusCode(200); + return response; + } + logger.info("[redis-rpc] 鑾峰彇鍙戞祦鐨勪俊鎭細 {}/{}, 鐩爣鍦板潃锛� {}锛歿}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort()); + // 鏌ヨ鏈骇鏄惁鏈夎繖涓祦 + MediaServerItem mediaServerItem = mediaServerService.getMediaServerByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream()); + if (mediaServerItem == null) { + RedisRpcResponse response = request.getResponse(); + response.setStatusCode(200); + } + // 鑷钩鍙板唴瀹� + int localPort = sendRtpPortManager.getNextPort(mediaServerItem); + if (localPort == 0) { + logger.info("[redis-rpc] getSendRtpItem->鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�" ); + RedisRpcResponse response = request.getResponse(); + response.setStatusCode(200); + } + // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� + sendRtpItem.setStatus(1); + sendRtpItem.setServerId(userSetting.getServerId()); + sendRtpItem.setLocalIp(mediaServerItem.getSdpIp()); + if (sendRtpItem.getSsrc() == null) { + // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡 + String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); + sendRtpItem.setSsrc(ssrc); + } + redisCatchStorage.updateSendRTPSever(sendRtpItem); + redisTemplate.opsForValue().set(sendRtpItemKey, sendRtpItem); + RedisRpcResponse response = request.getResponse(); + response.setStatusCode(200); + response.setBody(sendRtpItemKey); + return response; + } + + /** + * 鐩戝惉娴佷笂绾� + */ + public RedisRpcResponse waitePushStreamOnline(RedisRpcRequest request) { + SendRtpItem sendRtpItem = JSONObject.parseObject(request.getParam().toString(), SendRtpItem.class); + logger.info("[redis-rpc] 鐩戝惉娴佷笂绾匡細 {}/{}, 鐩爣鍦板潃锛� {}锛歿}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort()); + // 鏌ヨ鏈骇鏄惁鏈夎繖涓祦 + MediaServerItem mediaServerItem = mediaServerService.getMediaServerByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream()); + if (mediaServerItem != null) { + logger.info("[redis-rpc] 鐩戝惉娴佷笂绾挎椂鍙戠幇娴佸凡瀛樺湪鐩存帴杩斿洖锛� {}/{}, 鐩爣鍦板潃锛� {}锛歿}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() ); + // 璇诲彇redis涓殑涓婄骇鐐规挱淇℃伅锛岀敓鎴恠endRtpItm鍙戦�佸嚭鍘� + if (sendRtpItem.getSsrc() == null) { + // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡 + String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); + sendRtpItem.setSsrc(ssrc); + } + sendRtpItem.setMediaServerId(mediaServerItem.getId()); + sendRtpItem.setLocalIp(mediaServerItem.getSdpIp()); + sendRtpItem.setServerId(userSetting.getServerId()); + + redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem); + RedisRpcResponse response = request.getResponse(); + response.setBody(sendRtpItem.getRedisKey()); + response.setStatusCode(200); + } + // 鐩戝惉娴佷笂绾裤�� 娴佷笂绾跨洿鎺ュ彂閫乻endRtpItem娑堟伅缁欏疄闄呯殑淇′护澶勭悊鑰� + HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed( + sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null); + + hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> { + logger.info("[redis-rpc] 鐩戝惉娴佷笂绾匡紝娴佸凡涓婄嚎锛� {}/{}, 鐩爣鍦板潃锛� {}锛歿}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort()); + // 璇诲彇redis涓殑涓婄骇鐐规挱淇℃伅锛岀敓鎴恠endRtpItm鍙戦�佸嚭鍘� + if (sendRtpItem.getSsrc() == null) { + // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡 + String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItemInUse.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItemInUse.getId()); + sendRtpItem.setSsrc(ssrc); + } + sendRtpItem.setMediaServerId(mediaServerItemInUse.getId()); + sendRtpItem.setLocalIp(mediaServerItemInUse.getSdpIp()); + sendRtpItem.setServerId(userSetting.getServerId()); + + redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem); + RedisRpcResponse response = request.getResponse(); + response.setBody(sendRtpItem.getRedisKey()); + response.setStatusCode(200); + // 鎵嬪姩鍙戦�佺粨鏋� + sendResponse(response); + hookSubscribe.removeSubscribe(hook); + + }); + return null; + } + + /** + * 鍋滄鐩戝惉娴佷笂绾� + */ + public RedisRpcResponse stopWaitePushStreamOnline(RedisRpcRequest request) { + SendRtpItem sendRtpItem = JSONObject.parseObject(request.getParam().toString(), SendRtpItem.class); + logger.info("[redis-rpc] 鍋滄鐩戝惉娴佷笂绾匡細 {}/{}, 鐩爣鍦板潃锛� {}锛歿}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() ); + // 鐩戝惉娴佷笂绾裤�� 娴佷笂绾跨洿鎺ュ彂閫乻endRtpItem娑堟伅缁欏疄闄呯殑淇′护澶勭悊鑰� + HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed( + sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null); + hookSubscribe.removeSubscribe(hook); + RedisRpcResponse response = request.getResponse(); + response.setStatusCode(200); + return response; + } + + + /** + * 寮�濮嬪彂娴� + */ + public RedisRpcResponse startSendRtp(RedisRpcRequest request) { + String sendRtpItemKey = request.getParam().toString(); + SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(sendRtpItemKey); + RedisRpcResponse response = request.getResponse(); + response.setStatusCode(200); + if (sendRtpItem == null) { + logger.info("[redis-rpc] 寮�濮嬪彂娴�, 鏈壘鍒皉edis涓殑鍙戞祦淇℃伅锛� key锛歿}", sendRtpItemKey); + WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "鏈壘鍒皉edis涓殑鍙戞祦淇℃伅"); + response.setBody(wvpResult); + return response; + } + logger.info("[redis-rpc] 寮�濮嬪彂娴侊細 {}/{}, 鐩爣鍦板潃锛� {}锛歿}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort()); + MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + if (mediaServerItem == null) { + logger.info("[redis-rpc] startSendRtp->鏈壘鍒癕ediaServer锛� {}", sendRtpItem.getMediaServerId() ); + WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "鏈壘鍒癕ediaServer"); + response.setBody(wvpResult); + return response; + } + + Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream()); + if (!streamReady) { + logger.info("[redis-rpc] startSendRtp->娴佷笉鍦ㄧ嚎锛� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() ); + WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "娴佷笉鍦ㄧ嚎"); + response.setBody(wvpResult); + return response; + } + JSONObject jsonObject = zlmServerFactory.startSendRtp(mediaServerItem, sendRtpItem); + if (jsonObject.getInteger("code") == 0) { + logger.info("[redis-rpc] 鍙戞祦鎴愬姛锛� {}/{}, 鐩爣鍦板潃锛� {}锛歿}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort()); + WVPResult wvpResult = WVPResult.success(); + response.setBody(wvpResult); + }else { + logger.info("[redis-rpc] 鍙戞祦澶辫触锛� {}/{}, 鐩爣鍦板潃锛� {}锛歿}锛� {}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), jsonObject); + WVPResult wvpResult = WVPResult.fail(jsonObject.getInteger("code"), jsonObject.getString("msg")); + response.setBody(wvpResult); + } + return response; + } + + /** + * 鍋滄鍙戞祦 + */ + public RedisRpcResponse stopSendRtp(RedisRpcRequest request) { + String sendRtpItemKey = request.getParam().toString(); + SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(sendRtpItemKey); + RedisRpcResponse response = request.getResponse(); + response.setStatusCode(200); + if (sendRtpItem == null) { + logger.info("[redis-rpc] 鍋滄鎺ㄦ祦, 鏈壘鍒皉edis涓殑鍙戞祦淇℃伅锛� key锛歿}", sendRtpItemKey); + WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "鏈壘鍒皉edis涓殑鍙戞祦淇℃伅"); + response.setBody(wvpResult); + return response; + } + logger.info("[redis-rpc] 鍋滄鎺ㄦ祦锛� {}/{}, 鐩爣鍦板潃锛� {}锛歿}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() ); + MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + if (mediaServerItem == null) { + logger.info("[redis-rpc] stopSendRtp->鏈壘鍒癕ediaServer锛� {}", sendRtpItem.getMediaServerId() ); + WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "鏈壘鍒癕ediaServer"); + response.setBody(wvpResult); + return response; + } + JSONObject jsonObject = zlmServerFactory.stopSendRtpStream(mediaServerItem, sendRtpItem); + if (jsonObject.getInteger("code") == 0) { + logger.info("[redis-rpc] 鍋滄鎺ㄦ祦鎴愬姛锛� {}/{}, 鐩爣鍦板潃锛� {}锛歿}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() ); + response.setBody(WVPResult.success()); + return response; + }else { + int code = jsonObject.getInteger("code"); + String msg = jsonObject.getString("msg"); + logger.info("[redis-rpc] 鍋滄鎺ㄦ祦澶辫触锛� {}/{}, 鐩爣鍦板潃锛� {}锛歿}锛� code锛� {}, msg: {}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), code, msg ); + response.setBody(WVPResult.fail(code, msg)); + return response; + } + } + + /** + * 鍏朵粬wvp閫氱煡鎺ㄦ祦宸茬粡鍋滄浜� + */ + public RedisRpcResponse rtpSendStopped(RedisRpcRequest request) { + String sendRtpItemKey = request.getParam().toString(); + SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(sendRtpItemKey); + RedisRpcResponse response = request.getResponse(); + response.setStatusCode(200); + if (sendRtpItem == null) { + logger.info("[redis-rpc] 鎺ㄦ祦宸茬粡鍋滄, 鏈壘鍒皉edis涓殑鍙戞祦淇℃伅锛� key锛歿}", sendRtpItemKey); + return response; + } + logger.info("[redis-rpc] 鎺ㄦ祦宸茬粡鍋滄锛� {}/{}, 鐩爣鍦板潃锛� {}锛歿}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() ); + String platformId = sendRtpItem.getPlatformId(); + ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId); + if (platform == null) { + return response; + } + try { + commanderFroPlatform.streamByeCmd(platform, sendRtpItem); + redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(), + sendRtpItem.getCallId(), sendRtpItem.getStream()); + redisCatchStorage.sendPlatformStopPlayMsg(sendRtpItem, platform); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[鍛戒护鍙戦�佸け璐 鍙戦�丅YE: {}", e.getMessage()); + } + return response; + } + + private void sendResponse(RedisRpcResponse response){ + logger.info("[redis-rpc] >> {}", response); + response.setToId(userSetting.getServerId()); + RedisRpcMessage message = new RedisRpcMessage(); + message.setResponse(response); + redisTemplate.convertAndSend(RedisRpcConfig.REDIS_REQUEST_CHANNEL_KEY, message); + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java new file mode 100644 index 0000000..8ffb562 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java @@ -0,0 +1,155 @@ +package com.genersoft.iot.vmp.service.redisMsg.service; + +import com.alibaba.fastjson2.JSON; +import com.genersoft.iot.vmp.common.CommonCallback; +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig; +import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest; +import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse; +import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; +import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; +import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; +import com.genersoft.iot.vmp.vmanager.bean.WVPResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Service; + +@Service +public class RedisRpcServiceImpl implements IRedisRpcService { + + private final static Logger logger = LoggerFactory.getLogger(RedisRpcServiceImpl.class); + + @Autowired + private RedisRpcConfig redisRpcConfig; + + @Autowired + private UserSetting userSetting; + + @Autowired + private ZlmHttpHookSubscribe hookSubscribe; + + @Autowired + private SSRCFactory ssrcFactory; + + @Autowired + private RedisTemplate<Object, Object> redisTemplate; + + private RedisRpcRequest buildRequest(String uri, Object param) { + RedisRpcRequest request = new RedisRpcRequest(); + request.setFromId(userSetting.getServerId()); + request.setParam(param); + request.setUri(uri); + return request; + } + + @Override + public SendRtpItem getSendRtpItem(String sendRtpItemKey) { + RedisRpcRequest request = buildRequest("getSendRtpItem", sendRtpItemKey); + RedisRpcResponse response = redisRpcConfig.request(request, 10); + if (response.getBody() == null) { + return null; + } + return (SendRtpItem)redisTemplate.opsForValue().get(response.getBody().toString()); + } + + @Override + public WVPResult startSendRtp(String sendRtpItemKey, SendRtpItem sendRtpItem) { + logger.info("[璇锋眰鍏朵粬WVP] 寮�濮嬫帹娴侊紝wvp锛歿}锛� {}/{}", sendRtpItem.getServerId(), sendRtpItem.getApp(), sendRtpItem.getStream()); + RedisRpcRequest request = buildRequest("startSendRtp", sendRtpItemKey); + request.setToId(sendRtpItem.getServerId()); + RedisRpcResponse response = redisRpcConfig.request(request, 10); + return JSON.parseObject(response.getBody().toString(), WVPResult.class); + } + + @Override + public WVPResult stopSendRtp(String sendRtpItemKey) { + SendRtpItem sendRtpItem = (SendRtpItem)redisTemplate.opsForValue().get(sendRtpItemKey); + if (sendRtpItem == null) { + logger.info("[璇锋眰鍏朵粬WVP] 鍋滄鎺ㄦ祦, 鏈壘鍒皉edis涓殑鍙戞祦淇℃伅锛� key锛歿}", sendRtpItemKey); + return WVPResult.fail(ErrorCode.ERROR100.getCode(), "鏈壘鍒板彂娴佷俊鎭�"); + } + logger.info("[璇锋眰鍏朵粬WVP] 鍋滄鎺ㄦ祦锛寃vp锛歿}锛� {}/{}", sendRtpItem.getServerId(), sendRtpItem.getApp(), sendRtpItem.getStream()); + RedisRpcRequest request = buildRequest("stopSendRtp", sendRtpItemKey); + request.setToId(sendRtpItem.getServerId()); + RedisRpcResponse response = redisRpcConfig.request(request, 10); + return JSON.parseObject(response.getBody().toString(), WVPResult.class); + } + + @Override + public long waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback<String> callback) { + logger.info("[璇锋眰鎵�鏈塛VP鐩戝惉娴佷笂绾縘 {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); + // 鐩戝惉娴佷笂绾裤�� 娴佷笂绾跨洿鎺ュ彂閫乻endRtpItem娑堟伅缁欏疄闄呯殑淇′护澶勭悊鑰� + HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed( + sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null); + RedisRpcRequest request = buildRequest("waitePushStreamOnline", sendRtpItem); + request.setToId(sendRtpItem.getServerId()); + hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> { + + // 璇诲彇redis涓殑涓婄骇鐐规挱淇℃伅锛岀敓鎴恠endRtpItm鍙戦�佸嚭鍘� + if (sendRtpItem.getSsrc() == null) { + // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡 + String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItemInUse.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItemInUse.getId()); + sendRtpItem.setSsrc(ssrc); + } + sendRtpItem.setMediaServerId(mediaServerItemInUse.getId()); + sendRtpItem.setLocalIp(mediaServerItemInUse.getSdpIp()); + sendRtpItem.setServerId(userSetting.getServerId()); + redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem); + if (callback != null) { + callback.run(sendRtpItem.getRedisKey()); + } + hookSubscribe.removeSubscribe(hook); + redisRpcConfig.removeCallback(request.getSn()); + }); + + redisRpcConfig.request(request, response -> { + if (response.getBody() == null) { + logger.info("[璇锋眰鎵�鏈塛VP鐩戝惉娴佷笂绾縘 娴佷笂绾�,浣嗘槸鏈壘鍒板彂娴佷俊鎭細{}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); + return; + } + logger.info("[璇锋眰鎵�鏈塛VP鐩戝惉娴佷笂绾縘 娴佷笂绾� {}/{}->{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.toString()); + + if (callback != null) { + callback.run(response.getBody().toString()); + } + hookSubscribe.removeSubscribe(hook); + }); + return request.getSn(); + } + + @Override + public void stopWaitePushStreamOnline(SendRtpItem sendRtpItem) { + logger.info("[鍋滄WVP鐩戝惉娴佷笂绾縘 {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); + HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed( + sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null); + hookSubscribe.removeSubscribe(hook); + RedisRpcRequest request = buildRequest("stopWaitePushStreamOnline", sendRtpItem); + request.setToId(sendRtpItem.getServerId()); + redisRpcConfig.request(request, 10); + } + + @Override + public void rtpSendStopped(String sendRtpItemKey) { + SendRtpItem sendRtpItem = (SendRtpItem)redisTemplate.opsForValue().get(sendRtpItemKey); + if (sendRtpItem == null) { + logger.info("[鍋滄WVP鐩戝惉娴佷笂绾縘 鏈壘鍒皉edis涓殑鍙戞祦淇℃伅锛� key锛歿}", sendRtpItemKey); + return; + } + RedisRpcRequest request = buildRequest("rtpSendStopped", sendRtpItemKey); + request.setToId(sendRtpItem.getServerId()); + redisRpcConfig.request(request, 10); + } + + @Override + public void removeCallback(long key) { + redisRpcConfig.removeCallback(key); + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index 1233623..21911b9 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -45,6 +45,8 @@ void updateSendRTPSever(SendRtpItem sendRtpItem); + List<SendRtpItem> querySendRTPServer(String platformGbId, String channelId, String streamId); + /** * 鏌ヨRTP鎺ㄩ�佷俊鎭紦瀛� * @param platformGbId @@ -197,6 +199,8 @@ void addDiskInfo(List<Map<String, Object>> diskInfo); + void deleteSendRTPServer(SendRtpItem sendRtpItem); + List<SendRtpItem> queryAllSendRTPServer(); List<Device> getAllDevices(); @@ -209,7 +213,7 @@ void sendPlatformStartPlayMsg(MessageForPushChannel messageForPushChannel); - void sendPlatformStopPlayMsg(MessageForPushChannel messageForPushChannel); + void sendPlatformStopPlayMsg(SendRtpItem sendRtpItem, ParentPlatform platform); void addPushListItem(String app, String stream, MediaArrivalEvent param); @@ -219,4 +223,11 @@ void sendPushStreamClose(MessageForPushChannel messageForPushChannel); + void addWaiteSendRtpItem(SendRtpItem sendRtpItem, int platformPlayTimeout); + + SendRtpItem getWaiteSendRtpItem(String app, String stream); + + void sendStartSendRtp(SendRtpItem sendRtpItem); + + void sendPushStreamOnline(SendRtpItem sendRtpItem); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java index c03d73a..10d0ee1 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java @@ -52,8 +52,8 @@ "<if test='status != null'>, status=#{status}</if>" + "<if test='streamId != null'>, stream_id=#{streamId}</if>" + "<if test='hasAudio != null'>, has_audio=#{hasAudio}</if>" + - ", custom_longitude=#{longitude}" + - ", custom_latitude=#{latitude}" + + "<if test='customLongitude != null'>, custom_longitude=#{customLongitude}</if>" + + "<if test='customLatitude != null'>, custom_latitude=#{customLatitude}</if>" + "<if test='longitudeGcj02 != null'>, longitude_gcj02=#{longitudeGcj02}</if>" + "<if test='latitudeGcj02 != null'>, latitude_gcj02=#{latitudeGcj02}</if>" + "<if test='longitudeWgs84 != null'>, longitude_wgs84=#{longitudeWgs84}</if>" + @@ -89,8 +89,10 @@ "dc.password, " + "COALESCE(dc.custom_ptz_type, dc.ptz_type) AS ptz_type, " + "dc.status, " + - "COALESCE(dc.custom_longitude, dc.longitude) AS longitude, " + - "COALESCE(dc.custom_latitude, dc.latitude) AS latitude, " + + "dc.longitude, " + + "dc.latitude, " + + "dc.custom_longitude, " + + "dc.custom_latitude, " + "dc.stream_id, " + "dc.device_id, " + "dc.parental, " + @@ -345,6 +347,8 @@ "<if test='item.hasAudio != null'>, has_audio=#{item.hasAudio}</if>" + "<if test='item.longitude != null'>, longitude=#{item.longitude}</if>" + "<if test='item.latitude != null'>, latitude=#{item.latitude}</if>" + + "<if test='item.customLongitude != null'>, custom_longitude=#{item.customLongitude}</if>" + + "<if test='item.customLatitude != null'>, custom_latitude=#{item.customLatitude}</if>" + "<if test='item.longitudeGcj02 != null'>, longitude_gcj02=#{item.longitudeGcj02}</if>" + "<if test='item.latitudeGcj02 != null'>, latitude_gcj02=#{item.latitudeGcj02}</if>" + "<if test='item.longitudeWgs84 != null'>, longitude_wgs84=#{item.longitudeWgs84}</if>" + @@ -397,6 +401,23 @@ " </script>"}) int updatePosition(DeviceChannel deviceChannel); + @Update({"<script>" + + "<foreach collection='deviceChannelList' item='item' separator=';'>" + + " UPDATE" + + " wvp_device_channel" + + " SET gps_time=#{item.gpsTime}" + + "<if test='item.longitude != null'>, longitude=#{item.longitude}</if>" + + "<if test='item.latitude != null'>, latitude=#{item.latitude}</if>" + + "<if test='item.longitudeGcj02 != null'>, longitude_gcj02=#{item.longitudeGcj02}</if>" + + "<if test='item.latitudeGcj02 != null'>, latitude_gcj02=#{item.latitudeGcj02}</if>" + + "<if test='item.longitudeWgs84 != null'>, longitude_wgs84=#{item.longitudeWgs84}</if>" + + "<if test='item.latitudeWgs84 != null'>, latitude_wgs84=#{item.latitudeWgs84}</if>" + + "WHERE device_id=#{item.deviceId} " + + " <if test='item.channelId != null' > AND channel_id=#{item.channelId}</if>" + + "</foreach>" + + "</script>"}) + int batchUpdatePosition(List<DeviceChannel> deviceChannelList); + @Select("SELECT * FROM wvp_device_channel WHERE length(trim(stream_id)) > 0") List<DeviceChannel> getAllChannelInPlay(); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java index 7bf243c..c28b16e 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java @@ -33,4 +33,33 @@ @Delete("DELETE FROM wvp_device_mobile_position WHERE device_id = #{deviceId}") int clearMobilePositionsByDeviceId(String deviceId); + + @Insert("<script> " + + "insert into wvp_device_mobile_position " + + "(device_id,channel_id, device_name,time,longitude,latitude,altitude,speed,direction,report_source," + + "longitude_gcj02,latitude_gcj02,longitude_wgs84,latitude_wgs84,create_time)"+ + "values " + + "<foreach collection='mobilePositions' index='index' item='item' separator=','> " + + "(#{item.deviceId}, #{item.channelId}, #{item.deviceName}, #{item.time}, #{item.longitude}, " + + "#{item.latitude}, #{item.altitude}, #{item.speed},#{item.direction}," + + "#{item.reportSource}, #{item.longitudeGcj02}, #{item.latitudeGcj02}, #{item.longitudeWgs84}, #{item.latitudeWgs84}, " + + "#{item.createTime}) " + + "</foreach> " + + "</script>") + void batchadd2(List<MobilePosition> mobilePositions); + + @Insert("<script> " + + "<foreach collection='mobilePositions' index='index' item='item' separator=','> " + + "insert into wvp_device_mobile_position " + + "(device_id,channel_id, device_name,time,longitude,latitude,altitude,speed,direction,report_source," + + "longitude_gcj02,latitude_gcj02,longitude_wgs84,latitude_wgs84,create_time)"+ + "values " + + "(#{item.deviceId}, #{item.channelId}, #{item.deviceName}, #{item.time}, #{item.longitude}, " + + "#{item.latitude}, #{item.altitude}, #{item.speed},#{item.direction}," + + "#{item.reportSource}, #{item.longitudeGcj02}, #{item.latitudeGcj02}, #{item.longitudeWgs84}, #{item.latitudeWgs84}, " + + "#{item.createTime}); " + + "</foreach> " + + "</script>") + void batchadd(List<MobilePosition> mobilePositions); + } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 51878c7..6388932 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -12,6 +12,8 @@ import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; +import com.genersoft.iot.vmp.gb28181.bean.*; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; @@ -146,15 +148,26 @@ @Override public void updateSendRTPSever(SendRtpItem sendRtpItem) { + redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem); + } - String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + - userSetting.getServerId() + "_" - + sendRtpItem.getMediaServerId() + "_" - + sendRtpItem.getPlatformId() + "_" - + sendRtpItem.getChannelId() + "_" - + sendRtpItem.getStream() + "_" - + sendRtpItem.getCallId(); - redisTemplate.opsForValue().set(key, sendRtpItem); + @Override + public List<SendRtpItem> querySendRTPServer(String platformGbId, String channelId, String streamId) { + String scanKey = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + + userSetting.getServerId() + "_*_" + + platformGbId + "_" + + channelId + "_" + + streamId + "_" + + "*"; + List<SendRtpItem> result = new ArrayList<>(); + List<Object> scan = RedisUtil.scan(redisTemplate, scanKey); + if (!scan.isEmpty()) { + for (Object o : scan) { + String key = (String) o; + result.add(JsonUtil.redisJsonToObject(redisTemplate, key, SendRtpItem.class)); + } + } + return result; } @Override @@ -172,7 +185,7 @@ callId = "*"; } String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX - + userSetting.getServerId() + "_*_" + + "*_*_" + platformGbId + "_" + channelId + "_" + streamId + "_" @@ -268,9 +281,18 @@ List<Object> scan = RedisUtil.scan(redisTemplate, key); if (scan.size() > 0) { for (Object keyStr : scan) { + logger.info("[鍒犻櫎 redis鐨凷endRTP]锛� {}", keyStr.toString()); redisTemplate.delete(keyStr); } } + } + + /** + * 鍒犻櫎RTP鎺ㄩ�佷俊鎭紦瀛� + */ + @Override + public void deleteSendRTPServer(SendRtpItem sendRtpItem) { + deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),sendRtpItem.getCallId(), sendRtpItem.getStream()); } @Override @@ -555,7 +577,7 @@ @Override public void sendMobilePositionMsg(JSONObject jsonObject) { String key = VideoManagerConstants.VM_MSG_SUBSCRIBE_MOBILE_POSITION; - logger.info("[redis鍙戦�侀�氱煡] 鍙戦�� 绉诲姩浣嶇疆 {}: {}", key, jsonObject.toString()); +// logger.info("[redis鍙戦�侀�氱煡] 鍙戦�� 绉诲姩浣嶇疆 {}: {}", key, jsonObject.toString()); redisTemplate.convertAndSend(key, jsonObject); } @@ -646,9 +668,15 @@ } @Override - public void sendPlatformStopPlayMsg(MessageForPushChannel msg) { + public void sendPlatformStopPlayMsg(SendRtpItem sendRtpItem, ParentPlatform platform) { + + MessageForPushChannel msg = MessageForPushChannel.getInstance(0, + sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), + sendRtpItem.getPlatformId(), platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); + msg.setPlatFormIndex(platform.getId()); + String key = VideoManagerConstants.VM_MSG_STREAM_STOP_PLAY_NOTIFY; - logger.info("[redis鍙戦�侀�氱煡] 鍙戦�� 涓婄骇骞冲彴鍋滄瑙傜湅 {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId()); + logger.info("[redis鍙戦�侀�氱煡] 鍙戦�� 涓婄骇骞冲彴鍋滄瑙傜湅 {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), platform.getServerGBId()); redisTemplate.convertAndSend(key, JSON.toJSON(msg)); } @@ -681,4 +709,30 @@ logger.info("[redis鍙戦�侀�氱煡] 鍙戦�� 鍋滄鍚戜笂绾ф帹娴� {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId()); redisTemplate.convertAndSend(key, JSON.toJSON(msg)); } + + @Override + public void addWaiteSendRtpItem(SendRtpItem sendRtpItem, int platformPlayTimeout) { + String key = VideoManagerConstants.WAITE_SEND_PUSH_STREAM + sendRtpItem.getApp() + "_" + sendRtpItem.getStream(); + redisTemplate.opsForValue().set(key, sendRtpItem); + } + + @Override + public SendRtpItem getWaiteSendRtpItem(String app, String stream) { + String key = VideoManagerConstants.WAITE_SEND_PUSH_STREAM + app + "_" + stream; + return JsonUtil.redisJsonToObject(redisTemplate, key, SendRtpItem.class); + } + + @Override + public void sendStartSendRtp(SendRtpItem sendRtpItem) { + String key = VideoManagerConstants.START_SEND_PUSH_STREAM + sendRtpItem.getApp() + "_" + sendRtpItem.getStream(); + logger.info("[redis鍙戦�侀�氱煡] 閫氱煡鍏朵粬WVP鎺ㄦ祦 {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getPlatformId()); + redisTemplate.convertAndSend(key, JSON.toJSON(sendRtpItem)); + } + + @Override + public void sendPushStreamOnline(SendRtpItem sendRtpItem) { + String key = VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED; + logger.info("[redis鍙戦�侀�氱煡] 娴佷笂绾� {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getPlatformId()); + redisTemplate.convertAndSend(key, JSON.toJSON(sendRtpItem)); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/log/LogController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/log/LogController.java index 7ad595d..59a6943 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/log/LogController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/log/LogController.java @@ -2,6 +2,7 @@ import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; +import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig; import com.genersoft.iot.vmp.conf.security.JwtUtils; import com.genersoft.iot.vmp.service.ILogService; import com.genersoft.iot.vmp.storager.dao.dto.LogDto; @@ -92,4 +93,14 @@ logService.clear(); } + @Autowired + private RedisRpcConfig redisRpcConfig; + + @GetMapping("/test/count") + public Object count() { + return redisRpcConfig.getCallbackCount(); + } + + + } diff --git a/web_src/src/components/channelList.vue b/web_src/src/components/channelList.vue index 02e92e7..ef49205 100755 --- a/web_src/src/components/channelList.vue +++ b/web_src/src/components/channelList.vue @@ -326,7 +326,9 @@ e.ptzType = e.ptzType + ""; that.$set(e, "edit", false); that.$set(e, "location", ""); - if (e.longitude && e.latitude) { + if (e.customLongitude && e.customLatitude) { + that.$set(e, "location", e.customLongitude + "," + e.customLatitude); + }else if (e.longitude && e.latitude) { that.$set(e, "location", e.longitude + "," + e.latitude); } }); @@ -481,7 +483,9 @@ e.ptzType = e.ptzType + ""; this.$set(e, "edit", false); this.$set(e, "location", ""); - if (e.longitude && e.latitude) { + if (e.customLongitude && e.customLatitude) { + this.$set(e, "location", e.customLongitude + "," + e.customLatitude); + }else if (e.longitude && e.latitude) { this.$set(e, "location", e.longitude + "," + e.latitude); } }); @@ -603,8 +607,8 @@ this.$message.warning("浣嶇疆淇℃伅鏍煎紡鏈夎锛屼緥锛�117.234,36.378"); return; } else { - row.longitude = parseFloat(segements[0]); - row.latitude = parseFloat(segements[1]); + row.customLongitude = parseFloat(segements[0]); + row.custom_latitude = parseFloat(segements[1]); if (!(row.longitude && row.latitude)) { this.$message.warning("浣嶇疆淇℃伅鏍煎紡鏈夎锛屼緥锛�117.234,36.378"); return; diff --git "a/\346\225\260\346\215\256\345\272\223/2.7.0/\346\233\264\346\226\260-mysql-2.7.0.sql" "b/\346\225\260\346\215\256\345\272\223/2.7.0/\346\233\264\346\226\260-mysql-2.7.0.sql" index c229fb1..b14a5c8 100644 --- "a/\346\225\260\346\215\256\345\272\223/2.7.0/\346\233\264\346\226\260-mysql-2.7.0.sql" +++ "b/\346\225\260\346\215\256\345\272\223/2.7.0/\346\233\264\346\226\260-mysql-2.7.0.sql" @@ -4,5 +4,15 @@ alter table wvp_device drop switch_primary_sub_stream; +# 绗竴涓ˉ涓佸寘 alter table wvp_platform - add send_stream_ip character varying(50); \ No newline at end of file + add send_stream_ip character varying(50); + +alter table wvp_device + change on_line on_line bool default false; + +alter table wvp_device + change id id serial primary key; + +alter table wvp_device + change ssrc_check ssrc_check bool default false; \ No newline at end of file diff --git "a/\346\225\260\346\215\256\345\272\223/2.7.0/\346\233\264\346\226\260-postgresql-kingbase-2.7.0.sql" "b/\346\225\260\346\215\256\345\272\223/2.7.0/\346\233\264\346\226\260-postgresql-kingbase-2.7.0.sql" index c229fb1..b14a5c8 100644 --- "a/\346\225\260\346\215\256\345\272\223/2.7.0/\346\233\264\346\226\260-postgresql-kingbase-2.7.0.sql" +++ "b/\346\225\260\346\215\256\345\272\223/2.7.0/\346\233\264\346\226\260-postgresql-kingbase-2.7.0.sql" @@ -4,5 +4,15 @@ alter table wvp_device drop switch_primary_sub_stream; +# 绗竴涓ˉ涓佸寘 alter table wvp_platform - add send_stream_ip character varying(50); \ No newline at end of file + add send_stream_ip character varying(50); + +alter table wvp_device + change on_line on_line bool default false; + +alter table wvp_device + change id id serial primary key; + +alter table wvp_device + change ssrc_check ssrc_check bool default false; \ No newline at end of file -- Gitblit v1.8.0