欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

并发系列(4)之 Future 框架详解

程序员文章站 2023-10-24 18:36:42
本文将主要讲解 J.U.C 中的 Future 框架,并分析结合源码分析其内部结构逻辑; 一、Future 框架概述 JDK 中的 Future 框架实际就是 Future 模式的实现,通常情况下我们会配合线程池使用,但也可以单独使用;下面我们就单独使用简单举例; 1. 应用实例 打印: 如上面代码 ......

本文将主要讲解 j.u.c 中的 future 框架,并分析结合源码分析其内部结构逻辑;

一、future 框架概述

jdk 中的 future 框架实际就是 future 模式的实现,通常情况下我们会配合线程池使用,但也可以单独使用;下面我们就单独使用简单举例;

1. 应用实例

futuretask<string> future = new futuretask<>(() -> {
  log.info("异步任务执行...");
  thread.sleep(2000);
  log.info("过了很久很久...");
  return "异步任务完成";
});

log.info("启动异步任务...");
new thread(future).start();

log.info("继续其他任务...");
thread.sleep(1000);

log.info("获取异步任务结果:{}", future.get());

打印:

[15:38:03,231 info ] [main]     - 启动异步任务...
[15:38:03,231 info ] [main]     - 继续其他任务...
[15:38:03,231 info ] [thread-0] - 异步任务执行...
[15:38:05,232 info ] [thread-0] - 过了很久很久...
[15:38:05,236 info ] [main]     - 获取异步任务结果:异步任务完成

如上面代码所示,首先我们将要执行的任务包装成 callable,这里如果不需要返回值也可以使用 runnable;然后构建 futuretask 由一个线程启动,最后使用 future.get() 获取异步任务结果;

2. future 运行逻辑

对于 future 模式的流程图如下:

并发系列(4)之 Future 框架详解

对比上面的实例代码,大家可能会发现有些不一样,因为在 futuretask 同时继承了 runnable 和 future 接口,所以再提交任务后没有返回future,而是直接使用自身调用 get;下面我们就对源码进行实际分析;


二、源码分析

1. futuretask 主体结构

public interface runnablefuture<v> extends runnable, future<v> {}

public class futuretask<v> implements runnablefuture<v> {
  private volatile int state;         // 任务运行状态
  private callable<v> callable;       // 异步任务
  private object outcome;             // 返回结果
  private volatile thread runner;     // 异步任务执行线程
  private volatile waitnode waiters;  // 等待异步结果的线程栈(通过treiber stack算法实现)
  
  public futuretask(callable<v> callable) {  // 需要返回值
    if (callable == null)
      throw new nullpointerexception();
    this.callable = callable;
    this.state = new;     // ensure visibility of callable
  }
  
  public futuretask(runnable runnable, v result) {
    this.callable = executors.callable(runnable, result);
    this.state = new;     // ensure visibility of callable
  }
  ...
}

另外在代码中还可以看见有很多地方都是用了 cas 来更新变量,而 jdk1.6 中甚至使用了 aqs 来实现;其原因就是同一个 futuretask 可以多个线程同时提交,也可以多个线程同时获取; 所以代码中有很多的状态变量:

// futuretask.state 取值
private static final int new          = 0;  // 初始化到结果返回前
private static final int completing   = 1;  // 结果赋值
private static final int normal       = 2;  // 执行完毕
private static final int exceptional  = 3;  // 执行异常
private static final int cancelled    = 4;  // 任务取消
private static final int interrupting = 5;  // 设置中断状态
private static final int interrupted  = 6;  // 任务中断

同时源码的注释中也详细给出了可能出现的状态转换:

  • new -> completing -> normal // 任务正常执行
  • new -> completing -> exception // 任务执行异常
  • new ->cancelled // 任务取消
  • new -> initerrupting -> interrupted // 任务中断

注意这里的 completing 状态是一个很微妙的状态,正因为有他的存在才能实现无锁赋值;大家先留意这个状态,然后在代码中应该能体会到;另外这里还有一个变量需要注意,waitnode ;使用 treiber stack 算法实现的无锁栈;其原理说明可以参考下面第三节;


2. 任务执行

