并发学习之CountDownLatch、CyclicBarrier以及Semaphore
程序员文章站
2024-03-12 12:38:32
...
文章目录
一、java.util.concrrent.CountDownLatch
源码
基本使用
从JDK1.5开始的用于多线程控制的CountDownLatch,提供了一个初始值为正数的参数,也就是count,
在循环中每个线程调用它的countDown()使count–,当count=0时,调用它的await()等待为所有需要的线程任务完成。
目的
它不考虑中间线程的执行顺序,只考虑当count reaches zero(count=0)时,将最后一个线程调用。
请看官方示例代码:
<pre> {@code
* class Driver { // ...
* void main() throws InterruptedException {
* CountDownLatch startSignal = new CountDownLatch(1);
* CountDownLatch doneSignal = new CountDownLatch(N);
*
* for (int i = 0; i < N; ++i) // create and start threads
* new Thread(new Worker(startSignal, doneSignal)).start();
*
* doSomethingElse(); // don't let run yet
* startSignal.countDown(); // let all threads proceed
* doSomethingElse();
* doneSignal.await(); // wait for all to finish
* }
* }
2个Demo验证是否正确
Demo1 验证:不考虑同学走的顺序,班长最后锁门问题
//模拟班长最后锁门问题
public class CountDownLatchDemo {
public static void main(String[] args) {
//模拟6位同学分别进行离开教室的操作
for (int i = 1; i < 6; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+" 同学走了");
},String.valueOf(i)).start();
}
//最后输出班长锁门的情况
System.out.println(Thread.currentThread().getName()+"=======班长锁门========");
}
}
结果:
多执行几次:
问题:我们发现不能总得出班长最后锁门。
那么,我们可以使用CountDownLatch类来解决问题。
public class CountDownLatchDemo {
//模拟班长最后锁门问题
public static void main(String[] args) throws InterruptedException {
//模拟5位同学分别进行离开教室的操作
CountDownLatch countDownLatch = new CountDownLatch(5);//给定值为5,->0
for (int i = 1; i < 6; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+" 同学走了");
countDownLatch.countDown();//让每执行一次走,countDownLatch的值-1
},String.valueOf(i)).start();
}
//最后输出班长锁门的情况
countDownLatch.await();
System.out.println(Thread.currentThread().getName()+"=======班长锁门========");
}
}
结果如下:
多执行几次:
现在我们就可以控制班长最后锁门的顺序了。
Demo2 验证:秦灭六国,一统华夏
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(6);//给定值为5,->0
for (int i = 1; i < 6; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+" 亡了");
countDownLatch.countDown();//让每执行一次走,countDownLatch的值-1
},String.valueOf(i)).start();
}
//最后输出班长锁门的情况
countDownLatch.await();
System.out.println(Thread.currentThread().getName()+"=======秦扫六合,华夏一统========");
}
结果:
多执行几次:
问题:我们使用CountDownLatch规定当六国灭亡后,秦才真正的统一,那如何将数字变换为六国名称?
那么,我们可以使用枚举Enum来解决问题
public enum CountrySix {
ONE(1,"齐国"),TWO(2,"楚国"),THREE(3,"魏国"),FOUR(4,"燕国"),FIVE(5,"韩国"),SIX(6,"赵国");
//首先需要对应i值 1,2,3,4,5,6
private Integer reCode;
//其次需要i对应的name,齐楚燕韩赵魏
private String reMessage;
CountrySix(Integer reCode, String reMessage) {
this.reCode = reCode;
this.reMessage = reMessage;
}
public Integer getReCode() {
return reCode;
}
public String getReMessage() {
return reMessage;
}
public static CountrySix foreach_CountryEnum(int i){
CountrySix[] myArray = CountrySix.values();
for (CountrySix element : myArray) {
if(i==element.getReCode()){
return element;
}
}
return null;
}
}
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(6);//给定值为5,->0
for (int i = 1; i <= 6; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+" 亡了");
countDownLatch.countDown();//让每执行一次走,countDownLatch的值-1
},CountrySix.foreach_CountryEnum(i).getReMessage()).start();
}
//最后输出秦统一的情况
countDownLatch.await();
System.out.println(Thread.currentThread().getName()+"=======秦扫六合,华夏一统========");
}
结果:
则完成要求!
二、java.util.concrrent.CyclicBarrier
源码
构造方法推荐使用
public CyclicBarrier(int parties, Runnable barrierAction) {//初始值,线程
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
源码示例
* <pre> {@code
* class Solver {
* final int N;
* final float[][] data;
* final CyclicBarrier barrier;
*
* class Worker implements Runnable {
* int myRow;
* Worker(int row) { myRow = row; }
* public void run() {
* while (!done()) {
* processRow(myRow);
*
* try {
* barrier.await();
* } catch (InterruptedException ex) {
* return;
* } catch (BrokenBarrierException ex) {
* return;
* }
* }
* }
* }
*
* public Solver(float[][] matrix) {
* data = matrix;
* N = matrix.length;
* Runnable barrierAction = () -> mergeRows(...);
* barrier = new CyclicBarrier(N, barrierAction);
*
* List<Thread> threads = new ArrayList<>(N);
* for (int i = 0; i < N; i++) {
* Thread thread = new Thread(new Worker(i));
* threads.add(thread);
* thread.start();
* }
*
* // wait until done
* for (Thread thread : threads)
* thread.join();
* }
* }}</pre>
基本使用
通过给定的初值与线程,阻塞其他每个线程,最后参数上的线程开始执行。通过++循环并调用CyclicBarrier.await()阻塞其他线程。
目的
同样不考虑中间线程的实际执行过程,阻塞其他每个线程,最后执行参数上的线程。
Demo 验证:CyclicBarrierDemo的使用——七龙珠
public class CyclicBarrierDemo {
public static void main(String[] args) {
//相比较于CountdownLatch这个是给定初值,直到其他线程完成后,构造函数参数上线程执行
CyclicBarrier barrier = new CyclicBarrier(7, () -> {
System.out.println("=======召唤神龙=======");
});
for (int i = 1; i <= 7; i++) {
int finalI = i;
new Thread(()->{
System.out.println("收集到第"+ finalI +"颗龙珠");
try {
//阻塞线程
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
},String.valueOf(i)).start();
}
}
}
执行结果:
集齐龙珠,神龙出世~
许个愿?
三、java.util.concurrent.Semaphore
源码
推荐构造方法:初始值,是否公平锁,true->公平,false->非公平
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
Semaphore部分关键方法:
=======================================================semaphore.acquire()
//获取锁
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
=======================================================semaphore.release()
//释放锁
public void release() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!h.compareAndSetWaitStatus(0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
基本使用
通过调用Semaphore.acquire()获取锁,之后通过Semaphore.release释放锁,达到共享资源分配的一种效果
相比之前的CountDownLatch、CyclicBarrier 可复用
目的
用于多个线程的互斥作用
用于并发线程数的控制
Demo 验证Semaphore
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);//座位有3个
for (int i = 1; i <= 7; i++) {//模拟7个人吃饭,那么必有4个人吃不到需要等待
new Thread(()->{
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"\t抢到位置了");
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+"\t占有两秒,离开车位");
} catch (InterruptedException e) {
e.printStackTrace();
}
finally {
semaphore.release();
}
},String.valueOf(i)).start();
}
}
结果:
多执行几次:
推荐阅读
-
Java并发编程工具类:CountDownLatch、CyclicBarrier、Semaphore
-
并发学习之CountDownLatch、CyclicBarrier以及Semaphore
-
Java并发学习笔记(九):Semaphore、CountdownLatch、CyclicBarrier
-
Java并发6:CountDownLatch &CyclicBarrier &Semaphore
-
并发编程——彻底掌握CountDownLatch,CyclicBarrier和Semaphore
-
Java并发编程系列---Java中的并发工具类CountDownLatch、CyclicBarrier、Semaphore、Exchanger
-
Java并发编程:CountDownLatch与CyclicBarrier和Semaphore的实例详解
-
Java并发编程:CountDownLatch与CyclicBarrier和Semaphore的实例详解
-
并发编程CountDownLatch,CyclicBarrier,Semaphore实现原理分析
-
并发编程CountDownLatch,CyclicBarrier,Semaphore实现原理分析