欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

并发系列(6)之 ThreadPoolExecutor 详解

程序员文章站 2022-03-30 22:17:18
本文将主要介绍我们平时最常用的线程池 ,有可能你平时没有直接使用这个类,而是使用 的工厂方法创建线程池,虽然这样很简单,但是很可能因为这个线程池发生 OOM ,具体情况文中会详细介绍; 二、ThreadPoolExecutor 概览 的继承关系如图所示: 其中: Executor :定义了 异步接口 ......

本文将主要介绍我们平时最常用的线程池 threadpoolexecutor ,有可能你平时没有直接使用这个类,而是使用 executors 的工厂方法创建线程池,虽然这样很简单,但是很可能因为这个线程池发生 oom ,具体情况文中会详细介绍;

二、threadpoolexecutor 概览

threadpoolexecutor 的继承关系如图所示:

并发系列(6)之 ThreadPoolExecutor  详解

其中:

  • executor:定义了 executor(runnable command) 异步接口,但是没有强制要求异步;
  • executorservice:提供了生命周期管理的方法,以及有返回值的任务提交;
  • abstractexecutorservice:提供了 executorservice 的默认实现;


1. 主体结构

public class threadpoolexecutor extends abstractexecutorservice {
  private final atomicinteger ctl = new atomicinteger(ctlof(running, 0));  // 状态控制变量,核心
  private final blockingqueue<runnable> workqueue;                         // 任务等待队列
  private final hashset<worker> workers = new hashset<worker>();           // 工作线程集合
  private volatile threadfactory threadfactory;       // 线程构造工厂
  private volatile rejectedexecutionhandler handler;  // 拒绝策略
  private volatile long keepalivetime;                // 空闲线程的存活时间(非核心线程)
  private volatile int corepoolsize;                  // 核心线程大小
  private volatile int maximumpoolsize;               // 工作线程最大容量

  public threadpoolexecutor(int corepoolsize, int maximumpoolsize, long keepalivetime, timeunit unit,
                            blockingqueue<runnable> workqueue, threadfactory threadfactory,
                            rejectedexecutionhandler handler) {
    if (corepoolsize < 0 || maximumpoolsize <= 0 ||
        maximumpoolsize < corepoolsize || keepalivetime < 0)
        throw new illegalargumentexception();
    if (workqueue == null || threadfactory == null || handler == null)
        throw new nullpointerexception();
    this.acc = system.getsecuritymanager() == null ? null : accesscontroller.getcontext();
    this.corepoolsize = corepoolsize;
    this.maximumpoolsize = maximumpoolsize;
    this.workqueue = workqueue;
    this.keepalivetime = unit.tonanos(keepalivetime);
    this.threadfactory = threadfactory;
    this.handler = handler;
  }
  ...
}

这里已经可以大致看出 threadpoolexecutor 的结构了:

并发系列(6)之 ThreadPoolExecutor  详解


2. worker 结构

private final class worker extends abstractqueuedsynchronizer implements runnable {
  final thread thread;  // 持有线程,只有在线程工厂运行失败时为空
  runnable firsttask;   // 初始化任务,不为空的时候,任务直接运行,不在添加到队列
  volatile long completedtasks;  // 完成任务计数

  worker(runnable firsttask) {
    setstate(-1);   // aqs 初始化状态
    this.firsttask = firsttask;
    this.thread = getthreadfactory().newthread(this);
  }

  public void run() {
    runworker(this);  // 循环取任务执行
  }
  ...
  // aqs 锁方法
}

这里很容易理解的是 threadfirsttask;但是 worker 还继承了 aqs 做了一个简易的互斥锁,主要是在中断或者 worker 状态改变的时候使用;具体 aqs 的详细说明可以参考,abstractqueuedsynchronizer 源码分析


3. ctl 控制变量

ctl 控制变量(简记 c)是一个 atomicinteger 类型的变量,由两部分信息组合而成(两个值互补影响,又可以通过简单的大小比较判断状态):

  • 线程池的运行状态 (runstate,简记 rs),由 int 高位的前三位表示;
  • 线程池内有效线程的数量 (workercount,简记 wc),由 int 地位的29位表示;

源码如下:

private final atomicinteger ctl = new atomicinteger(ctlof(running, 0));
private static final int count_bits = integer.size - 3;       // 用来表示线程数量的位数
private static final int capacity   = (1 << count_bits) - 1;  // 线程最大容量

                                                         // 状态量
private static final int running    = -1 << count_bits;  // 高位 111,第一位是符号位,1表示负数
private static final int shutdown   =  0 << count_bits;  // 高位 000
private static final int stop       =  1 << count_bits;  // 高位 001
private static final int tidying    =  2 << count_bits;  // 高位 010
private static final int terminated =  3 << count_bits;  // 高位 011

private static int runstateof(int c)     { return c & ~capacity; }  // 运行状态,取前3位
private static int workercountof(int c)  { return c & capacity; }   // 线程数量,取后29位
private static int ctlof(int rs, int wc) { return rs | wc; }        // 状态和数量合成