public void run() {
  if (state != new ||  // 确保任务执行完成后,不再重复执行
    !unsafe.compareandswapobject(this, runneroffset, 
                                 null, thread.currentthread()))  // 确保只有一个线程执行
    return;
  try {
    callable<v> c = callable;
    if (c != null && state == new) {
      v result;
      boolean ran;
      try {
        result = c.call();
        ran = true;
      } catch (throwable ex) {
        result = null;
        ran = false;
        setexception(ex);    // 设置异常结果
      }
      if (ran) set(result);  // 设置结果
    }
  } finally {
    runner = null;
    int s = state;
    if (s >= interrupting) handlepossiblecancellationinterrupt(s);  // 确保中断状态已经设置
  }
}
// 设置异步任务结果
protected void set(v v) {
  if (unsafe.compareandswapint(this, stateoffset, new, completing)) {  // 保证结果只能设置一次
    outcome = v;
    unsafe.putorderedint(this, stateoffset, normal); // final state
    finishcompletion(); // 唤醒等待线程
  }
}
protected void setexception(throwable t) {
  if (unsafe.compareandswapint(this, stateoffset, new, completing)) {  // 保证结果只能设置一次
    outcome = t;
    unsafe.putorderedint(this, stateoffset, exceptional); // final state
    finishcompletion();
  }
}


3. 任务取消

public boolean cancel(boolean mayinterruptifrunning) {
  if (!(state == new &&  // 只有在任务执行阶段才能取消
      unsafe.compareandswapint(this, stateoffset, new,  // 设置取消状态
        mayinterruptifrunning ? interrupting : cancelled)))
    return false;
  try {  // in case call to interrupt throws exception
    if (mayinterruptifrunning) {
      try {
        thread t = runner;
        if (t != null)
          t.interrupt();
      } finally { // final state
        unsafe.putorderedint(this, stateoffset, interrupted);
      }
    }
  } finally {
    finishcompletion();
  }
  return true;
}

注意 cancel(false) 也就是仅取消,并没有打断;异步任务会继续执行,只是这里首先设置了 futuretask.state = cancelled ,所以最后在设置结果的时候会失败,unsafe.compareandswapint(this, stateoffset, new, completing)


4. 获取结果

public v get() throws interruptedexception, executionexception {
  int s = state;
  if (s <= completing)
    s = awaitdone(false, 0l);  // 阻塞等待
  return report(s);
}

private v report(int s) throws executionexception {  // 根据最后的状态返回结果
  object x = outcome;
  if (s == normal) return (v)x;
  if (s >= cancelled) throw new cancellationexception();
  throw new executionexception((throwable)x);
}
private int awaitdone(boolean timed, long nanos)
  throws interruptedexception {
  final long deadline = timed ? system.nanotime() + nanos : 0l;
  waitnode q = null;
  boolean queued = false;
  for (;;) {
    if (thread.interrupted()) {
      removewaiter(q);   // 移除等待节点
      throw new interruptedexception();
    }

    int s = state;
    if (s > completing) {  // 任务已完成
      if (q != null)
        q.thread = null;
      return s;
    }
    else if (s == completing) // 正在赋值,直接先出让线程
      thread.yield();
    else if (q == null)       // 任务还未完成需要等待
      q = new waitnode();
    else if (!queued)
      queued = unsafe.compareandswapobject(this, waitersoffset,
                         q.next = waiters, q);   // 使用 treiber stack 算法
    else if (timed) {
      nanos = deadline - system.nanotime();
      if (nanos <= 0l) {
        removewaiter(q);
        return state;
      }
      locksupport.parknanos(this, nanos);
    }
    else
      locksupport.park(this);
  }
}


三、treiber stack

在《java 并发编程实战》中讲了, 创建非阻塞算法的关键在于,找出如何将原子修改的范围缩小到单个变量上,同时还要维护数据的一致性 。

@threadsafe public class concurrentstack <e> {
  atomicreference<node<e>> top = new atomicreference<>();
  
  private static class node <e> {
    public final e item;
    public node<e> next;

    public node(e item) {
      this.item = item;
    }
  }

  public void push(e item) {
    node<e> newhead = new node<>(item);
    node<e> oldhead;
    do {
      oldhead = top.get();
      newhead.next = oldhead;
    } while (!top.compareandset(oldhead, newhead));
  }

  public e pop() {
    node<e> oldhead;
    node<e> newhead;
    do {
      oldhead = top.get();
      if (oldhead == null)
        return null;
      newhead = oldhead.next;
    } while (!top.compareandset(oldhead, newhead));
    return oldhead.item;
  }
}


总结

  • 总体来讲源码比较简单,因为其本身只是一个 future 模式的实现
  • 但是其中的状态量的设置,还有里面很多无锁的处理方式,才是 futuretask 带给我们的精华!