ReentrantLock是使用AQS实现的一种可重入的独占锁,并且提供了公平和非公平两种策略。
ReentractLock的内部类Sync继承了AbstractQueuedSynchronizer抽象类,并且提供了两个内部类FairSync和NonfairSync(即公平锁与非公平锁的落地实现),而具体采用哪一种加锁策略则则是由ReentractLock的构造函数进行指定。
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
从上面的构造函数可以看到ReentrantLock默认使用的是非公平锁,传入true启用公平锁,传入false则是非公平锁
public void lock() {
sync.lock();
}
从加锁的过程来看,lock方法调用了sync的lock方法,而Sync为抽象类,没有实现lock方法
// Sync类的lock方法
abstract void lock();
那么ReentrantLock的lock方法一定调用的是其子类NonfairSync或FairSync类的lock方法(也就是说加锁的过程与ReentractLock采用的策略有关系),首先分析FairSync(公平锁)的加锁过程。
// FairSync(公平锁)类的lock方法
final void lock() {
acquire(1);
}
// AQS的acquire方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
从上面的源码可以看出FairSync类的lock方法调用了AQS的acquire方法,而acquire方法调用了tryAcquire方法的落地实现依旧在FairSync类中,在分析tryAcquire方法前先分析一下AQS的内部属性
AQS内部定义了一个Node类,Node中有两个Node类的属性prev和next,因此Node类应该是一个双向链表,AQS中定义了Node类的head和tail,那么AQS的内部维护了一个Node类的双向链表,Node类中还有一个nextWaiter属性(区别于prev和next,nextWaiter只有单向的引用),分析应该是一个单链表,Node类中的thread属性表示Node封装的线程,waitStatus表示Node的状态。
在AQS中有一个int类型的state属性,根据锁的实现不同有不同的含义,在ReentractLock中state表示锁的可重入次数,在ReentractReadWriteLock中state的高16位表示读锁的获取次数,低16位则表示写锁的可重入次数。如果自己使用AQS实现一把锁,那么也可以自定义state的表示含义。AQS继承了AbstractOwnableSynchronizer的exclusiveOwnerThread属性表示持有锁的线程,用于可重入的判断。
分析完AQS的属性后,再看FairSync类的tryAcquire方法
protected final boolean tryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
// 获取锁的状态
int c = getState();
if (c == 0) { //c == 0表示锁未被其他线程获取,可以加锁
if (!hasQueuedPredecessors() && // 判断自己是否需要排队
compareAndSetState(0, acquires)) { // CAS修改锁的状态--加锁
setExclusiveOwnerThread(current); // 设置持有锁的线程为当前线程
return true; // 加锁成功
}
}
else if (current == getExclusiveOwnerThread()) { // 当前线程持有锁,可重入
int nextc = c + acquires; // 设置重入次数
if (nextc < 0) // 判断重入次数不能超过int范围(不能溢出)
throw new Error("Maximum lock count exceeded");
setState(nextc); // 设置重入次数,此处未使用cas操作,因为当前线程持有锁,所以此操作线程安全
return true; // 设置重入成功
}
return false; //既不能加锁又不能重入,无法获取到锁
}
如果执行的线程可以获取到锁,tryAcquire返回true并且设置state变量加1,则AQS的acquire方法结束,lock方法结束,加锁成功。在判断自己是否需要排队时调用了hasQueuedPredecessors方法。
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
hasQueuedPredecessors方法取得了AQS的tail和head,当没有其他线程加锁时,tail和head的值都为null,即h != t返回false,方法返回false,表示自己不需要排队可以直接加锁。分析到这里可以发现如果执行环境为单线程或线程间交替执行,那么ReentrantLock的加锁仅仅是改变AQS中state属性的值,并不涉及操作系统用户态与内核态的切换,因此效率较高。
如果此时线程t1加锁成功并持有锁未释放,线程t2请求加锁呢?继续走上面的acquire方法,tryAcquire(int acquires)
直接返回false,则执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
,首先分析addWaiter方法。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);// 将获取锁失败的线程封装成Node
// Try the fast path of enq; backup to full enq on failure
Node pred = tail; // 取得AQS的尾节点
if (pred != null) { // 尾节点不为空,AQS已经初始化过了
node.prev = pred; // 设置当前封装好节点的上一个节点为尾节点
if (compareAndSetTail(pred, node)) { // CAS设置尾节点为刚刚封装好的节点
pred.next = node; // CAS成功则设置前尾节点的下一个节点为新设置的尾节点
return node; // 返回新节点(此时等同于尾节点)
}
}
enq(node); // 尾节点为空(队列未初始化)或CAS设置尾节点失败了,则执行enq
return node; // 返回新节点(此时等同于尾节点)
}
private Node enq(final Node node) {
for (;;) { // 此处执行死循环
Node t = tail; // 取得尾节点
if (t == null) { // 尾节点为空(队列未初始化)
if (compareAndSetHead(new Node())) // cas设置尾节点为 nwe Node()
tail = head; // 设置头节点指向尾节点
} else { // 尾节点不为空,AQS已经初始化过了
node.prev = t; // CAS设置新节点加入队列并成为尾节点
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
当线程t2执行完addWaiter
方法后,初始化AQS中的Node队列,并设置将t2封装成Node入队。
由于此时t2已经入队,那么应该将线程t2设置为休眠,等待线程t1释放锁后唤醒继续执行,那么源码是这么实现的吗?我们来分析一下acquireQueued
方法。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false; // 线程是否被中断,记录标志
for (;;) { // 此处执行死循环
final Node p = node.predecessor(); // 获取上一个节点
if (p == head && tryAcquire(arg)) { // 如果上一个节点为头节点,并且自己获取锁成功
setHead(node); // 将自己设置为头节点
p.next = null; // 设置原头节点的next节点为空
failed = false;
return interrupted; // 返回中断标志
}
if (shouldParkAfterFailedAcquire(p, node) && // 获取锁失败,开始休眠
parkAndCheckInterrupt()) // 检查中断标志
interrupted = true; // 恢复用户的中断
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// Node内部类方法,获取上一个节点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
shouldParkAfterFailedAcquire方法决定着自旋次数为2次
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; // 取得上一节点的waitStatus
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
当线程t2执行到acquireQueued方法出,node的上一级节点为头节点,如果此时当前线程尝试获取锁成功,则将自己设置为头节点并退出,如果获取锁失败,则执行shouldParkAfterFailedAcquire方法将其上一节点的waitStatus使用CAS设置为-1,如果CAS失败了,由于在for死循环中,则继续尝试获取锁,失败后继续使用CAS修改上一节点waitStatus的状态,直到获取锁成功或CAS修改状态成功,如果CAS修改状态成功了,则返回for循环尝试最后一次获取锁,如果还是获取锁失败,则继续执行shouldParkAfterFailedAcquire方法,此时方法返回true,则执行parkAndCheckInterrupt方法将线程阻塞挂起。因此即上面的问题,线程被封装为Node加入队列后并不是立即阻塞挂起,而是判断自己是不是头节点的后一个节点,如果自己是头节点的后一个节点,则自旋尝试获取锁至少两次,失败后被阻塞挂起,此时的Node队列如下。
而调用parkAndCheckInterrupt后,线程被阻塞在了LockSupport.park(this);
处。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);// 线程被阻塞在此处
return Thread.interrupted(); // 返回并清空中断标志
}
如果线程在阻塞挂起期间被用户设置了中断标志,则线程被唤醒后继续在return Thread.interrupted();
处执行,检测是否设置了中断标志,如果设置了中断标志,则parkAndCheckInterrupt()方法返回true, 此时中断标志被清空。执行完parkAndCheckInterrupt方法后应回到for死循环出继续执行interrupted = true;
,表明用户设置了中断,然后继续for循环获取锁,此时由于t1将锁释放了,那么t2应该获取锁成功,随后将自己设置为头节点并返回true,acquireQueued方法结束,进入AQS的acquire方法中,if判断!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
为true,执行selfInterrupt();
方法恢复用户设置的中断。
如果t1未释放锁,t2还阻塞在队列中,此时线程t3调用了lock
方法,t3既不是得到锁的线程也不是队列中的第二个线程,调用hasQueuedPredecessors()
方法一定会返回false,那么tryAcquire()
方法一定会返回false。此时调用acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
方法将t3加入队列并将其阻塞于LockSupport.park(this);
处。
非公平锁的加锁调用的是NonfairSync类的lock方法
final void lock() {
if (compareAndSetState(0, 1)) //CAS设置state的状态
setExclusiveOwnerThread(Thread.currentThread()); //设置持有锁线程为当前线程,加锁成功
else // state的状态设置失败
acquire(1);
}
当使用CAS设置state的状态失败后调用AQS的acquire方法如上所示,但是tryAcquire方法则是由NonfairSync类自己实现的
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
tryAcquire方法调用了Sync类的nonfairTryAcquire方法
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
nonfairTryAcquire方法与公平锁的tryAcquire方法类似,但是少了hasQueuedPredecessors()
,也就是说非公平所并不需要判断自己是否有获取锁的资格,直接尝试获取锁,其他过程都是调用AQS内部的方法,与公平锁的加锁过程相同。
ReentractLock调用unlock方法进行解锁
public void unlock() {
sync.release(1);
}
unlock方法内部调用了AQS内的release(1)
方法
public final boolean release(int arg) {
if (tryRelease(arg)) { //尝试进行解锁,由ReentractLock内的Sync实现
Node h = head; // 取得头节点
if (h != null && h.waitStatus != 0) // 头节点不为空,且队列中存在线程排队等待唤醒
unparkSuccessor(h); //唤醒阻塞的线程,传入节点为头节点
return true;
}
return false;
}
Sync实现了tryRelease
方法
protected final boolean tryRelease(int releases) {
int c = getState() - releases; // 取得释放锁后的state的值
if (Thread.currentThread() != getExclusiveOwnerThread()) // 如果当前线程不是持有锁的线程,抛出异常
throw new IllegalMonitorStateException();
boolean free = false; // 判断锁是否可以被其他线程获取
if (c == 0) { // 如果释放锁后state的值为0,则锁可以被其他线程获取
free = true;
setExclusiveOwnerThread(null); // 清除持有锁的线程
}
setState(c); // 设置state的值,此处因为线程持有锁,因此不需要cas操作
return free;
}
由上面的代码可知,如果线程多次使用了lock()
方法(既锁的重入次数大于1),那么调用release
方法后,只是将AQS中state的值减一,线程依旧持有锁。如果锁的重入次数为1,那么调用release
方法后,清空了占有锁的线程,并判断AQS队列中是否存在线程排队等待唤醒,如果存在的话则调用unparkSuccessor
方法唤醒线程。
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus; // 取得节点waitStatus的值
if (ws < 0) // 如果waitStatus的值小于0,则使用cas将其设置为0
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next; // 取得下一个节点
if (s == null || s.waitStatus > 0) { // 如果下一个节点不存在或者对应节点的线程被取消了
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)// 从尾节点向前找节点直到头节点
if (t.waitStatus <= 0) // 找到一个需要被唤醒的节点
s = t;
}
if (s != null) // 如果找到了需要被唤醒的线程
LockSupport.unpark(s.thread); // 唤醒线程到park处继续执行
}
以上面的t3阻塞在队列中为例,假如此时线程t1调用unlock
方法释放了锁,由于线程t1只调用了一次lock
方法,因此调用tryRelease
方法一定会返回true,AQS队列的头节点不为空,且队列中存在线程t2,t3排队等待唤醒,因此执行unparkSuccessor(h);
方法并传入头节点。取得头节点的下一节点为封装线程t2的节点,该节点不为空且waitStatus=-1,因此直接执行LockSupport.unpark(s.thread);
方法唤醒包裹在node节点中的线程t2。t2被唤醒后回到LockSupport.park(this);
处继续执行代码,返回线程的中断标志并将其清除,返回到acquireQueued
方法的for循环中继续执行
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
判断线程在休眠期间是否被外部设置了中断,如果设置了中断则设置变量interrupted
为true,继续for循环取得该节点的上一节点为头节点,设置当前节点为头节点,并断开原头结点的引用链接,随后返回interrupted
,随后恢复acquire(int arg)
方法的执行,如果设置了中断则调用selfInterrupt();
方法回复中断后返回,否则直接返回,随后lock
方法结束。
Node中还存在一个属性nextWaiter,也是使用Node进行定义的
/**
* Link to next node waiting on condition, or the special
* value SHARED. Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
* linked queue to hold nodes while they are waiting on
* conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive,
* we save a field by using special value to indicate shared
* mode.
*/
Node nextWaiter;
对比于synchronized关键字中的wait
与notify
,使用AQS的await
与signal
可以实现与之类似的操作,区别在于wait
方法只能对一个共享变量进行同步,而AQS可以通过使用一把锁创造多个条件变量(即对多个对象同步)。
在ReentrantLock中有一个newCondition方法可以创建一个条件变量
final ConditionObject newCondition() {
return new ConditionObject();
}
而ConditionObject是AQS的内部类,即AQS的内部结构如下所示
在ConditionObject中定义了两个Node类型的属性firstWaiter,lastWaiter,其中firstWaiter用于记录调用了ConditionObject类实例的await方法后阻塞在该条件变量的阻塞队列的第一个线程对应的Node,lastWaiter记录最后一个阻塞线程对应的Node。与上面Node节点的nextWaiter属性呼应(即条件变量的阻塞队列是一个由Node节点形成的单链表)。
调用ConditionObject类实例的await方法可以使当前线程被封装为Node对象,放入该ConditionObject类实例的阻塞队列中。
public final void await() throws InterruptedException {
if (Thread.interrupted()) //如果当前线程被设置了中断标志,抛出中断异常
throw new InterruptedException();
Node node = addConditionWaiter(); // 设置当前线程封为Node并加入队列
int savedState = fullyRelease(node);// 释放当前线程获取到的锁
int interruptMode = 0;
while (!isOnSyncQueue(node)) { // 调用park方法将线程阻塞挂起,使用while循环是为了防止虚假唤醒
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
调用signal方法会将阻塞在条件队列中的node移动至AQS队列中
public final void signal() {
if (!isHeldExclusively()) // 判断线程是否获取到了锁
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first); // 将第一个阻塞队列的Node节点移动至AQS队列
}
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/Nicholas_GUB/article/details/122472322
内容来源于网络,如有侵权,请联系作者删除!