欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

JUC—CountDownLatch闭锁源码深度解析

程序员文章站 2022-12-20 13:16:20
基于JDK1.8详细介绍了CountDownLatch闭锁的原理和应用,以及CountDownLatch对于AQS框架的巧妙使用!...

基于JDK1.8详细介绍了CountDownLatch闭锁的原理和应用,以及CountDownLatch对于AQS框架的巧妙使用!



1 CountDownLatch的概述

public class CountDownLatch
extends Object

CountDownLatch来自于JDK1.5的JUC包,是一种同步工具,常被称为“闭锁”,也叫做“倒计数器”。在完成一组正在其他线程中执行的操作之前,CountDownLatch允许一个或多个线程一直等待。
很明显,这类似于在开始某个行为之前的准备操作。比如有一个任务A,它要等待其他4个任务执行完毕之后才能执行,此时就可以利用CountDownLatch来实现这种功能了。或者说CountDownLatch是一个同步辅助类,允许一个或多个线程等待其他线程完成操作。
想要真正明白CountDownLatch的原理,必然离不开AQS!

2 CountDownLatch的原理

2.1 基本结构

JUC—CountDownLatch闭锁源码深度解析
通过uml类图可知,CountDownLatch内部同样使用的AQS来实现它的功能的,我们可以大胆猜测,CountDownLatch的这个“倒计数”操作和AQS的state同步状态属性有关。内部类Sync实现了AbstractQueuedSynchronizer,这就类似于锁了。
CountDownLatch的构造函数接收一个int类型的count参数作为计数器,如果你想等待N个计数操作,这里就传入N。通过构造函数,实际上是把count的值赋给了AQS 的同步状态属性state ,也就是这里使用AQS 的state状态值来表示计数器值。
CountDownLatch的Sync实现中,重写了tryAcquireShared和tryReleaseShared方法,很明显是一个共享锁的实现。
一般情况下我们在释放锁的时候会将state资源增加,获得锁的时候会将state资源减少,但是CountDownLatch则不一样:

  1. 在尝试获取锁的tryAcquireShared方法中,仅仅是判断如果state为0,就表示获得了锁,其他情况下都表示没有获得锁。
  2. 而在尝试释放锁的tryReleaseShared方法中,虽然名曰释放锁,但是却是在对state尝试自减操作,它的内部是一个循环操作,每一次的调用tryReleaseShared都会首先判断state是否为0,如果是,那么返回false表示“释放锁失败”,如果不是那么尝试CAS的将state自减1,CAS成功之后会判断此时的值是否为0,如果不是那么表示“释放锁失败”,返回false,否则表示“释放锁成功”,返回true,这里的操作可以永远保证只有一个线程能够因为“释放锁成功”而返回true。
