package com.xxl.job.core.thread; import com.xxl.job.core.biz.model.HandleCallbackParam; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.TriggerParam; import com.xxl.job.core.context.XxlJobContext; import com.xxl.job.core.context.XxlJobHelper; import com.xxl.job.core.executor.XxlJobExecutor; import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.log.XxlJobFileAppender; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.PrintWriter; import java.io.StringWriter; import java.util.Collections; import java.util.Date; import java.util.HashSet; import java.util.Set; import java.util.concurrent.*; /** * handler thread * @author xuxueli 2016-1-16 19:52:47 */ public class JobThread extends Thread{ private static Logger logger = LoggerFactory.getLogger(JobThread.class); private int jobId; private IJobHandler handler; private LinkedBlockingQueue triggerQueue; private Set triggerLogIdSet; // avoid repeat trigger for the same TRIGGER_LOG_ID private volatile boolean toStop = false; private String stopReason; private boolean running = false; // if running job private int idleTimes = 0; // idel times public JobThread(int jobId, IJobHandler handler) { this.jobId = jobId; this.handler = handler; this.triggerQueue = new LinkedBlockingQueue(); this.triggerLogIdSet = Collections.synchronizedSet(new HashSet()); } public IJobHandler getHandler() { return handler; } /** * new trigger to queue * * @param triggerParam * @return */ public ReturnT pushTriggerQueue(TriggerParam triggerParam) { // avoid repeat if (triggerLogIdSet.contains(triggerParam.getLogId())) { logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId()); return new ReturnT(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId()); } triggerLogIdSet.add(triggerParam.getLogId()); triggerQueue.add(triggerParam); return ReturnT.SUCCESS; } /** * kill job thread * * @param stopReason */ public void toStop(String stopReason) { /** * Thread.interrupt只支持终止线程的阻塞状态(wait、join、sleep), * 在阻塞出抛出InterruptedException异常,但是并不会终止运行的线程本身; * 所以需要注意,此处彻底销毁本线程,需要通过共享变量方式; */ this.toStop = true; this.stopReason = stopReason; } /** * is running job * @return */ public boolean isRunningOrHasQueue() { return running || triggerQueue.size()>0; } @Override public void run() { // init try { handler.init(); } catch (Throwable e) { logger.error(e.getMessage(), e); } // execute while(!toStop){ running = false; idleTimes++; TriggerParam triggerParam = null; try { // to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout) triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS); if (triggerParam!=null) { running = true; idleTimes = 0; triggerLogIdSet.remove(triggerParam.getLogId()); // log filename, like "logPath/yyyy-MM-dd/9999.log" String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId()); XxlJobContext xxlJobContext = new XxlJobContext( triggerParam.getJobId(), triggerParam.getExecutorParams(), logFileName, triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal()); // init job context XxlJobContext.setXxlJobContext(xxlJobContext); // execute XxlJobHelper.log("
----------- xxl-job job execute start -----------
----------- Param:" + xxlJobContext.getJobParam()); if (triggerParam.getExecutorTimeout() > 0) { // limit timeout Thread futureThread = null; try { FutureTask futureTask = new FutureTask(new Callable() { @Override public Boolean call() throws Exception { // init job context XxlJobContext.setXxlJobContext(xxlJobContext); handler.execute(); return true; } }); futureThread = new Thread(futureTask); futureThread.start(); Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS); } catch (TimeoutException e) { XxlJobHelper.log("
----------- xxl-job job execute timeout"); XxlJobHelper.log(e); // handle result XxlJobHelper.handleTimeout("job execute timeout "); } finally { futureThread.interrupt(); } } else { // just execute handler.execute(); } // valid execute handle data if (XxlJobContext.getXxlJobContext().getHandleCode() <= 0) { XxlJobHelper.handleFail("job handle result lost."); } else { String tempHandleMsg = XxlJobContext.getXxlJobContext().getHandleMsg(); tempHandleMsg = (tempHandleMsg!=null&&tempHandleMsg.length()>50000) ?tempHandleMsg.substring(0, 50000).concat("...") :tempHandleMsg; XxlJobContext.getXxlJobContext().setHandleMsg(tempHandleMsg); } XxlJobHelper.log("
----------- xxl-job job execute end(finish) -----------
----------- Result: handleCode=" + XxlJobContext.getXxlJobContext().getHandleCode() + ", handleMsg = " + XxlJobContext.getXxlJobContext().getHandleMsg() ); } else { if (idleTimes > 30) { if(triggerQueue.size() == 0) { // avoid concurrent trigger causes jobId-lost XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit."); } } } } catch (Throwable e) { if (toStop) { XxlJobHelper.log("
----------- JobThread toStop, stopReason:" + stopReason); } // handle result StringWriter stringWriter = new StringWriter(); e.printStackTrace(new PrintWriter(stringWriter)); String errorMsg = stringWriter.toString(); XxlJobHelper.handleFail(errorMsg); XxlJobHelper.log("
----------- JobThread Exception:" + errorMsg + "
----------- xxl-job job execute end(error) -----------"); } finally { if(triggerParam != null) { // callback handler info if (!toStop) { // commonm TriggerCallbackThread.pushCallBack(new HandleCallbackParam( triggerParam.getLogId(), triggerParam.getLogDateTime(), XxlJobContext.getXxlJobContext().getHandleCode(), XxlJobContext.getXxlJobContext().getHandleMsg() ) ); } else { // is killed TriggerCallbackThread.pushCallBack(new HandleCallbackParam( triggerParam.getLogId(), triggerParam.getLogDateTime(), XxlJobContext.HANDLE_COCE_FAIL, stopReason + " [job running, killed]" ) ); } } } } // callback trigger request in queue while(triggerQueue !=null && triggerQueue.size()>0){ TriggerParam triggerParam = triggerQueue.poll(); if (triggerParam!=null) { // is killed TriggerCallbackThread.pushCallBack(new HandleCallbackParam( triggerParam.getLogId(), triggerParam.getLogDateTime(), XxlJobContext.HANDLE_COCE_FAIL, stopReason + " [job not executed, in the job queue, killed.]") ); } } // destroy try { handler.destroy(); } catch (Throwable e) { logger.error(e.getMessage(), e); } logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread()); } }