Java 线程池

线程复用

解决执行大量异步任务时效率和资源配置问题

  • 效率:重用,减少开销,提升响应速度
  • 解耦:线程创建和执行分离,业务不绑定线程

基础接口


Executor是任务调度的基础抽象,将任务逻辑和如何运行分离
任务逻辑就是纯Runnable,何时执行、是否多线程等具体执行方式由Executor负责

Executor.java
1
2
3
public interface Executor {
void execute(Runnable command);
}

ExecutorService顾名思义进行服务化功能扩展,包含两大块:任务提交和服务关闭
提交submit同时支持Runnable和Callable

ExecutorService.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public interface ExecutorService extends Executor {
//进入关闭状态,不再接受新任务,但是之前提交的还会执行,执行完再关
void shutdown();

//尝试立刻中止运行,返回尚未执行到的任务
//通过发送interrupt的方式中止,如果线程不响应中断,那么就还会运行下去
List<Runnable> shutdownNow();

//是否进入关闭流程
boolean isShutdown();

//是否在关闭流程后,运行完以前提交的任务,即彻底完成关闭
boolean isTerminated();

//进入关闭流程后,等待一段时间让任务执行
//true已终止,false还没
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

//提交有返回值的任务
<T> Future<T> submit(Callable<T> task);
//提交无返回值的任务,成功运行结束就返回结果参数
<T> Future<T> submit(Runnable task, T result);
//提交无返回值任务,由于没有预设结果,获取到null值就代表成功
Future<?> submit(Runnable task);

//返回多个任务的所有结果,其他的取消
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
//超时未完成任务被取消,返回已完成的部分
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;

//返回多个任务中的任意结果,其他的取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

优雅关闭

1
2
3
4
5
pool.shutdown();
while(!pool.awaitTermination(1, TimeUnit.SECONDS)) {
//still working
}
//terminated

Executor的职责


执行一个任务不是简单的找一个线程来运行那么简单,涉及到执行策略

  • 任务在哪个线程执行,当前线程还是新开
  • 任务间的执行顺序
  • 任务的并发度
  • 任务队列的容量
  • 系统过载时哪些任务可以舍弃
  • 任务执行前后的额外逻辑
    所以通常并不手动创建Thread执行任务

线程池


单个线程执行所有任务,吞吐率太差。而每个任务一个线程,资源压力大。线程池成为了比价经济的选择。
线程池一般基于生产者消费者模型,阻塞队列实现,根据不同的策略提供ExecutorService规定的方法
Executors作为一个工厂,提供一些线程池的标准实现
基础的线程池,单个任务队列

Executors.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//线程的数量固定(最大最小相同,空闲存活时间0),不管任务量,线程创建后不会销毁
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}

//只包含单个线程
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}

//动态数量,没有可用线程就新建,线程1分钟不用就删除,增删没有限制,核心数量0即长期空闲不占空间
//实现采用了SynchronousQueue,并没有容量,效果相当于无法加入队列,同步等待取走,这样迫使线程池创建新线程
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}

实现延时或周期执行的线程池
Timer可以实现类似功能,但是线程池全面优于定时器

  • 每个Timer实例只开一个线程,只适合能够快速完成的任务,否则没执行完会影响后续,使用线程池可以开多个线程
  • Timer没有错误处理机制,一旦出错,线程中止,所有任务无法运行,而线程池可以增补新线程
Executors.java
1
2
3
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}

线程池可以提供灵活调度

  • schedule: 一次性延迟执行
  • scheduleWithFixedDelay:周期执行,前一个任务结束后间隔固定时间后发起新任务
  • scheduleAtFixedRate:周期执行,不管前一个任务是否完成,以固定速率发起新任务
ScheduledExecutorService.java
1
2
3
4
5
6
7
public interface ScheduledExecutorService extends ExecutorService {
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
}

Java8新增多队列版的工作窃取线程池,底层机制ForkJoinPool由Java7实现
多线程同时访问单一任务队列,需要同步。为了减少竞争,分拆成多个任务队列,每个线程专属,平常无需同步
分拆后,为了解决线程任务进度不同的问题,采用任务窃取方式,当一个线程运行完所有任务时,尝试从其他线程队列拿任务
使用双端队列进一步降低竞争,线程访问自己队列从头部拿,窃取其他线程队列从尾部拿

Executors.java
1
2
3
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool(parallelism,ForkJoinPool.defaultForkJoinWorkerThreadFactory,null, true);
}

