欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

并发编程之深入理解Condition

程序员文章站 2024-03-12 12:38:20
...

在并发编程中的开发中,我们难免会使用到等待通知模式,比如我们生产者消费者模式中,当生产者生产的东西填满了容器,则需要停止生产,当消费者把容器内的东西消费完了,也需要停止消费,同样的当容器内有新的东西生产出来,会通知消费者继续生产。可能我们平时使用synchronized比较多,一般我们使用使用object.wait()和object.notify()、notifyAll()。然而今天我们一起学习的是当我们使用jdk提供的并发编程的Lock实现等待通知模式,此时我们就需要使用Condition来实现-------条件等待通知。

一、condition的简单使用

如果我们之前没用过condition,那么先来了解一下如何使用,其实和synchronized结合object.wait()和object.notify()思想是一致的。这里举了一个实例,面包生产者生产面包扔到容器中和面包消费者从容器中取出面包进行消费,当容器满了则生产者阻塞,停止生产,直到容器不满是被唤醒;当容器空了,停止消费,直到容器不空被唤醒。

简单实例代码如下:

BreadContainer.java

package com.taolong.concurrent.condition;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @Author taolong.hong
 * @Date 2020/5/10 16:12
 * @Version 1.0
 * @Description 装面包的容器
 */
public class BreadContainer {

    //锁
    private final ReentrantLock lock;

    private final Condition condition;

    private final List<Bread> breadList;

    private final int containerSize;

    private static final int MAX_SIZE = 20;


    public BreadContainer(int containerSize) {
        this.lock = new ReentrantLock();
        condition = lock.newCondition();
        if (containerSize <=0 || containerSize > MAX_SIZE){
            this.containerSize = MAX_SIZE;
        }else{
            this.containerSize = containerSize;
        }
        breadList = new ArrayList<>(containerSize);
    }

    /***
     * 往容器里添加生产的面包
     */
    public void produceBread(Bread bread){
       lock.lock();
       try {
           //已经装满了,需要等待,并且唤醒阻塞的线程
           while (breadList.size() == containerSize){
               System.out.println("容器已经满了,生产者停止生产...");
               condition.await();
           }
           System.out.println("正在往容器里添加 id="+ bread.getId()+"的面包");
           breadList.add(bread);
           condition.signal();
       } catch (InterruptedException e) {
           e.printStackTrace();
       } finally {
           lock.unlock();
       }
    }

    /***
     * 从容器里拿出面包
     */
    public void consumeBread(){
        lock.lock();
        try {
            //当容器里没有面包则停止消费...
            while(breadList.size() == 0){
                System.out.println("当前容器没有面包,停止消费...");
                condition.await();
            }
            Bread bread = breadList.get(0);
            breadList.remove(0);
            System.out.println("正在消费id="+bread.getId()+"的面包");
            condition.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }


}

Bread.java

package com.taolong.concurrent.condition;

/**
 * @Author taolong.hong
 * @Date 2020/5/8 16:15
 * @Version 1.0
 * @Description 面包
 */
public class Bread {

    private String name;

    private final int id;

    public Bread(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getId() {
        return id;
    }


}

ConditionTest.java

package com.taolong.concurrent.condition;

/**
 * @Author taolong.hong
 * @Date 2020/5/8 16:39
 * @Version 1.0
 * @Description
 */
public class ConditionTest {

    public static void main(String[] args) {
        BreadContainer breadContainer = new BreadContainer(20);
        Thread producer = new Thread(new BreadProducer(breadContainer));
        Thread consumer = new Thread(new BreadConsumer(breadContainer));
        producer.start();
        consumer.start();
    }


    /**
     * 面包生产者
     */
    static class BreadProducer implements Runnable{
        final BreadContainer breadContainer;

        BreadProducer(BreadContainer breadContainer){
            this.breadContainer = breadContainer;
        }

        @Override
        public void run() {
            //生产100000个面包
            for (int i = 0; i < 100000; i++) {
                Bread bread = new Bread(i);
                breadContainer.produceBread(bread);
            }
        }
    }


    /***
     * 面包消费者
     */
    static class BreadConsumer implements Runnable{

