`
沉沦的快乐
  • 浏览: 55835 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

并发编程之FutureTask的实现解析

阅读更多

简介

        很多时候我们希望创建一个新线程去执行一个功能相对独立的任务,并在任务完成之后返回执行结果。如果实现Runnable接口,没有办法获得返回值;如果实现Callable接口,必须使用ExecutorService来执行,不能使用更简单灵活的new thread方法来实现。为了解决上面两个问题,于是有了FutureTask。FutureTask实现了Runnable, Future<V>,所以它既可以通过new thread来跑任务,也可以通过ExecutorService来管理任务,同时FutureTask还提供了超时返回的功能。另外还有一个非常有用的功能是它提供了一个可重载方法done(),done()会在任务执行完之后被调用,用户可以override该方法,来执行一些数据清理,句柄释放等操作。下面来看下FutureTask的具体实现。

 

FutureTask实现原理

API接口

接口 接口描述
boolean cancel(boolean mayInterruptIfRunning) 取消任务,参数表示是否取消正在执行的任务
boolean isCancelled() 任务在正常结束前是否被取消
boolean isDone(); 任务是否执行完
V get() 返回任务结果
V get(long timeout, TimeUnit unit) 带超时时间的任务返回方法

实现原理

跟concurrent包中大多数并发类的设计一样,FutureTask也是基于AQS设计了一个内部类Sync来实现FutureTask的功能。Sync的通过传入Callable<V> callable来构造一个对象,这个callable就是需要执行的任务,V是任务返回的类型。Sync通过AQS的state属性来管理任务状态,比如1是RUNNING,2是RAN,4是CANCELLED。下面分别来剖析FutureTask的API的实现。

 

   1.isCancelled()

public boolean isCancelled() {
        return sync.innerIsCancelled();
    }
   真正的实现是通过sync的innerIsCancelled()来实现的,可以看出AQS的state属性值是保存任务状态的。

 

   

 boolean innerIsCancelled() {
            return getState() == CANCELLED;
        }
 
2.isDone()
public boolean isDone() {
        return sync.innerIsDone();
    }
    真正的实现是通过sync的innerIsDone()实现的,判断state的状态是RAN或者CANCELLED,并且这个任务的执行线程是否为null。
boolean innerIsDone() {
            return ranOrCancelled(getState()) && runner == null;
        }

 private boolean ranOrCancelled(int state) {
            return (state & (RAN | CANCELLED)) != 0;
        }
 
3.cancel(boolean mayInterruptIfRunning)
public boolean cancel(boolean mayInterruptIfRunning) {
        return sync.innerCancel(mayInterruptIfRunning);
    }
    取消任务的方法是Sync的innerCancel(boolean mayInterruptIfRunning)来实现的:
boolean innerCancel(boolean mayInterruptIfRunning) {
	    for (;;) {
		int s = getState();
		if (ranOrCancelled(s))
		    return false;
		if (compareAndSetState(s, CANCELLED))
		    break;
	    }
            if (mayInterruptIfRunning) {
                Thread r = runner;
                if (r != null)
                    r.interrupt();
            }
            releaseShared(0);
            done();
            return true;
        }
    这个方法通过轮询的方式 ,采用CAS(compareAndSet)的同步方式来把state状态设置为CANCELLED。如果设置参数需要中断正在执行的任务,则调用interrupt()来中断任务。最后通过tryReleaseShared释放执行线程。并且执行done()方法来执行一些清理操作。FutureTask的done()方法没有执行让任何操作,用户可以通过extends的方法来override该方法。tryReleaseShared的方法很简单,仅仅把执行线程runner设置为null。
 
4.public V get()
public V get() throws InterruptedException, ExecutionException {
        return sync.innerGet();
    }
  获取任务的返回值,该方法委托Sync的innerGet()来实现。
V innerGet() throws InterruptedException, ExecutionException {
            acquireSharedInterruptibly(0);
            if (getState() == CANCELLED)
                throw new CancellationException();
            if (exception != null)
                throw new ExecutionException(exception);
            return result;
        }
     acquireSharedInterruptibly方法是AQS里面非常重要的一个方法,他以轮询的方式来阻塞线程,直到获得中断或者满足退出轮询条件来终结果阻塞,运行后面的代码。acquireSharedInterruptibly的退出阻塞状态的条件是tryAcquireShared方法的返回值大于0,tryAcquireShared方法是需要用户来override的。在这里tryAcquireShared返回大于0的条件是任务是否完成或取消。
protected int tryAcquireShared(int ignore) {
            return innerIsDone()? 1 : -1;
        }
 如果acquireSharedInterruptibly捕获中断状态,则抛出中断异常,否则执行下面的代码,如果状态为CANCELLED,则抛出CancellationException;如果Sync的exception属性不为空,则抛出这个exception。如果到这里innerGet还没退出,说明任务是执行完的,则返回任务结果。
 
5.public V get(long timeout, TimeUnit unit)
public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        return sync.innerGet(unit.toNanos(timeout));
    }
 带超时时间的获取任务结果方法,委托sync的innerGet(long nanosTimeout)来执行:
V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException {
            if (!tryAcquireSharedNanos(0, nanosTimeout))
                throw new TimeoutException();
            if (getState() == CANCELLED)
                throw new CancellationException();
            if (exception != null)
                throw new ExecutionException(exception);
            return result;
        }
 这个方法与上面讲的不带超时时间的方法差不多,只是tryAcquireSharedNanos超时之后会抛出超时异常。
 
6.public void run()
public void run() {
        sync.innerRun();
    }
  启动任务运行方法,委托sync的innerRun()来实现。
void innerRun() {
            if (!compareAndSetState(0, RUNNING))
                return;
            try {
                runner = Thread.currentThread();
                if (getState() == RUNNING) // recheck after setting thread
                    innerSet(callable.call());
                else
                    releaseShared(0); // cancel
            } catch (Throwable ex) {
                innerSetException(ex);
            }
        }
 首先通过CAS方法把state更新为RUNNING状态,如果更新失败,说明这个任务的状态不是初始状态(0),说明这个任务被取消了,或者运行完了,或者正在运行。直接退出方法。只有state能更新为RUNNING状态,则说明能够开始执行任务。状态更新RUNNING之后,任务真正执行之前,需要在检查下状态是否为RUNING。因为这个反复没有加锁,状态可能被更新为取消了。所以如果检查状态不在为RUNNING,则释放线程资源,否则把runner设置为当前线程,并通过innerSet(callable.call());执行用户的任务,然后设置任务状态。
void innerSet(V v) {
	    for (;;) {
		int s = getState();
		if (s == RAN)
		    return;
                if (s == CANCELLED) {
		    // aggressively release to set runner to null,
		    // in case we are racing with a cancel request
		    // that will try to interrupt runner
                    releaseShared(0);
                    return;
                }
		if (compareAndSetState(s, RAN)) {
                    result = v;
                    releaseShared(0);
                    done();
		    return;
                }
            }
        }
 如果状态已经运行结束了 直接退出;如果状态设置为CANCELLED,则强制释放线程并试图中断正在执行的任务并退出。如果既没运行完也没取消,则通过CAS把状态设置为RAN,把结果设置为call()返回的结果,然后释放线程(runner属性设置为null),并执行done()操作。
 
 
分享到:
评论

相关推荐

    Java并发编程原理与实战

    提前完成任务之FutureTask使用.mp4 Future设计模式实现(实现类似于JDK提供的Future).mp4 Future源码解读.mp4 ForkJoin框架详解.mp4 同步容器与并发容器.mp4 并发容器CopyOnWriteArrayList原理与使用.mp4 并发容器...

    Java并发编程实战

    5.5.2 FutureTask 5.5.3 信号量 5.5.4 栅栏 5.6 构建高效且可伸缩的结果缓存 第二部分 结构化并发应用程序 第6章 任务执行 6.1 在线程中执行任务 6.1.1 串行地执行任务 6.1.2 显式地为任务创建线程 6.1.3 ...

    Java 并发编程原理与实战视频

    java并发编程原理实战 第2节理解多线程与并发的之间的联系与区别 [免费观看] 00:11:59分钟 | 第3节解析多线程与多进程的联系以及上下文切换所导致资源浪费问题 [免费观看] 00:13:03分钟 | 第4节学习并发的四个...

    简谈java并发FutureTask的实现

    主要介绍了简谈java并发FutureTask的实现,FutureTask都是用于获取线程执行的返回结果。文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,,需要的朋友可以参考下

    龙果 java并发编程原理实战

    龙果 java并发编程原理实战 第2节理解多线程与并发的之间的联系与区别 [免费观看] 00:11:59分钟 | 第3节解析多线程与多进程的联系以及上下文切换所导致资源浪费问题 [免费观看] 00:13:03分钟 | 第4节学习并发的四...

    Java 并发编程实战

    5.5.2 FutureTask 5.5.3 信号量 5.5.4 栅栏 5.6 构建高效且可伸缩的结果缓存 第二部分 结构化并发应用程序 第6章 任务执行 6.1 在线程中执行任务 6.1.1 串行地执行任务 6.1.2 显式地为任务创建线程 6.1.3 ...

    并发编程笔记20190526.docx

    4、Callable、Future和FutureTask 30 5、原子操作CAS (compare atomic swap) 32 三、显式锁和AQS 34 1、AQS定义两种资源共享方式: 34 2、深入源码 37 3、了解Condition的实现 42 4、 锁的可重入 44 第三章 并发容器...

    FutureTask:FutureTask原始解析与重组-源码解析

    FutureTask原始码解析 一,FutureTask是什么? FutureTask是可取消的异步的计算任务,它可以通过线程池和线程对象执行,一般来说是FutureTask用于耗时的计算。 二,FutureTask继承图 三,未来任务源码 FutureTask的...

    龙果java并发编程完整视频

    第42节提前完成任务之FutureTask使用00:11:43分钟 | 第43节Future设计模式实现(实现类似于JDK提供的Future)00:19:20分钟 | 第44节Future源码解读00:29:22分钟 | 第45节Fork/Join框架详解00:28:09分钟 | 第46节...

    java并发编程

    第42节提前完成任务之FutureTask使用00:11:43分钟 | 第43节Future设计模式实现(实现类似于JDK提供的Future)00:19:20分钟 | 第44节Future源码解读00:29:22分钟 | 第45节Fork/Join框架详解00:28:09分钟 | 第46节...

    JAVA线程总结,包含线程池,显示使用线程实现异步编程,基于JDK中的Future实现异步编程,JDK中的FutureTask等

    JAVA线程总结,包含线程池,显示使用线程实现异步编程,基于JDK中的Future实现异步编程,JDK中的FutureTask等

    Java并发编程与高并发解决方案

    Java并发编程与高并发解决方案:线程池,消息队列,服务拆分,限流,降级,熔断思路,数据库分表

    Java线程池FutureTask实现原理详解

    主要介绍了Java线程池FutureTask实现原理详解,小编觉得还是挺不错的,具有一定借鉴价值,需要的朋友可以参考下

    Java并发编程(学习笔记).xmind

    Java并发编程 背景介绍 并发历史 必要性 进程 资源分配的最小单位 线程 CPU调度的最小单位 线程的优势 (1)如果设计正确,多线程程序可以通过提高处理器资源的利用率来提升系统吞吐率 ...

    【2018最新最详细】并发多线程教程

    1.并发编程的优缺点 2.线程的状态转换以及基本操作 3.java内存模型以及happens-before规则 4.彻底理解synchronized 5.彻底理解volatile 6.你以为你真的了解final吗? 7.三大性质总结:原子性、可见性以及有序性 8....

    2万字Java并发编程面试题合集(含答案,建议收藏)

    2万字Java并发编程面试题合集(含答案,建议收藏) 具体如下 1、在 java 中守护线程和本地线程区别?2、线程与进程的区别?3、什么是多线程中的上下文切换?4、死锁与活锁的区别,死锁与饥饿的区别?5、Java 中用到...

    futuretask用法及使用场景介绍

    主要介绍了futuretask用法及使用场景介绍,小编觉得挺不错的,这里分享给大家,供大家参考。

Global site tag (gtag.js) - Google Analytics