Java FutureTask 源码解析

Java 面试 juc About 6,354 words

说明

本文基于Java8

构造方法

传入Callable对象,使用成员变量接收,并将状态改为NEW

private Callable<V> callable;

private volatile int state;

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // 确保让 callable 可见
}

state 状态

state可能的几种过渡状态:

  • NEW -> COMPLETING -> NORMAL
  • NEW -> COMPLETING -> EXCEPTIONAL
  • NEW -> CANCELLED
  • NEW -> INTERRUPTING -> INTERRUPTED
/**
 * Possible state transitions:
 * NEW -> COMPLETING -> NORMAL
 * NEW -> COMPLETING -> EXCEPTIONAL
 * NEW -> CANCELLED
 * NEW -> INTERRUPTING -> INTERRUPTED
 */
private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

run 方法

如果state不为NEW,或者CAS赋值runner不成功就直接return

调用callablecall()方法,正常运行则result赋值为call()方法的返回值、标识位ran置为true、用CAS先将stateNEW设置为COMPLETING,再将result赋值给outcomeoutcome成员变量是get()方法调用时返回的结果),

若出现异常则将result置为null、标识位ran置为false、用CAS先将stateNEW设置为COMPLETING,再将异常结果赋值给outcome,最后将state设置为EXCEPTIONAL

finishCompletion()方法再解析完get()方法后再做分析。

// 运行 callable 的线程;在 run() 方法调用时使用 CAS 赋值
private volatile Thread runner;

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

// get() 方法调用时返回的结果
private Object outcome;// 由 state 维护

protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}

private static final long runnerOffset;

// 省略了部分代码
runnerOffset = UNSAFE.objectFieldOffset
            (k.getDeclaredField("runner"));

get() 方法

如果stateNEW或者COMPLETING,则执行awaitDone()方法并接收返回值state。解除等待后执行report()方法。

awaitDone() 方法

无限循环中有如下步骤:

  1. 判断线程是否被中断,中断则移出等待节点并抛出异常。
  2. 如果当前state大于COMPLETING(即:state可能已经完成,异常,取消,中断),将等待节点中的thread引用置空,进行下一次循环。
  3. 如果当前state等于COMPLETING(正在完成中),将线程从运行状态变为就绪状态,进行下一次循环。
  4. 如果等待节点为空,new一个WaitNode,进行下一次循环。
  5. 如果queued没有排队的,将第4步中new的等待节点,从头步插入waiters等待节点链表中,插入成功将queued置为true,表示有排队的节点了,将进行下一次循环。
  6. 判断是否设置了超时等待,进行下一次循环。
  7. 都没有命中if条件,则将当前线程阻塞。

awaitDone()方法正常结束无限循环的条件只能是,unpark()取消阻塞后state大于COMPLETING(正常完成中),即awaitDone()方法正常返回的state只能是:NORMALEXCEPTIONALCANCELLED

report() 方法

如果state等于NORMAL,则将outcome成员变量的结果返回回去,如果state大于等于CANCELLED,则抛出CancellationException取消异常,其他state抛出ExecutionException执行异常。

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
}

private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        else if (q == null)
            q = new WaitNode();
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else
            LockSupport.park(this);
    }
}

private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

finishCompletion() 方法

将等待节点waiters中的元素遍历删除,并且将waiterspark()的线程都unpark()释放。done()方法为空实现,不做任何事情。将callable置空。

private volatile WaitNode waiters;

private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }

    done();

    callable = null;        // to reduce footprint
}

protected void done() { }

private static final long waitersOffset;

// 省略了部分代码
waitersOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("waiters"));
Views: 2,683 · Posted: 2021-03-26

————        END        ————

Give me a Star, Thanks:)

https://github.com/fendoudebb/LiteNote

扫描下方二维码关注公众号和小程序↓↓↓

扫描下方二维码关注公众号和小程序↓↓↓


Today On History
Browsing Refresh