上述工厂方法只是提供了默认配置下快捷的创建方法,底层实际依托了两个核心类ThreadPoolExecutorForkJoinPool
使用Executors创建ThreadPoolExecutor线程池有一定风险,容易资源耗尽

  • 线程最大数量固定时,任务队列长度不加限制, java.lang.OutOfMemoryError: heap space
  • 线程数量动态时,最大线程数目不加限制, java.lang.OutOfMemoryError: unable to create new native thread

自己创建能更好的控制线程池

1
2
3
4
5
6
7
8
9
10
11
public ThreadPoolExecutor(
int corePoolSize, //即使空闲也要维护的最少线程数
int maximumPoolSize, //最大线程数
long keepAliveTime, //线程超过最少数量时,空闲时存活时间
TimeUnit unit, //销毁时间单位
BlockingQueue<Runnable> workQueue, //待执行任务阻塞队列
ThreadFactory threadFactory, //创建线程的工厂
RejectedExecutionHandler handler //线程数限制或队列长度限制时的拒绝处理器
){
//...
}

构建ThreadPoolExecutor并没有参数控制队列的容量,需要队列自己控制,比如ArrayBlockingQueueLinkedListBlockingQueue都可以构造时传入容量参数

对于ForkJoinPool来说,无特殊需求一般不进行创建
类内静态初始化块会创建一个内部的线程池,这个线程池不能被手动关闭,一般异步任务都可以使用这个线程池进行

ForkJoinPool.java
1
2
3
public static ForkJoinPool commonPool() {
return common;
}

ThreadPoolExecutor实现细节


状态控制

线程池一共5状态

  • Shutdown对应shutdown方法,所有任务执行完之后进入Tidying状态
  • Stop对应shutdownNow方法, 没有执行中任务并且队列空后进入Tidying状态
1
2
3
           /  Shutdown \
Running -- -- Tidying -- Terminated
\ Stop /

采用一个原子整数表示线程池状态,好处是避免了多个状态变量,容易同步

ThreadPoolExecutor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//状态控制变量,初始状态是0个线程的运行态
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//高位前3位记录状态,剩下位数用来统计线程数量
private static final int COUNT_BITS = Integer.SIZE - 3;
//构造一个掩码,前三位是0,后面全是1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

//前3位最多容量8,实际存储了五个状态
private static final int RUNNING = -1 << COUNT_BITS;
//不接受新任务,已有任务执行完
private static final int SHUTDOWN = 0 << COUNT_BITS;
//不接受新任务,抛弃已有任务
private static final int STOP = 1 << COUNT_BITS;
//无任务运行,队列为空
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

提供了相应的操作方法,便于提取各部分的值

ThreadPoolExecutor.java
1
2
3
4
5
//使用掩码分别提取前三位状态和后面线程数
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
//将两个数字合并成状态
private static int ctlOf(int rs, int wc) { return rs | wc; }

工作线程


线程池底层使用Set维护线程

ThreadPoolExecutor.java
1
private final HashSet<Worker> workers = new HashSet<Worker>();

Worker是AQS子类,为线程提供锁功能,使用AQS内部状态表示加解锁
Worker同时也是个Runnable, 运行它的线程是Worker构造时基于自身创建出来的,即建立Worker时就同时建出了线程
成员包含线程,第一个任务,和完成任务数统计

ThreadPoolExecutor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
{
final Thread thread;
//工作线程的初始任务,可以是null
Runnable firstTask;
volatile long completedTasks;

Worker(Runnable firstTask) {
//初始化设置成中断状态
setState(-1);
this.firstTask = firstTask;
//使用工厂以自身为逻辑创建一个线程
this.thread = getThreadFactory().newThread(this);
}

//代理给外部逻辑实现
public void run() {
runWorker(this);
}

//判断是否上锁
protected boolean isHeldExclusively() {
return getState() != 0;
}

//上锁,状态置为1
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
//解锁,状态置0
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}

public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
//...
}

工作线程的职责是不断从队列中取任务执行

