1. 什么是线程池

2. ThreadPoolExecutor源码

2.1 继承关系

  • Executor
    execute(Runnable): 用来执行传进去的任务

  • ExecutorService
    submit()、invokeAll()、invokeAny() 以及shutDown() 等

  • AbstractExecutorService
    基本实现了ExecutorService中生命的所有方法

  • ThreadPoolExecutor
    execute() : 向线程池提交一个任务,交给线程池去执行
    submit() : 用来向线程池提交任务,能够返回任务执行的结果,实际还是调用execute()方法,利用了Feature来获取任务的执行结果
    shutdown()和shutdownNow() : 用来关闭线程池

2.2 构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue)

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory)

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
  • corePoolSize:线程池中用来工作的核心的线程数量。
  • maximumPoolSize:最大线程数,线程池允许创建的最大线程数。
  • keepAliveTime:超出 corePoolSize 后创建的线程存活时间或者是所有线程最大存活时间,取决于配置。
  • unit:keepAliveTime 的时间单位。
  • workQueue:任务队列,是一个阻塞队列,当线程数已达到核心线程数,会将任务存储在阻塞队列中。
  • threadFactory :线程池内部创建线程所用的工厂。
  • handler:拒绝策略;当队列已满并且线程数量达到最大线程数量时,会调用该方法处理该任务。

3. 线程池的运行原理

  1. execute() 方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
     public void execute(Runnable command) {
    if (command == null)
    throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
    return;
    c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
    reject(command);
    else if (workerCountOf(recheck) == 0)
    addWorker(null, false);
    }
    else if (!addWorker(command, false))
    reject(command);
    }
  2. 线程池刚被创建出来
    刚创建出来的线程池中只有一个构造时传入的阻塞队列,默认是没有任何线程。

    图片

  3. 提交一个任务

    提交一个任务,当前线程池的线程数小于核心线程数。

    图片

    当任务执行完毕,线程不会退出,而是会阻塞队列中获取任务。

    图片

    注意:提交任务时候,即使阻塞队列中没有任务,只要线程池线程数小于核心线程数,那么依然会继续创建线程,而不是复用已有的线程。

  4. 再次提交一个任务(线程池中的线程数不再小于核心线程数)

    提交一个任务,当线程池里的线程数不再小于核心线程数,则会将任务放入阻塞队列。

    图片

  5. 再次提交一个任务(阻塞队列已满)

    提交一个任务,阻塞队列已满,但是线程池当前线程数小于最大线程数,则创建非核心线程来执行提交的任务。

    图片

    注意:即使队列中有任务,但是新创建的线程还是会优先处理这个提交的任务,而不是从队列中获取已有的任务,先提交的任务不一定先执行

  6. 再次提交一个任务(超过最大线程数)

    提交一个任务,当前线程池线程数大于线程池最大线程数,则执行拒绝策略。

    图片

    注意:默认四种

    1. AbortPolicy:丢弃任务,抛出运行时异常
    2. CallerRunsPolicy:由提交任务的线程来执行任务
    3. DiscardPolicy:丢弃这个任务,但是不抛异常
    4. DiscardOldestPolicy:从队列中剔除最先进入队列的任务,然后再次提交任务
  7. 总结

    图片

4. 线程池中线程实现复用的原理

线程池的核心功能就是实现了线程的重复利用。

4.1 内部类Worker继承关系

1
2
3
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable

4.2 内部类Worker的run()方法解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
 public void run() {
runWorker(this);
}
/**
* 线程执行完任务不会退出的原因,runWorker内部使用了while死循环,当第一个任务
* 执行完成之后,会不断地通过getTask方法获取任务,只要能获取到任务,就会调用run
* 方法,继续执行任务;
*
* 如果getTask获取不到的时候,就会调用finally中的processWorkerExit方法,
* 来将线程退出。
*/
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

注意:Worker因为继承了AQS,每次在执行任务之前都会调用Worker的lock方法,执行完任务之后,会调用unlock方法;好处是:通过Worker的加锁状态就能判断出当前线程是否正在执行任务。判断线程是否正在运行任务,调用Worker的tryLock方法,加锁成功说明当前线程没有加锁,也就没有在执行任务。在执行shutdown方法关闭线程池的时候,就用这种方式判断线程有没有在执行任务,如果没有正在运行任务,则尝试打断没有执行任务的线程。

5. 线程是如何获取任务以及如何实现超时

5.1 getTask方法实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
 private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// Are workers subject to culling?
