欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

并发学习之CountDownLatch、CyclicBarrier以及Semaphore

程序员文章站 2024-03-12 12:38:32
...

一、java.util.concrrent.CountDownLatch

源码

并发学习之CountDownLatch、CyclicBarrier以及Semaphore
并发学习之CountDownLatch、CyclicBarrier以及Semaphore

基本使用

从JDK1.5开始的用于多线程控制的CountDownLatch,提供了一个初始值为正数的参数,也就是count,

在循环中每个线程调用它的countDown()使count–,当count=0时,调用它的await()等待为所有需要的线程任务完成。

目的

它不考虑中间线程的执行顺序,只考虑当count reaches zero(count=0)时,将最后一个线程调用。

请看官方示例代码:

<pre> {@code
 * class Driver { // ...
 *   void main() throws InterruptedException {
 *     CountDownLatch startSignal = new CountDownLatch(1);
 *     CountDownLatch doneSignal = new CountDownLatch(N);
 *
 *     for (int i = 0; i < N; ++i) // create and start threads
 *       new Thread(new Worker(startSignal, doneSignal)).start();
 *
 *     doSomethingElse();            // don't let run yet
 *     startSignal.countDown();      // let all threads proceed
 *     doSomethingElse();
 *     doneSignal.await();           // wait for all to finish
 *   }
 * }

2个Demo验证是否正确

Demo1 验证:不考虑同学走的顺序,班长最后锁门问题

//模拟班长最后锁门问题
public class CountDownLatchDemo {
    public static void main(String[] args) {
        //模拟6位同学分别进行离开教室的操作
        for (int i = 1; i < 6; i++) {
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+" 同学走了");
            },String.valueOf(i)).start();
        }
        //最后输出班长锁门的情况
        System.out.println(Thread.currentThread().getName()+"=======班长锁门========");
    }
}

结果:

并发学习之CountDownLatch、CyclicBarrier以及Semaphore
多执行几次:

并发学习之CountDownLatch、CyclicBarrier以及Semaphore
并发学习之CountDownLatch、CyclicBarrier以及Semaphore
并发学习之CountDownLatch、CyclicBarrier以及Semaphore

问题:我们发现不能总得出班长最后锁门。

那么,我们可以使用CountDownLatch类来解决问题。

public class CountDownLatchDemo {
    //模拟班长最后锁门问题
    public static void main(String[] args) throws InterruptedException {
        //模拟5位同学分别进行离开教室的操作
        CountDownLatch countDownLatch = new CountDownLatch(5);//给定值为5,->0
        for (int i = 1; i < 6; i++) {
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+" 同学走了");
                countDownLatch.countDown();//让每执行一次走,countDownLatch的值-1
                },String.valueOf(i)).start();
        }
        //最后输出班长锁门的情况
        countDownLatch.await();
        System.out.println(Thread.currentThread().getName()+"=======班长锁门========");
    }
}

结果如下:

并发学习之CountDownLatch、CyclicBarrier以及Semaphore
多执行几次:

并发学习之CountDownLatch、CyclicBarrier以及Semaphore

并发学习之CountDownLatch、CyclicBarrier以及Semaphore
现在我们就可以控制班长最后锁门的顺序了。


Demo2 验证:秦灭六国,一统华夏

public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(6);//给定值为5,->0
        for (int i = 1; i < 6; i++) {
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+" 亡了");
                countDownLatch.countDown();//让每执行一次走,countDownLatch的值-1
            },String.valueOf(i)).start();
        }
        //最后输出班长锁门的情况
        countDownLatch.await();
        System.out.println(Thread.currentThread().getName()+"=======秦扫六合,华夏一统========");
}

结果:

并发学习之CountDownLatch、CyclicBarrier以及Semaphore
多执行几次:

并发学习之CountDownLatch、CyclicBarrier以及Semaphore
并发学习之CountDownLatch、CyclicBarrier以及Semaphore

问题:我们使用CountDownLatch规定当六国灭亡后,秦才真正的统一,那如何将数字变换为六国名称?

那么,我们可以使用枚举Enum来解决问题

public enum CountrySix {
    ONE(1,"齐国"),TWO(2,"楚国"),THREE(3,"魏国"),FOUR(4,"燕国"),FIVE(5,"韩国"),SIX(6,"赵国");
    //首先需要对应i值 1,2,3,4,5,6
    private Integer reCode;
    //其次需要i对应的name,齐楚燕韩赵魏
    private String reMessage;

    CountrySix(Integer reCode, String reMessage) {
        this.reCode = reCode;
        this.reMessage = reMessage;
    }

    public Integer getReCode() {
        return reCode;
    }
    public String getReMessage() {
        return reMessage;
    }