private static boolean runstatelessthan(int c, int s) { return c < s; } // 状态比较
private static boolean runstateatleast(int c, int s) { return c >= s; } 
private static boolean isrunning(int c) { return c < shutdown; } // running 是负数,必然小于 shutdown

代码中可以看到状态判断的时候都是直接比较的,这是因为 terminated > tidying > stop > shutdown > running ;他们的状态变迁关系如下:

并发系列(6)之 ThreadPoolExecutor  详解

其中:

  • running:运行状态,可接收新任务;
  • shutdown:不可接收新任务,继续处理已提交的任务;
  • stop:不接收、不处理任务,中断正在进行的任务
  • tidying:所有任务清空,线程停止;
  • terminated:钩子方法,执行后的最终状态;


三、threadpoolexecutor 源码分析

1. 增加工作线程

    private boolean addworker(runnable firsttask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runstateof(c);

            // 这里正常情况下,只要大于shutdown,则必然不能添加线程;但是这里做了一个优化,
            // 如果线程池还在继续处理任务,则可以添加线程加速处理,
            // shutdown 表示不接收新任务,但是还在继续处理,
            // firsttask 不为空时,是在添加线程的时候,firsttask 不入队,直接处理
            // workqueue 不为空时,则还有任务需要处理
            // 所以连起来就是 rs == shutdown && firsttask == null && ! workqueue.isempty()
            if (rs >= shutdown &&
                ! (rs == shutdown && firsttask == null && ! workqueue.isempty()))
                return false;

            for (;;) {
                int wc = workercountof(c);
                if (wc >= capacity ||  // 容量超出,则返回
                    wc >= (core ? corepoolsize : maximumpoolsize))
                    return false;
                if (compareandincrementworkercount(c))
                    break retry;  // 线程数增加成功,则跳出循环
                c = ctl.get();  // re-read ctl
                if (runstateof(c) != rs)  // 如果线程状态改变时,重头开始重试
                    continue retry;
            }
        }
        // 此时线程计数,增加成功
      
        boolean workerstarted = false;
        boolean workeradded = false;
        worker w = null;
        try {
            w = new worker(firsttask);
            final thread t = w.thread;
            if (t != null) {  // 线程创建失败时,直接退出
                final reentrantlock mainlock = this.mainlock;
                mainlock.lock();
                try {
                    int rs = runstateof(ctl.get());
                    if (rs < shutdown ||
                        (rs == shutdown && firsttask == null)) { // 这里同样检查上面的优化条件
                        if (t.isalive()) // 如果线程已经启动,则状态错误;
                            throw new illegalthreadstateexception();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestpoolsize) largestpoolsize = s;  // 记录工作线程的最大数,统计峰值用
                        workeradded = true;
                    }
                } finally {
                    mainlock.unlock();
                }
                if (workeradded) {
                    t.start();  // 启动线程
                    workerstarted = true;
                }
            }
        } finally {
            if (! workerstarted) addworkerfailed(w); // 添加失败清除
        }
        return workerstarted;
    }

2. 提交任务

public void execute(runnable command) {
  if (command == null) throw new nullpointerexception();
  int c = ctl.get();
  if (workercountof(c) < corepoolsize) {   // 如果小于核心线程,直接添加
    if (addworker(command, true)) return;
    c = ctl.get();
  }
  if (isrunning(c) && workqueue.offer(command)) {  // 任务入队
    int recheck = ctl.get();
    if (!isrunning(recheck) && remove(command))  // 再次检查,状态不是running的时候,拒绝并移除任务
      reject(command);
    else if (workercountof(recheck) == 0)  // 这里是防止状态为shutdown时,已经添加的任务无法执行
      addworker(null, false);
  }
  else if (!addworker(command, false))  // 任务入队失败时,直接添加线程,并运行
    reject(command);
}

流程图如下:

并发系列(6)之 ThreadPoolExecutor  详解

所以影响任务提交的因数就有:

  • 核心线程的大小;
  • 是否为阻塞队列;
  • 线程池的大小;


3. 处理任务

工作线程启动之后,首先处理 firsttask 任务(特别注意,这个任务是没有入队的),然后从 workqueue 中取出任务处理,队列为空时,超时等待 keepalivetime

