From 1d753b48c023090430c2931fd8e3a45a382eddf8 Mon Sep 17 00:00:00 2001 From: hotcoffie <35990065+hotcoffie@users.noreply.github.com> Date: 星期二, 17 五月 2022 11:53:42 +0800 Subject: [PATCH] Merge branch '648540858:wvp-28181-2.0' into wvp-28181-2.0 --- src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java | 96 ++++++++++++++++++++++++++++++++++++++++++++---- 1 files changed, 88 insertions(+), 8 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java b/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java index c9572ae..3b021de 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java @@ -1,41 +1,114 @@ package com.genersoft.iot.vmp.conf; +import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.stereotype.Component; +import java.time.Instant; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; /** * 鍔ㄦ�佸畾鏃朵换鍔� + * @author lin */ @Component public class DynamicTask { + private final Logger logger = LoggerFactory.getLogger(DynamicTask.class); + @Autowired private ThreadPoolTaskScheduler threadPoolTaskScheduler; - private Map<String, ScheduledFuture<?>> futureMap = new ConcurrentHashMap<>(); + private final Map<String, ScheduledFuture<?>> futureMap = new ConcurrentHashMap<>(); + private final Map<String, Runnable> runnableMap = new ConcurrentHashMap<>(); @Bean public ThreadPoolTaskScheduler threadPoolTaskScheduler() { - return new ThreadPoolTaskScheduler(); + ThreadPoolTaskScheduler schedulerPool = new ThreadPoolTaskScheduler(); + schedulerPool.setPoolSize(300); + schedulerPool.setWaitForTasksToCompleteOnShutdown(true); + schedulerPool.setAwaitTerminationSeconds(10); + return schedulerPool; + } - public String startCron(String key, Runnable task, int cycleForCatalog) { - stopCron(key); + /** + * 寰幆鎵ц鐨勪换鍔� + * @param key 浠诲姟ID + * @param task 浠诲姟 + * @param cycleForCatalog 闂撮殧 姣 + * @return + */ + public void startCron(String key, Runnable task, int cycleForCatalog) { + ScheduledFuture<?> future = futureMap.get(key); + if (future != null) { + if (future.isCancelled()) { + logger.debug("浠诲姟銆恵}銆戝凡瀛樺湪浣嗘槸鍏抽棴鐘舵�侊紒锛侊紒", key); + } else { + logger.debug("浠诲姟銆恵}銆戝凡瀛樺湪涓斿凡鍚姩锛侊紒锛�", key); + return; + } + } // scheduleWithFixedDelay 蹇呴』绛夊緟涓婁竴涓换鍔$粨鏉熸墠寮�濮嬭鏃秔eriod锛� cycleForCatalog琛ㄧず鎵ц鐨勯棿闅� - ScheduledFuture future = threadPoolTaskScheduler.scheduleWithFixedDelay(task, cycleForCatalog * 1000L); - futureMap.put(key, future); - return "startCron"; + future = threadPoolTaskScheduler.scheduleAtFixedRate(task, cycleForCatalog); + if (future != null){ + futureMap.put(key, future); + runnableMap.put(key, task); + logger.debug("浠诲姟銆恵}銆戝惎鍔ㄦ垚鍔燂紒锛侊紒", key); + }else { + logger.debug("浠诲姟銆恵}銆戝惎鍔ㄥけ璐ワ紒锛侊紒", key); + } } - public void stopCron(String key) { + /** + * 寤舵椂浠诲姟 + * @param key 浠诲姟ID + * @param task 浠诲姟 + * @param delay 寤舵椂 /姣 + * @return + */ + public void startDelay(String key, Runnable task, int delay) { + stop(key); + + // 鑾峰彇鎵ц鐨勬椂鍒� + Instant startInstant = Instant.now().plusMillis(TimeUnit.MILLISECONDS.toMillis(delay)); + + ScheduledFuture future = futureMap.get(key); + if (future != null) { + if (future.isCancelled()) { + logger.debug("浠诲姟銆恵}銆戝凡瀛樺湪浣嗘槸鍏抽棴鐘舵�侊紒锛侊紒", key); + } else { + logger.debug("浠诲姟銆恵}銆戝凡瀛樺湪涓斿凡鍚姩锛侊紒锛�", key); + return; + } + } + // scheduleWithFixedDelay 蹇呴』绛夊緟涓婁竴涓换鍔$粨鏉熸墠寮�濮嬭鏃秔eriod锛� cycleForCatalog琛ㄧず鎵ц鐨勯棿闅� + future = threadPoolTaskScheduler.schedule(task, startInstant); + if (future != null){ + futureMap.put(key, future); + runnableMap.put(key, task); + logger.debug("浠诲姟銆恵}銆戝惎鍔ㄦ垚鍔燂紒锛侊紒", key); + }else { + logger.debug("浠诲姟銆恵}銆戝惎鍔ㄥけ璐ワ紒锛侊紒", key); + } + } + + public void stop(String key) { if (futureMap.get(key) != null && !futureMap.get(key).isCancelled()) { futureMap.get(key).cancel(true); + Runnable runnable = runnableMap.get(key); + if (runnable instanceof ISubscribeTask) { + ISubscribeTask subscribeTask = (ISubscribeTask) runnable; + subscribeTask.stop(); + } } } @@ -43,4 +116,11 @@ return futureMap.get(key) != null; } + public Set<String> getAllKeys() { + return futureMap.keySet(); + } + + public Runnable get(String key) { + return runnableMap.get(key); + } } -- Gitblit v1.8.0