        final BreadContainer breadContainer;

        BreadConsumer(BreadContainer breadContainer){
            this.breadContainer = breadContainer;
        }

        @Override
        public void run() {
            int i = 0;
            //消费100000个面包
            while(i<100000){
                breadContainer.consumeBread();
                i++;
            }
        }
    }
}

上面只是一个简单的应用实例,介绍如何使用

二、Condition原理

前面简单介绍了Condition的使用,现在开始介绍Condition的原理,本节主要结合图和文字描述condition的原理,下一节将从源码的角度分析。在分析condition的原理之前,最好先了解AQS的原理,我之前有写过一篇文章,如果不了解的可以参考(并发编程之深入理解ReentrantLock和AQS原理)。先看下面一副condition的原理图
并发编程之深入理解Condition

这里我用不同的颜色线条和文字标注了对应的关系,这里为什么有两个condition等待队列,是因为一个lock可以有多个不同的阻塞条件,比如大家可以参考ArrayBlockingQueue的源码,里面就定义了一个lock,两个condition(notEmpty,notFull)用于提升性能

/** Main lock guarding all access */
final ReentrantLock lock;

/** Condition for waiting takes */
private final Condition notEmpty;

/** Condition for waiting puts */
private final Condition notFull;

我用下面的文字来解释上面的逻辑图

1、当调用condition.await()方法时,这时创建一个Node节点封装thread信息,加入到condition等待队列的队尾(如果有多个condition,哪一个condition调用了就加入到对应condition的队尾),同时,AQS同步队列中获取锁的线程会释放锁(state同步状态置为0)

2、AQS同步队列中释放锁之后,会唤醒下面一个Node节点,让其去竞争锁资源(修改同步状态state)

3、当AQS同步队列中持有锁的线程调用condition.signal()时,则会将condition等待队列中的第一个节点Node加入到同步队列的队尾(当然也要用自旋cas咯),如果调用的是condition.signalAll()则会将该condition队列的所有的节点唤醒加入到AQS的队尾,再后面的逻辑就和之前分析AQS的逻辑一致。

这里一定要注意,一个lock可以有多个condition,也就意味着有多个condition等待队列,调用不同的condition则处理不同的conditin的等待队列

到这里相信结合图和文字大家对condition的原理已经有了一个比较深刻的了解了,下面开始分析condition的源码

三、深入源码分析

condition是在AbstractQueuedSynchronizer内部类ConditionObject,它实现了Condition接口。我们今天主要分析它的await()和signal()方法,其他的方法大家可自行研究

1、condition.await()

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    //构建Node节点,加入到condition等待队列
    Node node = addConditionWaiter();
    //释放锁
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    //判断node是否在AQS同步队列中,如果不在则阻塞
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    //下面的逻辑就是在AQS同步队列中了,和之前分析的AQS一样的
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

上面代码逻辑分析如下

(1)创建节点,并且加入到condition的等待队列的队尾

(2)当前线程需要释放同步状态(将state置为0),并且唤醒AQS后继节点,让其竞争锁

(3)判断node是否在AQS中,此时node一般是下面几种情况:
在condition队列中:处于等待队列中,状态为CONDITION,需要等待调用signal,加入到AQS同步队列
在AQS同步队列中:在AQS队列中,如果被前一个节点唤醒,则有竞争锁的资格,状态为SIGNAL,一般是调用 condition.signal()后从condition的等待队列转移到AQS队列中

取消状态:过时或者状态为取消状态CANCELED,则会剔除出队列

下面一个个重要的方法来分析,先看addConditionWaiter()

2、addConditionWaiter()

/**
 * Adds a new waiter to wait queue.
 * @return its new wait node
 */
private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    //创建node
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

这段代码比较简单,结合上面的图,一目了然,就是将新创建的node节点加入到condition队列的尾部。这里说一下为什么没有使用cas加入到condition等待队列的队尾呢,因为这个时候锁还没有释放,所以没有竞争的情况。

接下来看fullyRelease(node)释放锁

3、fullyRelease(node)

/**
 * Invokes release with current state value; returns saved state.
 * Cancels node and throws exception on failure.
 * @param node the condition node for this wait
 * @return previous sync state
 */
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        //获取同步状态
        int savedState = getState();
        //释放锁,这里注意是直接递减至0,而不是-1
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

上面的逻辑也非常简单,就是释放同步状态,但是是全部释放,而不是将state-1(因为当重入是state>1),此时如果释放成功了,则将将AQS的头节点移除,并且唤醒后继节点,让其竞争锁,release()的逻辑这里就不在继续将了,如果不了解的可以看之前的文章(并发编程之深入理解ReentrantLock和AQS原理)。此时上面condition.await()中的while循环会阻塞,直到调用condition.signal()或者condition.signalAll(),将该node从condition等待队列中移到AQS同步队列的尾部。接下来我们就看condition.signal()方法

4、condition.signal()

/**
 * Moves the longest-waiting thread, if one exists, from the
 * wait queue for this condition to the wait queue for the
 * owning lock.
 *
 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
 *         returns {@code false}
 */
public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

这里很简单,首先判断当前线程是否持有锁,如果没有持有则会抛异常,也就是说持有锁(同步状态)的线程才有资格调用signal()方法,下面看doSignal()

5、doSignal()

/**
 * Removes and transfers nodes until hit non-cancelled one or
 * null. Split out from signal in part to encourage compilers
 * to inline the case of no waiters.
 * @param first (non-null) the first node on condition queue
 */
private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

这里也很简单,使用do-while,先将firstwaiter后移动一位,找到一个可以有效的node节点移动到AQS队列中,我们看transferForSignal(first),看看它是如何将first节点移动到AQS队列中的

6、transferForSignal()

final boolean transferForSignal(Node node) {
    /*
     * If cannot change waitStatus, the node has been cancelled.
     */
    //首先要将node节点状态从CONDITION改变成0,否则则认为该节点被取消,则剔除
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    /*
     * Splice onto queue and try to set waitStatus of predecessor to
     * indicate that thread is (probably) waiting. If cancelled or
     * attempt to set waitStatus fails, wake up to resync (in which
     * case the waitStatus can be transiently and harmlessly wrong).
     */
    //使用自旋的cas将node节点加入到AQS队列中,返回的是node的前一个节点
    Node p = enq(node);
    int ws = p.waitStatus;
    //前面一个节点不管是CANCELLED状态还是设置signal失败,都唤醒node节点
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        //唤醒节点
        LockSupport.unpark(node.thread);
    return true;
}

这段逻辑也不难,首先使用cas将node从CONDITION状态编程0,如果失败则表示该节点已取消(CANCELLED)则不用转移到AQS同步队列;否则使用enq(node)自旋cas加入到AQS同步队列,这个enq(node)方法在之前线程竞争锁失败并加入AQS同步队列时机也调用了,之前的文章分析过,这里不再继续分析。接着看最后一个if条件,前一个节点的状态>0(CANNELLED)或者使用cas尝试将前一个节点的状态修改成SIGNAL,失败后则唤醒当前节点,为什么要这么做呢?

(1)如果前一个节点时SIGNAL状态,则不需要唤醒当前节点,因为AQS队列自动会通知前一个节点(如果通知前没有取消的话),能保证当前节点一定会被唤醒

(2)如果前一个节点时取消,或者无法将前一个节点修改成SIGNAL状态,则当前节点有可能唤醒不了,因为前一个节点不会通知它,此时就需要手动唤醒,当手动唤醒后,该节点会执行下面的方法,将它前面取消状态(CANCELED)移除,这样就保证自己有机会被唤醒竞争锁。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

这个方法在之前的文章分析过,这里不再继续分析。到这里基本上所有的逻辑都执行完了,此时该节点已经正常进入了同步AQS队列,和之前竞争锁失败进入同步AQS队列效果是一样的。后面竞争锁的逻辑也是一样的,这里就不再继续分析。

前面描述的在之前的文章分析过,该文章就是
并发编程之深入理解ReentrantLock和AQS原理),本文就分享到这里,希望可以给大家带来一点帮助,同时如果有错误,欢迎大家指正!