| | |
| | | package com.genersoft.iot.vmp.conf; |
| | | |
| | | import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; |
| | | import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd.CatalogResponseMessageHandler; |
| | | import org.apache.commons.lang3.ObjectUtils; |
| | | import org.slf4j.Logger; |
| | | import org.slf4j.LoggerFactory; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.context.annotation.Bean; |
| | | import org.springframework.scheduling.annotation.Scheduled; |
| | | import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; |
| | | import org.springframework.stereotype.Component; |
| | | |
| | | import java.util.Date; |
| | | import javax.annotation.PostConstruct; |
| | | import java.time.Instant; |
| | | import java.util.Map; |
| | | import java.util.Set; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.ScheduledFuture; |
| | | import java.util.concurrent.TimeUnit; |
| | | |
| | | /** |
| | | * 动态定时任务 |
| | | * @author lin |
| | | */ |
| | | @Component |
| | | public class DynamicTask { |
| | | |
| | | private Logger logger = LoggerFactory.getLogger(DynamicTask.class); |
| | | private final Logger logger = LoggerFactory.getLogger(DynamicTask.class); |
| | | |
| | | @Autowired |
| | | private ThreadPoolTaskScheduler threadPoolTaskScheduler; |
| | | |
| | | private Map<String, ScheduledFuture<?>> futureMap = new ConcurrentHashMap<>(); |
| | | private Map<String, Runnable> runnableMap = new ConcurrentHashMap<>(); |
| | | private final Map<String, ScheduledFuture<?>> futureMap = new ConcurrentHashMap<>(); |
| | | private final Map<String, Runnable> runnableMap = new ConcurrentHashMap<>(); |
| | | |
| | | @Bean |
| | | public ThreadPoolTaskScheduler threadPoolTaskScheduler() { |
| | | ThreadPoolTaskScheduler schedulerPool = new ThreadPoolTaskScheduler(); |
| | | schedulerPool.setPoolSize(300); |
| | | schedulerPool.setWaitForTasksToCompleteOnShutdown(true); |
| | | schedulerPool.setAwaitTerminationSeconds(10); |
| | | return schedulerPool; |
| | | |
| | | @PostConstruct |
| | | public void DynamicTask() { |
| | | threadPoolTaskScheduler = new ThreadPoolTaskScheduler(); |
| | | threadPoolTaskScheduler.setPoolSize(300); |
| | | threadPoolTaskScheduler.setWaitForTasksToCompleteOnShutdown(true); |
| | | threadPoolTaskScheduler.setAwaitTerminationSeconds(10); |
| | | threadPoolTaskScheduler.initialize(); |
| | | } |
| | | |
| | | /** |
| | | * 循环执行的任务 |
| | | * @param key 任务ID |
| | | * @param task 任务 |
| | | * @param cycleForCatalog 间隔 |
| | | * @param cycleForCatalog 间隔 毫秒 |
| | | * @return |
| | | */ |
| | | public void startCron(String key, Runnable task, int cycleForCatalog) { |
| | | ScheduledFuture future = futureMap.get(key); |
| | | if(ObjectUtils.isEmpty(key)) { |
| | | return; |
| | | } |
| | | ScheduledFuture<?> future = futureMap.get(key); |
| | | if (future != null) { |
| | | if (future.isCancelled()) { |
| | | logger.info("任务【{}】已存在但是关闭状态!!!", key); |
| | | logger.debug("任务【{}】已存在但是关闭状态!!!", key); |
| | | } else { |
| | | logger.info("任务【{}】已存在且已启动!!!", key); |
| | | logger.debug("任务【{}】已存在且已启动!!!", key); |
| | | return; |
| | | } |
| | | } |
| | | // scheduleWithFixedDelay 必须等待上一个任务结束才开始计时period, cycleForCatalog表示执行的间隔 |
| | | future = threadPoolTaskScheduler.scheduleAtFixedRate(task, cycleForCatalog * 1000L); |
| | | future = threadPoolTaskScheduler.scheduleAtFixedRate(task, cycleForCatalog); |
| | | if (future != null){ |
| | | futureMap.put(key, future); |
| | | runnableMap.put(key, task); |
| | | logger.info("任务【{}】启动成功!!!", key); |
| | | logger.debug("任务【{}】启动成功!!!", key); |
| | | }else { |
| | | logger.info("任务【{}】启动失败!!!", key); |
| | | logger.debug("任务【{}】启动失败!!!", key); |
| | | } |
| | | } |
| | | |
| | |
| | | * @return |
| | | */ |
| | | public void startDelay(String key, Runnable task, int delay) { |
| | | if(ObjectUtils.isEmpty(key)) { |
| | | return; |
| | | } |
| | | stop(key); |
| | | Date starTime = new Date(System.currentTimeMillis() + delay); |
| | | |
| | | // 获取执行的时刻 |
| | | Instant startInstant = Instant.now().plusMillis(TimeUnit.MILLISECONDS.toMillis(delay)); |
| | | |
| | | ScheduledFuture future = futureMap.get(key); |
| | | if (future != null) { |
| | | if (future.isCancelled()) { |
| | | logger.info("任务【{}】已存在但是关闭状态!!!", key); |
| | | logger.debug("任务【{}】已存在但是关闭状态!!!", key); |
| | | } else { |
| | | logger.info("任务【{}】已存在且已启动!!!", key); |
| | | logger.debug("任务【{}】已存在且已启动!!!", key); |
| | | return; |
| | | } |
| | | } |
| | | // scheduleWithFixedDelay 必须等待上一个任务结束才开始计时period, cycleForCatalog表示执行的间隔 |
| | | future = threadPoolTaskScheduler.schedule(task, starTime); |
| | | future = threadPoolTaskScheduler.schedule(task, startInstant); |
| | | if (future != null){ |
| | | futureMap.put(key, future); |
| | | runnableMap.put(key, task); |
| | | logger.info("任务【{}】启动成功!!!", key); |
| | | logger.debug("任务【{}】启动成功!!!", key); |
| | | }else { |
| | | logger.info("任务【{}】启动失败!!!", key); |
| | | logger.debug("任务【{}】启动失败!!!", key); |
| | | } |
| | | } |
| | | |
| | | public void stop(String key) { |
| | | if (futureMap.get(key) != null && !futureMap.get(key).isCancelled()) { |
| | | futureMap.get(key).cancel(true); |
| | | Runnable runnable = runnableMap.get(key); |
| | | if (runnable instanceof ISubscribeTask) { |
| | | ISubscribeTask subscribeTask = (ISubscribeTask) runnable; |
| | | subscribeTask.stop(); |
| | | } |
| | | public boolean stop(String key) { |
| | | if(ObjectUtils.isEmpty(key)) { |
| | | return false; |
| | | } |
| | | boolean result = false; |
| | | if (!ObjectUtils.isEmpty(futureMap.get(key)) && !futureMap.get(key).isCancelled() && !futureMap.get(key).isDone()) { |
| | | result = futureMap.get(key).cancel(true); |
| | | futureMap.remove(key); |
| | | runnableMap.remove(key); |
| | | } |
| | | return result; |
| | | } |
| | | |
| | | public boolean contains(String key) { |
| | | if(ObjectUtils.isEmpty(key)) { |
| | | return false; |
| | | } |
| | | return futureMap.get(key) != null; |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | public Runnable get(String key) { |
| | | if(ObjectUtils.isEmpty(key)) { |
| | | return null; |
| | | } |
| | | return runnableMap.get(key); |
| | | } |
| | | |
| | | /** |
| | | * 每五分钟检查失效的任务,并移除 |
| | | */ |
| | | @Scheduled(cron="0 0/5 * * * ?") |
| | | public void execute(){ |
| | | if (futureMap.size() > 0) { |
| | | for (String key : futureMap.keySet()) { |
| | | if (futureMap.get(key).isDone() || futureMap.get(key).isCancelled()) { |
| | | futureMap.remove(key); |
| | | runnableMap.remove(key); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | public boolean isAlive(String key) { |
| | | return futureMap.get(key) != null && !futureMap.get(key).isDone() && !futureMap.get(key).isCancelled(); |
| | | } |
| | | } |