// 获取到的任务是否可以超时退出?true 或者 当前线程数 大于 核心线程数
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
Runnable r = timed ?
// 允许超时,则会调用poll,传入keppAliveTime(构造线程池时传入的空闲时间),从阻塞keepAliveTime时间来获取任务,获取不到就返回null
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
// 一直阻塞获取任务,直到从队列中获取到任务
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

注意:如果将allowCoreThreadTimeOut设置为true,那么所有线程走到这个timed都是true,那么所有的线程,包括核心线程都可以做到超时退出。如果线程池需要将核心线程超时退出,可以通过设置allowCoreThreadTimeOut为true

5.2 总结

图片

6. 线程池的五种状态

  • RUNNING:线程池创建时就是这个状态,能够接收新任务,以及对已添加的任务进行处理。
  • SHUTDOWN:调用shutdown方法线程池就会转换成SHUTDOWN状态,此时线程池不再接收新任务,但能继续处理已添加的任务到队列中任务。
  • STOP:调用shutdownNow方法线程池就会转换成STOP状态,不接收新任务,也不能继续处理已添加的任务到队列中任务,并且会尝试中断正在处理的任务的线程。
  • TIDYING:SHUTDOWN 状态下,任务数为 0, 其他所有任务已终止,线程池会变为 TIDYING 状态。线程池在 SHUTDOWN 状态,任务队列为空且执行中任务为空,线程池会变为 TIDYING 状态。线程池在 STOP 状态,线程池中执行中任务为空时,线程池会变为 TIDYING 状态。
  • TERMINATED:线程池彻底终止。线程池在 TIDYING 状态执行完 terminated() 方法就会转变为 TERMINATED 状态。

5种状态的流转:

图片

7. 线程池的关闭

7.1 shutdown()方法

图片

注意:将线程池的状态修改为SHUTDOWN,然后尝试打断空闲线程。

7.2 shutdownNow()方法

图片

注意:此处将线程池的状态修改stop,然后尝试打断所有的线程,从阻塞队列中移除剩余的任务,这也是为什么shutdownNow不能执行剩余任务的原因。

8. 线程池的监控

  • getCompletedTaskCount:已经执行完成的任务数量
  • getLargestPoolSize:线程池里曾经创建过的最大的线程数量。这个主要是用来判断线程是否满过。
  • getActiveCount:获取正在执行任务的线程数据
  • getPoolSize:获取当前线程池中线程数量的大小

除了线程池提供的上述已经实现的方法,同时线程池也预留了很多扩展方法。比如在runWorker方法里面,在执行任务之前会回调beforeExecute方法,执行任务之后会回调afterExecute方法,而这些方法默认都是空实现,你可以自己继承ThreadPoolExecutor来扩展重写这些方法,来实现自己想要的功能。

9. Executrors构造线程池以及问题分析

9.1 newFixedThreadPool - 固定线程数量的线程池

图片

注意:由于使用了LinkedBlockingQueue,队列的容量默认是无限大,实际使用中出现任务过多时会导致内存溢出。

9.2 newSingleThreadExecutor - 单个线程数量的线程池

图片

9.3 newScheduledThreadPool - 带定时调度功能的线程池

图片

9.4 newCachedThreadPool - 接近无限大数量的线程池

图片

注意:由于核心线程数无限大,当任务过多的时候,会导致创建大量的线程,可能机器负载过高,导致服务宕机。

10. 如何合理自定义线程池

10.1 线程数

线程数的设置主要取决于业务是IO密集型还是CPU密集型。

CPU密集型指的是任务主要使用来进行大量的计算,没有什么导致线程阻塞。一般这种场景的线程数设置为CPU核心数+1。

IO密集型:当执行任务需要大量的io,比如磁盘io,网络io,可能会存在大量的阻塞,所以在IO密集型任务中使用多线程可以大大地加速任务的处理。一般线程数设置为 2*CPU核心数

java中用来获取CPU核心数的方法:

1
Runtime.getRuntime().availableProcessors();

10.2 线程工厂

一般建议自定义线程工厂,构建线程的时候设置线程的名称,这样就在查日志的时候就方便知道是哪个线程执行的代码。

10.3 有界队列

一般需要设置有界队列的大小,比如LinkedBlockingQueue在构造的时候就可以传入参数,来限制队列中任务数据的大小,这样就不会因为无限往队列中扔任务导致系统的oom。


本站由 卡卡龙 使用 Stellar 1.27.0 主题创建

本站访问量 次. 本文阅读量 次.