package com.xxl.job.core.executor; import com.xxl.job.core.biz.AdminBiz; import com.xxl.job.core.biz.client.AdminBizClient; import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.log.XxlJobFileAppender; import com.xxl.job.core.server.EmbedServer; import com.xxl.job.core.thread.JobLogFileCleanThread; import com.xxl.job.core.thread.JobThread; import com.xxl.job.core.thread.TriggerCallbackThread; import com.xxl.job.core.util.IpUtil; import com.xxl.job.core.util.NetUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; /** * Created by xuxueli on 2016/3/2 21:14. */ public class XxlJobExecutor { private static final Logger logger = LoggerFactory.getLogger(XxlJobExecutor.class); // ---------------------- param ---------------------- private String adminAddresses; private String accessToken; private String appname; private String address; private String ip; private int port; private String logPath; private int logRetentionDays; public void setAdminAddresses(String adminAddresses) { this.adminAddresses = adminAddresses; } public void setAccessToken(String accessToken) { this.accessToken = accessToken; } public void setAppname(String appname) { this.appname = appname; } public void setAddress(String address) { this.address = address; } public void setIp(String ip) { this.ip = ip; } public void setPort(int port) { this.port = port; } public void setLogPath(String logPath) { this.logPath = logPath; } public void setLogRetentionDays(int logRetentionDays) { this.logRetentionDays = logRetentionDays; } // ---------------------- start + stop ---------------------- public void start() throws Exception { // init logpath XxlJobFileAppender.initLogPath(logPath); // init invoker, admin-client initAdminBizList(adminAddresses, accessToken); // init JobLogFileCleanThread JobLogFileCleanThread.getInstance().start(logRetentionDays); // init TriggerCallbackThread TriggerCallbackThread.getInstance().start(); // init executor-server initEmbedServer(address, ip, port, appname, accessToken); } public void destroy(){ // destory executor-server stopEmbedServer(); // destory jobThreadRepository if (jobThreadRepository.size() > 0) { for (Map.Entry item: jobThreadRepository.entrySet()) { JobThread oldJobThread = removeJobThread(item.getKey(), "web container destroy and kill the job."); // wait for job thread push result to callback queue if (oldJobThread != null) { try { oldJobThread.join(); } catch (InterruptedException e) { logger.error(">>>>>>>>>>> xxl-job, JobThread destroy(join) error, jobId:{}", item.getKey(), e); } } } jobThreadRepository.clear(); } jobHandlerRepository.clear(); // destory JobLogFileCleanThread JobLogFileCleanThread.getInstance().toStop(); // destory TriggerCallbackThread TriggerCallbackThread.getInstance().toStop(); } // ---------------------- admin-client (rpc invoker) ---------------------- private static List adminBizList; private void initAdminBizList(String adminAddresses, String accessToken) throws Exception { if (adminAddresses!=null && adminAddresses.trim().length()>0) { for (String address: adminAddresses.trim().split(",")) { if (address!=null && address.trim().length()>0) { AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken); if (adminBizList == null) { adminBizList = new ArrayList(); } adminBizList.add(adminBiz); } } } } public static List getAdminBizList(){ return adminBizList; } // ---------------------- executor-server (rpc provider) ---------------------- private EmbedServer embedServer = null; private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception { // fill ip port port = port>0?port: NetUtil.findAvailablePort(9999); ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp(); // generate address if (address==null || address.trim().length()==0) { String ip_port_address = IpUtil.getIpPort(ip, port); // registry-address:default use address to registry , otherwise use ip:port if address is null address = "http://{ip_port}/".replace("{ip_port}", ip_port_address); } // accessToken if (accessToken==null || accessToken.trim().length()==0) { logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken."); } // start embedServer = new EmbedServer(); embedServer.start(address, port, appname, accessToken); } private void stopEmbedServer() { // stop provider factory if (embedServer != null) { try { embedServer.stop(); } catch (Exception e) { logger.error(e.getMessage(), e); } } } // ---------------------- job handler repository ---------------------- private static ConcurrentMap jobHandlerRepository = new ConcurrentHashMap(); public static IJobHandler loadJobHandler(String name){ return jobHandlerRepository.get(name); } public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){ logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler); return jobHandlerRepository.put(name, jobHandler); } // ---------------------- job thread repository ---------------------- private static ConcurrentMap jobThreadRepository = new ConcurrentHashMap(); public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){ JobThread newJobThread = new JobThread(jobId, handler); newJobThread.start(); logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler}); JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread); // putIfAbsent | oh my god, map's put method return the old value!!! if (oldJobThread != null) { oldJobThread.toStop(removeOldReason); oldJobThread.interrupt(); } return newJobThread; } public static JobThread removeJobThread(int jobId, String removeOldReason){ JobThread oldJobThread = jobThreadRepository.remove(jobId); if (oldJobThread != null) { oldJobThread.toStop(removeOldReason); oldJobThread.interrupt(); return oldJobThread; } return null; } public static JobThread loadJobThread(int jobId){ JobThread jobThread = jobThreadRepository.get(jobId); return jobThread; } }