/**
 * 倒计数的同步控制,使用AQS的state表示倒计数,和一般的“锁”实现不一样
 */ private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; /**
     * 构造器
     *
     * @param count 计数器
     */ Sync(int count) { //设置state初始值 setState(count); } /**
     * @return 获取计数器值,实际上就是获取state值
     */ int getCount() { return getState(); } /**
     * 尝试共享式获取锁
     * 实际上仅仅是一个判断操作,只有state=0的时候才会返回1
     *
     * @param acquires 参数,在实现的时候可以传递自己想要的数据,这里没什么用
     * @return 返回大于等于0的值表示获取成功,否则失败。
     */ protected int tryAcquireShared(int acquires) { //判断state是否等于0,如果是那么返回1,否则返回-1 return (getState() == 0) ? 1 : -1; } /**
     * 尝试共享式释放锁
     * 实际上仅仅是一个判断-自减操作,只有state=0的时候才会返回true
     *
     * @param releases 参数,在实现的时候可以传递自己想要的数据,这里没什么用
     * @return 返回true表示释放成功,否则失败。
     */ protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero /*
         * 开启一个循环,实际上是state的自减以及是否唤醒等待线程的操作
         */ for (; ; ) { //获取state的值c int c = getState(); //如果c为0,那么返回false,表示释放失败 if (c == 0) return false; //否则c大于0,尝试CAS更新state为state-1,更新失败直接重试 int nextc = c - 1; if (compareAndSetState(c, nextc)) //CAS成功之后再次判断nextc是否为0 //如果为0,说明是最后一个CAS成功的线程,返回true;如果不为0,说明不是最后一个CAS成功的线程,返回false //这样可以通过CAS控制永远只有一条线程能够返回true,随后唤醒因调用CountDownLatch 的await 方法而被阻塞的线程 return nextc == 0; } } } private final Sync sync; /**
 * 构造一个用给定计数初始化的 CountDownLatch。
 *
 * @param count 在线程能通过 await() 之前,必须调用 countDown() 的次数
 * @throws IllegalArgumentException 如果 count 为负数
 */ public CountDownLatch(int count) { //count校验 if (count < 0) throw new IllegalArgumentException("count < 0"); //初始化Sync,使用传的参数 this.sync = new Sync(count); } 

这样的“反常规”操作有什么用呢?别急,看看CountDownLatch相关方法就知道了!

2.2 await()方法

public void await()

需要等待的线程调用。调用该方法后,当前线程会被阻塞,直到下面的情况之一发生才会返回:

  1. 当计数器的值为0 时;
  2. 其他线程调用了当前线程的interrupt()方法中断了当前线程,当前线程就会抛出InterruptedException 异常,然后返回。

根据源码,想要调用await方法的线程能够返回,一般情况下需要获取到共享锁,而CountDownLatch内部的tryAcquireShared返回大于0的要求是state为0,即只有在state为0的时候,调用await方法的线程才能能够返回。

/**
 * CountDownLatch 的await方法
 *
 * @throws InterruptedException 等待时被中断
 */ public void await() throws InterruptedException { //调用了AQS 的acquireSharedInterruptibly方法,共享式可中断获取锁 sync.acquireSharedInterruptibly(1); } /**
 * AQS 的acquireSharedInterruptibly方法
 * 共享式获取同步状态,可以被中断,在AQS部分我们已经讲过了
 *
 * @param arg 参数,在实现的时候可以传递自己想要的数据,这里没什么用
 * @throws InterruptedException 等待时被中断
 */ public final void acquireSharedInterruptibly(int arg) throws InterruptedException { //如果线程被中断则抛出异常 if (Thread.interrupted()) throw new InterruptedException(); //tryAcquireShared方法由AQS的子类实现,尝试共享式获取锁,如果返回值小于0,表示获取失败 if (tryAcquireShared(arg) < 0) //获取锁失败的线程进入AQS的队列等待,在被唤醒之后还是会继续调用tryAcquireShared获取锁,直到获得锁成功 doAcquireSharedInterruptibly(arg); } 

2.3 await(timeout, unit)方法

public boolean await(long timeout, TimeUnit unit)

需要等待的线程调用。调用该方法后,当前线程会被阻塞,直到下面的情况之一发生才会返回:

  1. 当计数器值为0 时,这时候会返回true ;
  2. 设置的timeout 时间到了,因为超时而返回false ;
  3. 其他线程调用了当前线程的interrupt()方法中断了当前线程,当前线程就会抛出InterruptedException 异常,然后返回。
    可以发现,和await方法一样,即只有在state为0的时候,调用await方法的线程才能能够正常返回true,同时加入了超时操作,一段时间范围内state还不为0,则失败返回false。
/**
 * CountDownLatch 的await( timeout, unit)方法
 * 超时等待
 *
 * @param timeout 等待时间
 * @param unit    时间单位
 * @return true 成功 false 失败
 * @throws InterruptedException 被中断
 */ public boolean await(long timeout, TimeUnit unit) throws InterruptedException { //调用了AQS 的tryAcquireSharedNanos方法,共享式超时可中断获取锁 return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } /**
 * AQS 的tryAcquireSharedNanos方法
 * 共享式超时获取锁,可以被中断,在AQS部分我们已经讲过了
 *
 * @param arg          参数
 * @param nanosTimeout 超时时间,纳秒
 * @return 是否获取锁成功
 * @throws InterruptedException 被中断
 */ public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { //最开始就检查一次,如果当前线程是被中断状态,直接抛出异常 if (Thread.interrupted()) throw new InterruptedException(); //下面是一个||运算进行短路连接的代码 //tryAcquireShared尝试获取锁,获取到了(返回大于等于0)直接返回true //获取不到(左边表达式为false) 就执行doAcquireSharedNanos方法 //doAcquireSharedNanos等待一段时间,直到途中计数器变成了0就返回,或者时间到了自动返回,或者等待时被中断 return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); } 

2.4 countDown()方法

public void countDown()

需要准备线程调用。如果当前计数(也就是state)等于0,则什么也不做;如果当前计数大于0,则尝试CAS将计数器递减1,递减成功如果新的计数为零,出于线程调度目的,将唤醒所有的因为调用await而等待的线程。
这个countDown方法不会阻塞调用该方法的线程!

/**
 * CountDownLatch的countDown方法
 */ public void countDown() { //调用了AQS 的releaseShared方法,共享式释放锁 sync.releaseShared(1); } /**
 * AQS 的releaseShared方法,共享式释放锁
 *
 * @param arg 参数
 * @return true 成功 false 失败
 */ public final boolean releaseShared(int arg) { //tryReleaseShared方法由AQS的子类实现,尝试共享式获取锁,如果返回值小于0,表示获取失败 //在CountDownLatch的Sync子类实现中,如果state自减之后为0,则返回true if (tryReleaseShared(arg)) { /*state自减之后为0,调用doReleaseShared唤醒因调用CountDownLatch 的await 方法而被阻塞的线程*/ doReleaseShared(); return true; } //state为0或者state自减之后不为0,那么不调用AQS 的doReleaseShared方法,不会唤醒因调用CountDownLatch 的await 方法而被阻塞的线程 return false; } 

2.5 getCount()方法

public long getCount()

获取当前计数器的值,也就是AQS 的state 的值。

/**
 * CountDownLatch的方法
 * @return
 */ public long getCount() { return sync.getCount(); } 

3 CountDownLatch的使用

在JDK1.5之前,为了完成CountDownLatch的功能,我们通常使用thread.join方法方法
在JDK 1.5之后的并发包中提供的CountDownLatch也可以实现join的功能,并且比join的功能更多。并且配合线程池实现更加优雅的编码,传统join方法无法在线程池中使用join。
案例:要求等两个子线程执行完毕之后,主线程才能开始执行。

class CountDownLatchTest { /**
     * 传统join实现线程等待
     */ static class JoinRun { public static void main(String[] args) throws InterruptedException { Runnable run = () -> { try { Thread.sleep(100); System.out.println("子线程" + Thread.currentThread().getName() + "正在执行"); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("子线程" + Thread.currentThread().getName() + "执行完毕"); }; Thread parser1 = new Thread(run, " 1 "); Thread parser2 = new Thread(run, " 2 "); parser1.start(); parser2.start(); System.out.println("主线程等待2个子线程执行完毕"); parser1.join(); parser2.join(); System.out.println("2个子线程已经执行完毕"); System.out.println("继续执行主线程"); } } /**
     * CountDownLatch实现线程等待
     */ static class CountDownLatchRun1 { public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(2); Runnable run = () -> { try { Thread.sleep(100); System.out.println("子线程" + Thread.currentThread().getName() + "正在执行"); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("子线程" + Thread.currentThread().getName() + "执行完毕"); latch.countDown(); }; Thread parser1 = new Thread(run, " 1 "); Thread parser2 = new Thread(run, " 2 "); parser1.start(); parser2.start(); System.out.println("主线程等待2个子线程执行完毕"); latch.await(); System.out.println("2个子线程已经执行完毕"); System.out.println("继续执行主线程"); } } /**
     * CountDownLatch配合线程池使用
     */ static class CountDownLatchRun2 { public static void main(String[] args) throws InterruptedException { final CountDownLatch latch = new CountDownLatch(2); ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, 2, 60, TimeUnit.SECONDS, new SynchronousQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); Runnable run = () -> { try { Thread.sleep(100); System.out.println("子线程" + Thread.currentThread().getName() + "正在执行"); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("子线程" + Thread.currentThread().getName() + "执行完毕"); latch.countDown(); }; threadPoolExecutor.execute(run); threadPoolExecutor.execute(run); threadPoolExecutor.shutdown(); System.out.println("主线程等待2个子线程执行完毕"); latch.await(); System.out.println("2个子线程已经执行完毕"); System.out.println("继续执行主线程"); } } } 

4 CountDownLatch的总结

CountDownLatch巧妙地利用了AQS的共享锁的实现原理,构造器要求传入N个点,实际上就是state的初始值。一个线程调用await方法会阻塞当前线程,直到state变成零。而当我们调用一次countDown方法时,state就会尝试自减1。
在tryAcquireShared中只有state为0才表示“获取到了锁”,否则就会阻塞调用线程;而在tryReleaseShared中只有state自减之后值为0才表示“成功释放了锁”,即只有当某个countDown方法将state变成0的时候,此时表示“成功释放了锁”,随后就会唤醒因为调用await方法而阻塞的线程,被唤醒的线程会判断到此时state=0,因此可以返回!
可以发现CountDownLatch对于state的描述和普通锁不一样:countDown释放锁的时候要求state大于0,并且state会反向自减;await获取锁的时候则要求state等于0。这个state就是一个倒计数!这也从侧面反映出了AQS功能的强大,我们可以借用AQS非常简单的实现自己的同步组件,而不仅仅拘泥于“锁”!
countDown方法可以用在任何地方,这里的初始值N,可以是N个线程执行完毕之后调用N次countDown方法,也可以是1个线程里的N次调用countDown方法。
计数器必须大于等于0,只是等于0时候,计数器就是零,调用await方法时不会阻塞当前线程。另外,CountDownLatch不能重新初始化或者修改CountDownLatch对象的count值,因此一个倒计数器只能使用一次!
CountDownLatch一般用来确保某些活动直到其他活动都完成才继续执行,比如:

  1. 确保某个计算在其需要的所有资源都被初始化之后才继续执行;
  2. 确保某个服务在其依赖的所有其他服务都已经启动之后才启动;
  3. 等待直到某个操作所有参与者都准备就绪再继续执行。

CountDownLatch的源码看起来非常简单,那是因为复杂的线程等待、唤醒机制都被AQS同步器框架实现了,如果想要真正了解CountDownLatch的原理,那么AQS的实现必须要了解。同时AQS也是JUC中基本上所有的锁和同步组件的实现基石,比如我们现在讲的CountDownLatch。本文没有讲解AQS的原理,因为那实在太多了,如果真的想要学习AQS,那么看看下面的文章吧!

本文地址:https://blog.csdn.net/weixin_43767015/article/details/108035192