系列文章:
1. 为什么要使用线程池 线程创建与销毁都耗费时间,对于大量的短暂任务 如果仍使用“创建->执行任务->销毁”的简单模式,将极大地降低线程的使用效率(一个线程仅仅处理一个短暂的任务就被销毁了)。在这种情况下,为了提高线程的使用效率,我们使用缓存池的策略让线程执行任务后不立即销毁而是等待着处理下一个任务。
2. 使用Executors工具类创建线程池 Executors是线程池框架提供给我们的创建线程池的工具类,它里面提供了以下创建几类线程池的方法。
1 2 3 4 5 6 7 8 9 10 11 public static ExecutorService newFixedThreadPool () ;public static ExecutorService newSingleThreadExecutor () ;public static ExecutorService newCachedThreadPool () ;public static ScheduledExecutorService newScheduledThreadPool () ;public static ScheduledExecutorService newSingleThreadScheduledExecutor () ;
通过查看这几个方法的源码发现:前三个方法new了ThreadPoolExecutor
对象,而后面两个方法new了ScheduledThreadPoolExecutor
对象。
整个线程池框架的类继承图如下,其中ThreadPoolExecutor是本文的核心,ScheduleThreadPoolExecutor
将放到后一篇文章中讲。
JDK文档 建议一般情况使用Executors去创建线程池
其中三个核心接口的方法如下:
3. 构造ThreadPoolExecutor对象 java.util.concurrent.ThreadPoolExecutor
类是线程池中最核心的类之一,因此如果要透彻地了解Java中的线程池,必须先了解这个类。下面分析一下ThreadPoolExecutor类的核心源码的具体实现。
在ThreadPoolExecutor类中提供了四个构造方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 public class ThreadPoolExecutor extends AbstractExecutorService { ..... public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this (corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this (corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this (corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { } ... }
从上面的代码可以得知,ThreadPoolExecutor继承了AbstractExecutorService(实现ExecutorService接口)类,并提供了四个构造器,前三个构造器最终都会辗转调用第四个构造器。
下面解释一下第四个构造器中各个参数的含义:
corePoolSize:核心池的大小。
核心池中的线程会一致保存在线程池中(即使线程空闲),除非调用allowCoreThreadTimeOut
方法允许核心线程在空闲后一定时间内销毁,该时间由构造方法中的keepAliveTime
和unit
参数指定; 在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()
或者prestartCoreThread()
方法,从这两个方法的名字就可以看出,是“预创建线程” 的意思,即在没有任务到来之前就创建corePoolSize个线程(prestartAllCoreThreads)或者一个线程(prestartCoreThread); maximumPoolSize:线程池允许的最大线程数。这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程。
默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把新加入的任务放到缓存队列当中 ,缓存队列由构造方法中的workQueue
参数指定,如果入队失败(队列已满)则尝试创建临时线程 ,但临时线程和核心线程的总数不能超过maximumPoolSize,当线程总数达到maximumPoolSize后会拒绝新任务 ;所以有两种方式可以让任务绝不被拒绝:
① 将maximumPoolSize设置为Integer.MAX_VALUE
(线程数不可能达到这个值),CachedThreadPool
就是这么做的;
② 使用无限容量的阻塞队列(比如LinkedBlockingQueue),所有处理不过来的任务全部排队去,FixedThreadPool
就是这么做的。
keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。
默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用——当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会被销毁,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(true)
方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0; unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:
1 2 3 4 5 6 7 TimeUnit.DAYS; TimeUnit.HOURS; TimeUnit.MINUTES; TimeUnit.SECONDS; TimeUnit.MILLISECONDS; TimeUnit.MICROSECONDS; TimeUnit.NANOSECONDS;
并发库中所有时间表示方法都是以TimeUnit
枚举类作为单位
workQueue:一个阻塞队列(BlockingQueue接口的实现类),用来存储等待执行的任务,一般来说,这里的阻塞队列有以下几种选择:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 ArrayBlockingQueue LinkedBlockingQueue SynchronousQueue PriorityBlockingQueue DelayQueue
有关BlockingQueue的内容可以参考《Java集合框架总结与巩固》
threadFactory:线程工厂,主要用来创建线程;默认情况都会使用Executors工具类中定义的默认工厂类DefaultThreadFactory
。可以实现ThreadFactory接口来自己控制创建线程池的过程(比如设置创建线程的名字、优先级或者是否为Deamon守护线程)
handler:表示当拒绝处理任务时的策略,有以下四种取值(默认为AbortPolicy):
1 2 3 4 ThreadPoolExecutor.AbortPolicy: 丢弃任务并抛出RejectedExecutionException异常。 ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程) ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
可通过实现RejectedExecutionHandler接口来自定义任务拒绝后的处理策略
4. 线程池的状态转换 ThreadPoolExecutor类中有一个ctl
属性,该属性是AtomicInteger类型,本质上就是32bit的int类型。这个32bit字段中存储了两个数据:
其中三个高字节位存储了线程池当前的运行状态,线程池状态有以下几个:
1 2 3 4 5 6 private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;
RUNNING:接受新任务并处理队列中的任务 SHUTDOWN:不接受新任务但处理队列中的任务 STOP:不接受新任务也不处理队列中的任务并终断正在处理中的任务 TIDYING:所有任务已经终止,workerCount等于0,线程池切换到TIDYING后将会执行terminated()
钩子方法 TERMINATED:terminated()
方法已执行完成 整个过程的状态转换图如下:
我们可以调用的两个改变线程池状态的方法分别是:
1 2 3 4 public void shutdown () ;public List<Runnable> shutdownNow () ;
另外ThreadPoolExecutor提供了一些方法来查询这些状态:
1 2 3 4 5 6 7 8 9 10 11 12 13 public boolean isShutdown () { return ! isRunning(ctl.get()); } public boolean isTerminating () { int c = ctl.get(); return ! isRunning(c) && runStateLessThan(c, TERMINATED); } public boolean isTerminated () { return runStateAtLeast(ctl.get(), TERMINATED); }
5. 任务的提交 任务提交主要有三种方式:
execute(Runnable command):定义在Executor接口中 submit的三个重载方法:定义在ExecutorService接口中 invoke(invokeAll,invokeAny)提交方式:定义在ExecutorService接口中 5.1 submit提交方式 submit方法的实现源码在ThreadPoolExecutor的基类AbstractExecutorService中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 protected <T> RunnableFuture<T> newTaskFor (Runnable runnable, T value) { return new FutureTask <T>(runnable, value); } protected <T> RunnableFuture<T> newTaskFor (Callable<T> callable) { return new FutureTask <T>(callable); } public Future<?> submit(Runnable task) { if (task == null ) throw new NullPointerException (); RunnableFuture<Void> ftask = newTaskFor(task, null ); execute(ftask); return ftask; } public <T> Future<T> submit (Runnable task, T result) { if (task == null ) throw new NullPointerException (); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } public <T> Future<T> submit (Callable<T> task) { if (task == null ) throw new NullPointerException (); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
submit最终都会调用execute方法去执行任务,区别在于submit方法返回一个FutureTask对象(顾名思义FutrueTask就是未来将执行的任务,可以通过FutureTask对象获取任务执行的结果)。
5.1.1 FutrueTask FutureTask实现Future接口,Future接口及其相关类继承图如下:
FutureTask类中定义了如下的状态:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 private volatile int state;private static final int NEW = 0 ;private static final int COMPLETING = 1 ;private static final int NORMAL = 2 ;private static final int EXCEPTIONAL = 3 ;private static final int CANCELLED = 4 ;private static final int INTERRUPTING = 5 ;private static final int INTERRUPTED = 6 ;
FutureTask的状态转换图如下(其中绿色标注的是外部可调用的方法,其他方法均有内部调用,RUNNING状态是我附加的状态,表示该任务已经被运行):
Future接口定义了以下几个方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 boolean cancel (boolean mayInterruptIfRunning) ;boolean isCancelled () { return state >= CANCELLED; }boolean isDone () { return state != NEW; }V get () throws InterruptedException, ExecutionException; V get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 public class ThreadPoolExecutorTest { public static void main (String[] args) { ThreadPoolExecutor threadPool = new ThreadPoolExecutor (4 , 4 , 0 , TimeUnit.SECONDS, new LinkedBlockingQueue <>()); Scanner sc = new Scanner (System.in); System.out.println("----------------------------------------" ); System.out.println("输入0使用submit提交任务到线程池" ); System.out.println("输入正整数使用取消上次提交的任务并允许中断" ); System.out.println("输入负整数使用取消上次提交的任务不允许中断" ); System.out.println("输入其他字符关闭线程池并退出程序" ); System.out.println("----------------------------------------" ); Future<String> task = null ; while (true ) { if (sc.hasNextInt()) { int command = sc.nextInt(); if (command == 0 ) { task = threadPool.submit(new Callable <String>() { @Override public String call () throws Exception { printMessage("Task start" ); Thread.sleep(1800 ); printMessage("Task finish" ); return "result" ; } }); } else if (command > 0 && task != null ) { boolean success = task.cancel(true ); System.out.println(task + ":task.cancel(true)->" + (success ? "success" : "failed" )); } else if (command < 0 && task != null ) { boolean success = task.cancel(false ); System.out.println(task + ":task.cancel(false)->" + (success ? "success" : "failed" )); } } else { threadPool.shutdownNow(); break ; } } } private static void printMessage (String msg) { String name = Thread.currentThread().getName(); System.out.println(name + ":" + msg); } }
5.2 execute提交方式 submit最终都会调用execute方法去执行任务,所以我们重点需要分析execute方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public void execute (Runnable command) { if (command == null ) throw new NullPointerException (); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true )) return ; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0 ) addWorker(null , false ); } else if (!addWorker(command, false )) reject(command); }
根据源码可以得出以下执行流程:
如果正在运行的线程数小于corePoolSize,Executor总是添加一个新线程,而不是任务排队。 如果已有corePoolSize或更多线程在运行,则Executor总是优先选择任务排队而不会添加新线程。 如果任务请求不能排队,则创建一个新线程,但是线程总数量不超过maximumPoolSize,如果不能创建新线程任务将会被拒绝。 这里有几点需要重点注意:
第五行workerCountOf(c) < corePoolSize
:如果核心线程没有创建完则新任务交由新创建的核心线程处理,但是我们如果调用prestartAllCoreThreads
方法预先把所有核心线程创建完成,则这一分支不会执行。 第十三行workQueue.offer(command)
:offer方法不会阻塞(名不符其实,阻塞队列不阻塞呵呵),如果入队成功会立即返回true否则返回false。入队成功与否取决于workQueue
的性质。比如:①单链表实现的LinkedBlockingQueue 默认容量为Integer.MAX_VALUE
(等价于无限容量),所以此时该方法不会返回false也不会创建临时线程(都去排队去了),当然如果创建LinkedBlockingQueue时指定了capacity,offer方法就可能返回false,但我们一般不会这么干;②而数组实现的ArrayBlockingQueue 不允许扩容,所以队列已满则会返回false进入二十三行尝试创建临时线程,如果总线程数不超过maximumPoolSize则能创建临时线程,但会导致后来的任务没排队反而能得到执行(不公平),如果超出maximumPoolSize创建临时线程失败则会拒绝任务,两种情况都不好,所以ArrayBlockingQueue用的不是很多;③使用小顶堆实现的PriorityBlockingQueue
会根据任务的优先级来选择执行顺序;④使用没有容量的同步队列SynchronousQueue
,如果没有空闲线程接收任务会立即返回false,所以大部分情况会创建新的临时线程。 第十七行workerCountOf(recheck) == 0
:这句判断间接表示核心线程数为0的情况,核心线程数为0只会发生在两种条件下:①线程池本身已经指定核心数为0(构造方法指定或setCorePoolSize
方法指定),②调用allowCoreThreadTimeOut
方法允许核心线程超时导致核心线程数位0。 5.3 invoke提交方式 ExecutorService中定义了两组invoke方法:
1 2 3 4 5 6 7 8 9 10 11 12 <T> List<Future<T>> invokeAll (Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll (Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;<T> T invokeAny (Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> T invokeAny (Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
invokeAny取得率先完成的任务的返回值,当第一个任务结束后,会调用cancel方法取消其它任务。 invokeAll等所有任务执行完毕后,取得全部任务的结果值。 invokeAll存在的问题
invokeAll有个严重的问题是,任务执行后不会抛出任务执行的异常。调用方需要手动调用Future.get()
方法触发异常,而且FutureTask的异常是被ExecutionException包裹过的,所以调用get
方法的时候是无法捕获到内部抛出的异常类型,要么通过捕获ExecutionException异常拿到它内部包装的异常,要么直接捕获所有的Exception。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public <T> List<Future<T>> invokeAll (Collection<? extends Callable<T>> tasks) throws InterruptedException { if (tasks == null ) throw new NullPointerException (); ArrayList<Future<T>> futures = new ArrayList <Future<T>>(tasks.size()); boolean done = false ; try { for (Callable<T> t : tasks) { RunnableFuture<T> f = newTaskFor(t); futures.add(f); execute(f); } for (int i = 0 , size = futures.size(); i < size; i++) { Future<T> f = futures.get(i); if (!f.isDone()) { try { f.get(); } catch (CancellationException ignore) { } catch (ExecutionException ignore) { } } } done = true ; return futures; } finally { if (!done) for (int i = 0 , size = futures.size(); i < size; i++) futures.get(i).cancel(true ); } }
5.3.1 等待任务完成 invokeAll方式会导致调用线程阻塞直到所有任务完成,由于不知道哪个任务先执行完毕,使用这种方式效率不是很高。所以Java5还提供了一个CompletionService
接口给我们用。CompletionService
目前只有一个实现类——ExecutorCompletionService
。
ExecutorCompletionService实际只是维护了一个队列,然后将完成的任务往队列里放,这个实现主要是依赖FutureTask的一个钩子方法done:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class ExecutorCompletionService <V> implements CompletionService <V> { private final Executor executor; private final AbstractExecutorService aes; private final BlockingQueue<Future<V>> completionQueue; private class QueueingFuture extends FutureTask <Void> { QueueingFuture(RunnableFuture<V> task) { super (task, null ); this .task = task; } protected void done () { completionQueue.add(task); } private final Future<V> task; } ... public Future<V> submit (Callable<V> task) { if (task == null ) throw new NullPointerException (); RunnableFuture<V> f = newTaskFor(task); executor.execute(new QueueingFuture (f)); return f; } public Future<V> submit (Runnable task, V result) { if (task == null ) throw new NullPointerException (); RunnableFuture<V> f = newTaskFor(task, result); executor.execute(new QueueingFuture (f)); return f; }
示例:
1 2 3 4 5 6 7 8 9 10 11 12 void solve (Executor e, Collection<Callable<Result>> solvers) throws InterruptedException, ExecutionException { CompletionService<Result> ecs = new ExecutorCompletionService <Result>(e); for (Callable<Result> s : solvers) ecs.submit(s); int n = solvers.size(); for (int i = 0 ; i < n; ++i) { Result r = ecs.take().get(); if (r != null ) doSomething(r); } }
invokeAny就是通过CompletionService
实现,从而拿到第一个执行完成的任务的结果。
6. 线程如何创建 刚刚的execute提交任务中调用到了addWorker方法来创建线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 private boolean addWorker (Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false ; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false ; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false ; boolean workerAdded = false ; Worker w = null ; try { w = new Worker (firstTask); final Thread t = w.thread; if (t != null ) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null )) { if (t.isAlive()) throw new IllegalThreadStateException (); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true ; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true ; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; } private void addWorkerFailed (Worker w) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { if (w != null ) workers.remove(w); decrementWorkerCount(); tryTerminate(); } finally { mainLock.unlock(); } }
7. 线程如何执行 线程执行我们肯定得找到run方法,我们看一下Worker类是怎么定义的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 private final class Worker extends AbstractQueuedSynchronizer implements Runnable { final Thread thread; Runnable firstTask; volatile long completedTasks; Worker(Runnable firstTask) { setState(-1 ); this .firstTask = firstTask; this .thread = getThreadFactory().newThread(this ); } public void run () { runWorker(this ); } ... }
ThreadPoolExecutor.runWorker方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 final void runWorker (Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null ; w.unlock(); boolean completedAbruptly = true ; try { while (task != null || (task = getTask()) != null ) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null ; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error (x); } finally { afterExecute(task, thrown); } } finally { task = null ; w.completedTasks++; w.unlock(); } } completedAbruptly = false ; } finally { processWorkerExit(w, completedAbruptly); } } private Runnable getTask () { boolean timedOut = false ; for (;;) { ... try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null ) return r; timedOut = true ; } catch (InterruptedException retry) { timedOut = false ; } } }
8. 线程池的其他变量和方法 其他成员变量:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 private final BlockingQueue<Runnable> workQueue; private final ReentrantLock mainLock = new ReentrantLock (); private final HashSet<Worker> workers = new HashSet <Worker>(); private final Condition termination = mainLock.newCondition(); private int largestPoolSize; private long completedTaskCount; private volatile ThreadFactory threadFactory; private volatile RejectedExecutionHandler handler; private volatile long keepAliveTime; private volatile boolean allowCoreThreadTimeOut; private volatile int corePoolSize; private volatile int maximumPoolSize;
其他成员方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 public void setCorePoolSize (int corePoolSize) ;public int getCorePoolSize () ;public void setMaximumPoolSize (int maximumPoolSize) ;public int getMaximumPoolSize () ;public int getPoolSize () ;public int getLargestPoolSize () ;public int getActiveCount () ;public long getTaskCount () ;public long getCompletedTaskCount () ;public void setThreadFactory (ThreadFactory threadFactory) ;public ThreadFactory getThreadFactory () ;public void setRejectedExecutionHandler (RejectedExecutionHandler handler) ;public RejectedExecutionHandler getRejectedExecutionHandler () ;public void allowCoreThreadTimeOut (boolean value) ;public boolean allowsCoreThreadTimeOut () ;public void setKeepAliveTime (long time, TimeUnit unit) ;public long getKeepAliveTime (TimeUnit unit) ;public boolean prestartCoreThread () ;public int prestartAllCoreThreads () ;public BlockingQueue<Runnable> getQueue () ;public boolean remove (Runnable task) ;public void purge () ;public boolean awaitTermination (long timeout, TimeUnit unit) ;
9. 扩展ThreadPoolExecutor的功能 在ThreadPoolExecutor类中还有三个protected属性的空方法:
1 2 3 4 5 6 7 8 9 10 protected void beforeExecute (Thread t, Runnable r) { }protected void afterExecute (Runnable r, Throwable t) { }protected void terminated () { }
根据JDK文档中的说明,我们可以重载这三个钩子方法来对ThreadPoolExecutor进行扩展。下面是官方文档提供的扩展示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 class PausableThreadPoolExecutor extends ThreadPoolExecutor { public PausableThreadPoolExecutor (...) { super (...); } private boolean isPaused; private ReentrantLock pauseLock = new ReentrantLock (); private Condition unpaused = pauseLock.newCondition(); protected void beforeExecute (Thread t, Runnable r) { super .beforeExecute(t, r); pauseLock.lock(); try { while (isPaused) unpaused.await(); } catch (InterruptedException ie) { t.interrupt(); } finally { pauseLock.unlock(); } } public void pause () { pauseLock.lock(); try { isPaused = true ; } finally { pauseLock.unlock(); } } public void resume () { pauseLock.lock(); try { isPaused = false ; unpaused.signalAll(); } finally { pauseLock.unlock(); } } }
10. 更多种类的线程池MoreExecutors Guava 是Google提供的一个最受欢迎的通用工具包。它提供了很多并发工具,其中包括几个便捷的ExecutorService
实现类,这些实现类我们无法直接访问,Guava提供了一个唯一的访问入口——MoreExecutors 工具类。
1、 应用执行完成(主线程以及其他非守护线程执行完)后自动关闭的线程池
1 2 3 4 5 6 7 8 9 10 public static ExecutorService getExitingExecutorService ( ThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) ;public static ExecutorService getExitingExecutorService (ThreadPoolExecutor executor) ;public static ScheduledExecutorService getExitingScheduledExecutorService ( ScheduledThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) ;public static ScheduledExecutorService getExitingScheduledExecutorService ( ScheduledThreadPoolExecutor executor)
这个工具方法修改传入的线程池的ThreadFactory使其生成守护线程,但是线程池使用守护进程会导致有些任务只执行了一半,为了让线程池更加可控,所以Guava使用Runtime.getRuntime().addShutdownHook(hook)
注册了一个等待线程:
1 2 3 4 5 6 7 8 9 10 11 12 addShutdownHook( MoreExecutors.newThread( "DelayedShutdownHook-for-" + service, new Runnable () { @Override public void run () { try { service.shutdown(); service.awaitTermination(terminationTimeout, timeUnit); } catch (InterruptedException ignored) { } } }));
示例:
1 2 3 4 ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5 );ExecutorService executorService = MoreExecutors.getExitingExecutorService( executor, 100 , TimeUnit.MILLISECONDS); executorService.submit(() -> { while (true ) {} });
上面的代码中,如果我们直接在线程池中执行无限期的任务,会导致JVM无限期挂起。
当我们使用getExitingExecutorService
进行包装后,程序执行完后,如果线程池100毫秒内没有执行完任务,将会直接关闭线程池。
2、当前线程执行的线程池
有时可能需要在当前线程或线程池中执行任务,具体取决于某些条件,为了使用统一的接口可能需要在当前线程执行的Executor。虽然这个实现起来并不难,但还是需要编写样板代码。所幸Guava已经提供了这样的方法。
1 2 3 4 Executor executor = MoreExecutors.directExecutor();ExecutorService executor = MoreExecutors.newDirectExecutorService();
3、使用ListenableFuture异步增强的线程池
使用submit提交方式将会返回一个实现了Future接口的FutureTask对象,我们通过调用Future.get
方法获取任务的执行结果,如果任务没有执行完,get方法将会导致调用线程的阻塞。为此Guava为我们提供了一个增强型的Future接口——ListenableFuture
,能以异步回调的方式获取执行结果。
为了能让线程池返回ListenableFuture
,MoreExecutors中提供了两个包装方法:
1 2 public static ListeningExecutorService listeningDecorator (ExecutorService delegate) ;public static ListeningScheduledExecutorService listeningDecorator (ScheduledExecutorService delegate)
实际上ListenableFutureTask
和上面的ExecutorComletionService一样也是通过实现FutureTask的done方法实现。
使用示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 ExecutorService delegate = Executors.newCachedThreadPool();ListeningExecutorService executor = MoreExecutors.listeningDecorator(delegate);final ListenableFuture future = executor.submit(new Callable <Integer>() { public Integer call () throws Exception { int result = 0 ; Thread.sleep(1000 ); return result; } }); future.addListener(new Runable () { public void run () { System.out.println("result:" + future.get()); } }, MoreExecutors.directExecutor()); Futures.addCallback(future, new FutureCallback <Integer>() { public void onSuccess (Integer result) { System.out.println("result:" + result); } public void onFailure (Throwable t) { t.printStackTrace(); } }, MoreExecutors.directExecutor());
关于ListenableFuture的更多细节用法,参考Guava的wiki
本作品采用 知识共享署名 4.0 国际许可协议 进行许可。
转载时请注明原文链接 :https://blog.hufeifei.cn/2017/06/Java/multithread/06-ThreadPoolExecutor/