前篇:《Java多线程复习与巩固(六)—线程池ThreadPoolExecutor详解》
1. 为什么要使用ScheduledThreadPoolExecutor
在《Java多线程复习与巩固(二)—线程相关工具类Timer和ThreadLocal的使用》提到过,Timer可以实现指定延时调度任务,还可以实现任务的周期性执行。但是Timer中的所有任务都是由一个TimerThread执行,也就是说Timer是单线程执行任务。单线程执行任务有一个致命的缺点:当某些任务的执行特别耗时,后续的任务无法在预定的时间内得到执行,前一个任务的延迟或异常将影响到后续的任务;另外TimerThread没有做异常处理,一个任务出现异常将会导致整个Timer线程结束。
由于Timer单线程的种种缺点,这个时候我们就需要让线程池去执行这些任务。
2. 使用Executors工具类
Executors是线程池框架提供给我们的创建线程池的工具类,FixedThreadPool,SingleThreadExecutor,CachedThreadPool都是上一篇文章中的ThreadPoolExecutor对象。
他还有另外两个方法:
1 | // 创建(可计划的)任务延时执行线程池 |
从下面的继承图我们知道ScheduledThreadPoolExecutor就是ScheduledExecutorService接口的实现类。
3. 构造ScheduledThreadPoolExecutor对象
先看一下ScheduledThreadPoolExecutor的几个构造函数
1 | public class ScheduledThreadPoolExecutor |
从上面的代码可以看出ScheduledThreadPoolExecutor都是直接调用的父类ThreadPoolExecutor的构造函数。
我们结合上一篇对ThreadPoolExecutor构造参数的解释对ScheduledThreadPoolExecutor的几个参数进行分析,主要有以下几个参数比较特殊:
- maximumPoolSize:线程池允许的最大线程数为
Integer.MAX_VALUE
,也就意味着ScheduledThreadPoolExecutor对线程数没有限制。这个是必须的,因为一旦对线程数有了限制,必定会存在任务等待调度的情况,有等待就可能会存在任务延时,所以最大线程数不能有限制。 - keepAliveTime和unit:0 NANOSECONDS,0纳秒,也就是说一旦有空闲线程会立即销毁该线程对象。
- workQueue:DelayedWorkQueue是ScheduledThreadPoolExecutor的内部类,它也是实现按时调度的核心。
4. 二叉堆DelayedWorkQueue
DelayedWorkQueue和java.util.concurrent.DelayQueue
有着惊人的相似度:
- DelayedWorkQueue实现了一个容量无限的二叉堆,DelayQueue底层使用PriorityQueue实现二叉堆各种操作。
- DelayedWorkQueue存储了
java.util.concurrent.RunnableScheduledFuture
接口的实现类,DelayQueue存储java.util.concurrent.Delayed
接口的实现类,这两个接口有以下的继承关系(其中ScheduledThreadPoolExecutor
内部类ScheduledFutureTask
就实现了RunnableScheduledFuture
接口)
5. 为什么使用二叉堆
大学学过数据结构的应该学过堆排序吧:堆排序就是用小顶堆(或大顶堆)实现最小(或最大)的元素往堆顶移动。这里的DelayedWorkQueue就是使用二叉堆获取堆中延时最短的任务。具体的比较策略让我们看下面这个方法:
ScheduledThreadPoolExecutor.ScheduledFutureTask.compareTo()
1 | public int compareTo(Delayed other) { |
6. 为什么不用DelayQueue的二叉堆实现
java.util.concurrent.DelayQueue
就是根据延时获取元素的,那为什么不直接用DalayQueue
而重新定义一个DelayedWorkQueue
呢。这个问题本质上就是在问DelayQueue
与DelayedWorkQueue
的区别,我们看一下DelayedWorkQueue
注释中的一段话:
1 | static class DelayedWorkQueue extends AbstractQueue<Runnable> |
大致翻译过来:
1 | DelayedWorkQueue类似于DelayQueue和PriorityQueue,是基于“堆”的一种数据结构。 |
这里有几个地方可能有疑问:
1. remove操作的时间复杂度从O(n)降低到了O(log n)
1 | public boolean remove(Object x) { |
2. 任务的包装修饰
包装修饰主要是指两个ScheduledThreadPoolExecutor.decorateTask
方法。这部分内容放在文末“扩展ScheduledThreadPoolExecutor的功能”时讲。
7. 任务的提交
1 | public void execute(Runnable command) { |
我们看到原来ThreadPoolExecutor中的几个提交方法都被重写了,最终调用了个的都是schedule
方法,并且这几个方法的延时都为0纳秒。
8. schedule
既然前面任务的提交全部都是交给schedule方法执行,那么让我们看一下schedule相关的几个方法
下面的几个方法也是
ScheduledExecutorService
接口扩展的几个方法
下面需要注意的主要是scheduleAtFixedRate
和scheduleWithFixedDelay
两个方法的区别。
1 | // 触发时间 |
9. delayedExecute
上面的几个方法都是将runnable
或callable
包装成ScheduledFutureTask
对象,最终都是丢给delayedExecute
方法去执行:
1 | private void delayedExecute(RunnableScheduledFuture<?> task) { |
10. ScheduledFutureTask.run
添加线程后,线程肯定会从阻塞队列中获取任务,并执行任务的run方法,也就是ScheduledFutureTask的run方法:
1 | private class ScheduledFutureTask<V> |
11. ScheduledThreadPoolExecutor的其他配置项
1 | public class ScheduledThreadPoolExecutor |
12. 继承ScheduledThreadPoolExecutor对任务进行包装
ThreadPoolExecutor提供了beforeExecute,afterExecute,terminated三个钩子方法让我们重载以进行扩展。
ScheduledThreadPoolExecutor也提供了两个方法给我们扩展,下面是JDK文档提供的一个简单例子:
1 | public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor { |
13. ScheduledThreadPoolExecutor尚有的缺点
ScheduledThreadPoolExecutor是使用纳秒为单位进行任务调度,它底层使用的是System.nanoTime()
来获取时间:
1 | final long now() { |
这个时间是相对于JVM虚拟机启动的时间,这个纳秒值在$2^{63}纳秒 \approx 292年$后会溢出(几乎可以忽略溢出问题),ScheduledThreadPoolExecutor也对溢出进行了处理:
1 | long triggerTime(long delay) { |
既然ScheduledThreadPoolExecutor已经处理了,那还有什么问题吗。问题就在于我们无法使用yyyy-MM-dd HH-mm-ss
这种精确时间点的方式进行任务的调度。
不过在SpringTask 以及 Quartz等框架中已经解决了这个问题,并提供了cron表达式来精确任务的调度时间。后续如果有机会对这些框架的原理进行分析。
SpringTask既可以单独使用也可以整合Quartz使用,除了Quartz还有一个轻量级的Cron4j可以实现任务调度,不过Cron4j并没有用线程池(估计那时候java5还没出来),每个任务都会去创建一个新线程。