欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Java多线程系列--“JUC锁”06之 Condition条件

程序员文章站 2023-01-29 16:10:36
synchronized配合Object的wait()、notify()系列方法可以实现等待/通知模式。 Lock提供了条件Condition,对线程的等待、唤醒操作更加详细和灵活。 Condition的作用是对锁进行更精确的控制。Condition中的await()方法相当于Object的wait ......

synchronized配合object的wait()、notify()系列方法可以实现等待/通知模式。

lock提供了条件condition,对线程的等待、唤醒操作更加详细和灵活。

condition的作用是对锁进行更精确的控制。condition中的await()方法相当于object的wait()方法,condition中的signal()方法相当于object的notify()方法,condition中的signalall()相当于object的notifyall()方法。

一、condition实现生产者消费者问题

class boundedbuffer {
    final lock lock = new reentrantlock();
    // condition 依赖于 lock 来产生
    final condition notfull = lock.newcondition();
    final condition notempty = lock.newcondition();

    final object[] items = new object[100];
    int putptr, takeptr, count;

    // 生产
    public void put(object x) throws interruptedexception {
        lock.lock();
        try {
            while (count == items.length)
                notfull.await();  // 队列已满,等待,直到 not full 才能继续生产
            items[putptr] = x;
            if (++putptr == items.length) putptr = 0;
            ++count;
            notempty.signal(); // 生产成功,队列已经 not empty 了,发个通知出去
        } finally {
            lock.unlock();
        }
    }

    // 消费
    public object take() throws interruptedexception {
        lock.lock();
        try {
            while (count == 0)
                notempty.await(); // 队列为空,等待,直到队列 not empty,才能继续消费
            object x = items[takeptr];
            if (++takeptr == items.length) takeptr = 0;
            --count;
            notfull.signal(); // 被我消费掉一个,队列 not full 了,发个通知出去
            return x;
        } finally {
            lock.unlock();
        }
    }
}

二、lock与condition关系

    // 一个lock可以new多个条件
    public static void main(string[] args) {
        reentrantlock lock = new reentrantlock();
        lock.lock();
        condition newcondition1 = lock.newcondition();
        newcondition1.await();
        newcondition1.signal();
        condition newcondition2 = lock.newcondition();
        newcondition2.await();
        newcondition2.signal();
        lock.unlock();
    }

    // reentrantlock.newcondition()
    public condition newcondition() {
        return sync.newcondition();
    }
    
    // reentrantlock.sync.newcondition()
    final conditionobject newcondition() {
        return new conditionobject();
    }

三、条件队列

每个condition对象都包含着一个fifo队列,该队列是condition对象通知/等待功能的关键。在队列中每一个节点(使用的aqs的节点)都包含着一个线程引用,该线程就是在该condition对象上等待的线程。

public class conditionobject implements condition, java.io.serializable {
    private static final long serialversionuid = 1173984872572414699l;
    private transient node firstwaiter; //头节点
    private transient node lastwaiter; //尾节点
    public conditionobject() {
    }
    
    /**
     * 通过addconditionwaiter()方法理解等待队列数据结构
     * 将当前线程加入条件等待队列
     * 1.node就是aqs的node
     * 2.单向链表,通过nextwaiter连接
     * 3.waitstatus==node.condition才能在等待队列中
     */
    private node addconditionwaiter() {
        node t = lastwaiter;
        if (t != null && t.waitstatus != node.condition) {
            unlinkcancelledwaiters();
            t = lastwaiter;
        }
        node node = new node(thread.currentthread(), node.condition);
        if (t == null)
            firstwaiter = node;
        else
            t.nextwaiter = node;
        lastwaiter = node;
        return node;
    }
    
    /**
     * 清除队列中不是等待状态的线程
     */
    private void unlinkcancelledwaiters() {
        node t = firstwaiter;
        node trail = null; // 用于保存前一个节点
        while (t != null) {
            node next = t.nextwaiter;
            if (t.waitstatus != node.condition) {
                t.nextwaiter = null;
                if (trail == null)
                    firstwaiter = next;
                else
                    trail.nextwaiter = next;
                if (next == null)
                    lastwaiter = trail;
            }
            else
                trail = t;
            t = next;
        }
    }
}

四、等待await()

    public final void await() throws interruptedexception {
        if (thread.interrupted())
            throw new interruptedexception();
        node node = addconditionwaiter(); // 当前线程new node()加入条件队列
        int savedstate = fullyrelease(node); // 释放当先线程的锁
        int interruptmode = 0;
        
        /**
         * 自旋:
         * 1.当前节点不在同步队列(刚new的节点肯定不在),挂起当前线程,等待被唤醒
         * 2.当其他线程调用同一个conditionobject的signal方法时,会将队列里的节点放入同步队列,并unpark线程(排队唤醒)
         * 3.如果该节点被唤醒,再自旋检查是否在同步队列。发现已经在队列中,就可以跳出循环,获取lock
         */
        while (!isonsyncqueue(node)) {
            locksupport.park(this);
            if ((interruptmode = checkinterruptwhilewaiting(node)) != 0) // 处理打断
                break;
        }
        if (acquirequeued(node, savedstate) && interruptmode != throw_ie) // 获取锁
            interruptmode = reinterrupt;
        if (node.nextwaiter != null) // clean up if cancelled
            unlinkcancelledwaiters();
        if (interruptmode != 0)
            reportinterruptafterwait(interruptmode);
    }

五、唤醒signal()

public final void signal() {
        if (!isheldexclusively()) // 检查是否拿到锁
            throw new illegalmonitorstateexception();
        node first = firstwaiter;
        if (first != null)
            dosignal(first);
    }
    
    private void dosignal(node first) {
        do {
            if ( (firstwaiter = first.nextwaiter) == null)
                lastwaiter = null;
            first.nextwaiter = null;
        } while (!transferforsignal(first) && // 唤醒队列第一个节点
                 (first = firstwaiter) != null);
    }
    
    final boolean transferforsignal(node node) {
        if (!compareandsetwaitstatus(node, node.condition, 0))
            return false;
        node p = enq(node); // 加入同步队列
        int ws = p.waitstatus;
        if (ws > 0 || !compareandsetwaitstatus(p, ws, node.signal))
            locksupport.unpark(node.thread); // 唤醒线程
        return true;
    }

 

await() :造成当前线程在接到信号或被中断之前一直处于等待状态。

await(long time, timeunit unit) :造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。

awaitnanos(long nanostimeout) :造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。返回值表示剩余时间,如果在nanostimesout之前唤醒,那么返回值 = nanostimeout – 消耗时间,如果返回值 <= 0 ,则可以认定它已经超时了。

awaituninterruptibly() :造成当前线程在接到信号之前一直处于等待状态。【注意:该方法对中断不敏感】。

awaituntil(date deadline) :造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。如果没有到指定时间就被通知,则返回true,否则表示到了指定时间,返回返回false。

signal():唤醒一个等待线程。该线程从等待方法返回前必须获得与condition相关的锁。

signal()all:唤醒所有等待线程。能够从等待方法返回的线程必须获得与condition相关的锁。

 

参考资料 / 相关推荐

一行一行源码分析清楚 abstractqueuedsynchronizer (二)

【死磕java并发】—–j.u.c之condition

java多线程系列--“juc锁”06之 condition条件