final void runworker(worker w) {
  thread wt = thread.currentthread();
  runnable task = w.firsttask;
  w.firsttask = null;
  w.unlock(); // allow interrupts
  boolean completedabruptly = true;
  try {
    while (task != null || (task = gettask()) != null) {  // 获取任务
      w.lock();
      // 总体条件表示线程池停止的时候,需要中断线程,
      // 如果没有停止,则清除中断状态,确保未中断
      if ((runstateatleast(ctl.get(), stop) ||
         (thread.interrupted() && runstateatleast(ctl.get(), stop))) &&
        !wt.isinterrupted()) 
        wt.interrupt();
      try {
        beforeexecute(wt, task); // 回调方法
        throwable thrown = null;
        try {
          task.run();
        } catch (runtimeexception x) {
          thrown = x; throw x;
        } catch (error x) {
          thrown = x; throw x;
        } catch (throwable x) {
          thrown = x; throw new error(x);
        } finally {
          afterexecute(task, thrown);  // 回调方法
        }
      } finally {
        task = null;
        w.completedtasks++;
        w.unlock();
      }
    }
    completedabruptly = false;
  } finally {
    processworkerexit(w, completedabruptly);  // 退出时清理
  }
}
private runnable gettask() {
  boolean timedout = false; // did the last poll() time out?

  for (;;) {
    int c = ctl.get();
    int rs = runstateof(c);

    // 此处保证 shutdown 状态继续处理任务,stop 状态停止处理
    if (rs >= shutdown && (rs >= stop || workqueue.isempty())) {
      decrementworkercount();
      return null;
    }
    int wc = workercountof(c);
    boolean timed = allowcorethreadtimeout || wc > corepoolsize;  // 是否关闭空闲线程

    if ((wc > maximumpoolsize || (timed && timedout))  // 如果线程大于最大容量,或者允许关闭,且第一次没取到
      && (wc > 1 || workqueue.isempty())) {            // 返回空,最后由 processworkerexit 清理
      if (compareanddecrementworkercount(c))
        return null;
      continue;
    }

    try {
      // 是否超时获取
      runnable r = timed ? workqueue.poll(keepalivetime, timeunit.nanoseconds) : workqueue.take();
      if (r != null)
        return r;
      timedout = true;
    } catch (interruptedexception retry) {
      timedout = false;
    }
  }
}

4. 停止线程池

public void shutdown() {
  final reentrantlock mainlock = this.mainlock;
  mainlock.lock();
  try {
    checkshutdownaccess();     // 检查停止权限
    advancerunstate(shutdown); // 设置线程池状态
    interruptidleworkers();    // 设置所有线程中断
    onshutdown();              // hook for scheduledthreadpoolexecutor
  } finally {
    mainlock.unlock();
  }
  tryterminate();              // 继续执行等待队列中的任务,完毕后设置 terminated 状态
}
public list<runnable> shutdownnow() {
  list<runnable> tasks;
  final reentrantlock mainlock = this.mainlock;
  mainlock.lock();
  try {
    checkshutdownaccess();
    advancerunstate(stop);
    interruptworkers();
    tasks = drainqueue();   // 清空所有等待队列的任务,并返回
  } finally {
    mainlock.unlock();
  }
  tryterminate();
  return tasks;
}

可以看到 shutdownnow 只比 shutdown 多了,清空等待队列,但是正在执行的任务还是会继续执行;

四、拒绝策略

之前提到了,提交任务失败的时候,会执行拒绝操作,在 jdk 中为我们提供了四种策略:

  • abortpolicy:直接抛出 rejectedexecutionexception 异常,这是默认的拒绝策略;
  • callerrunspolicy:由调用线程本身运行任务,以减缓提交速度;
  • discardpolicy:不处理,直接丢弃掉;
  • discardoldestpolicy:丢弃最老的任务,并执行当前任务;

五、executors 工厂方法

另外就是根据线程池参数的不同,executors 为我们提供了4种典型的用法:

singlethreadexecutor:单线程的线程池,提交任务顺序执行;

public static executorservice newsinglethreadexecutor() {
  return new finalizabledelegatedexecutorservice
    (new threadpoolexecutor(1, 1, 0l, timeunit.milliseconds, new linkedblockingqueue<runnable>()));
}

如代码所示,就是最大线程、核心线程都是1,和*队列组成的线程池,提交任务的时候就会,直接将任务加入队列顺序执行;

fixedthreadpool:固定线程数量线程池:

public static executorservice newfixedthreadpool(int nthreads) {
  return new threadpoolexecutor(nthreads, nthreads, 0l, timeunit.milliseconds, 
                                new linkedblockingqueue<runnable>());
}

singlethreadexecutor 一样,只是线程数量由用户决定;

cachedthreadpool:动态调节线程池;

public static executorservice newcachedthreadpool() {
  return new threadpoolexecutor(0, integer.max_value, 60l, timeunit.seconds, 
                                new synchronousqueue<runnable>());
}

这里核心线程为0,队列是 synchronousqueue 容量为1的阻塞队列,而线程数最大,存活60s,所以有任务的时候直接创建新的线程,超时空闲60s;

scheduledthreadpool:定时任务线程池,功能同 timer 类似,具体细节后续还会讲到;

总结

  • 决定线程池运行逻辑的主要有三个变量,核心线程大小,队列容量,线程池容量
  • 最后发现其实 executors 提供的几种实现,都很典型;但是却容易发生 oom ,所以最好还是自己手动创建比较好;