`
沉沦的快乐
  • 浏览: 55711 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

并发编程之线程池ScheduledThreadPoolExecutor原理探究

阅读更多

简介 

       上一篇文章剖析了ThreadPoolExecutor的原理和实现,这里剖析下ScheduledThreadPoolExecutor的实现方式。ScheduledThreadPoolExecuto能够执行延迟固定时间的任务和周期性任务。在需要多工作线程的情况下,它比Timer更方便,同时比ThreadPoolExecutor有更大灵活性和能力。

      但是在使用ScheduledThreadPoolExecutor时,有几个点必须注意:

1.在执行延迟任务时,任务的执行不会早于你设定的延迟时间,但是也不保证恰好在达到你设定的延迟时间时就马上执行。在同一时间开始执行的任务,按照FIFO顺序提交给线程。

2.虽然ScheduledThreadPoolExecutor继承了ThreadPoolExecutor。但是针对ThreadPoolExecutor的一些性能调优的方法不一定试用于ScheduledThreadPoolExecutor。在ThreadPoolExecutor中,调整corePoolSize,maximumPoolSize两个参数可以控制任务的执行速度,例如当前线程小于corePoolSize时,可以马上为新任务创建线程而不必等待,当前线程大于corePoolSize小于maximumPoolSize。可以动态扩展当前线程以提高并发速度等。但是对于ScheduledThreadPoolExecutor,它实际上是一个固定线程数,无界阻塞队列的线程池,所有任务都是先放到工作队列中,而不会有直接为其创建线程的优待。所以调整corePoolSize,maximumPoolSize对ScheduledThreadPoolExecutor来说,性能上不会有影响。

 

 实现原理

     ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,它先把用户提交的任务进一步封装成ScheduledFutureTask。再把ScheduledFutureTask放到任务队列中,交给线程池的线程去执行。可以说ScheduledThreadPoolExecutor与ThreadPoolExecutor的最大区别就是多了一步封装ScheduledFutureTask的过程。其他像任务队列的执行,线程的管理等都于ThreadPoolExecutor差不多,甚至就是直接使用了ThreadPoolExecutor的方法。下面就主要研究下ScheduledThreadPoolExecutor与ThreadPoolExecutor的不同地方。与ThreadPoolExecutor相关的这里直接省略,需要参考ThreadPoolExecutor的地方,请移步ThreadPoolExecutor的原理和实现

 

ScheduledFutureTask的设计

    把用户任务进一步封装成ScheduledFutureTask是ScheduledThreadPoolExecutor与ThreadPoolExecutor的主要区别。因此ScheduledFutureTask的设计就是ScheduledThreadPoolExecutor的核心。ScheduledFutureTask是ScheduledThreadPoolExecutor的一个内部类。ScheduledFutureTask继承了FutureTask。先看下ScheduledFutureTask独有的属性:

属性 属性描述
long sequenceNumber 跳出FIFO调度关系的序列号
long time 激活任务的时间,单位是纳秒。注意:这个时间不是需要延迟的时间。而是用纳秒表示的自然时间。
long period 周期性任务的时间间隔,单位是纳秒。正数表示固定频率,负数表示固定的延迟时间。

ScheduledFutureTask提供了三种构造方法:

1.ScheduledFutureTask(Runnable r, V result, long ns)。 创建设定了任务执行的触发时间的一次性任务

1.ScheduledFutureTask(Runnable r, V result, long ns, long period)。创建设定了任务触发时间和周期时间的周期性任务。

2.ScheduledFutureTask(Callable<V> callable, long ns)。创建Callable,固定触发时间的一次性任务。

 ScheduledFutureTask继承自FutureTask,所以需要实现自己的run方法:

 

public void run() {
            if (isPeriodic())
                runPeriodic();
            else
                ScheduledFutureTask.super.run();
        }

     如果是一次性任务,则执行FutureTask的run方法,如果是周期性任务,则执行runPeriodic()方法。下面看下runPeriodic()做了什么事情:

 

private void runPeriodic() {
            boolean ok = ScheduledFutureTask.super.runAndReset();
            boolean down = isShutdown();
            // Reschedule if not cancelled and not shutdown or policy allows
            if (ok && (!down ||
                       (getContinueExistingPeriodicTasksAfterShutdownPolicy() &&
                        !isStopped()))) {
                long p = period;
                if (p > 0)
                    time += p;
                else
                    time = triggerTime(-p);
                ScheduledThreadPoolExecutor.super.getQueue().add(this);
            }
            // This might have been the final executed delayed
            // task.  Wake up threads to check.
            else if (down)
                interruptIdleWorkers();
        }

       首先,先执行FutureTask的runAndReset()。这个方法尝试执行一次用户任务,重置状态,并返回是否成功执行。判断下状态:任务成功执行了,线程池没有shutdown,任务没有被取消。那么设置任务的下一次执行时间。这里设置下次执行时间比较有意思如果period>0则time=time+p,若果period<=0,time=now()+p。即前者是上次任务开始的时间加上时间周期,后置是上一次任务执行完成后的时间加上时间周期。这样说有点拗口,换种方式来说:当把时间周期设置为正数,那么表示该任务每多长时间执行一次,任务的执行频率是固定的。如果间周期设置为负数,表示前一次任务执行完之后间隔多长时间开始执行下一次任务,每个任务之间的延时是固定的。

 

 ScheduledThreadPoolExecutor的实现

     ScheduledThreadPoolExecutor继承了ThreadPoolExecutor并且实现了ScheduledExecutorService接口。

ScheduledExecutorService定义了使用ScheduledThreadPoolExecutor提交任务的接口schedule。schedule是一组返回值和参数不同的overwrite接口。以供用户选择提交任务的模式,比如是一次性的定时任务还是周期性的任务,需不需要返回结果等等。不管是哪个schedule接口,他的实现功能主要是把用户提交的任务和模式封装成ScheduledFutureTask,然后调用delayedExecute(Runnable command)。所以delayedExecute方法是真正的处理任务的方法,代码如下:

 

private void delayedExecute(Runnable command) {
        if (isShutdown()) {
            reject(command);
            return;
        }
        // Prestart a thread if necessary. We cannot prestart it
        // running the task because the task (probably) shouldn't be
        // run yet, so thread will just idle until delay elapses.
        if (getPoolSize() < getCorePoolSize())
            prestartCoreThread();

        super.getQueue().add(command);
    }

      如果线程池shutdown了,则执行reject策略。如果当前线程小于核心线程,那么调用prestartCoreThread()创建一个线程线程。prestartCoreThread()实际上是调用了ThreadPoolExecutor类里的addIfUnderCorePoolSize(null)。如果你看了ThreadPoolExecutor的分析。就会知道这个addIfUnderCorePoolSize(null)的参数设置为null其实就是只创建线程,而不会把当前任务直接提交给这个线程。任务直接通过super.getQueue().add(command);放到任务队列里去了。这也解释了前面简介中第2点说了为什么调整corePoolSize也不会让任务马上执行了,不管你corePoolSize调多大,你的任务还是需要排队,只是你的线程多了,可能任务排队等待的时间少了。

       另外这里也可以解释简介中第1点注意事项:对于定时任务,只能保证执行时间不会早于你设定的时间。但不一定会恰好在你设定的时间点上执行。因为你提交的任务都是排队的。加入你12点00分00秒提交了一个任务,希望这个任务在12点00分05秒的时刻执行。加入你的线程任务处理的足够快,在12点00分03秒的时候这个任务得到了一个线程,那么这个线程开始执行你的任务,发现你的执行时间是12点00分05秒,那么线程会等2秒到12点00分05秒的时候开始执行你的任务;但是如果线程池线程数比较少或者任务执行比较耗时等,可能使你的任务在12点00分10秒的时刻才分配到了一个线程,然后线程发现你的任务已经过了到了执行时间,马上开始执行你的任务,这种情况下,相当于任务比你设定的时间延迟了5秒。

       

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics