线程池主要解决两个问题:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static final int COUNT_BITS = Integer.SIZE - 3;private static final int CAPACITY = (1 << COUNT_BITS) - 1;// runState is stored in the high-order bitsprivate static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctlprivate static int runStateOf(int c) { return c & ~CAPACITY; }private static int workerCountOf(int c) { return c & CAPACITY; }private static int ctlOf(int rs, int wc) { return rs | wc; }
public void execute(Runnable command) {//1. 如果任务为null,则NPEif (command == null)throw new NullPointerException();//2. 获取当前线程池的状态 + 线程个数的变量的组合值int c = ctl.get();// 3. 当前线程池个数是否小于corePoolSize,如果小于则开启新线程执行if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}//4. 如果线程池处于running状态,则添加任务到阻塞队列if (isRunning(c) && workQueue.offer(command)) {// 4.1 二次检查int recheck = ctl.get();//4.2 如果当前线程池不是running,则从队列中删除任务,并执行拒绝策略if (! isRunning(recheck) && remove(command))reject(command);//4.3 否则如果当前线程池为空,则添加一个线程else if (workerCountOf(recheck) == 0)addWorker(null, false);}//5. 如果队列满,则新增线程,新增失败则执行拒绝策略else if (!addWorker(command, false))reject(command);}
addWorker方法为新增线程。
Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}
首先设置Worker的状态为-1,这是为了避免当前Worker在调用runWorker方法前被中断(当其他线程调用了线程池的shutdownNow时,如果Worker状态>=0则会中断该线程)
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {www.hack95.com();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}
private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly) // If abrupt, then workerCount wasn't adjusteddecrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 统计线程完成任务个数,并从工作线程中删除当前workercompletedTaskCount += w.completedTasks;workers.remove(w);} finally {mainLock.unlock();}//尝试设置当前线程池状态为TERMINATED,如果当前是SHUTDOWN状态并且工作队列为空//或者当前是stop状态,当前线程池没有活动线程tryTerminate();int c = ctl.get();//如果当前线程个数小于核销个数,则增加if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && ! workQueue.isEmpty())min = 1;if (workerCountOf(c) >= min)return; // replacement not needed}addWorker(null, false);}}
调用shutdown方法后,线程池就不会再接受新的任务了,但是工作队列里面的任务还是要执行的。该方法会立刻返回,并不等待队列任务完成再返回
public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();advanceRunState(SHUTDOWN);interruptIdleWorkers();onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();}tryTerminate();}
线程池巧妙地使用一个Integer类型的原子变量来记录线程池状态和线程池中的线程个数。通过线程池状态来控制任务的执行,每个Worker线程可以处理多个任务。线程池通过线程的复用减少了线程创建和销毁的开销。