1. 什么是线程池
2. ThreadPoolExecutor源码
2.1 继承关系
Executor
execute(Runnable): 用来执行传进去的任务ExecutorService
submit()、invokeAll()、invokeAny() 以及shutDown() 等AbstractExecutorService
基本实现了ExecutorService中生命的所有方法ThreadPoolExecutor
execute() : 向线程池提交一个任务,交给线程池去执行
submit() : 用来向线程池提交任务,能够返回任务执行的结果,实际还是调用execute()方法,利用了Feature来获取任务的执行结果
shutdown()和shutdownNow() : 用来关闭线程池
2.2 构造方法
1 | public ThreadPoolExecutor(int corePoolSize, |
1 | public ThreadPoolExecutor(int corePoolSize, |
- corePoolSize:线程池中用来工作的核心的线程数量。
- maximumPoolSize:最大线程数,线程池允许创建的最大线程数。
- keepAliveTime:超出 corePoolSize 后创建的线程存活时间或者是所有线程最大存活时间,取决于配置。
- unit:keepAliveTime 的时间单位。
- workQueue:任务队列,是一个阻塞队列,当线程数已达到核心线程数,会将任务存储在阻塞队列中。
- threadFactory :线程池内部创建线程所用的工厂。
- handler:拒绝策略;当队列已满并且线程数量达到最大线程数量时,会调用该方法处理该任务。
3. 线程池的运行原理
execute() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}线程池刚被创建出来
刚创建出来的线程池中只有一个构造时传入的阻塞队列,默认是没有任何线程。提交一个任务
提交一个任务,当前线程池的线程数小于核心线程数。
当任务执行完毕,线程不会退出,而是会阻塞队列中获取任务。
注意:提交任务时候,即使阻塞队列中没有任务,只要线程池线程数小于核心线程数,那么依然会继续创建线程,而不是复用已有的线程。
再次提交一个任务(线程池中的线程数不再小于核心线程数)
提交一个任务,当线程池里的线程数不再小于核心线程数,则会将任务放入阻塞队列。
再次提交一个任务(阻塞队列已满)
提交一个任务,阻塞队列已满,但是线程池当前线程数小于最大线程数,则创建非核心线程来执行提交的任务。
注意:即使队列中有任务,但是新创建的线程还是会优先处理这个提交的任务,而不是从队列中获取已有的任务,先提交的任务不一定先执行
再次提交一个任务(超过最大线程数)
提交一个任务,当前线程池线程数大于线程池最大线程数,则执行拒绝策略。
注意:默认四种
- AbortPolicy:丢弃任务,抛出运行时异常
- CallerRunsPolicy:由提交任务的线程来执行任务
- DiscardPolicy:丢弃这个任务,但是不抛异常
- DiscardOldestPolicy:从队列中剔除最先进入队列的任务,然后再次提交任务
总结
4. 线程池中线程实现复用的原理
线程池的核心功能就是实现了线程的重复利用。
4.1 内部类Worker继承关系
1 | private final class Worker |
4.2 内部类Worker的run()方法解析
1 | public void run() { |
注意:Worker因为继承了AQS,每次在执行任务之前都会调用Worker的lock方法,执行完任务之后,会调用unlock方法;好处是:通过Worker的加锁状态就能判断出当前线程是否正在执行任务。判断线程是否正在运行任务,调用Worker的tryLock方法,加锁成功说明当前线程没有加锁,也就没有在执行任务。在执行shutdown方法关闭线程池的时候,就用这种方式判断线程有没有在执行任务,如果没有正在运行任务,则尝试打断没有执行任务的线程。
5. 线程是如何获取任务以及如何实现超时
5.1 getTask方法实现
1 | private Runnable getTask() { |
注意:如果将allowCoreThreadTimeOut设置为true,那么所有线程走到这个timed都是true,那么所有的线程,包括核心线程都可以做到超时退出。如果线程池需要将核心线程超时退出,可以通过设置allowCoreThreadTimeOut为true
5.2 总结
6. 线程池的五种状态
- RUNNING:线程池创建时就是这个状态,能够接收新任务,以及对已添加的任务进行处理。
- SHUTDOWN:调用shutdown方法线程池就会转换成SHUTDOWN状态,此时线程池不再接收新任务,但能继续处理已添加的任务到队列中任务。
- STOP:调用shutdownNow方法线程池就会转换成STOP状态,不接收新任务,也不能继续处理已添加的任务到队列中任务,并且会尝试中断正在处理的任务的线程。
- TIDYING:SHUTDOWN 状态下,任务数为 0, 其他所有任务已终止,线程池会变为 TIDYING 状态。线程池在 SHUTDOWN 状态,任务队列为空且执行中任务为空,线程池会变为 TIDYING 状态。线程池在 STOP 状态,线程池中执行中任务为空时,线程池会变为 TIDYING 状态。
- TERMINATED:线程池彻底终止。线程池在 TIDYING 状态执行完 terminated() 方法就会转变为 TERMINATED 状态。
5种状态的流转:
7. 线程池的关闭
7.1 shutdown()方法
注意:将线程池的状态修改为SHUTDOWN,然后尝试打断空闲线程。
7.2 shutdownNow()方法
注意:此处将线程池的状态修改stop,然后尝试打断所有的线程,从阻塞队列中移除剩余的任务,这也是为什么shutdownNow不能执行剩余任务的原因。
8. 线程池的监控
- getCompletedTaskCount:已经执行完成的任务数量
- getLargestPoolSize:线程池里曾经创建过的最大的线程数量。这个主要是用来判断线程是否满过。
- getActiveCount:获取正在执行任务的线程数据
- getPoolSize:获取当前线程池中线程数量的大小
除了线程池提供的上述已经实现的方法,同时线程池也预留了很多扩展方法。比如在runWorker方法里面,在执行任务之前会回调beforeExecute方法,执行任务之后会回调afterExecute方法,而这些方法默认都是空实现,你可以自己继承ThreadPoolExecutor来扩展重写这些方法,来实现自己想要的功能。
9. Executrors构造线程池以及问题分析
9.1 newFixedThreadPool - 固定线程数量的线程池
注意:由于使用了LinkedBlockingQueue,队列的容量默认是无限大,实际使用中出现任务过多时会导致内存溢出。
9.2 newSingleThreadExecutor - 单个线程数量的线程池
9.3 newScheduledThreadPool - 带定时调度功能的线程池
9.4 newCachedThreadPool - 接近无限大数量的线程池
注意:由于核心线程数无限大,当任务过多的时候,会导致创建大量的线程,可能机器负载过高,导致服务宕机。
10. 如何合理自定义线程池
10.1 线程数
线程数的设置主要取决于业务是IO密集型还是CPU密集型。
CPU密集型指的是任务主要使用来进行大量的计算,没有什么导致线程阻塞。一般这种场景的线程数设置为CPU核心数+1。
IO密集型:当执行任务需要大量的io,比如磁盘io,网络io,可能会存在大量的阻塞,所以在IO密集型任务中使用多线程可以大大地加速任务的处理。一般线程数设置为 2*CPU核心数
java中用来获取CPU核心数的方法:
1 | Runtime.getRuntime().availableProcessors(); |
10.2 线程工厂
一般建议自定义线程工厂,构建线程的时候设置线程的名称,这样就在查日志的时候就方便知道是哪个线程执行的代码。
10.3 有界队列
一般需要设置有界队列的大小,比如LinkedBlockingQueue在构造的时候就可以传入参数,来限制队列中任务数据的大小,这样就不会因为无限往队列中扔任务导致系统的oom。