ThreadPoolExecutor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); //设置为正常工作状态
boolean completedAbruptly = true;
try {
//循环获取任务,直到无法获取再退出
while (task != null || (task = getTask()) != null) {
//执行前加锁
w.lock();
//响应中断
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
wt.interrupt();
try {
//预处理
beforeExecute(wt, task);
Throwable thrown = null;
try {
//执行
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//后处理
afterExecute(task, thrown);
}
} finally {
task = null;
//完成统计并解锁
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

//预处理和后处理是两个钩子方法,默认无动作,自定义线程池实现可以嵌入逻辑
protected void beforeExecute(Runnable r, Throwable t) { }
protected void afterExecute(Runnable r, Throwable t) { }

获取任务同样是死循环,获取任务的方式是阻塞
这个方法如果返回null,就会导致外围停止,效果是线程退出

ThreadPoolExecutor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

//死循环不断尝试获取
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

//已经进入关闭状态,无法获取任务,返回null外围即停止,效果是线程退出
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);
//已经超过核心线程数,开启定时功能
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//多余非核心线程空闲,返回线程退出
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
//超过核心线程就不能无限期阻塞等待,会导致过多资源,采取定时等待以便及时退出
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
//成功获取任务
if (r != null)
return r;
//定时时间内没任务,超时
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

自动调节机制

execute是核心方法,其他运行方式比如submit底层也是execute
初始时没有线程,即使核心线程也不会提前创建,而是任务驱动。

  • 如果没达到核心线程数
    即使线程池有空闲也新建线程执行
  • 如果在核心数量和最大数量之间
    只有队列满了才会再开新线程
  • 超过核心数量的线程空闲一定时间会被销毁
    可以配置allowCoreThreadTimeOut让核心线程空闲也可以进行销毁
  • 如果队列满了,线程也达到最大,就按拒绝策略处理
ThreadPoolExecutor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//小于核心,新建线程运行
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//队列没满,成功插入队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//插入后再次检查状态,如果不运行就从队列中删除,拒绝任务
if (! isRunning(recheck) && remove(command))
reject(command);
//还在运行但是没有线程,新建线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
//队列满,尝试新建线程执行,失败就拒绝任务
} else if (!addWorker(command, false))
reject(command);
}

增加线程逻辑

ThreadPoolExecutor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
private boolean addWorker(Runnable firstTask, boolean core) {
//循环体之前放置标签,方便跳出多层循环,Goto操作
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
//判断是否超过线程数量限制,区分是否为核心模式
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//CAS尝试增长线程数,成功后跳出循环
if (compareAndIncrementWorkerCount(c))
break retry;

//数字增长失败,重新读取状态
c = ctl.get();
//状态发生变化,重新外部循环判断
if (runStateOf(c) != rs)
continue retry;
//内部循环重新尝试增长数字
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//以这个任务新建一个Worker
w = new Worker(firstTask);
//Worker内部构造的线程就是运行Worker的线程
final Thread t = w.thread;
if (t != null) {
//尝试将线程加入线程池,敏感操作加锁保护
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//加入线程池的set中
workers.add(w);
int s = workers.size();
//更新最大线程数量历史
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//成功加入后线程启动
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

所以核心线程并非初始就全部启动,而是任务触发,如果想一次启动可以使用prestartAllCoreThreads方法

拒绝策略


ThreadPoolExecutor中包含四个RejectedExecutionHandler静态内部子类表示预定义的拒绝策略

  • AbortPolicy: 默认策略,直接抛出异常
  • DiscardPolicy:什么都不干,静默丢弃,即放弃最新的
  • DiscardOldestPolicy:移除一个最早的尚未执行的任务,即移除最老的腾出空间
  • CallerRunsPolicy:不给线程池,由当前调用线程执行

注意如果使用submit方法,会返回Future,如果线程池使用了丢弃策略,效果是不做任何处理,即永远没有完成,此时如果使用无等待时间限制的get方法就会导致一直阻塞

数值统计


线程池提供一组接口反映统计信息

  • 当前线程数:getPoolSize
  • 曾经达到的最大线程数:getLargestPoolSize
  • 当前活跃线程数:getActiveCount
  • 接受总任务数:getTaskCount
  • 完成任务数:getCompletedTaskCount

应用策略


  • 隔离线程环境,多个不同配置的线程池对应不同功能,防止功能间干扰
  • 计算密集型,线程数设为核数+1即可,多了也没资源
  • IO密集型,线程数可以设为核数2倍+1
  • 更精确算法为线程数量 = 核数/线程工作比 线程工作比 = 工作时间/工作时间+等待时间
  • 必要时可以使用优先级队列,提供插队功能
  • 使用有界队列,防止内存溢出影响外界应用,无界队列下,永远不会增加非核心线程,永远不会执行拒绝策略