    public static CountrySix foreach_CountryEnum(int i){
        CountrySix[] myArray = CountrySix.values();
        for (CountrySix element : myArray) {
            if(i==element.getReCode()){
                return element;
            }
        }
        return null;
    }
}
public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(6);//给定值为5,->0
        for (int i = 1; i <= 6; i++) {
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+" 亡了");
                countDownLatch.countDown();//让每执行一次走,countDownLatch的值-1
            },CountrySix.foreach_CountryEnum(i).getReMessage()).start();
        }
        //最后输出秦统一的情况
        countDownLatch.await();
        System.out.println(Thread.currentThread().getName()+"=======秦扫六合,华夏一统========");
    }

结果:

并发学习之CountDownLatch、CyclicBarrier以及Semaphore
并发学习之CountDownLatch、CyclicBarrier以及Semaphore
并发学习之CountDownLatch、CyclicBarrier以及Semaphore
则完成要求!

二、java.util.concrrent.CyclicBarrier

源码

构造方法推荐使用

public CyclicBarrier(int parties, Runnable barrierAction) {//初始值,线程
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

源码示例

* <pre> {@code
 * class Solver {
 *   final int N;
 *   final float[][] data;
 *   final CyclicBarrier barrier;
 *
 *   class Worker implements Runnable {
 *     int myRow;
 *     Worker(int row) { myRow = row; }
 *     public void run() {
 *       while (!done()) {
 *         processRow(myRow);
 *
 *         try {
 *           barrier.await();
 *         } catch (InterruptedException ex) {
 *           return;
 *         } catch (BrokenBarrierException ex) {
 *           return;
 *         }
 *       }
 *     }
 *   }
 *
 *   public Solver(float[][] matrix) {
 *     data = matrix;
 *     N = matrix.length;
 *     Runnable barrierAction = () -> mergeRows(...);
 *     barrier = new CyclicBarrier(N, barrierAction);
 *
 *     List<Thread> threads = new ArrayList<>(N);
 *     for (int i = 0; i < N; i++) {
 *       Thread thread = new Thread(new Worker(i));
 *       threads.add(thread);
 *       thread.start();
 *     }
 *
 *     // wait until done
 *     for (Thread thread : threads)
 *       thread.join();
 *   }
 * }}</pre>

基本使用

通过给定的初值与线程,阻塞其他每个线程,最后参数上的线程开始执行。通过++循环并调用CyclicBarrier.await()阻塞其他线程。

目的

同样不考虑中间线程的实际执行过程,阻塞其他每个线程,最后执行参数上的线程。

Demo 验证:CyclicBarrierDemo的使用——七龙珠

public class CyclicBarrierDemo {
    public static void main(String[] args) {
        //相比较于CountdownLatch这个是给定初值,直到其他线程完成后,构造函数参数上线程执行
        CyclicBarrier barrier = new CyclicBarrier(7, () -> {
            System.out.println("=======召唤神龙=======");
        });

        for (int i = 1; i <= 7; i++) {
            int finalI = i;
            new Thread(()->{
                System.out.println("收集到第"+ finalI +"颗龙珠");
                try {
                    //阻塞线程
                    barrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            },String.valueOf(i)).start();
        }
    }
}

执行结果:
并发学习之CountDownLatch、CyclicBarrier以及Semaphore
并发学习之CountDownLatch、CyclicBarrier以及Semaphore
并发学习之CountDownLatch、CyclicBarrier以及Semaphore
集齐龙珠,神龙出世~

许个愿?

三、java.util.concurrent.Semaphore

源码

推荐构造方法:初始值,是否公平锁,true->公平,false->非公平

public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

Semaphore部分关键方法:

=======================================================semaphore.acquire()
//获取锁
public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } catch (Throwable t) {
            cancelAcquire(node);
            throw t;
        }
    }
=======================================================semaphore.release()
//释放锁
public void release() {
        sync.releaseShared(1);
    }
public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

基本使用

通过调用Semaphore.acquire()获取锁,之后通过Semaphore.release释放锁,达到共享资源分配的一种效果

相比之前的CountDownLatch、CyclicBarrier 可复用

目的

用于多个线程的互斥作用

用于并发线程数的控制

Demo 验证Semaphore

public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(3);//座位有3个
        for (int i = 1; i <= 7; i++) {//模拟7个人吃饭,那么必有4个人吃不到需要等待
            new Thread(()->{
                try {
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName()+"\t抢到位置了");
                    TimeUnit.SECONDS.sleep(2);
                    System.out.println(Thread.currentThread().getName()+"\t占有两秒,离开车位");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                finally {
                    semaphore.release();
                }
            },String.valueOf(i)).start();
        }
    }

结果:

并发学习之CountDownLatch、CyclicBarrier以及Semaphore
多执行几次:

并发学习之CountDownLatch、CyclicBarrier以及Semaphore

并发学习之CountDownLatch、CyclicBarrier以及Semaphore

并发学习之CountDownLatch、CyclicBarrier以及Semaphore