Java线程池工作原理

Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池,所以我们就要认识并弄懂线程池,以便于更好为业务场景服务。
(异步与并发)

一、线程池的好处

在开发过程中,合理地使用线程池大致有3个好处:

  • 第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  • 第三:提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。

但是,要做到合理利用线程池,必须对其实现原理了如指掌。

二、线程池工作流程

1)当提交一个新任务到线程池时,线程池判断corePoolSize线程池是否都在执行任务,如果有空闲线程,则创建一个新的工作线程来执行任务,直到当前线程数等于corePoolSize;

2)如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;

3)如果阻塞队列满了,那就创建新的线程执行当前任务,直到线程池中的线程数达到maxPoolSize,这时再有任务来,由饱和策略来处理提交的任务。

三、线程池参数

下面是ThreadPoolExecutor类的构造方法传参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public ThreadPoolExecutor(
int corePoolSize, #核心线程数
int maximumPoolSize, #最大线程数
long keepAliveTime, #达到最大线程数数时候,线程池的工作线程空闲后,保持存活的时间
TimeUnit unit, #keepAliveTime单位
BlockingQueue<Runnable> workQueue #阻塞队列
RejectedExecutionHandler handler #饱和策略

) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}

new ThreadPoolExecutor(6 ,12, 5L, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(10),new ThreadPoolExecutor.CallerRunsPolicy());

比如corePoolSize为6,maximumPoolSize为12,keepAliveTime为5秒,队列长度为10;提交任务数达到核心线程数6时候,新来的任务就会被放入LinkedBlockingQueue阻塞队列。
当队列任务数达到10个时候,就会创建新线程执行任务,直到达到maximumPoolSize数量12。如果还有新来的任务,由策略来处理提交的任务;如果没有,线程池空闲时候,超过5秒,创建的maximumPoolSize,就会被销毁。

四、阻塞队列

阻塞队列BlockingQueue接口,从jdk1.5开始,有四个实现类,jdk8亦是如此

  • ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按FIFO(先进先出)原则对元素进行排序。

  • LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue,静态工厂方法Executors.newFixedThreadPool()使用了这个队列。

  • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个列。

  • PriorityBlockingQueue:一个具有优先级的无限阻塞队列。

五、饱和策略

当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。在JDK 1.5中Java线程池框架提供了以下4种策略。

  • AbortPolicy:直接抛出异常。
  • CallerRunsPolicy:只用调用者所在线程来运行任务。
  • DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
  • DiscardPolicy:不处理,丢弃掉。

当然,也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化存储不能处理的任务。

六、向线程池提交任务

可以使用两个方法向线程池提交任务,分别为execute()和submit()方法

  • 1、execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功
  • 2、submit()方法用于提交需要返回值的任务。

线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit),在指定的时间内会等待任务执行,超时则抛出超时异常,等待时候会阻塞当前线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package com.mine.test;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class ThreadPoolTest {
public static void main(String[] args) {

// 新建线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(6, // 核心线程数
12, // 最大线程数
5L, // KeepAlive Time long
TimeUnit.SECONDS, // TimeOut
new LinkedBlockingQueue<Runnable>(10), // 阻塞队列
new ThreadPoolExecutor.CallerRunsPolicy()// 饱和策略
);

// 向线程池提交任务
// 1、execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功

threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
System.out.println(
"执行当前线程体,线程名: " + Thread.currentThread().getName() + "当前:" + System.currentTimeMillis());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});

Future<?> future = threadPoolExecutor.submit(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
System.out.println("执行当前线程体,线程名: " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});

/**
* 线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值,
* get()方法会阻塞当前线程直到任务完成, 而使用get(long timeout,TimeUnit
* unit),在指定的时间内会等待任务执行,超时则抛出超时异常,等待时候会阻塞当前线程
*/
try {
// 阻塞当前线程,直到任务完成
Object obj = future.get();
// 当前线程等待执行结果的返回值,延迟2s
Object obj2 = future.get(10, TimeUnit.MINUTES);

} catch (InterruptedException e) {
// 处理中断异常
// TODO: handle exception
} catch (ExecutionException e) {
// 处理执行异常
// TODO Auto-generated catch block
e.printStackTrace();
} catch (TimeoutException e) {
// 处理超时异常
e.printStackTrace();
} finally {// 关闭线程池
threadPoolExecutor.shutdown();
}
threadPoolExecutor.shutdown();
}
}

七、关闭线程池

ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),它们的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程

shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务。

shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务。

只要调用了这两个关闭方法中的任意一个,isShutdown方法就会返回true。当所有的任务
都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true。

因此,判断线程池所有线程是否执行完成,可以这样写:

1
2
3
4
5
6
7
while(true){//死循环
if(threadPool.isTerminated()) {
//执行自己的操作
break;//true停止
}
Thread.sleep(500);//休眠500继续循环
}

shutdown,只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程,等待执行任务的线程完成。

shutdownNow首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表。

八、线程池状态

线程池有五种运行状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
1RUNNING

(1) 状态说明:线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。
(2) 状态切换:线程池的初始化状态是RUNNING。线程池被一旦被创建,
就处于RUNNING状态,且线程池中的任务数为0

2SHUTDOWN

(1) 状态说明:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。
(2) 状态切换:调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN

3STOP

(1) 状态说明:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。
(2) 状态切换:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP

4TIDYING

(1) 状态说明:当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为
TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。
terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,
进行相应的处理;可以通过重载terminated()函数来实现。
(2) 状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务
也为空时,就会由 SHUTDOWN->TIDYING。当线程池在STOP状态下,线程池中执行的任务
为空时,就会由STOP -> TIDYING

5TERMINATED

(1) 状态说明:线程池彻底终止,就变成TERMINATED状态。
(2) 状态切换:线程池处在TIDYING状态时,执行完terminated()之后,
就会由 TIDYING -> TERMINATED