前一篇文章《如何选择线程数 》讲了如何决定线程池中线程个数,这篇文章讨论“如何选择工作队列”。
2、当核心线程数达到corePoolSize 时,后续提交的都会进BlockingQueue中排队;
4、当线程总数达到maximumPoolSize 时,后续提交的任务都会被RejectedExecutionHandler拒绝。
1、BlockingQueue 线程池中工作队列由BlockingQueue实现类提供功能,BlockingQueue定义了这么几组方法:
Summary of BlockingQueue methods Throws exception Special value Blocks Times out Insert add(e) offer(e) put(e) offer(e, time, unit) Remove remove() poll() take() poll(time, unit) Examine element() peek() not applicable not applicable
阻塞队列是最典型的“生产者消费者” 模型:
生产者调用put()方法将生产的元素入队,消费者调用take()方法; 当队列满了,生产者调用的put()方法会阻塞,直到队列有空间可入队; 当队列为空,消费者调用的get()方法会阻塞,直到队列有元素可消费;
事实上,工作线程的超时销毁是调用offer(e, time, unit)
2、JDK提供的阻塞队列实现 JDK中提供了以下几个BlockingQueue实现类:
这是一个由数组实现 的容量固定 的有界阻塞队列。这个队列的实现非常简单:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private void enqueue (E x) { final Object[] items = this .items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0 ; count++; notEmpty.signal(); } private E dequeue () { final Object[] items = this .items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null ; if (++takeIndex == items.length) takeIndex = 0 ; count--; if (itrs != null ) itrs.elementDequeued(); notFull.signal(); return x; }
这是一个非常有意思的集合,更准确的说它并不是一个集合容器,因为它没有容量 。你可以“偷偷地”把它看作new ArrayBlockingQueue(0)
正如SynchronousQueue的名字所描述一样——“同步队列”,它专门用于生产者线程与消费者线程之间的同步 :
因为它任何时候都是空的,所以消费者线程调用take()方法的时候就会发生阻塞,直到有一个生产者线程生产了一个元素,消费者线程就可以拿到这个元素并返回。 同样的,你也可以认为任何时候都是满的,所以生产者线程调用put()方法的时候就会发生阻塞,直到有一个消费者线程消费了一个元素,生产者才会返回。 另外还有几点需要注意:
SynchronousQueue不能遍历,因为它没有元素可以遍历; 所有的阻塞队列都不允许插入null元素,因为当生产者生产了一个null的时候,消费者调用poll()返回null,无法判断是生产者生产了一个null元素,还是队列本身就是空。 CachedThreadPool使用的就是同步队列 :
1 2 3 4 5 public static ExecutorService newCachedThreadPool () { return new ThreadPoolExecutor (0 , Integer.MAX_VALUE, 60L , TimeUnit.SECONDS, new SynchronousQueue <Runnable>()); }
这是一个由单链表实现 的默认无界 的阻塞队列。LinkedBlockingQueue提供了一个可选有界的构造函数,而在未指明容量时,容量默认为Integer.MAX_VALUE。
按照官方文档的说法LinkedBlockingQueue是一种可选有界(optionally-bounded)阻塞队列 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public static ExecutorService newFixedThreadPool (int nThreads) { return new ThreadPoolExecutor (nThreads, nThreads, 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>()); } public static ExecutorService newFixedThreadPool (int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor (nThreads, nThreads, 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>(), threadFactory); } public static ExecutorService newSingleThreadExecutor () { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor (1 , 1 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>())); } public static ExecutorService newSingleThreadExecutor (ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor (1 , 1 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>(), threadFactory)); }
2.4、其他队列 DelayQueue和PriorityBlockingQueue底层都是使用二叉堆实现 的优先级阻塞队列 。
前者要求队列中的元素实现Delayed接口,通过执行时延从队列中提取任务,时间没到任务取不出来; 后者对元素没有要求,可以实现Comparable接口也可以提供Comparator来对队列中的元素进行比较,跟时间没有任何关系,仅仅是按照优先级取任务。 当我们提交的任务有优先顺序时可以考虑选用这两种队列
事实上ScheduledThreadPoolExecutor内部实现了一个类似于DelayQueue的队列 。
3、让生产者阻塞的线程池 前面说到CachedThreadPool和FixedThreadPool都有可能导致内存溢出,前者是由于线程数过多,后者是由于队列任务过多。而究其根本就是因为任务生产速度远大于线程池处理任务的速度。
3.1、重写BlockingQueue的offer 这种处理方式是将原来非阻塞的offer覆盖,使用阻塞的put方法实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 public class ThreadPoolTest { private static class Task implements Runnable { private int taskId; Task(int taskId) { this .taskId = taskId; } @Override public void run () { try { TimeUnit.SECONDS.sleep(3 ); } catch (InterruptedException ignore) { } System.out.println("task " + taskId + " end" ); } } public static void main (String[] args) { ThreadPoolExecutor threadPool = new ThreadPoolExecutor (2 , 2 , 0 , TimeUnit.SECONDS, new ArrayBlockingQueue <Runnable>(2 ) { @Override public boolean offer (Runnable runnable) { try { super .put(runnable); } catch (InterruptedException e) { e.printStackTrace(); } return true ; } }); threadPool.submit(new Task (1 )); System.out.println("task 1 submitted" ); threadPool.submit(new Task (2 )); System.out.println("task 2 submitted" ); threadPool.submit(new Task (3 )); System.out.println("task 3 submitted" ); threadPool.submit(new Task (4 )); System.out.println("task 4 submitted" ); threadPool.submit(new Task (5 )); System.out.println("task 5 submitted" ); threadPool.submit(new Task (6 )); System.out.println("task 6 submitted" ); threadPool.shutdown(); } }
3.2、重写拒绝策略 在介绍第二种方式之前,先简单介绍JDK中提供了四种拒绝策略:
AbortPolicy——抛出RejectedExecutionException异常的方式拒绝任务。 DiscardPolicy——什么都不干,静默地丢弃任务 DiscardOldestPolicy——把队列中最老的任务拿出来扔掉 CallerRunsPolicy——在任务提交的线程把任务给执行了 ThreadPoolExecutor默认使用AbortPolicy
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 public class ThreadPoolTest { private static class Task implements Runnable { private int taskId; Task(int taskId) { this .taskId = taskId; } @Override public void run () { try { TimeUnit.SECONDS.sleep(3 ); } catch (InterruptedException ignore) { } System.out.println("task " + taskId + " end" ); } } private static class BlockCallerPolicy implements RejectedExecutionHandler { @Override public void rejectedExecution (Runnable r, ThreadPoolExecutor executor) { try { executor.getQueue().put(r); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main (String[] args) { ThreadPoolExecutor threadPool = new ThreadPoolExecutor (2 , 2 , 0 , TimeUnit.SECONDS, new ArrayBlockingQueue <>(2 ), new BlockCallerPolicy ()); threadPool.submit(new Task (1 )); System.out.println("task 1 submitted" ); threadPool.submit(new Task (2 )); System.out.println("task 2 submitted" ); threadPool.submit(new Task (3 )); System.out.println("task 3 submitted" ); threadPool.submit(new Task (4 )); System.out.println("task 4 submitted" ); threadPool.submit(new Task (5 )); System.out.println("task 5 submitted" ); threadPool.submit(new Task (6 )); System.out.println("task 6 submitted" ); threadPool.shutdown(); } }
4、Tomcat中的线程池 作为一个最常用的Java应用服务器之一,Tomcat中线程池还是值得我们借鉴学习的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public class StandardThreadExecutor extends LifecycleMBeanBase implements Executor , ResizableExecutor { protected int threadPriority = Thread.NORM_PRIORITY; protected boolean daemon = true ; protected String namePrefix = "tomcat-exec-" ; protected int maxThreads = 200 ; protected int minSpareThreads = 25 ; protected int maxIdleTime = 60000 ; ... protected boolean prestartminSpareThreads = false ; protected int maxQueueSize = Integer.MAX_VALUE; protected void startInternal () throws LifecycleException { taskqueue = new TaskQueue (maxQueueSize); TaskThreadFactory tf = new TaskThreadFactory (namePrefix,daemon,getThreadPriority()); executor = new ThreadPoolExecutor ( getMinSpareThreads(), getMaxThreads(), maxIdleTime, TimeUnit.MILLISECONDS, taskqueue, tf); executor.setThreadRenewalDelay(threadRenewalDelay); if (prestartminSpareThreads) { executor.prestartAllCoreThreads(); } taskqueue.setParent(executor); setState(LifecycleState.STARTING); } ... }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 public class ThreadPoolExecutor extends java .util.concurrent.ThreadPoolExecutor { private final AtomicInteger submittedCount = new AtomicInteger (0 ); private final AtomicLong lastContextStoppedTime = new AtomicLong (0L ); private final AtomicLong lastTimeThreadKilledItself = new AtomicLong (0L ); @Override protected void afterExecute (Runnable r, Throwable t) { submittedCount.decrementAndGet(); if (t == null ) { stopCurrentThreadIfNeeded(); } } @Override public void execute (Runnable command) { execute(command,0 ,TimeUnit.MILLISECONDS); } public void execute (Runnable command, long timeout, TimeUnit unit) { submittedCount.incrementAndGet(); try { super .execute(command); } catch (RejectedExecutionException rx) { if (super .getQueue() instanceof TaskQueue) { final TaskQueue queue = (TaskQueue)super .getQueue(); try { if (!queue.force(command, timeout, unit)) { submittedCount.decrementAndGet(); throw new RejectedExecutionException ("Queue capacity is full." ); } } catch (InterruptedException x) { submittedCount.decrementAndGet(); throw new RejectedExecutionException (x); } } else { submittedCount.decrementAndGet(); throw rx; } } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class TaskQueue extends LinkedBlockingQueue <Runnable> { private volatile ThreadPoolExecutor parent = null ; public boolean force (Runnable o) { if ( parent==null || parent.isShutdown() ) throw new RejectedExecutionException ("Executor not running, can't force a command into the queue" ); return super .offer(o); } public boolean force (Runnable o, long timeout, TimeUnit unit) throws InterruptedException { if ( parent==null || parent.isShutdown() ) throw new RejectedExecutionException ("Executor not running, can't force a command into the queue" ); return super .offer(o,timeout,unit); } @Override public boolean offer (Runnable o) { if (parent==null ) return super .offer(o); if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super .offer(o); if (parent.getSubmittedCount() < parent.getPoolSize()) return super .offer(o); if (parent.getPoolSize() < parent.getMaximumPoolSize()) return false ; return super .offer(o); } }
Tomcat的ThreadPoolExecutor使用的TaskQueue,是无界的LinkedBlockingQueue,但是通过taskQueue的offer方法覆盖了LinkedBlockingQueue的offer方法,改写了规则,使得线程池能在任务较多的情况下增长线程池数量——JDK是先排队再涨线程池,Tomcat则是先涨线程池再排队。 Tomcat的ThreadPoolExecutor改写了execute方法,当任务被reject时,捕获异常,并强制入队。 参考链接:
支持生产阻塞的线程池 :http://ifeve.com/blocking-threadpool-executor/
