From bea63f67e75ea6c38d946c2ee463260fcf815f87 Mon Sep 17 00:00:00 2001 From: Fang <costa11@qq.com> Date: 星期一, 07 三月 2022 14:21:29 +0800 Subject: [PATCH] Merge branch '648540858:wvp-28181-2.0' into wvp-28181-2.0 --- src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java | 13 src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java | 20 + src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java | 22 src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java | 4 src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/SubscribeListenerForPlatform.java | 4 src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMStatusEventListener.java | 6 src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java | 7 src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java | 2 src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java | 7 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java | 8 src/main/java/com/genersoft/iot/vmp/gb28181/auth/RegisterLogicHandler.java | 16 + src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java | 42 ++ src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java | 54 ++- sql/mysql.sql | 24 src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/OfflineEventListener.java | 26 + src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java | 4 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java | 21 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java | 7 src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/KeepaliveTimeoutListenerForPlatform.java | 23 + src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java | 5 src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java | 28 + src/main/java/com/genersoft/iot/vmp/service/IPlayService.java | 4 src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java | 4 src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java | 156 ++++++++-- src/main/java/com/genersoft/iot/vmp/conf/UserSetup.java | 10 src/main/java/com/genersoft/iot/vmp/gb28181/event/platformNotRegister/PlatformNotRegisterEventLister.java | 4 src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java | 2 src/main/resources/all-application.yml | 2 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java | 66 +-- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java | 5 src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java | 3 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java | 29 + src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/platform/PlatformController.java | 18 src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java | 24 + src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java | 18 - src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java | 83 +++++ src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java | 2 src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java | 2 src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java | 37 ++ 39 files changed, 588 insertions(+), 224 deletions(-) diff --git a/sql/mysql.sql b/sql/mysql.sql index d45f6de..7c68429 100644 --- a/sql/mysql.sql +++ b/sql/mysql.sql @@ -44,7 +44,7 @@ `charset` varchar(50) COLLATE utf8mb4_general_ci NOT NULL, PRIMARY KEY (`id`), UNIQUE KEY `device_deviceId_uindex` (`deviceId`) -) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; +) ENGINE=InnoDB AUTO_INCREMENT=21 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; /*!40101 SET character_set_client = @saved_cs_client */; -- @@ -129,7 +129,7 @@ PRIMARY KEY (`id`), UNIQUE KEY `device_channel_id_uindex` (`id`), UNIQUE KEY `device_channel_pk` (`channelId`,`deviceId`) -) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; +) ENGINE=InnoDB AUTO_INCREMENT=46 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; /*!40101 SET character_set_client = @saved_cs_client */; -- @@ -198,7 +198,7 @@ PRIMARY KEY (`gbStreamId`) USING BTREE, UNIQUE KEY `app` (`app`,`stream`) USING BTREE, UNIQUE KEY `gbId` (`gbId`) USING BTREE -) ENGINE=InnoDB AUTO_INCREMENT=375 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC; +) ENGINE=InnoDB AUTO_INCREMENT=300766 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC; /*!40101 SET character_set_client = @saved_cs_client */; -- @@ -228,7 +228,7 @@ `username` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, `createTime` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, PRIMARY KEY (`id`) USING BTREE -) ENGINE=InnoDB AUTO_INCREMENT=313 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC; +) ENGINE=InnoDB AUTO_INCREMENT=962 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC; /*!40101 SET character_set_client = @saved_cs_client */; -- @@ -317,7 +317,7 @@ PRIMARY KEY (`id`), UNIQUE KEY `parent_platform_id_uindex` (`id`), UNIQUE KEY `parent_platform_pk` (`serverGBId`) -) ENGINE=InnoDB AUTO_INCREMENT=14 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC; +) ENGINE=InnoDB AUTO_INCREMENT=23 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC; /*!40101 SET character_set_client = @saved_cs_client */; -- @@ -367,7 +367,7 @@ `catalogId` varchar(50) COLLATE utf8mb4_general_ci NOT NULL, `deviceChannelId` int NOT NULL, PRIMARY KEY (`id`) -) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; +) ENGINE=InnoDB AUTO_INCREMENT=47 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; /*!40101 SET character_set_client = @saved_cs_client */; -- @@ -393,7 +393,7 @@ `id` int NOT NULL AUTO_INCREMENT, PRIMARY KEY (`id`), UNIQUE KEY `platform_gb_stream_pk` (`platformId`,`catalogId`,`gbStreamId`) -) ENGINE=InnoDB AUTO_INCREMENT=406 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC; +) ENGINE=InnoDB AUTO_INCREMENT=301207 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC; /*!40101 SET character_set_client = @saved_cs_client */; -- @@ -415,7 +415,6 @@ CREATE TABLE `stream_proxy` ( `id` int NOT NULL AUTO_INCREMENT, `type` varchar(50) COLLATE utf8mb4_general_ci NOT NULL, - `name` varchar(255) COLLATE utf8mb4_general_ci NOT NULL, `app` varchar(255) COLLATE utf8mb4_general_ci NOT NULL, `stream` varchar(255) COLLATE utf8mb4_general_ci NOT NULL, `url` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL, @@ -431,9 +430,10 @@ `status` bit(1) NOT NULL, `enable_remove_none_reader` bit(1) NOT NULL, `createTime` varchar(50) COLLATE utf8mb4_general_ci NOT NULL, + `name` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `stream_proxy_pk` (`app`,`stream`) -) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; +) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; /*!40101 SET character_set_client = @saved_cs_client */; -- @@ -464,7 +464,7 @@ `mediaServerId` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `stream_push_pk` (`app`,`stream`) -) ENGINE=InnoDB AUTO_INCREMENT=394 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; +) ENGINE=InnoDB AUTO_INCREMENT=300799 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; /*!40101 SET character_set_client = @saved_cs_client */; -- @@ -501,7 +501,7 @@ LOCK TABLES `user` WRITE; /*!40000 ALTER TABLE `user` DISABLE KEYS */; -INSERT INTO `user` VALUES (1,'admin','21232f297a57a5a743894a0e4a801fc3',1,'2021-04-13 14:14:57','2021-04-13 14:14:57'); +INSERT INTO `user` VALUES (1,'admin','21232f297a57a5a743894a0e4a801fc3',1,'2021 - 04 - 13 14:14:57','2021 - 04 - 13 14:14:57'); /*!40000 ALTER TABLE `user` ENABLE KEYS */; UNLOCK TABLES; @@ -541,4 +541,4 @@ /*!40101 SET COLLATION_CONNECTION=@OLD_COLLATION_CONNECTION */; /*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */; --- Dump completed on 2022-02-25 20:32:21 +-- Dump completed on 2022-03-07 8:26:30 diff --git a/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java b/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java index c9572ae..80e39f5 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java @@ -5,6 +5,7 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.stereotype.Component; +import java.util.Date; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledFuture; @@ -25,15 +26,38 @@ return new ThreadPoolTaskScheduler(); } + /** + * 寰幆鎵ц鐨勪换鍔� + * @param key 浠诲姟ID + * @param task 浠诲姟 + * @param cycleForCatalog 闂撮殧 + * @return + */ public String startCron(String key, Runnable task, int cycleForCatalog) { - stopCron(key); + stop(key); // scheduleWithFixedDelay 蹇呴』绛夊緟涓婁竴涓换鍔$粨鏉熸墠寮�濮嬭鏃秔eriod锛� cycleForCatalog琛ㄧず鎵ц鐨勯棿闅� ScheduledFuture future = threadPoolTaskScheduler.scheduleWithFixedDelay(task, cycleForCatalog * 1000L); futureMap.put(key, future); return "startCron"; } - public void stopCron(String key) { + /** + * 寤舵椂浠诲姟 + * @param key 浠诲姟ID + * @param task 浠诲姟 + * @param delay 寤舵椂 /绉� + * @return + */ + public String startDelay(String key, Runnable task, int delay) { + stop(key); + Date starTime = new Date(System.currentTimeMillis() + delay * 1000); + // scheduleWithFixedDelay 蹇呴』绛夊緟涓婁竴涓换鍔$粨鏉熸墠寮�濮嬭鏃秔eriod锛� cycleForCatalog琛ㄧず鎵ц鐨勯棿闅� + ScheduledFuture future = threadPoolTaskScheduler.schedule(task, starTime); + futureMap.put(key, future); + return "startCron"; + } + + public void stop(String key) { if (futureMap.get(key) != null && !futureMap.get(key).isCancelled()) { futureMap.get(key).cancel(true); } diff --git a/src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java b/src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java index b69bf68..4ebaf0b 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java @@ -59,8 +59,11 @@ redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch); // 鍙栨秷璁㈤槄 - sipCommanderForPlatform.unregister(parentPlatform, null, null); - Thread.sleep(500); + sipCommanderForPlatform.unregister(parentPlatform, null, (eventResult)->{ + ParentPlatform platform = storager.queryParentPlatByServerGBId(parentPlatform.getServerGBId()); + sipCommanderForPlatform.register(platform, null, null); + }); + // 鍙戦�佸钩鍙版湭娉ㄥ唽娑堟伅 publisher.platformNotRegisterEventPublish(parentPlatform.getServerGBId()); } diff --git a/src/main/java/com/genersoft/iot/vmp/conf/UserSetup.java b/src/main/java/com/genersoft/iot/vmp/conf/UserSetup.java index 4decd2b..d1d0e20 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/UserSetup.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/UserSetup.java @@ -19,8 +19,6 @@ private Long playTimeout = 18000L; - private Boolean waitTrack = Boolean.FALSE; - private Boolean interfaceAuthentication = Boolean.TRUE; private Boolean recordPushLive = Boolean.TRUE; @@ -57,10 +55,6 @@ return playTimeout; } - public Boolean isWaitTrack() { - return waitTrack; - } - public Boolean isInterfaceAuthentication() { return interfaceAuthentication; } @@ -87,10 +81,6 @@ public void setPlayTimeout(Long playTimeout) { this.playTimeout = playTimeout; - } - - public void setWaitTrack(Boolean waitTrack) { - this.waitTrack = waitTrack; } public void setInterfaceAuthentication(boolean interfaceAuthentication) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/auth/RegisterLogicHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/auth/RegisterLogicHandler.java index 62d4bec..c6fba3d 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/auth/RegisterLogicHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/auth/RegisterLogicHandler.java @@ -1,5 +1,6 @@ package com.genersoft.iot.vmp.gb28181.auth; +import com.genersoft.iot.vmp.storager.impl.VideoManagerStoragerImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -20,13 +21,24 @@ @Autowired private SIPCommander cmder; + + @Autowired + private VideoManagerStoragerImpl storager; public void onRegister(Device device) { // 鍙湁绗竴娆℃敞鍐屾椂璋冪敤鏌ヨ璁惧淇℃伅锛屽闇�鏇存柊璋冪敤鏇存柊API鎺ュ彛 + // TODO 姝ゅ閿欒鏃犳硶鑾峰彇鍒伴�氶亾 + Device device1 = storager.queryVideoDevice(device.getDeviceId()); if (device.isFirsRegister()) { logger.info("[{}] 棣栨娉ㄥ唽锛屾煡璇㈣澶囦俊鎭互鍙婇�氶亾淇℃伅", device.getDeviceId()); - cmder.deviceInfoQuery(device); - cmder.catalogQuery(device, null); + try { + Thread.sleep(100); + cmder.deviceInfoQuery(device); + Thread.sleep(100); + cmder.catalogQuery(device, null); + } catch (InterruptedException e) { + e.printStackTrace(); + } } } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java index 3e5d222..3914fa1 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java @@ -81,6 +81,10 @@ */ private boolean isPlay; + private byte[] transaction; + + private byte[] dialog; + public String getIp() { return ip; } @@ -200,4 +204,20 @@ public void setPlay(boolean play) { isPlay = play; } + + public byte[] getTransaction() { + return transaction; + } + + public void setTransaction(byte[] transaction) { + this.transaction = transaction; + } + + public byte[] getDialog() { + return dialog; + } + + public void setDialog(byte[] dialog) { + this.dialog = dialog; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/KeepaliveTimeoutListenerForPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/KeepaliveTimeoutListenerForPlatform.java index 9ba0c05..3b611b5 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/KeepaliveTimeoutListenerForPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/KeepaliveTimeoutListenerForPlatform.java @@ -2,7 +2,10 @@ import com.genersoft.iot.vmp.conf.RedisKeyExpirationEventMessageListener; import com.genersoft.iot.vmp.conf.UserSetup; +import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; +import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; @@ -39,6 +42,9 @@ @Autowired private SipSubscribe sipSubscribe; + @Autowired + private IVideoManagerStorager storager; + public KeepaliveTimeoutListenerForPlatform(RedisMessageListenerContainer listenerContainer, UserSetup userSetup) { super(listenerContainer, userSetup); } @@ -61,15 +67,22 @@ String REGISTER_INFO_PREFIX = VideoManagerConstants.PLATFORM_REGISTER_INFO_PREFIX + userSetup.getServerId() + "_"; if (expiredKey.startsWith(PLATFORM_KEEPLIVEKEY_PREFIX)) { String platformGBId = expiredKey.substring(PLATFORM_KEEPLIVEKEY_PREFIX.length(),expiredKey.length()); - - publisher.platformKeepaliveExpireEventPublish(platformGBId); + ParentPlatform platform = storager.queryParentPlatByServerGBId(platformGBId); + if (platform != null) { + publisher.platformKeepaliveExpireEventPublish(platformGBId); + } }else if (expiredKey.startsWith(PLATFORM_REGISTER_PREFIX)) { String platformGBId = expiredKey.substring(PLATFORM_REGISTER_PREFIX.length(),expiredKey.length()); - - publisher.platformRegisterCycleEventPublish(platformGBId); + ParentPlatform platform = storager.queryParentPlatByServerGBId(platformGBId); + if (platform != null) { + publisher.platformRegisterCycleEventPublish(platformGBId); + } }else if (expiredKey.startsWith(KEEPLIVEKEY_PREFIX)){ String deviceId = expiredKey.substring(KEEPLIVEKEY_PREFIX.length(),expiredKey.length()); - publisher.outlineEventPublish(deviceId, KEEPLIVEKEY_PREFIX); + Device device = storager.queryVideoDevice(deviceId); + if (device != null) { + publisher.outlineEventPublish(deviceId, KEEPLIVEKEY_PREFIX); + } }else if (expiredKey.startsWith(REGISTER_INFO_PREFIX)) { String callid = expiredKey.substring(REGISTER_INFO_PREFIX.length()); SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/OfflineEventListener.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/OfflineEventListener.java index aa87728..97e480c 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/OfflineEventListener.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/OfflineEventListener.java @@ -2,8 +2,13 @@ import com.genersoft.iot.vmp.conf.UserSetup; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; +import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; +import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.service.IMediaServerService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -32,6 +37,9 @@ @Autowired private IVideoManagerStorager storager; + + @Autowired + private VideoStreamSessionManager streamSession; @Autowired private RedisUtil redis; @@ -41,6 +49,14 @@ @Autowired private EventPublisher eventPublisher; + + + @Autowired + private IMediaServerService mediaServerService; + + + @Autowired + private ZLMRTPServerFactory zlmrtpServerFactory; @Override public void onApplicationEvent(OfflineEvent event) { @@ -73,5 +89,15 @@ // TODO 绂荤嚎鍙栨秷璁㈤槄 + // 绂荤嚎閲婃斁鎵�鏈塻src + List<SsrcTransaction> ssrcTransactions = streamSession.getSsrcTransactionForAll(event.getDeviceId(), null, null, null); + if (ssrcTransactions.size() > 0) { + for (SsrcTransaction ssrcTransaction : ssrcTransactions) { + mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc()); + mediaServerService.closeRTPServer(event.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getStream()); + streamSession.remove(event.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getStream()); + } + } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformNotRegister/PlatformNotRegisterEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformNotRegister/PlatformNotRegisterEventLister.java index 2ab2b23..1b8e7ae 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformNotRegister/PlatformNotRegisterEventLister.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformNotRegister/PlatformNotRegisterEventLister.java @@ -75,7 +75,7 @@ stream.append(","); } stream.append(sendRtpItem.getStreamId()); - redisCatchStorage.deleteSendRTPServer(event.getPlatformGbID(), sendRtpItem.getChannelId()); + redisCatchStorage.deleteSendRTPServer(event.getPlatformGbID(), sendRtpItem.getChannelId(), null, null); MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); Map<String, Object> param = new HashMap<>(); param.put("vhost", "__defaultVhost__"); @@ -84,9 +84,7 @@ zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param); } - } - Timer timer = new Timer(); SipSubscribe.Event okEvent = (responseEvent)->{ timer.cancel(); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/SubscribeListenerForPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/SubscribeListenerForPlatform.java index bac8e3d..3b2bd23 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/SubscribeListenerForPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/SubscribeListenerForPlatform.java @@ -4,8 +4,6 @@ import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.RedisKeyExpirationEventMessageListener; import com.genersoft.iot.vmp.conf.UserSetup; -import com.genersoft.iot.vmp.gb28181.event.EventPublisher; -import org.checkerframework.checker.units.qual.A; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -46,7 +44,7 @@ String PLATFORM_KEEPLIVEKEY_PREFIX = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_"; if (expiredKey.startsWith(PLATFORM_KEEPLIVEKEY_PREFIX)) { // 鍙栨秷瀹氭椂浠诲姟 - dynamicTask.stopCron(expiredKey); + dynamicTask.stop(expiredKey); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java index 3e9f28a..ba8f24c 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java @@ -86,6 +86,15 @@ return dialog; } + public SIPDialog getDialogByCallId(String deviceId, String channelId, String callID){ + SsrcTransaction ssrcTransaction = getSsrcTransaction(deviceId, channelId, callID, null); + if (ssrcTransaction == null) return null; + byte[] dialogByteArray = ssrcTransaction.getDialog(); + if (dialogByteArray == null) return null; + SIPDialog dialog = (SIPDialog)SerializeUtils.deSerialize(dialogByteArray); + return dialog; + } + public SsrcTransaction getSsrcTransaction(String deviceId, String channelId, String callId, String stream){ if (StringUtils.isEmpty(callId)) callId ="*"; if (StringUtils.isEmpty(stream)) stream ="*"; @@ -95,6 +104,21 @@ return (SsrcTransaction)redisUtil.get((String) scanResult.get(0)); } + public List<SsrcTransaction> getSsrcTransactionForAll(String deviceId, String channelId, String callId, String stream){ + if (StringUtils.isEmpty(deviceId)) deviceId ="*"; + if (StringUtils.isEmpty(channelId)) channelId ="*"; + if (StringUtils.isEmpty(callId)) callId ="*"; + if (StringUtils.isEmpty(stream)) stream ="*"; + String key = VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetup.getServerId() + "_" + deviceId + "_" + channelId + "_" + callId+ "_" + stream; + List<Object> scanResult = redisUtil.scan(key); + if (scanResult.size() == 0) return null; + List<SsrcTransaction> result = new ArrayList<>(); + for (Object keyObj : scanResult) { + result.add((SsrcTransaction)redisUtil.get((String) keyObj)); + } + return result; + } + public String getMediaServerId(String deviceId, String channelId, String stream){ SsrcTransaction ssrcTransaction = getSsrcTransaction(deviceId, channelId, null, stream); if (ssrcTransaction == null) return null; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java index f0d9033..25dc75b 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java @@ -63,7 +63,5 @@ } } } - - } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java index cd2d627..1f58a15 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java @@ -96,4 +96,11 @@ * @param recordInfo 褰曞儚淇℃伅 */ boolean recordInfo(DeviceChannel deviceChannel, ParentPlatform parentPlatform, String fromTag, RecordInfo recordInfo); + + /** + * 鍚戝彂璧风偣鎾殑涓婄骇鍥炲bye + * @param platform 骞冲彴淇℃伅 + * @param callId callId + */ + void streamByeCmd(ParentPlatform platform, String callId); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index 437c69d..a7b67ad 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -2,6 +2,7 @@ import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetup; import com.genersoft.iot.vmp.gb28181.bean.Device; @@ -84,6 +85,9 @@ @Autowired private IMediaServerService mediaServerService; + + @Autowired + private DynamicTask dynamicTask; /** @@ -330,7 +334,8 @@ * @param errorEvent sip閿欒璁㈤槄 */ @Override - public void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent) { + public void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, + ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent) { String streamId = ssrcInfo.getStream(); try { if (device == null) return; @@ -342,15 +347,13 @@ subscribeKey.put("app", "rtp"); subscribeKey.put("stream", streamId); subscribeKey.put("regist", true); + subscribeKey.put("schema", "rtmp"); subscribeKey.put("mediaServerId", mediaServerItem.getId()); subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, (MediaServerItem mediaServerItemInUse, JSONObject json)->{ - if (userSetup.isWaitTrack() && json.getJSONArray("tracks") == null) return; if (event != null) { event.response(mediaServerItemInUse, json); } - -// subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey); }); // StringBuffer content = new StringBuffer(200); @@ -419,7 +422,7 @@ transmitRequest(device, request, (e -> { streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); - mediaServerService.releaseSsrc(mediaServerItem, ssrcInfo.getSsrc()); + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); errorEvent.response(e); }), e ->{ // 杩欓噷涓轰緥閬垮厤涓�涓�氶亾鐨勭偣鎾彧鏈変竴涓猚allID杩欎釜鍙傛暟浣跨敤涓�涓浐瀹氬�� @@ -458,8 +461,6 @@ logger.debug("褰曞儚鍥炴斁娣诲姞璁㈤槄锛岃闃呭唴瀹癸細" + subscribeKey.toString()); subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, (MediaServerItem mediaServerItemInUse, JSONObject json)->{ - System.out.println(344444); - if (userSetup.isWaitTrack() && json.getJSONArray("tracks") == null) return; if (event != null) { event.response(mediaServerItemInUse, json); } @@ -565,7 +566,6 @@ logger.debug("褰曞儚鍥炴斁娣诲姞璁㈤槄锛岃闃呭唴瀹癸細" + subscribeKey.toString()); subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, (MediaServerItem mediaServerItemInUse, JSONObject json)->{ - if (userSetup.isWaitTrack() && json.getJSONArray("tracks") == null) return; event.response(mediaServerItemInUse, json); subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey); }); @@ -662,6 +662,7 @@ @Override public void streamByeCmd(String deviceId, String channelId, String stream, SipSubscribe.Event okEvent) { try { + SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(deviceId, channelId, null, stream); ClientTransaction transaction = streamSession.getTransactionByStream(deviceId, channelId, stream); if (transaction == null) { logger.warn("[ {} -> {}]鍋滄瑙嗛娴佺殑鏃跺�欏彂鐜颁簨鍔″凡涓㈠け", deviceId, channelId); @@ -715,10 +716,9 @@ dialog.sendRequest(clientTransaction); - SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(deviceId, channelId, callIdHeader.getCallId(), null); if (ssrcTransaction != null) { MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransaction.getMediaServerId()); - mediaServerService.releaseSsrc(mediaServerItem, ssrcTransaction.getSsrc()); + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransaction.getSsrc()); mediaServerService.closeRTPServer(deviceId, channelId, ssrcTransaction.getStream()); streamSession.remove(deviceId, channelId, ssrcTransaction.getStream()); } @@ -1169,8 +1169,6 @@ */ @Override public boolean catalogQuery(Device device, SipSubscribe.Event errorEvent) { - // 娓呯┖閫氶亾 -// storager.cleanChannelsForDevice(device.getDeviceId()); try { StringBuffer catalogXml = new StringBuffer(200); catalogXml.append("<?xml version=\"1.0\" encoding=\"GB2312\"?>\r\n"); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index b57a5e4..67cb734 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -5,8 +5,16 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderPlarformProvider; import com.genersoft.iot.vmp.gb28181.utils.DateUtil; +import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.utils.SerializeUtils; +import gov.nist.javax.sip.SipProviderImpl; +import gov.nist.javax.sip.SipStackImpl; +import gov.nist.javax.sip.message.SIPRequest; +import gov.nist.javax.sip.stack.SIPDialog; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -18,10 +26,14 @@ import org.springframework.util.StringUtils; import javax.sip.*; +import javax.sip.address.SipURI; import javax.sip.header.CallIdHeader; +import javax.sip.header.ViaHeader; import javax.sip.header.WWWAuthenticateHeader; import javax.sip.message.Request; +import java.lang.reflect.Field; import java.text.ParseException; +import java.util.HashSet; import java.util.List; import java.util.UUID; @@ -38,17 +50,23 @@ private IRedisCatchStorage redisCatchStorage; @Autowired + private IMediaServerService mediaServerService; + + @Autowired private SipSubscribe sipSubscribe; + + @Autowired + private ZLMRTPServerFactory zlmrtpServerFactory; @Lazy @Autowired @Qualifier(value="tcpSipProvider") - private SipProvider tcpSipProvider; + private SipProviderImpl tcpSipProvider; @Lazy @Autowired @Qualifier(value="udpSipProvider") - private SipProvider udpSipProvider; + private SipProviderImpl udpSipProvider; @Override public boolean register(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) { @@ -57,13 +75,12 @@ @Override public boolean unregister(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) { - parentPlatform.setExpires("0"); ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId()); if (parentPlatformCatch != null) { parentPlatformCatch.setParentPlatform(parentPlatform); redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch); } - + parentPlatform.setExpires("0"); return register(parentPlatform, null, null, errorEvent, okEvent, false); } @@ -543,4 +560,62 @@ } return true; } + + @Override + public void streamByeCmd(ParentPlatform platform, String callId) { + if (platform == null) { + return; + } + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platform.getServerGBId(), null, null, callId); + if (sendRtpItem != null) { + String mediaServerId = sendRtpItem.getMediaServerId(); + MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); + if (mediaServerItem != null) { + mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc()); + zlmrtpServerFactory.closeRTPServer(mediaServerItem, sendRtpItem.getStreamId()); + } + byte[] dialogByteArray = sendRtpItem.getDialog(); + if (dialogByteArray != null) { + SIPDialog dialog = (SIPDialog) SerializeUtils.deSerialize(dialogByteArray); + SipStack sipStack = udpSipProvider.getSipStack(); + SIPDialog sipDialog = ((SipStackImpl) sipStack).putDialog(dialog); + if (dialog != sipDialog) { + dialog = sipDialog; + } else { + try { + dialog.setSipProvider(udpSipProvider); + Field sipStackField = SIPDialog.class.getDeclaredField("sipStack"); + sipStackField.setAccessible(true); + sipStackField.set(dialog, sipStack); + Field eventListenersField = SIPDialog.class.getDeclaredField("eventListeners"); + eventListenersField.setAccessible(true); + eventListenersField.set(dialog, new HashSet<>()); + + byte[] transactionByteArray = sendRtpItem.getTransaction(); + ClientTransaction clientTransaction = (ClientTransaction) SerializeUtils.deSerialize(transactionByteArray); + Request byeRequest = dialog.createRequest(Request.BYE); + SipURI byeURI = (SipURI) byeRequest.getRequestURI(); + SIPRequest request = (SIPRequest) clientTransaction.getRequest(); + byeURI.setHost(request.getRemoteAddress().getHostName()); + byeURI.setPort(request.getRemotePort()); + if ("TCP".equals(platform.getTransport())) { + clientTransaction = tcpSipProvider.getNewClientTransaction(byeRequest); + } else if ("UDP".equals(platform.getTransport())) { + clientTransaction = udpSipProvider.getNewClientTransaction(byeRequest); + } + dialog.sendRequest(clientTransaction); + } catch (SipException e) { + e.printStackTrace(); + } catch (ParseException e) { + e.printStackTrace(); + } catch (NoSuchFieldException e) { + e.printStackTrace(); + } catch (IllegalAccessException e) { + e.printStackTrace(); + } + + } + } + } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index d5bc99b..8556730 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -3,6 +3,7 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; @@ -22,6 +23,7 @@ import javax.sip.DialogState; import javax.sip.RequestEvent; import javax.sip.address.SipURI; +import javax.sip.header.CallIdHeader; import javax.sip.header.FromHeader; import javax.sip.header.HeaderAddress; import javax.sip.header.ToHeader; @@ -60,6 +62,9 @@ @Autowired private ZLMHttpHookSubscribe subscribe; + @Autowired + private DynamicTask dynamicTask; + /** * 澶勭悊 ACK璇锋眰 @@ -68,13 +73,16 @@ */ @Override public void process(RequestEvent evt) { - logger.info("ACK璇锋眰锛� {}", ((System.currentTimeMillis()))); Dialog dialog = evt.getDialog(); + CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME); if (dialog == null) return; if (dialog.getState()== DialogState.CONFIRMED) { String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser(); + logger.info("ACK璇锋眰锛� platformGbId->{}", platformGbId); + // 鍙栨秷璁剧疆鐨勮秴鏃朵换鍔� + dynamicTask.stop(callIdHeader.getCallId()); String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); - SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId); + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId, null, callIdHeader.getCallId()); String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; String deviceId = sendRtpItem.getDeviceId(); StreamInfo streamInfo = null; @@ -83,15 +91,12 @@ }else { streamInfo = redisCatchStorage.queryPlaybackByDevice(deviceId, channelId); } - System.out.println(JSON.toJSON(streamInfo)); if (streamInfo == null) { streamInfo = new StreamInfo(); streamInfo.setApp(sendRtpItem.getApp()); streamInfo.setStream(sendRtpItem.getStreamId()); } redisCatchStorage.updateSendRTPSever(sendRtpItem); - logger.info(platformGbId); - logger.info(channelId); Map<String, Object> param = new HashMap<>(); param.put("vhost","__defaultVhost__"); param.put("app",streamInfo.getApp()); @@ -100,42 +105,23 @@ param.put("dst_url",sendRtpItem.getIp()); param.put("dst_port", sendRtpItem.getPort()); param.put("is_udp", is_Udp); - // 璁惧鎺ㄦ祦鏌ヨ锛屾垚鍔熷悗鎵嶈兘杞帹 MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); -// if (zlmrtpServerFactory.isStreamReady(mediaInfo, streamInfo.getApp(), streamInfo.getStreamId())) { -// logger.info("宸茶幏鍙栬澶囨帹娴乕{}/{}]锛屽紑濮嬪悜涓婄骇鎺ㄦ祦[{}:{}]", -// streamInfo.getApp() ,streamInfo.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort()); -// zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); -// } else { -// // 瀵筯ook杩涜璁㈤槄 -// logger.info("绛夊緟璁惧鎺ㄦ祦[{}/{}].......", -// streamInfo.getApp(), streamInfo.getStreamId()); -// Timer timer = new Timer(); -// timer.schedule(new TimerTask() { -// @Override -// public void run() { -// logger.info("璁惧鎺ㄦ祦[{}/{}]瓒呮椂锛岀粓姝㈠悜涓婄骇鎺ㄦ祦", -// finalStreamInfo.getApp() , finalStreamInfo.getStreamId()); -// -// } -// }, 30*1000L); -// // 娣诲姞璁㈤槄 -// JSONObject subscribeKey = new JSONObject(); -// subscribeKey.put("app", "rtp"); -// subscribeKey.put("stream", streamInfo.getStreamId()); -// subscribeKey.put("mediaServerId", streamInfo.getMediaServerId()); -// subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_publish, subscribeKey, -// (MediaServerItem mediaServerItemInUse, JSONObject json) -> { -// logger.info("宸茶幏鍙栬澶囨帹娴乕{}/{}]锛屽紑濮嬪悜涓婄骇鎺ㄦ祦[{}:{}]", -// finalStreamInfo.getApp(), finalStreamInfo.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort()); -// timer.cancel(); -// zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); -// subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey); -// }); -// } - - + JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); + if (jsonObject.getInteger("code") != 0) { + logger.info("鐩戝惉娴佷互绛夊緟娴佷笂绾縶}/{}", streamInfo.getApp(), streamInfo.getStream()); + // 鐩戝惉娴佷笂绾� + // 娣诲姞璁㈤槄 + JSONObject subscribeKey = new JSONObject(); + subscribeKey.put("app", "rtp"); + subscribeKey.put("stream", streamInfo.getStream()); + subscribeKey.put("regist", true); + subscribeKey.put("schema", "rtmp"); + subscribeKey.put("mediaServerId", sendRtpItem.getMediaServerId()); + subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, + (MediaServerItem mediaServerItemInUse, JSONObject json)->{ + zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); + }); + } } } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java index deda783..2811c4f 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -4,6 +4,8 @@ import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction; +import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; @@ -13,6 +15,8 @@ import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; +import com.genersoft.iot.vmp.utils.SerializeUtils; +import gov.nist.javax.sip.stack.SIPDialog; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; @@ -21,6 +25,7 @@ import javax.sip.*; import javax.sip.address.SipURI; +import javax.sip.header.CallIdHeader; import javax.sip.header.FromHeader; import javax.sip.header.HeaderAddress; import javax.sip.header.ToHeader; @@ -56,6 +61,9 @@ @Autowired private SIPProcessorObserver sipProcessorObserver; + @Autowired + private VideoStreamSessionManager streamSession; + @Override public void afterPropertiesSet() throws Exception { // 娣诲姞娑堟伅澶勭悊鐨勮闃� @@ -71,11 +79,12 @@ try { responseAck(evt, Response.OK); Dialog dialog = evt.getDialog(); + CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME); if (dialog == null) return; if (dialog.getState().equals(DialogState.TERMINATED)) { String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser(); String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); - SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId); + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId, null, callIdHeader.getCallId()); logger.info("鏀跺埌bye, [{}/{}]", platformGbId, channelId); if (sendRtpItem != null){ String streamId = sendRtpItem.getStreamId(); @@ -87,35 +96,44 @@ logger.info("鍋滄鍚戜笂绾ф帹娴侊細" + streamId); MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param); - redisCatchStorage.deleteSendRTPServer(platformGbId, channelId); + redisCatchStorage.deleteSendRTPServer(platformGbId, channelId, callIdHeader.getCallId(), null); int totalReaderCount = zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId); - if (totalReaderCount == 0) { + if (totalReaderCount <= 0) { logger.info(streamId + "鏃犲叾瀹冭鐪嬭�咃紝閫氱煡璁惧鍋滄鎺ㄦ祦"); cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId, streamId); - }else if (totalReaderCount == -1){ - logger.warn(streamId + " 鏌ユ壘鍏跺畠瑙傜湅鑰呭け璐�"); } } // 鍙兘鏄澶囦富鍔ㄥ仠姝� Device device = storager.queryVideoDeviceByChannelId(platformGbId); if (device != null) { + storager.stopPlay(device.getDeviceId(), channelId); StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId); - if (sendRtpItem != null) { - if (sendRtpItem.isPlay()) { - if (streamInfo != null) { - redisCatchStorage.stopPlay(streamInfo); - } - }else { - if (streamInfo != null) { - redisCatchStorage.stopPlayback(streamInfo); - } - } - - storager.stopPlay(device.getDeviceId(), channelId); + if (streamInfo != null) { + redisCatchStorage.stopPlay(streamInfo); mediaServerService.closeRTPServer(device.getDeviceId(), channelId, streamInfo.getStream()); } + SsrcTransaction ssrcTransactionForPlay = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, "play", null); + if (ssrcTransactionForPlay != null){ + SIPDialog dialogForPlay = (SIPDialog) SerializeUtils.deSerialize(ssrcTransactionForPlay.getDialog()); + if (dialogForPlay.getCallId().equals(callIdHeader.getCallId())){ + // 閲婃斁ssrc + MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransactionForPlay.getMediaServerId()); + if (mediaServerItem != null) { + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransactionForPlay.getSsrc()); + } + streamSession.remove(device.getDeviceId(), channelId, ssrcTransactionForPlay.getStream()); + } + } + SsrcTransaction ssrcTransactionForPlayBack = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, callIdHeader.getCallId(), null); + if (ssrcTransactionForPlayBack != null) { + // 閲婃斁ssrc + MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransactionForPlayBack.getMediaServerId()); + if (mediaServerItem != null) { + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransactionForPlayBack.getSsrc()); + } + streamSession.remove(device.getDeviceId(), channelId, ssrcTransactionForPlayBack.getStream()); + } } - } } catch (SipException e) { e.printStackTrace(); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 9cf0d1b..44ae50b 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -3,6 +3,7 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; @@ -21,6 +22,7 @@ import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; +import com.genersoft.iot.vmp.utils.SerializeUtils; import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult; import gov.nist.javax.sdp.TimeDescriptionImpl; import gov.nist.javax.sdp.fields.TimeField; @@ -69,6 +71,9 @@ @Autowired private IRedisCatchStorage redisCatchStorage; + + @Autowired + private DynamicTask dynamicTask; @Autowired private SIPCommander cmder; @@ -261,11 +266,13 @@ } sendRtpItem.setCallId(callIdHeader.getCallId()); sendRtpItem.setPlay("Play".equals(sessionName)); + byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog()); + sendRtpItem.setDialog(dialogByteArray); + byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction()); + sendRtpItem.setTransaction(transactionByteArray); // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� redisCatchStorage.updateSendRTPSever(sendRtpItem); - Device finalDevice = device; - MediaServerItem finalMediaServerItem = mediaServerItem; Long finalStartTime = startTime; Long finalStopTime = stopTime; ZLMHttpHookSubscribe.Event hookEvent = (mediaServerItemInUSe, responseJSON)->{ @@ -293,7 +300,15 @@ content.append("f=\r\n"); try { + // 瓒呮椂鏈敹鍒癆ck搴旇鍥炲bye,褰撳墠绛夊緟鏃堕棿涓�10绉� + dynamicTask.startDelay(callIdHeader.getCallId(), ()->{ + logger.info("Ack 绛夊緟瓒呮椂"); + mediaServerService.releaseSsrc(mediaServerItemInUSe.getId(), ssrc); + // 鍥炲bye + cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId()); + }, 60); responseSdpAck(evt, content.toString(), platform); + } catch (SipException e) { e.printStackTrace(); } catch (InvalidArgumentException e) { @@ -324,6 +339,7 @@ if (result.getEvent() != null) { errorEvent.response(result.getEvent()); } + redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); try { responseAck(evt, Response.REQUEST_TIMEOUT); } catch (SipException e) { @@ -347,7 +363,9 @@ sendRtpItem.setStreamId(String.format("%s_%s", device.getDeviceId(), channelId)); } sendRtpItem.setPlay(false); - playService.play(mediaServerItem,device.getDeviceId(), channelId, hookEvent,errorEvent); + playService.play(mediaServerItem,device.getDeviceId(), channelId, hookEvent, errorEvent, ()->{ + redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); + }); }else { sendRtpItem.setStreamId(streamInfo.getStream()); hookEvent.response(mediaServerItem, null); @@ -369,6 +387,11 @@ // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� sendRtpItem.setStatus(1); + sendRtpItem.setCallId(callIdHeader.getCallId()); + byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog()); + sendRtpItem.setDialog(dialogByteArray); + byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction()); + sendRtpItem.setTransaction(transactionByteArray); redisCatchStorage.updateSendRTPSever(sendRtpItem); StringBuffer content = new StringBuffer(200); content.append("v=0\r\n"); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java index d9bfb56..737f752 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java @@ -158,6 +158,10 @@ device.setCharset("gb2312"); device.setDeviceId(deviceId); device.setFirsRegister(true); + }else { + if (device.getOnline() == 0) { + device.setFirsRegister(true); + } } device.setIp(received); device.setPort(rPort); @@ -187,7 +191,6 @@ if (serverTransaction.getDialog() != null) serverTransaction.getDialog().delete(); // 娉ㄥ唽鎴愬姛 // 淇濆瓨鍒皉edis - // 涓嬪彂catelog鏌ヨ鐩綍 if (registerFlag == 1 ) { logger.info("[{}] 娉ㄥ唽鎴愬姛! deviceId:" + device.getDeviceId(), requestAddress); publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_REGISTER); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java index a2c6cbf..a765b3a 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java @@ -27,9 +27,7 @@ import javax.sip.RequestEvent; import javax.sip.ServerTransaction; import javax.sip.SipException; -import javax.sip.header.CallIdHeader; import javax.sip.header.ExpiresHeader; -import javax.sip.header.Header; import javax.sip.header.ToHeader; import javax.sip.message.Request; import javax.sip.message.Response; @@ -139,18 +137,16 @@ if (subscribeInfo.getExpires() > 0) { if (redisCatchStorage.getSubscribe(key) != null) { - dynamicTask.stopCron(key); + dynamicTask.stop(key); } String interval = XmlUtil.getText(rootElement, "Interval"); // GPS涓婃姤鏃堕棿闂撮殧 dynamicTask.startCron(key, new GPSSubscribeTask(redisCatchStorage, sipCommanderForPlatform, storager, platformId, sn, key), Integer.parseInt(interval)); redisCatchStorage.updateSubscribe(key, subscribeInfo); }else if (subscribeInfo.getExpires() == 0) { - dynamicTask.stopCron(key); + dynamicTask.stop(key); redisCatchStorage.delSubscribe(key); } - - try { ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformId); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java index b6040aa..ffac1d0 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java @@ -85,19 +85,18 @@ redisCatchStorage.delPlatformRegisterInfo(callId); parentPlatform.setStatus("娉ㄥ唽".equals(action)); // 鍙栧洖Expires璁剧疆锛岄伩鍏嶆敞閿�杩囩▼涓缃负0 - ParentPlatform parentPlatformTmp = storager.queryParentPlatByServerGBId(platformGBId); - String expires = parentPlatformTmp.getExpires(); - parentPlatform.setExpires(expires); - parentPlatform.setId(parentPlatformTmp.getId()); + if (!parentPlatformCatch.getParentPlatform().getExpires().equals("0")) { + ParentPlatform parentPlatformTmp = storager.queryParentPlatByServerGBId(platformGBId); + String expires = parentPlatformTmp.getExpires(); + parentPlatform.setExpires(expires); + parentPlatform.setId(parentPlatformTmp.getId()); + redisCatchStorage.updatePlatformRegister(parentPlatform); + redisCatchStorage.updatePlatformKeepalive(parentPlatform); + parentPlatformCatch.setParentPlatform(parentPlatform); + redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch); + } storager.updateParentPlatformStatus(platformGBId, "娉ㄥ唽".equals(action)); - redisCatchStorage.updatePlatformRegister(parentPlatform); - - redisCatchStorage.updatePlatformKeepalive(parentPlatform); - - parentPlatformCatch.setParentPlatform(parentPlatform); - - redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch); } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 14705bc..62723ac 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -489,7 +489,7 @@ } String mediaServerId = json.getString("mediaServerId"); MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId); - if (userSetup.isAutoApplyPlay() && mediaInfo != null) { + if (userSetup.isAutoApplyPlay() && mediaInfo != null && mediaInfo.isRtpEnable()) { String app = json.getString("app"); String streamId = json.getString("stream"); if ("rtp".equals(app)) { @@ -499,28 +499,16 @@ String channelId = s[1]; Device device = redisCatchStorage.getDevice(deviceId); if (device != null) { - UUID uuid = UUID.randomUUID(); - SSRCInfo ssrcInfo; - String streamId2 = null; - if (mediaInfo.isRtpEnable()) { - streamId2 = String.format("%s_%s", device.getDeviceId(), channelId); - } - ssrcInfo = mediaServerService.openRTPServer(mediaInfo, streamId2); - cmder.playStreamCmd(mediaInfo, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> { - logger.info("鏀跺埌璁㈤槄娑堟伅锛� " + response.toJSONString()); - playService.onPublishHandlerForPlay(mediaServerItemInuse, response, deviceId, channelId, uuid.toString()); - }, null); + playService.play(mediaInfo,deviceId, channelId, null, null, null); } - } } - } JSONObject ret = new JSONObject(); ret.put("code", 0); ret.put("msg", "success"); - return new ResponseEntity<String>(ret.toString(),HttpStatus.OK); + return new ResponseEntity<>(ret.toString(),HttpStatus.OK); } /** diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java index 76bab9c..a0b7e75 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java @@ -205,7 +205,7 @@ /** * 璋冪敤zlm RESTful API 鈥斺�� startSendRtp */ - public Boolean startSendRtpStream(MediaServerItem mediaServerItem, Map<String, Object>param) { + public JSONObject startSendRtpStream(MediaServerItem mediaServerItem, Map<String, Object>param) { Boolean result = false; JSONObject jsonObject = zlmresTfulUtils.startSendRtp(mediaServerItem, param); if (jsonObject == null) { @@ -216,7 +216,7 @@ } else { logger.error("RTP鎺ㄦ祦澶辫触: " + jsonObject.getString("msg")); } - return result; + return jsonObject; } /** diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMStatusEventListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMStatusEventListener.java index 5b0741b..ca6fdfe 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMStatusEventListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMStatusEventListener.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.media.zlm.event; import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.service.IPlayService; import com.genersoft.iot.vmp.service.IStreamProxyService; import com.genersoft.iot.vmp.service.IStreamPushService; import org.slf4j.Logger; @@ -34,6 +35,9 @@ @Autowired private IMediaServerService mediaServerService; + @Autowired + private IPlayService playService; + private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @Async @@ -55,6 +59,6 @@ mediaServerService.zlmServerOffline(event.getMediaServerId()); streamProxyService.zlmServerOffline(event.getMediaServerId()); streamPushService.zlmServerOffline(event.getMediaServerId()); - // TODO 澶勭悊瀵瑰浗鏍囩殑褰卞搷 + playService.zlmServerOffline(event.getMediaServerId()); } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java b/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java index 8c12c78..00ec0dd 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java @@ -58,7 +58,7 @@ void removeCount(String mediaServerId); - void releaseSsrc(MediaServerItem mediaServerItem, String ssrc); + void releaseSsrc(String mediaServerItemId, String ssrc); void clearMediaServerForOnline(); diff --git a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java index 12bb8fa..80ededa 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java @@ -17,11 +17,13 @@ void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String uuid); - PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent); + PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent, Runnable timeoutCallback); MediaServerItem getNewMediaServerItem(Device device); void onPublishHandlerForDownload(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String toString); DeferredResult<ResponseEntity<String>> playBack(String deviceId, String channelId, String startTime, String endTime, PlayBackCallback errorCallBack); + + void zlmServerOffline(String mediaServerId); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java index 66407a1..675ed4e 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java @@ -52,11 +52,9 @@ return false; } logger.info("绉婚櫎鐩綍璁㈤槄: {}", device.getDeviceId()); - dynamicTask.stopCron(device.getDeviceId()); + dynamicTask.stop(device.getDeviceId()); device.setSubscribeCycleForCatalog(0); sipCommander.catalogSubscribe(device, null, null); - // 娓呯┖cseq璁℃暟 - return true; } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java index f226a37..e578165 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java @@ -167,13 +167,14 @@ if (mediaServerItem != null) { String streamId = String.format("%s_%s", deviceId, channelId); zlmrtpServerFactory.closeRTPServer(mediaServerItem, streamId); - releaseSsrc(mediaServerItem, ssrc); + releaseSsrc(mediaServerItem.getId(), ssrc); } streamSession.remove(deviceId, channelId, stream); } @Override - public void releaseSsrc(MediaServerItem mediaServerItem, String ssrc) { + public void releaseSsrc(String mediaServerItemId, String ssrc) { + MediaServerItem mediaServerItem = getOne(mediaServerItemId); if (mediaServerItem == null || ssrc == null) { return; } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index 778bd67..2df78b7 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -5,13 +5,13 @@ import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.UserSetup; -import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; @@ -37,8 +37,7 @@ import org.springframework.web.context.request.async.DeferredResult; import java.io.FileNotFoundException; -import java.util.Objects; -import java.util.UUID; +import java.util.*; @SuppressWarnings(value = {"rawtypes", "unchecked"}) @Service @@ -51,6 +50,9 @@ @Autowired private SIPCommander cmder; + + @Autowired + private SIPCommanderFroPlatform sipCommanderFroPlatform; @Autowired private IRedisCatchStorage redisCatchStorage; @@ -78,7 +80,9 @@ @Override - public PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId, ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent) { + public PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId, + ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent, + Runnable timeoutCallback) { PlayResult playResult = new PlayResult(); RequestMessage msg = new RequestMessage(); String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId; @@ -101,29 +105,10 @@ Device device = redisCatchStorage.getDevice(deviceId); StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); playResult.setDevice(device); - // 瓒呮椂澶勭悊 - result.onTimeout(()->{ - logger.warn(String.format("璁惧鐐规挱瓒呮椂锛宒eviceId锛�%s 锛宑hannelId锛�%s", deviceId, channelId)); - WVPResult wvpResult = new WVPResult(); - wvpResult.setCode(-1); - SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, streamInfo.getStream()); - if (dialog != null) { - wvpResult.setMsg("鏀舵祦瓒呮椂锛岃绋嶅�欓噸璇�"); - }else { - wvpResult.setMsg("鐐规挱瓒呮椂锛岃绋嶅�欓噸璇�"); - } - msg.setData(wvpResult); - // 鐐规挱瓒呮椂鍥炲BYE - cmder.streamByeCmd(device.getDeviceId(), channelId, streamInfo.getStream()); - // 閲婃斁rtpserver - mediaServerService.closeRTPServer(playResult.getDevice().getDeviceId(), channelId, streamInfo.getStream()); - // 鍥炲涔嬪墠鎵�鏈夌殑鐐规挱璇锋眰 - resultHolder.invokeAllResult(msg); - // TODO 閲婃斁ssrc - }); result.onCompletion(()->{ // 鐐规挱缁撴潫鏃惰皟鐢ㄦ埅鍥炬帴鍙� + // TODO 搴旇鍦ㄤ笂娴佹椂璋冪敤鏇村ソ锛岀粨鏉熶篃鍙兘鏄敊璇粨鏉� try { String classPath = ResourceUtils.getURL("classpath:").getPath(); // 鍏煎鎵撳寘涓簀ar鐨刢lass璺緞 @@ -161,31 +146,60 @@ if (mediaServerItem.isRtpEnable()) { streamId = String.format("%s_%s", device.getDeviceId(), channelId); } - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId); + // 瓒呮椂澶勭悊 + Timer timer = new Timer(); + timer.schedule(new TimerTask() { + @Override + public void run() { + logger.warn(String.format("璁惧鐐规挱瓒呮椂锛宒eviceId锛�%s 锛宑hannelId锛�%s", deviceId, channelId)); + if (timeoutCallback != null) { + timeoutCallback.run(); + } + WVPResult wvpResult = new WVPResult(); + wvpResult.setCode(-1); + SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, ssrcInfo.getStream()); + if (dialog != null) { + wvpResult.setMsg("鏀舵祦瓒呮椂锛岃绋嶅�欓噸璇�"); + // 鐐规挱瓒呮椂鍥炲BYE 鍚屾椂閲婃斁ssrc浠ュ強姝ゆ鐐规挱鐨勮祫婧� + cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream()); + }else { + wvpResult.setMsg("鐐规挱瓒呮椂锛岃绋嶅�欓噸璇�"); + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); + mediaServerService.closeRTPServer(deviceId, channelId, ssrcInfo.getStream()); + streamSession.remove(deviceId, channelId, ssrcInfo.getStream()); + } + + msg.setData(wvpResult); + + // 鍥炲涔嬪墠鎵�鏈夌殑鐐规挱璇锋眰 + resultHolder.invokeAllResult(msg); + } + }, userSetup.getPlayTimeout()); // 鍙戦�佺偣鎾秷鎭� cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInUse, JSONObject response) -> { logger.info("鏀跺埌璁㈤槄娑堟伅锛� " + response.toJSONString()); + timer.cancel(); onPublishHandlerForPlay(mediaServerItemInUse, response, deviceId, channelId, uuid); if (hookEvent != null) { hookEvent.response(mediaServerItem, response); } }, (event) -> { + timer.cancel(); WVPResult wvpResult = new WVPResult(); wvpResult.setCode(-1); // 鐐规挱杩斿洖sip閿欒 mediaServerService.closeRTPServer(playResult.getDevice().getDeviceId(), channelId, ssrcInfo.getStream()); // 閲婃斁ssrc - mediaServerService.releaseSsrc(mediaServerItem, ssrcInfo.getSsrc()); + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); streamSession.remove(deviceId, channelId, ssrcInfo.getStream()); + wvpResult.setMsg(String.format("鐐规挱澶辫触锛� 閿欒鐮侊細 %s, %s", event.statusCode, event.msg)); msg.setData(wvpResult); resultHolder.invokeAllResult(msg); if (errorEvent != null) { errorEvent.response(event); } - - }); } else { String streamId = streamInfo.getStream(); @@ -222,13 +236,41 @@ streamId2 = String.format("%s_%s", device.getDeviceId(), channelId); } SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId2); + // 瓒呮椂澶勭悊 + Timer timer = new Timer(); + timer.schedule(new TimerTask() { + @Override + public void run() { + logger.warn(String.format("璁惧鐐规挱瓒呮椂锛宒eviceId锛�%s 锛宑hannelId锛�%s", deviceId, channelId)); + if (timeoutCallback != null) { + timeoutCallback.run(); + } + WVPResult wvpResult = new WVPResult(); + wvpResult.setCode(-1); + SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, ssrcInfo.getStream()); + if (dialog != null) { + wvpResult.setMsg("鏀舵祦瓒呮椂锛岃绋嶅�欓噸璇�"); + // 鐐规挱瓒呮椂鍥炲BYE 鍚屾椂閲婃斁ssrc浠ュ強姝ゆ鐐规挱鐨勮祫婧� + cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream()); + }else { + wvpResult.setMsg("鐐规挱瓒呮椂锛岃绋嶅�欓噸璇�"); + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); + mediaServerService.closeRTPServer(deviceId, channelId, ssrcInfo.getStream()); + streamSession.remove(deviceId, channelId, ssrcInfo.getStream()); + } + + msg.setData(wvpResult); + // 鍥炲涔嬪墠鎵�鏈夌殑鐐规挱璇锋眰 + resultHolder.invokeAllResult(msg); + } + }, userSetup.getPlayTimeout()); cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> { logger.info("鏀跺埌璁㈤槄娑堟伅锛� " + response.toJSONString()); onPublishHandlerForPlay(mediaServerItemInuse, response, deviceId, channelId, uuid); }, (event) -> { mediaServerService.closeRTPServer(playResult.getDevice().getDeviceId(), channelId, ssrcInfo.getStream()); // 閲婃斁ssrc - mediaServerService.releaseSsrc(mediaServerItem, ssrcInfo.getSsrc()); + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); streamSession.remove(deviceId, channelId, ssrcInfo.getStream()); WVPResult wvpResult = new WVPResult(); wvpResult.setCode(-1); @@ -306,14 +348,33 @@ msg.setId(uuid); msg.setKey(key); PlayBackResult<RequestMessage> playBackResult = new PlayBackResult<>(); - result.onTimeout(()->{ - msg.setData("鍥炴斁瓒呮椂"); - playBackResult.setCode(-1); - playBackResult.setData(msg); - callback.call(playBackResult); - }); + + Timer timer = new Timer(); + timer.schedule(new TimerTask() { + @Override + public void run() { + logger.warn(String.format("璁惧鍥炴斁瓒呮椂锛宒eviceId锛�%s 锛宑hannelId锛�%s", deviceId, channelId)); + playBackResult.setCode(-1); + playBackResult.setData(msg); + callback.call(playBackResult); + SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, ssrcInfo.getStream()); + // 鐐规挱瓒呮椂鍥炲BYE 鍚屾椂閲婃斁ssrc浠ュ強姝ゆ鐐规挱鐨勮祫婧� + if (dialog != null) { + // 鐐规挱瓒呮椂鍥炲BYE 鍚屾椂閲婃斁ssrc浠ュ強姝ゆ鐐规挱鐨勮祫婧� + cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream()); + }else { + mediaServerService.releaseSsrc(newMediaServerItem.getId(), ssrcInfo.getSsrc()); + mediaServerService.closeRTPServer(deviceId, channelId, ssrcInfo.getStream()); + streamSession.remove(deviceId, channelId, ssrcInfo.getStream()); + } + cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream()); + // 鍥炲涔嬪墠鎵�鏈夌殑鐐规挱璇锋眰 + callback.call(playBackResult); + } + }, userSetup.getPlayTimeout()); cmder.playbackStreamCmd(newMediaServerItem, ssrcInfo, device, channelId, startTime, endTime, (MediaServerItem mediaServerItem, JSONObject response) -> { logger.info("鏀跺埌璁㈤槄娑堟伅锛� " + response.toJSONString()); + timer.cancel(); StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId); if (streamInfo == null) { logger.warn("璁惧鍥炴斁API璋冪敤澶辫触锛�"); @@ -331,6 +392,7 @@ playBackResult.setResponse(response); callback.call(playBackResult); }, event -> { + timer.cancel(); msg.setData(String.format("鍥炴斁澶辫触锛� 閿欒鐮侊細 %s, %s", event.statusCode, event.msg)); playBackResult.setCode(-1); playBackResult.setData(msg); @@ -370,4 +432,26 @@ return streamInfo; } + @Override + public void zlmServerOffline(String mediaServerId) { + // 澶勭悊姝e湪鍚戜笂鎺ㄦ祦鐨勪笂绾у钩鍙� + List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServer(null); + if (sendRtpItems.size() > 0) { + for (SendRtpItem sendRtpItem : sendRtpItems) { + if (sendRtpItem.getMediaServerId().equals(mediaServerId)) { + ParentPlatform platform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId()); + sipCommanderFroPlatform.streamByeCmd(platform, sendRtpItem.getCallId()); + } + } + } + // 澶勭悊姝e湪瑙傜湅鐨勫浗鏍囪澶� + List<SsrcTransaction> allSsrc = streamSession.getAllSsrc(); + if (allSsrc.size() > 0) { + for (SsrcTransaction ssrcTransaction : allSsrc) { + if(ssrcTransaction.getMediaServerId().equals(mediaServerId)) { + cmder.streamByeCmd(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getStream()); + } + } + } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java index a13dc29..39d37c5 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java @@ -413,12 +413,15 @@ } } - platformGbStreamMapper.batchAdd(streamPushItemListFroPlatform); - // 鍙戦�侀�氱煡 - for (String platformId : platformForEvent.keySet()) { - eventPublisher.catalogEventPublishForStream( - platformId, platformForEvent.get(platformId), CatalogEvent.ADD); + if (streamPushItemListFroPlatform.size() > 0) { + platformGbStreamMapper.batchAdd(streamPushItemListFroPlatform); + // 鍙戦�侀�氱煡 + for (String platformId : platformForEvent.keySet()) { + eventPublisher.catalogEventPublishForStream( + platformId, platformForEvent.get(platformId), CatalogEvent.ADD); + } } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index 1a93902..b0edc06 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -89,7 +89,7 @@ * @param channelId * @return sendRtpItem */ - SendRtpItem querySendRTPServer(String platformGbId, String channelId); + SendRtpItem querySendRTPServer(String platformGbId, String channelId, String streamId, String callId); List<SendRtpItem> querySendRTPServer(String platformGbId); @@ -98,7 +98,7 @@ * @param platformGbId * @param channelId */ - void deleteSendRTPServer(String platformGbId, String channelId); + void deleteSendRTPServer(String platformGbId, String channelId, String callId, String streamId); /** * 鏌ヨ鏌愪釜閫氶亾鏄惁瀛樺湪涓婄骇鐐规挱锛圧TP鎺ㄩ�侊級 diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java index d4cace4..2431699 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java @@ -135,6 +135,32 @@ "'${item.ipAddress}', ${item.port}, '${item.password}', ${item.PTZType}, ${item.status}, " + "'${item.streamId}', ${item.longitude}, ${item.latitude},'${item.createTime}', '${item.updateTime}')" + "</foreach> " + + "ON DUPLICATE KEY UPDATE " + + "updateTime=VALUES(updateTime), " + + "name=VALUES(name), " + + "manufacture=VALUES(manufacture), " + + "model=VALUES(model), " + + "owner=VALUES(owner), " + + "civilCode=VALUES(civilCode), " + + "block=VALUES(block), " + + "subCount=VALUES(subCount), " + + "address=VALUES(address), " + + "parental=VALUES(parental), " + + "parentId=VALUES(parentId), " + + "safetyWay=VALUES(safetyWay), " + + "registerWay=VALUES(registerWay), " + + "certNum=VALUES(certNum), " + + "certifiable=VALUES(certifiable), " + + "errCode=VALUES(errCode), " + + "secrecy=VALUES(secrecy), " + + "ipAddress=VALUES(ipAddress), " + + "port=VALUES(port), " + + "password=VALUES(password), " + + "PTZType=VALUES(PTZType), " + + "status=VALUES(status), " + + "streamId=VALUES(streamId), " + + "longitude=VALUES(longitude), " + + "latitude=VALUES(latitude)" + "</script>") int batchAdd(List<DeviceChannel> addChannels); @@ -211,4 +237,15 @@ " from device_channel\n" + " where deviceId = #{deviceId}") List<DeviceChannelTree> tree(String deviceId); + + @Delete(value = {" <script>" + + "DELETE " + + "from " + + "device_channel " + + "WHERE " + + "deviceId = #{deviceId} " + + " AND channelId NOT IN " + + "<foreach collection='channels' item='item' open='(' separator=',' close=')' > #{item.channelId}</foreach>" + + " </script>"}) + int cleanChannelsNotInList(String deviceId, List<DeviceChannel> channels); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index b5a3aba..0641348 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -18,6 +18,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.security.core.parameters.P; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; @@ -276,19 +277,32 @@ @Override public void updateSendRTPSever(SendRtpItem sendRtpItem) { - String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetup.getServerId() + "_" + sendRtpItem.getPlatformId() + "_" + sendRtpItem.getChannelId(); + String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetup.getServerId() + "_" + + sendRtpItem.getPlatformId() + "_" + sendRtpItem.getChannelId() + "_" + + sendRtpItem.getStreamId() + "_" + sendRtpItem.getCallId(); redis.set(key, sendRtpItem); } @Override - public SendRtpItem querySendRTPServer(String platformGbId, String channelId) { - String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetup.getServerId() + "_" + platformGbId + "_" + channelId; - return (SendRtpItem)redis.get(key); + public SendRtpItem querySendRTPServer(String platformGbId, String channelId, String streamId, String callId) { + if (platformGbId == null) platformGbId = "*"; + if (channelId == null) channelId = "*"; + if (streamId == null) streamId = "*"; + if (callId == null) callId = "*"; + String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetup.getServerId() + "_" + platformGbId + + "_" + channelId + "_" + streamId + "_" + callId; + List<Object> scan = redis.scan(key); + if (scan.size() > 0) { + return (SendRtpItem)redis.get((String)scan.get(0)); + }else { + return null; + } } @Override public List<SendRtpItem> querySendRTPServer(String platformGbId) { - String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetup.getServerId() + "_" + platformGbId + "_*"; + if (platformGbId == null) platformGbId = "*"; + String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetup.getServerId() + "_" + platformGbId + "_*" + "_*" + "_*"; List<Object> queryResult = redis.scan(key); List<SendRtpItem> result= new ArrayList<>(); @@ -306,10 +320,20 @@ * @param channelId */ @Override - public void deleteSendRTPServer(String platformGbId, String channelId) { - String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetup.getServerId() + "_" + platformGbId + "_" + channelId; - redis.del(key); + public void deleteSendRTPServer(String platformGbId, String channelId, String callId, String streamId) { + if (streamId == null) streamId = "*"; + if (callId == null) callId = "*"; + String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetup.getServerId() + "_" + platformGbId + + "_" + channelId + "_" + streamId + "_" + callId; + List<Object> scan = redis.scan(key); + if (scan.size() > 0) { + for (Object keyStr : scan) { + redis.del((String)keyStr); + } + } } + + /** * 鏌ヨ鏌愪釜閫氶亾鏄惁瀛樺湪涓婄骇鐐规挱锛圧TP鎺ㄩ�侊級 @@ -317,7 +341,7 @@ */ @Override public boolean isChannelSendingRTP(String channelId) { - String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetup.getServerId() + "_" + "*_" + channelId; + String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetup.getServerId() + "_" + "*_" + channelId + "*_" + "*_"; List<Object> RtpStreams = redis.scan(key); if (RtpStreams.size() > 0) { return true; diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java index 5e6410b..ce45088 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java @@ -284,7 +284,8 @@ logger.debug("[鐩綍鏌ヨ]鏀跺埌鐨勬暟鎹瓨鍦ㄩ噸澶嶏細 {}" , stringBuilder); } try { - int cleanChannelsResult = deviceChannelMapper.cleanChannelsByDeviceId(deviceId); +// int cleanChannelsResult = deviceChannelMapper.cleanChannelsByDeviceId(deviceId); + int cleanChannelsResult = deviceChannelMapper.cleanChannelsNotInList(deviceId, channels); int limitCount = 300; boolean result = cleanChannelsResult < 0; if (!result && channels.size() > 0) { diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java index 1d63909..178ad9b 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.vmanager.gb28181.device; import com.alibaba.fastjson.JSONObject; +import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.event.DeviceOffLineDetector; @@ -13,7 +14,6 @@ import com.genersoft.iot.vmp.vmanager.bean.DeviceChannelTree; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import com.github.pagehelper.PageInfo; -import com.github.xiaoymin.knife4j.annotations.ApiOperationSupport; import io.swagger.annotations.Api; import io.swagger.annotations.ApiImplicitParam; import io.swagger.annotations.ApiImplicitParams; @@ -56,6 +56,9 @@ @Autowired private IDeviceService deviceService; + + @Autowired + private DynamicTask dynamicTask; /** * 浣跨敤ID鏌ヨ鍥芥爣璁惧 @@ -209,6 +212,8 @@ boolean isSuccess = storager.delete(deviceId); if (isSuccess) { redisCatchStorage.clearCatchByDeviceId(deviceId); + // 鍋滄姝よ澶囩殑璁㈤槄鏇存柊 + dynamicTask.stop(deviceId); JSONObject json = new JSONObject(); json.put("deviceId", deviceId); return new ResponseEntity<>(json.toString(),HttpStatus.OK); diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/platform/PlatformController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/platform/PlatformController.java index caca64e..4f4d800 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/platform/PlatformController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/platform/PlatformController.java @@ -2,8 +2,9 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; -import com.genersoft.iot.vmp.gb28181.bean.CatalogData; -import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.common.VideoManagerConstants; +import com.genersoft.iot.vmp.conf.DynamicTask; +import com.genersoft.iot.vmp.conf.UserSetup; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; @@ -40,6 +41,9 @@ private final static Logger logger = LoggerFactory.getLogger(PlatformController.class); @Autowired + private UserSetup userSetup; + + @Autowired private IVideoManagerStorager storager; @Autowired @@ -50,6 +54,9 @@ @Autowired private SipConfig sipConfig; + + @Autowired + private DynamicTask dynamicTask; /** * 鑾峰彇鍥芥爣鏈嶅姟鐨勯厤缃� @@ -222,7 +229,7 @@ if (updateResult) { // 淇濆瓨鏃跺惎鐢ㄥ氨鍙戦�佹敞鍐� if (parentPlatform.isEnable()) { - if (parentPlatformOld.isStatus()) { + if (parentPlatformOld != null && parentPlatformOld.isStatus()) { commanderForPlatform.unregister(parentPlatformOld, null, null); try { Thread.sleep(500); @@ -287,8 +294,9 @@ boolean deleteResult = storager.deleteParentPlatform(parentPlatform); storager.delCatalogByPlatformId(parentPlatform.getServerGBId()); storager.delRelationByPlatformId(parentPlatform.getServerGBId()); - - + // 鍋滄鍙戦�佷綅缃闃呭畾鏃朵换鍔� + String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_MobilePosition_" + parentPlatform.getServerGBId(); + dynamicTask.stop(key); if (deleteResult) { return new ResponseEntity<>("success", HttpStatus.OK); } else { diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java index fd70690..4a22546 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java @@ -88,7 +88,7 @@ // 鑾峰彇鍙敤鐨剒lm Device device = storager.queryVideoDevice(deviceId); MediaServerItem newMediaServerItem = playService.getNewMediaServerItem(device); - PlayResult playResult = playService.play(newMediaServerItem, deviceId, channelId, null, null); + PlayResult playResult = playService.play(newMediaServerItem, deviceId, channelId, null, null, null); return playResult.getResult(); } diff --git a/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java b/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java index 853ec56..955b68b 100644 --- a/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java +++ b/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java @@ -150,7 +150,7 @@ JSONObject result = new JSONObject(); result.put("error", "channel[ " + code + " ] " + eventResult.msg); resultDeferredResult.setResult(result); - }); + }, null); return resultDeferredResult; } diff --git a/src/main/resources/all-application.yml b/src/main/resources/all-application.yml index f3f1fb3..e90f5a1 100644 --- a/src/main/resources/all-application.yml +++ b/src/main/resources/all-application.yml @@ -170,8 +170,6 @@ save-position-history: false # 鐐规挱绛夊緟瓒呮椂鏃堕棿,鍗曚綅锛氭绉� play-timeout: 3000 - # 绛夊緟闊宠棰戠紪鐮佷俊鎭啀杩斿洖锛� true锛� 鍙互鏍规嵁缂栫爜閫夋嫨鍚堥�傜殑鎾斁鍣紝false锛� 鍙互鏇村揩鐐规挱 - wait-track: false # 鏄惁寮�鍚帴鍙i壌鏉� interface-authentication: true # 鑷姩閰嶇疆redis 鍙互杩囨湡浜嬩欢 -- Gitblit v1.8.0