线程复用
解决执行大量异步任务时效率和资源配置问题
效率:重用,减少开销,提升响应速度
解耦:线程创建和执行分离,业务不绑定线程
基础接口
Executor
是任务调度的基础抽象,将任务逻辑和如何运行分离
任务逻辑就是纯Runnable
,何时执行、是否多线程等具体执行方式由Executor
负责
Executor.java 1 2 3 public interface Executor { void execute (Runnable command) ; }
ExecutorService
顾名思义进行服务化功能扩展,包含两大块:任务提交和服务关闭
提交submit同时支持Runnable和Callable
ExecutorService.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public interface ExecutorService extends Executor { void shutdown () ; List<Runnable> shutdownNow () ; boolean isShutdown () ; boolean isTerminated () ; boolean awaitTermination (long timeout, TimeUnit unit) throws InterruptedException ; <T> Future<T> submit (Callable<T> task) ; <T> Future<T> submit (Runnable task, T result) ; Future<?> submit(Runnable task); <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny (Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException ; <T> T invokeAny (Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException ; }
优雅关闭
1 2 3 4 5 pool.shutdown(); while (!pool.awaitTermination(1 , TimeUnit.SECONDS)) { }
Executor的职责
执行一个任务不是简单的找一个线程来运行那么简单,涉及到执行策略
任务在哪个线程执行,当前线程还是新开
任务间的执行顺序
任务的并发度
任务队列的容量
系统过载时哪些任务可以舍弃
任务执行前后的额外逻辑
所以通常并不手动创建Thread
执行任务
线程池
单个线程执行所有任务,吞吐率太差。而每个任务一个线程,资源压力大。线程池成为了比价经济的选择。
线程池一般基于生产者消费者模型,阻塞队列实现,根据不同的策略提供ExecutorService
规定的方法
Executors
作为一个工厂,提供一些线程池的标准实现
基础的线程池,单个任务队列
Executors.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public static ExecutorService newFixedThreadPool (int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads,0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newSingleThreadExecutor () { return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1 , 1 ,0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newCachedThreadPool () { return new ThreadPoolExecutor(0 , Integer.MAX_VALUE,60L , TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
实现延时或周期执行的线程池
Timer
可以实现类似功能,但是线程池全面优于定时器
每个Timer实例只开一个线程,只适合能够快速完成的任务,否则没执行完会影响后续,使用线程池可以开多个线程
Timer没有错误处理机制,一旦出错,线程中止,所有任务无法运行,而线程池可以增补新线程
Executors.java 1 2 3 public static ScheduledExecutorService newScheduledThreadPool (int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
线程池可以提供灵活调度
schedule: 一次性延迟执行
scheduleWithFixedDelay:周期执行,前一个任务结束后间隔固定时间后发起新任务
scheduleAtFixedRate:周期执行,不管前一个任务是否完成,以固定速率发起新任务
ScheduledExecutorService.java 1 2 3 4 5 6 7 public interface ScheduledExecutorService extends ExecutorService { public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit); public <V> ScheduledFuture<V> schedule (Callable<V> callable, long delay, TimeUnit unit) ; public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit); public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); }
Java8新增多队列版的工作窃取线程池,底层机制ForkJoinPool由Java7实现
多线程同时访问单一任务队列,需要同步。为了减少竞争,分拆成多个任务队列,每个线程专属,平常无需同步
分拆后,为了解决线程任务进度不同的问题,采用任务窃取方式,当一个线程运行完所有任务时,尝试从其他线程队列拿任务
使用双端队列进一步降低竞争,线程访问自己队列从头部拿,窃取其他线程队列从尾部拿
Executors.java 1 2 3 public static ExecutorService newWorkStealingPool (int parallelism) { return new ForkJoinPool(parallelism,ForkJoinPool.defaultForkJoinWorkerThreadFactory,null , true ); }
上述工厂方法只是提供了默认配置下快捷的创建方法,底层实际依托了两个核心类ThreadPoolExecutor
和ForkJoinPool
使用Executors
创建ThreadPoolExecutor
线程池有一定风险,容易资源耗尽
线程最大数量固定时,任务队列长度不加限制, java.lang.OutOfMemoryError: heap space
线程数量动态时,最大线程数目不加限制, java.lang.OutOfMemoryError: unable to create new native thread
自己创建能更好的控制线程池
1 2 3 4 5 6 7 8 9 10 11 public ThreadPoolExecutor ( int corePoolSize, //即使空闲也要维护的最少线程数int maximumPoolSize, //最大线程数long keepAliveTime, //线程超过最少数量时,空闲时存活时间TimeUnit unit, //销毁时间单位 BlockingQueue<Runnable> workQueue, //待执行任务阻塞队列 ThreadFactory threadFactory, //创建线程的工厂 RejectedExecutionHandler handler //线程数限制或队列长度限制时的拒绝处理器 ) { }
构建ThreadPoolExecutor
并没有参数控制队列的容量,需要队列自己控制,比如ArrayBlockingQueue
和LinkedListBlockingQueue
都可以构造时传入容量参数
对于ForkJoinPool
来说,无特殊需求一般不进行创建
类内静态初始化块会创建一个内部的线程池,这个线程池不能被手动关闭,一般异步任务都可以使用这个线程池进行
ForkJoinPool.java 1 2 3 public static ForkJoinPool commonPool () { return common; }
ThreadPoolExecutor实现细节
状态控制
线程池一共5状态
Shutdown对应shutdown方法,所有任务执行完之后进入Tidying状态
Stop对应shutdownNow方法, 没有执行中任务并且队列空后进入Tidying状态
1 2 3 / Shutdown \ Running -- -- Tidying -- Terminated \ Stop /
采用一个原子整数表示线程池状态,好处是避免了多个状态变量,容易同步
ThreadPoolExecutor.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0 ));private static final int COUNT_BITS = Integer.SIZE - 3 ;private static final int CAPACITY = (1 << COUNT_BITS) - 1 ;private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;
提供了相应的操作方法,便于提取各部分的值
ThreadPoolExecutor.java 1 2 3 4 5 private static int runStateOf (int c) { return c & ~CAPACITY; }private static int workerCountOf (int c) { return c & CAPACITY; }private static int ctlOf (int rs, int wc) { return rs | wc; }
工作线程
线程池底层使用Set维护线程
ThreadPoolExecutor.java 1 private final HashSet<Worker> workers = new HashSet<Worker>();
Worker是AQS
子类,为线程提供锁功能,使用AQS
内部状态表示加解锁
Worker同时也是个Runnable, 运行它的线程是Worker构造时基于自身创建出来的,即建立Worker时就同时建出了线程
成员包含线程,第一个任务,和完成任务数统计
ThreadPoolExecutor.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 private final class Worker extends AbstractQueuedSynchronizer implements Runnable { final Thread thread; Runnable firstTask; volatile long completedTasks; Worker(Runnable firstTask) { setState(-1 ); this .firstTask = firstTask; this .thread = getThreadFactory().newThread(this ); } public void run () { runWorker(this ); } protected boolean isHeldExclusively () { return getState() != 0 ; } protected boolean tryAcquire (int unused) { if (compareAndSetState(0 , 1 )) { setExclusiveOwnerThread(Thread.currentThread()); return true ; } return false ; } protected boolean tryRelease (int unused) { setExclusiveOwnerThread(null ); setState(0 ); return true ; } public void lock () { acquire(1 ); } public boolean tryLock () { return tryAcquire(1 ); } public void unlock () { release(1 ); } public boolean isLocked () { return isHeldExclusively(); } }
工作线程的职责是不断从队列中取任务执行
ThreadPoolExecutor.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 final void runWorker (Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null ; w.unlock(); boolean completedAbruptly = true ; try { while (task != null || (task = getTask()) != null ) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null ; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null ; w.completedTasks++; w.unlock(); } } completedAbruptly = false ; } finally { processWorkerExit(w, completedAbruptly); } } protected void beforeExecute (Runnable r, Throwable t) { }protected void afterExecute (Runnable r, Throwable t) { }
获取任务同样是死循环,获取任务的方式是阻塞
这个方法如果返回null,就会导致外围停止,效果是线程退出
ThreadPoolExecutor.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 private Runnable getTask () { boolean timedOut = false ; for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null ; } int wc = workerCountOf(c); boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null ; continue ; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null ) return r; timedOut = true ; } catch (InterruptedException retry) { timedOut = false ; } } }
自动调节机制
execute
是核心方法,其他运行方式比如submit
底层也是execute
初始时没有线程,即使核心线程也不会提前创建,而是任务驱动。
如果没达到核心线程数
即使线程池有空闲也新建线程执行
如果在核心数量和最大数量之间
只有队列满了才会再开新线程
超过核心数量的线程空闲一定时间会被销毁
可以配置allowCoreThreadTimeOut
让核心线程空闲也可以进行销毁
如果队列满了,线程也达到最大,就按拒绝策略处理
ThreadPoolExecutor.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public void execute (Runnable command) { if (command == null ) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true )) return ; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0 ) addWorker(null , false ); } else if (!addWorker(command, false )) reject(command); }
增加线程逻辑
ThreadPoolExecutor.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 private boolean addWorker (Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false ; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false ; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false ; boolean workerAdded = false ; Worker w = null ; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null ) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null )) { if (t.isAlive()) throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true ; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true ; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
所以核心线程并非初始就全部启动,而是任务触发,如果想一次启动可以使用prestartAllCoreThreads
方法
拒绝策略
ThreadPoolExecutor
中包含四个RejectedExecutionHandler
静态内部子类表示预定义的拒绝策略
AbortPolicy: 默认策略,直接抛出异常
DiscardPolicy:什么都不干,静默丢弃,即放弃最新的
DiscardOldestPolicy:移除一个最早的尚未执行的任务,即移除最老的腾出空间
CallerRunsPolicy:不给线程池,由当前调用线程执行
注意如果使用submit
方法,会返回Future
,如果线程池使用了丢弃策略,效果是不做任何处理,即永远没有完成,此时如果使用无等待时间限制的get
方法就会导致一直阻塞
数值统计
线程池提供一组接口反映统计信息
当前线程数:getPoolSize
曾经达到的最大线程数:getLargestPoolSize
当前活跃线程数:getActiveCount
接受总任务数:getTaskCount
完成任务数:getCompletedTaskCount
应用策略
隔离线程环境,多个不同配置的线程池对应不同功能,防止功能间干扰
计算密集型,线程数设为核数+1即可,多了也没资源
IO密集型,线程数可以设为核数2倍+1
更精确算法为线程数量 = 核数/线程工作比
线程工作比 = 工作时间/工作时间+等待时间
必要时可以使用优先级队列,提供插队功能
使用有界队列,防止内存溢出影响外界应用,无界队列下,永远不会增加非核心线程,永远不会执行拒绝策略