JDK1. 7中的ThreadPoolExecutor
线程池,顾名思义一个线程的池子,池子里存放了非常多能够复用的线程,假设不用线程池相似的容器,每当我们须要创建新的线程时都须要去new Thread()
,用完之后就被回收了,线程的启动回收都须要用户态到内核态的交互,频繁的创建开销比較大。而且随着线程数的增加,会引起CPU频繁的上下文切换严重影响性能。
这时候线程池相似的容器就发挥出了作用。线程池里面的线程不但能够复用,而且还能够控制线程并发的数量,是CPU的性能达到最优。以下一点一点的分析一下我们使用线程池时线程池都做了些什么?
字段
先从线程池ThreadPoolExecutor中的字段開始(事实上JDK源代码中的英文注解已经说的非常明确了,可能比翻译过来或者我说的都明确):
/*AtomicInteger类型,用来标识线程池的状态,以及线程池里面线程的数量,初始值为1110 0000 0000 0000 0000 0000 0000 0000 前三位是线程池的状态,当中:000 SHUTDOWN 不接受新任务可是处理堵塞队列中的任务010 TIDYING 全部任务都被终止,工作线程为0001 STOP 不接受新任务也不处理堵塞队列中的任务而且中断全部线程池中正在运行的任务011 TERMINATED 不接受新任务也不处理堵塞队列中的任务而且中断全部线程池中正在运行的任务111 RUNNING 接受新的任务并处理堵塞队列中的任务注:关于堵塞队列。兴许会说,暂且理解为一个存放还未运行线程的队列就好。*/ private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; //在某些情况下用来存储任务,并将任务提供给线程池中的工作线程 private final BlockingQueueworkQueue; //用来对pooSize、corePoolSize、maximumPoolSize、runState、workers改动时候同步 private final ReentrantLock mainLock = new ReentrantLock(); //线程池中全部线程的集合。訪问和改动须要mainLock的配合 private final HashSet workers = new HashSet (); //用来支持waitTemination private final Condition termination = mainLock.newCondition(); //跟踪线程池中线程的最大值。详细的推測是为了矫正poolsize。訪问和改动须要配合mainLock private int largestPoolSize; //已完毕任务的数量,在任务处于Terminate状态时才更新。訪问和改动须要mainLock的配合 private long completedTaskCount; /* * 一下參数都是用户控制的。全部被声明为了Volatile类型的值,这样能够确保在多线程下。每一个 * 线程都能够获取到最新值。
*/
//线程工厂。用户能够自己定义,以便在想线程池创建线程时附加一些个人操作 private volatile ThreadFactory threadFactory; //当线程池处于shutdown或者处于饱和时运行的拒绝策略 private volatile RejectedExecutionHandler handler; //设置线程池中空暇线程等待多时毫秒被回收 private volatile long keepAliveTime; //指定线程池中的空暇线程是否一段时间被回收,false一直存活 private volatile boolean allowCoreThreadTimeOut; //核心线程池大小,若allowCoreThreadTimeOut被设置,全部空暇超时被回收的情况下会为0 private volatile int corePoolSize; //最大线程池大小。不得超过CAPACITY private volatile int maximumPoolSize; //默认的拒绝策略 private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
分析完上面的字段,以下我们着重说一下堵塞队列,拒绝策略。
堵塞队列 我们先临时了解他们是干什么的。详细可參考这篇文章 从源代码剖析了堵塞队列。
ArrayBlockingQueue:基于数组实现的一个堵塞队列,在创建ArrayBlockingQueue对象时必须制定容量大小。
而且能够指定公平性与非公平性,默认情况下为非公平的。即不保证等待时间最长的队列最优先能够訪问队列。(数目固定,也就是说当线程池中提交任务的速度大于处理速度时。堵塞队列非常快饱和,此时非常easy造成被提交的任务被抛弃的情况)
LinkedBlockingQueue:基于链表实现的一个堵塞队列,在创建LinkedBlockingQueue对象时假设不指定容量大小,则默认大小为Integer.MAX_VALUE。
PriorityBlockingQueue:以上2种队列都是先进先出队列,而PriorityBlockingQueue却不是。它会依照元素的优先级对元素进行排序,依照优先级顺序出队,每次出队的元素都是优先级最高的元素。
注意。此堵塞队列为无界堵塞队列。即容量没有上限(通过源代码就能够知道,它没有容器满的信号标志),前面2种都是有界队列。
DelayQueue:基于PriorityQueue。一种延时堵塞队列。DelayQueue中的元素仅仅有当其指定的延迟时间到了。才干够从队列中获取到该元素。DelayQueue也是一个无界队列,因此往队列中插入数据的操作(生产者)永远不会被堵塞,而仅仅有获取数据的操作(消费者)才会被堵塞。
拒绝策略 同堵塞队列一样。我们先有一个概念上的理解。
- ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
- ThreadPoolExecutor.DiscardPolicy:也是丢弃任务。可是不抛出异常。
- ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后又一次尝试运行任务(反复此过程)
- ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
构造函数
以下我们開始从ThreadPoolExecutor源代码上来分析原理。
我们常常使用线程池的方式就是new 一个ThreadPoolExecutor对象。通过调用该对象的submit
或者execute
函数,把我们的任务交给线程池。先看ThreadPoolExecutor的构造函数
//间接调用最后一个构造函数。採用默认的拒绝策略AbortPolicy和默认的线程工厂ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue)//间接调用最后一个构造函数,採用默认的默认的线程工厂ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue , RejectedExecutionHandler)//间接调用最后一个构造函数,採用默认的拒绝策略AbortPolicyThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue , ThreadFactory)//前面三个分别调用了最后一个ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue , ThreadFactory, RejectedExecutionHandler)//最后一个构造函数的详细实现public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {//參数合法性检验,核心线程数目、最大线程数目、线程空暇回收时间不得小于0,最大线程池不得小于核心线程数数目 if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0) throw new IllegalArgumentException();//參数合法性检验。堵塞队列。线程工厂,决绝策略不得为空 if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize;//核心线程数大小 this.maximumPoolSize = maximumPoolSize;//最大线程数目 this.workQueue = workQueue;//堵塞队列 this.keepAliveTime = unit.toNanos(keepAliveTime);//空暇回收时间 this.threadFactory = threadFactory;//线程工厂 this.handler = handler;//拒绝策略 }
详细函数
看完构造函数看一下向线程池提交线程的两个函数submit
和execute
public Future
> submit(Runnable task) public <T> Future<T> submit(Runnable task, T result) public <T> Future<T> submit(Callable<T> task) //三个构造函数均是返回一个Future。关于Future内容还有非常多,临时我们仅仅须要理解。我们通过Future能够获取到提交线程的运行结果和抛出的异常,能够监视线程的运行。以 //public Future<?
> submit(Runnable task) //为例简介一下。
//sumit函数是在ThreadPoolExecute的父类AbstractExecutorService实现的,终于还是调用的子类的execute方法public Future submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFutureftask = newTaskFor(task, null); execute(ftask); return ftask;}
首先大家要把这三个函数搞明确。差点儿每一个函数都会用到这几个函数:
runStateOf(int c)
是通过与的方式,在clt字段中获取到clt的前三位,也就是线程池的状态标识。 workerCountOf(int c)
是通过与的方式。在clt字段中获取到clt的后29位,也就是线程池中的线程数量。
ctlOf(int rs, int wc)
是通过或的方式,将改动后的线程池状态rs和线程池中线程数量打包成clt。 isRunning(int c)
SHUTDOWN的状态是0左移29为得到的,比他大的均是线程池停止或销毁状态 private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; } private static boolean isRunning(int c) { return c < SHUTDOWN; }
以下着重看一下execute方法。个人感觉,1.7的execute方法要比1.6的该方法清楚非常多。以下的(1)相应着代码中的(1)。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get();//AtomicInteger //(1) if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } //(2) if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command))//(21) reject(command); else if (workerCountOf(recheck) == 0)//(22) addWorker(null, false);//为什么是false }//(3) else if (!addWorker(command, false)) reject(command); }
(1)首先查看了当前线程池中的线程数量是否小于我们指定的核心线程池的数目,假设是就尝试新建一个线程,把command作为他的第一个任务,并把他们增加到线程池中。
可是我们在推断了线程池的数量合法后,调用addWorker(command, true)
把线程增加到线程池中时,是多线程并发的,可能会导致增加失败。假设增加成功。则直接返回,若假如失败,则又一次获取clt。由于此时clt必发生了变化,否则不会失败,继续往下运行(2)。
isRunning(c)
推断假设线程池还在运行。那我们就尝试把当前的command增加到堵塞队列中。 增加的过程也是并发的,也可能会出现失败。假设失败在继续运行(3)。增加堵塞队列成功后我们要又一次在检查一遍,防止在增加的过程中线程时关闭了或者线程池中没有线程了。全部由于空暇时间超过了我们指定的alivetime被回收了。假设是线程池已经不再是RUNNING状态,则用我们的拒绝策略去丢弃它(21)。
假设是线程池没有了线程,那我们新建一个空线程。让他去堵塞队列中去获取任务运行(22)。
(3)假设上面的两步都没有运行成功,那我们此时就须要使用我们指定的最大线程池,来处理它,可是此时也是可能失败的,可能有多个线程运行么。假设失败。就用拒绝策略丢弃该线程。
整个的流程大概是就上面的三部,以下我们详细分析一下里面用到的函数
private boolean addWorker(Runnable firstTask, boolean core)final void reject(Runnable command)public boolean remove(Runnable task)
一个一个分析。
addWorker
好长,好唬人。大部分内容直接写在了凝视中。
private boolean addWorker(Runnable firstTask, boolean core) { //(1)循环CAS操作,将线程池中的线程数+1. retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); //core true代表是往核心线程池中增加线程 false代表往最大线程池中增加线程 //线程数超标,不能再增加了,直接返回 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //CAS改动clt的值+1。在线程池中为将要增加的线程流出空间,成功退出cas循环,失败继续 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl //假设线程池的状态发生了变化回到retry外层循环 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } //(2)新建线程。并增加到线程池workers中。 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //对workers操作要通过加锁来实现 final ReentrantLock mainLock = this.mainLock; w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { //细化锁的力度。防止临界区过大,浪费时间 mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); int rs = runStateOf(c); //推断线程池的状态 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { //推断增加的任务状态,假设已经開始丢出异常 if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); //将新建的线程增加到线程池中 workers.add(w); int s = workers.size(); //修正largestPoolSize的值 if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } //线程增加线程池成功,则开启新创建的线程 if (workerAdded) { t.start();//(3) workerStarted = true; } } } finally { //线程增加线程池失败或者线程start失败,则须要调用addWorkerFailed函数,假设增加成功则须要移除,并回复clt的值 if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
可能会有人问,当时我也困惑。为什么把线程池中线程数目增加和增加到works中两部拆开。为什么不在+1之后马上运行新建线程并增加到works中的操作。我认为是放到一块可阅读性比較差,而且(2)代码块中的try-catch-finnaly会变的不好控制。
//比較简单,就是调用了我们指定的拒绝策略去丢弃当前的任务 final void reject(Runnable command) { handler.rejectedExecution(command, this); }
//在堵塞队列中移除task任务。并尝试改动线程池的状态 public boolean remove(Runnable task) { boolean removed = workQueue.remove(task); tryTerminate(); // In case SHUTDOWN and now empty return removed; }
Worker
关于线程池是怎么复用当中的线程,这些都跟ThreadPoolExecutor中的静态类Worker有关。addWorker
代码的(3)处,正式调用了Worker的run方法启动的线程。
以下我们分析一下Worker类。
继承自AQS。具有锁的功能,实现了Runable接口。具有线程的功能。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
Worker的主要字段就以下三个,代码也比較简单。
//线程池中正真运行的线程。通过我们指定的线程工厂创建而来 final Thread thread; //线程包装的任务。
thread 在run时主要调用了该任务的run方法
Runnable firstTask; //记录当前线程完毕的任务数 volatile long completedTasks;
Worker的构造函数
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker(写的非常清楚) this.firstTask = firstTask; //利用我们指定的线程工厂创建一个线程,注意,參数是this,也就是在运行thread.run时,正真运行的是我们Woker类的run方法 this.thread = getThreadFactory().newThread(this); }
我们看一下Worker的run方法你会发现,run有调用了ThreadPoolExecutor的runWorker()
方法。调来调去,好头疼。事实上并没有那么复杂,worker就这样了。我们再回到ThreadPoolExecutor看一下他的runWorker
都是干了什么。
public void run() { runWorker(this); }
ThreadPoolExecutor
的runworker
源代码和凝视解析
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); //线程池处于stop状态或者当前线程被中断时,线程池状态是stop状态。可是当前线程没有中断,则发出中断请求 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //開始运行任务前的Hook,相似回调函数 beforeExecute(wt, task); Throwable thrown = null; try { //运行任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //任务运行后的Hook,相似回调函数 afterExecute(task, thrown); } } finally { //运行完毕后task重置,completedTasks计数器++,解锁 task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { //线程空暇达到我们设定的值时,Worker退出销毁。 processWorkerExit(w, completedAbruptly); } }
runWorker函数中最重要的是getTask(),他不断的从堵塞队列中取任务交给线程运行。以下分析一下
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); //假设线程池处于shutdown状态,而且队列为空,或者线程池处于stop或者terminate状态。在线程池数量-1。返回null,回收线程 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } //标识当前线程在空暇时,是否应该超时回收 boolean timed; for (;;) { int wc = workerCountOf(c); //假设allowCoreThreadTimeOut 为ture或者当前线程数量大于核心线程池数目。则须要超时回收 timed = allowCoreThreadTimeOut || wc > corePoolSize; //(1) //假设线程数目小于最大线程数目,且不同意超时回收或者未超时,则跳出循环,继续去堵塞队列中取任务(2) if (wc <= maximumPoolSize && ! (timedOut && timed)) break; //假设上面if没有成立,则当前线程数-1,返回null,回收该线程 if (compareAndDecrementWorkerCount(c)) return null; //假设上面if没有成立,则CAS改动ctl失败。重读,cas循环又一次尝试改动 c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } (2) try { //假设同意空暇回收。则调用堵塞队列的poll。否则take,一直等到队列中有可取任务 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); //取到任务。返回任务,否则超时timedOut = true;进入下一个循环,而且在(1)处会不成立。进而进入到cas改动ctl的程序中 if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
堵塞队列
在之前的函数execute和上面的函数中都提到了 workQueue这个字段,以下我们看一下堵塞队列的详细实现。 并分析一下offer。take和poll三个函数的详细源代码。
以LinkedBlockingQueue
为列子。
先看一下基本字段
//静态类链表节点 static class Node{ E item; Node next; Node(E x) { item = x; } } //队列的容量 private final int capacity; //队列中当前节点的数目 private final AtomicInteger count = new AtomicInteger(0); //队列头结点 private transient Node head; //队列尾节点 private transient Node last; //take, poll 函数持有的锁 private final ReentrantLock takeLock = new ReentrantLock(); //队列空了后。等待条件 private final Condition notEmpty = takeLock.newCondition(); //put, offer持有的锁 private final ReentrantLock putLock = new ReentrantLock(); //队列满了后。等待条件 private final Condition notFull = putLock.newCondition();
字段都非常好理解,以下看一下详细的函数
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; //队列已满,直接返回 if (count.get() == capacity) return false; int c = -1; Nodenode = new Node(e); //offer和poll。take的锁分离 final ReentrantLock putLock = this.putLock; putLock.lock(); try { //继续推断,防止并发导致队列中节点数量变化 if (count.get() < capacity) { enqueue(node); //返回的是旧值,不注意会掉坑里 c = count.getAndIncrement(); //队列未满,通知等待线程能够继续入队列了 if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } // 假设c==0。怎么回事?一開始假设是个空队列,就会是这种值,要注意的是。上边的c返回的是旧值。 if (c == 0) signalNotEmpty(); return c >= 0; } private void enqueue(Node node) { //此时持有putLock。线程安全 last = last.next = node; }
public E poll(long timeout, TimeUnit unit) throws InterruptedException { E x = null; int c = -1; long nanos = unit.toNanos(timeout); final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; //可中断锁 takeLock.lockInterruptibly(); try { //不停的循环,不是一次等待timeout而是分多次,提高获取到线程的概率 while (count.get() == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } x = dequeue(); c = count.getAndDecrement(); //通知等待notEmpty条件的线程 if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { //与poll(int。int)不同的是不会超时返回,会一直堵塞 while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
我们假设指定了线程池中的线程超时回收,那么线程池中的线程在获取任务时就会在获取不到指定的时间后。超时回收,调用的是poll(int,int)。假设没有指定调用的则是take()会一直堵塞到队列中有新的任务到来。
当中与JDK1.6中的相比,有了非常大的变化1.7大量运用了移位。而且将线程池中的状态和线程数量打包在了一起,在思路上也更加清楚了。只是,整体的流程还是一样的,知识在详细细节的实现上有所不同。