Java多线程Day19-JUC锁之公平锁
程序员文章站
2022-04-03 10:20:24
公平锁基本概念AQS锁CLH队列CAS函数ReentrantLock基本概念AQS锁AQS: AbstractQueuedSynchronizerAQS是Java中管理锁的抽象类,包含锁实现的公共方法AQS是独占锁和共享锁的公共父类AQS锁的类别:独占锁:锁在一个时间点只会被一个线程所占有根据锁的获取机制,独占锁又分为公平锁和非公平锁公平锁: 通过CLH等待线程队列按照先来先得的规则,公平地获取锁非公平锁: 当线程需要获取锁时,无视CLH等待线程队列直接获取锁...
公平锁
基本概念
AQS锁
-
AQS: AbstractQueuedSynchronizer
- AQS是Java中管理锁的抽象类,包含锁实现的公共方法
- AQS是独占锁和共享锁的公共父类
-
AQS锁的类别:
-
独占锁:
- 锁在一个时间点只会被一个线程所占有
- 根据锁的获取机制,独占锁又分为公平锁和非公平锁
- 公平锁: 通过CLH等待线程队列按照先来先得的规则,公平地获取锁
- 非公平锁: 当线程需要获取锁时,无视CLH等待线程队列直接获取锁
- 独占锁实例 : ReentrantLock, ReentrantReadWriteLock, WriteLock
-
共享锁:
- 锁在一个时间点可以被多个线程同时拥有,可以共享的锁
- 共享锁实例 : ReadLock, CyclicBarrier, CountDownLatch, Semaphore
-
独占锁:
CLH队列
-
CLH: Craig,Ladin and Hagersten lock queue
-
CLH队列是AQS等待锁的线程队列
- 通过锁来保护竞争资源不会被多个线程同时操作而引发错误
- 在独占锁中,竞争资源在同一个时间点只能被一个线程锁访问,其余线程则需要等待
- CLH队列就是用来管理这些等待锁的线程队列
- CLH是一个非阻塞的FIFO队列,往CLH等待队列中插入或者移除一个节点的时候,在并发的条件下不会阻塞,而是通过自旋锁和CAS保证节点插入和移除的原子性
-
CLH队列是AQS等待锁的线程队列
CAS函数
-
CAS: Compare And Swap
- 比较交换函数,是一个原子操作的函数.也就是说,通过CAS操作的数据都是以原子的方式进行的
- 比如compareAndSetHead(), compareAndSetTail(), compareAndSetNext(). 这些函数的共同特点就是所执行的动作都是以原子的方式执行的
ReentrantLock
- ReentrantLock的UML图:
- ReentrantLock实现了Lock接口
-
ReentrantLock和Sync是组合关系:
- ReentrantLock中包含了Sync对象
- Sync是AQS的子类
- Sync有两个子类是公平锁FairSync和非公平锁NonFairSync
- ReentrantLock是一个独占锁,至于是公平锁还是非公平锁就取决于sync对象是FairSync的实例还是NonFairSync的实例
获取公平锁
lock
- lock() 在ReentrantLock中的FairSync类中实现
final void lock() {
acquire(1);
}
- 当前线程是通过acquire(1) 获取锁的
-
1 的含义: 锁的状态的参数
- 对于独占锁来说,当锁处于可获取状态时,状态值为0
- 对于独占锁来说,当锁初次被线程获取到时,状态值为1
- 因为ReentrantLock是可重入锁,所以独占锁可以被单个线程多次获取,每获取1次锁的状态值就会加1
acquire
- acquire()是在AQS中实现的:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- 当前线程首先通过tryAcquire() 尝试获取锁,如果获取锁成功,就直接返回;如果尝试获取锁失败,则进入到等待队列中等待, 因为前面可能会有线程正在等待该锁
- 当前线程在执行失败的情况下,首先通过addWaiter(Node.EXCLUSIVE) 将当前线程加入到非阻塞的FIFO队列CLH队列的末尾
- 在执行完addWaiter(Node.EXCLUSIVE) 方法之后,会通过aquiredQueued() 方法来获取锁,因为ReentrantLock是公平锁,所以会根据公平原则来获取锁
- 当前线程在执行acquireQueued() 时,会进入到CLH队列中休眠等待,直到获取锁了才返回
- 如果当前线程在休眠等待的过程中被中断过,那么acquireQueued() 方法会返回true
- 此时当前线程会调用selfInterrupt() 来自身产生一个中断
tryAcquire
- 公平锁的tryAcquire() 在ReentrantLock中的FairSync类中实现
protected final boolean tryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
// 获取独占锁的状态
int c = getState();
if (c == 0) {
/*
* 如果锁未被任何线程持有,则判断当前线程是不是CLH队列中的第一个线程
* - 如果是的话,则获取该锁,设置锁的状态,并设置锁的拥有者为当前线程
*/
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果独占锁已经被当前线程持有,则更新锁的状态
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
-
tryAcquire() 的作用是尝试获取锁
- 如果获取成功则返回true
- 如果获取失败则返回false
- 后续通过其余的方式再来获取锁
hasQueuedPredecessors
- hasQueuedPredecessors() 在AQS中实现:
public final boolean hasQueuedPredecessors() {
/*
* 正确性取决于头部是否在头尾之间被初始化
* 如果当前线程队列中的首个线程那么结果将会是精确的
*/
// 按照反向初始化顺序读取字段
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
- hasQueuedPredecessors() 通过判断 “当前线程” 是否在CLH队列的头部来返回AQS中是否存在有比 “当前线程” 等待更久的线程
Node
- Node就是CLH队列的节点
- Node在AQS中实现,数据结构如下:
/**
* 等待队列的头部,延迟初始化
* 除初始化外,只能通过setHead方法进行修改
* 如果队列的头部head已经存在,节点的状态waitStatus不会为CANCELLED
*/
private transient volatile Node head;
/**
* 等待队列的尾部,延迟初始化
* 只能通过enq方法增加一个新的等待节点
*/
private transient volatile Node tail;
static final class Node {
/** 指定一个节点在共享模式下等待的标识 */
static final Node SHARED = new Node();
/** 指定一个节点在独占模式下等待的标识 */
static final Node EXCLUSIVE = null;
/** 表示线程已被取消的waitStatus的值 */
static final int CANCELLED = 1;
/**
* 表示当前线程的后继线程需要被唤醒unpark的waitStatus的值
* - 当前线程的后继线程处于阻塞状态
* - 而当前线程被release或者cancel时,需要唤醒unpark当前线程的后继线程
*/
static final int SIGNAL = -1;
/** 表示处于Condition休眠状态的线程等待Condition唤醒的waitStatus的值 */
static final int CONDITION = -2;
/**
* 表示其余线程获取到共享锁的线程
*/
static final int PROPAGATE = -3;
/**
* 状态属性waitStatus,包括以下几个值:
* SIGNAL: 当节点的后继者处于或者将要处于阻塞状态时,在当前线程被release或者cancel时,需要唤醒当前线程的后继线程.为了避免竞争,获取方法首先要获取SINGAL状态,然后才会尝试原子获取
* CANCELLED: 超时或者中断时线程取消.节点不会离开这个状态,也就是说,取消状态的线程不会再次阻塞
* CONDITION: 当前节点在一个条件队列中,在状态变化之前不会作为同步队列的节点,此时的状态将会设置为0
* PROPAGATE: 共享锁应该传播给其余的节点,在doReleaseShared中设置的
* 0: 当前线程不属于任何一种状态
* 非负的值表示节点不需要发信号signal,因此大多数代码仅需要检查正负符号,而不需要检查特定的值
*/
volatile int waitStatus;
/**
* 上一个节点
*/
volatile Node prev;
/**
* 下一个节点
*/
volatile Node next;
/**
* 节点对应的线程
*/
volatile Thread thread;
/**
* nextWaiter是区别当前CLH队列是独占锁队列还是共享锁队列的标记
* - nextWaiter = SHARED,则CLH队列是共享锁队列
* - nextWaiter = EXCLUSIVE,则CLH队列是独占锁队列
*/
Node nextWaiter;
/**
* 如果是共享锁则返回true
* 如果是独占锁则返回false
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* 返回上一个节点.如果为空则回抛出空指针异常
*
* @return Node 上一个节点
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
/**
* 构造函数. 用于发布初始化的头节点或者共享锁标记
*/
Node() {
}
/**
* 构造函数. 用于addWaiter
*
* @param thread 节点所对应的线程
* @param mode 用于表示thread锁是独占锁还是共享锁
*/
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
/**
* 构造函数. 用于Condition
*
* @param thread 节点所对应的线程
* @param waitStatus 线程的等待状态
*/
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
-
Node: CLH等待队列的节点,表示等待锁的线程队列
- 每个Node都对应一个线程thread
- 每个Node会通过prev和next分别指向上一个节点和下一个节点.分别代表上一个等待线程和下一个等待线程
- Node通过waitStatus来保存线程的等待状态
-
Node通过nextWaiter来区分线程是独占锁线程还是共享锁线程
- 独占锁线程的nextWaiter的值为EXCLUSIVE
- 共享锁线程的nextWaiter的值为SHARED
compareAndSetState
- compareAndSetState()在AQS中实现:
/**
* 如果当前的状态值等于期望值,则以原子方式将同步状态设置为给定的更新值.
* 此操作具有volatile读取的内存语义
*
* @param expect 期望值
* @param update 给定的更新值
* @return boolean 更新成功则返回true. 如果返回false,说明当前的状态值不等于给定的期望值
*/
protected final boolean compareAndSetState(int expect, int update) {
return STATE.compareAndSet(this, expect, update);
}
- compareAndSetState是以原子的方式对当前线程进行操作的:如果当前线程的状态等于expect, 则将当前线程的状态设置为update
setExclusiveOwnerThread
- setExclusiveOwnerThread() 在AbstractOwnableSynchronizer中实现:
/**
* 独占锁线程的当前拥有者
*/
private transient Thread exclusiveOwnerThread;
/**
* 设置当前拥有的线程的独占访问权
* null表示没有线程拥有独占访问权
* 否则当前线程不会增加任何同步或者volatile访问
* @param thread 当前拥有的线程
*/
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
- setExclusiveOwnerThread的作用: 设置线程为当前拥有独占锁的线程
state
- getState() 和setState() 都在AQS中实现:
/**
* 同步状态
*/
private volatile int state;
/**
* 获取同步状态的当前值
* 此操作具有volatile读取的内存语义
*
* @return int 当前同步状态的值
*/
protected final int getState() {
return state;
}
/**
* 设置同步状态的值
* 此操作具有volatile写入的内存语义
*
* @param newState 设置的新的同步状态
*/
protected final void setState(int newState) {
state = newState;
}
-
state表示锁的状态:
- 对于独占锁而言 ,state=0表示锁是可获取状态,也就是锁未被任何线程所持有
- Java中的独占锁是可重入的,所以state的值可以大于1
addWaiter
- addWaiter的作用: 将当前线程添加到CLH队列中,也就是将当前线程添加到等待获取锁的等待线程队列中
-
addWaiter(Node.EXCLUSIVE):
- 创建当前线程的Node节点
- 在Node中记录当前显得对应的锁是独占锁类型
- 将该节点添加到CLH队列的末尾
- addWaiter() 在AQS中实现:
/**
* 创建当前线程和给定模式的节点并且添加到CLH队列中
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE-独占锁, Node.SHARED-共享锁
* @return Node 创建的新的节点
*/
private Node addWaiter(Node mode) {
// 新建一个Node节点,节点对应的线程是当前线程,当前线程的锁的模型是mode
Node node = new Node(mode);
for (;;) {
Node oldTail = tail;
// 如果CLH队列不为空,则将当前线程的节点添加到CLH队列的尾部
if (oldTail != null) {
node.setPrevRelaxed(oldTail);
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return node;
}
} else {
// 如果CLH队列为空,则初始化头部队列,当前节点作为CLH队列的头部head节点
initializeSyncQueue();
}
}
}
- 对于公平锁而言 ,addWaiter(Node.EXCLUSIVE)首先会创建一个Node节点,设置节点的类型为独占锁类型,然后再将Node节点添加到CLH队列的尾部
compareAndSetTail
- compareAndSetTail在AQS中实现:
/**
* 比较并设置尾部节点
*/
private final boolean compareAndSetTail(Node expect, Node update) {
return TAIL.compareAndSet(this, expect, update);
}
- compareAndSetTail属于CAS函数
-
compareAndSetTail通过原子的方式进行操作:
- 判断CLH队列的队尾是否等于expect, 如果是则将尾部设置为update节点
acquireQueued
- acquiredQueued() 在AQS中实现:
/**
* 通过排他非中断的模式获取队列中的线程
* 用于条件等待和获取
*
* @param node 节点
* @param arg 获取参数
* @return boolean 如果在等待时被中断则返回true
*/
final boolean acquireQueued(final Node node, int arg) {
/**
* interrupted表示在CLH队列的调度过程中,当前线程在休眠时,有没有被中断过
*/
boolean interrupted = false;
try {
for (;;) {
// 获取上一个节点. 因为Node是当前线程对应的节点,在这里获取上一个等待锁的线程
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node))
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
cancelAcquire(node);
if (interrupted)
selfInterrupt();
throw t;
}
}
- acquireQueued的作用是从队列中获取锁
shouldParkAfterFailedAcquire
- shouldParkAfterFailedAcquire() 在AQS中实现:
/**
* 检查并更新节点获取失败时的状态
* 如果当前线程应该阻塞则返回true,这是获取循环中的主要信号控制.要求参数pred等于node.prev
* 返回当前线程是否应该阻塞
*
* @param pred 上一个保存状态的节点
* @param node 当前节点
* @return boolean 如果当前线程应该阻塞则返回true
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 前继节点的状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 如果前继节点的状态为SIGNAL,则说明当前线程已经阻塞,需要使用unpark唤醒
return true;
if (ws > 0) {
// 如果前继节点是取消状态,则设置当前节点的前继节点为原前继节点的前继节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 如果前继节点为0(不是任何状态)或者共享锁PROPAGATE状态,则设置前继节点为阻塞状态SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
- 等待状态waiStatus:
- CANCELLED - 1 : 当前线程已被取消
- SIGNAL - -1 : 当前线程的后继线程需要unpark唤醒. 当前线程的后继线程处于阻塞状态,而当前线程release或者cancel, 此时需要唤醒unpark当前线程的后继线程
- CONDITION - -2 : 当前线程处于Condition休眠状态,需要等待Condition唤醒
- PROPAGATE - -3 : 共享锁状态.其余线程可以获取到共享锁
- 0 : 当前线程不属于以上任何一种状态
-
shouldParkAfterFailedAcquire() 通过以下规则判断当前线程是否应该阻塞:
- 如果前继节点状态为SIGNAL, 说明当前线程需要unpark唤醒.当前线程处于阻塞状态,此时返回true
- 如果前继节点状态为CANCELLED, 说明前继节点已经取消,则需要通过回溯找到一个非CANCELLED状态有效节点值(值小于等于0)设置为前继节点的状态值,并返回false
- 如果前继节点状态值为0或者共享锁PROPAGATE. 即前继节点为非SIGNAL或非CANCELLED, 则设置前继节点为SIGNAL, 并返回false
- shouldParkAfterFailedAcquire() 判断当前线程应该阻塞时,会调用parkAndCheckInterrupt() 阻塞当前线程
parkAndCheckInterrupt
- parkAndCheckInterrupt() 在AQS中实现:
/**
* 阻塞当前线程,然后返回线程的中断状态
*
* @return 线程是否被中断
*/
private final boolean parkAndCheckInterrupt() {
// 通过LockSupoort中的park()方法阻塞当前线程
LockSupport.park(this);
// 返回线程的中断唤醒后的状态
return Thread.interrupted();
}
- 线程阻塞之后唤醒的两种情况:
- unpark唤醒: 前继节点对应的线程使用完锁之后,通过unpark() 方式唤醒当前线程
- interrupt中断唤醒: 其余线程通过interrupt() 中断当前线程
-
LockSupport中的park() 和unpark() 的作用类似于Object中的wait() 和notify(). 都是对线程进行阻塞和唤醒
- park() 和unpark() 是轻量级的
- wait() 和notify() 则必须先通过Synchronized获取同步锁
tryAcquire
- 在acquireQueue() 的for循环中:
// 获取node的前继节点
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
return interrupted;
}
-
p == head && tryAcquire(arg) :
- 判断前继节点是否为CLH队列的表头,如果是的话,则通过tryAcquire() 尝试获取锁
- 使用p == head的条件的原因是为了保证当前线程获取锁的公平性:
- 首先,在shouldParkAfterFailedAcquire() 判断当前线程是否需要阻塞
- 接着,当前线程应该阻塞时,会调用parkAndCheckInterrupt() 来阻塞线程. 当线程解除阻塞时,会返回线程的中断状态.线程解除阻塞可能是由于其余线程调用了当前线程的unpark() 函数,也可能由于线程中断
- 如果当前线程是由于其余线程调用unpark() 函数唤醒时,根据释放锁的过程,唤醒当前线程的应该是前继节点所对应的线程. 也就是说,必然会是由于前继节点调用unpark() 函数唤醒当前线程
- p == head : 当前节点前继节点是CLH队列的头节点,当前继节点释放锁之后,当前节点开始获取锁.此时当前节点通过tryAcquire() 获取锁,获取成功,则通过setHead(node) 将当前节点设置为头结点并返回
- 总结: 如果前继节点调用unpark() 唤醒当前线程并且前继节点是CLH表头即满足p == head , 这时是符合公平性原则的. 否则出现当前线程因为线程中断interrupt唤醒时,就是非公平的了
- acquireQueued的作用: 当前线程根据公平性原则进行阻塞等待,直到获取锁.并且返回当前线程在等待过程中是否中断过
selfInterrupt
- selfInterrupt() 在AQS中实现:
/**
* 中端当前线程
*/
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
-
selfInterrupt() 就是当前线程自己产生一个中断.如果在acquireQueued() 中,如果当前线程被中断过,则执行selfInterrupt(), 否则不会执行
- 在acquireQueued() 中,线程在阻塞状态中被中断唤醒时获取CPU的执行权,如果该线程的前面还有其余等待锁的线程,那么根据公平性原则,该线程是依然无法获取锁的.该线程会再次阻塞,直到该线程被前面等待锁的线程锁唤醒,此时线程才会获取锁,真正执行起来
- 在线程成功获取锁并真正执行起来之前,线程的中断会被忽略并且中断标识会被清除. 因为在parkAndCheckInterrupt() 中,线程调用了Thread.interrupt() 中断线程返回状态.该函数不仅仅返回当前中断状态,还会清除中断标识.因为之前的中断标识被清除,所以需要调用selfInterrupt() 重新产生一个中断
-
获取公平锁acquire():
- 首先通过tryAcquire() 尝试获取锁,如果获取成功的话就直接返回. 如果获取失败,再通过acquireQueued() 获取锁
- 在获取失败,需要调用acquireQueued() 时,首先通过addWaiter() 将当前线程加入CLH队列的末尾,然后调用acquireQueued() 方法,在CLH队列中排序等待获取锁,此时线程处于休眠状态,直到获取锁才返回. 如果在休眠等待过程中发生中断,则接着会调用selfInterrupt() 来自己产生一个中断
释放公平锁
unlock
- unlock() 在ReentrantLock中实现:
public void unlock() {
sync.release(1);
}
- unlock() 是释放锁函数,是通过AQS中的release() 函数实现的
-
1的含义: 锁的状态的参数
- 对于独占锁来说,当锁处于可获取状态时,状态值为0
- 对于独占锁来说,当锁初次被线程获取到时,状态值为1
- 因为公平锁是可重入锁,对于同一个线程,每次释放锁,锁的状态就会减1
release
- release() 在AQS中实现:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
- release() 中首先会调用tryRelease() 尝试释放当前线程持有的锁. 如果成功,则唤醒后继等待的线程,并且返回true. 如果失败,则返回false
tryRelease
- tryRelease() 在ReentrantLock类中的Sync类中实现:
protected final boolean tryRelease(int releases) {
// 定义释放锁之后的状态
int c = getState() - releases;
// 如果当前线程不是锁的持有者,则抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
// 如果当前锁已经被当前线程释放,则设置锁的持有者为null.此时锁是可获取状态
free = true;
setExclusiveOwnerThread(null);
}
// 设置释放锁之后的当前线程的锁的状态
setState(c);
return free;
}
-
tryRelease() 的作用是尝试释放当前锁
- 如果当前线程不是锁的持有者,则抛出异常
- 在当前线程释放锁操作之后,当前线程对锁的拥有状态为0, 此时当前线程彻底释放该锁.设置锁的持有者为null, 这是锁即为可获取状态,同时更新当前线程的锁的状态为0
unparkSuccessor
- 在release() 中当前线程释放锁成功之后,会唤醒当前线程的后继线程
- 根据CLH队列的先入先出FIFO原则,当前线程必然是头结点head, 如果CLH队列非空,则唤醒下一个等待线程
private void unparkSuccessor(Node node) {
// 获取当前线程的状态
int ws = node.waitStatus;
if (ws < 0)
// 如果状态为负,则设置状态值为0
compareAndSetWaitStatus(node, ws, 0);
/*
* 获取当前节点的有效的后继节点
* 如果是无效的节点,则使用for循环从尾部开始寻找
* 这里的有效是指后继节点对应的状态的
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 唤醒后继节点对应的线程
if (s != null)
LockSupport.unpark(s.thread);
}
- unparkSuccessor() 的作用的是唤醒当前线程的后继线程
- 后继线程唤醒之后,就可以继续获取锁并恢复运行
总结
- 释放锁的过程主要是更新当前线程对应的锁的状态
- 如果当前线程的锁已经释放,则设置锁持有的线程为null
- 接着设置当前线程的状态为空
- 然后唤醒后继线程
本文地址:https://blog.csdn.net/JewaveOxford/article/details/110871331