队列同步器 AbstractQueuedSynchronizer
(AQS), 是用来构建锁或者其他同步组件的基础框架(ReentrantLock、CountDownLatch、Semaphore…),它使用了一个int成员变量表示同步状态( private volatile int state;
),通过内置的FIFO队列来完成资源获取线程的排队工作,并发包的作者(Doug Lea)期望它能够成为实现大部分同步需求的基础。
state
用关键字volatile
修饰,代表着该共享资源的状态一更改就能被所有线程可见,而AQS的加锁方式本质上就是多个线程在竞争state,当state
为0时代表线程可以竞争锁,不为0时代表当前对象锁已经被占有,其他线程来加锁时则会失败,加锁失败的线程会被放入一个FIFO的等待队列中,这些线程会被UNSAFE.park()操作挂起,等待其他获取锁的线程释放锁才能够被唤醒。
AQS的主要使用方式是继承,子类通过继承AQS并实现它的抽象方法来管理同步状态,在抽象方法的实现过程中免不了要对同步状态state
进行更改,这时就需要使用同步器提供的3个方法(getState()
、setState(int newState)
和compareAndSetState(int expect,int update)
)来进行操作,因为它们能够保证状态的改变是安全的。
AQS的子类推荐被定义为自定义同步组件的静态内部类,同步器自身没有实现任何同步接口,它仅仅是定义了若干同步锁的获取和释放的方法来供自定义同步组件使用。
AQS定义两种资源共享方式:Exclusive
(独占,只有一个线程能执行,如ReentrantLock)和Share
(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。
不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state
的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:
一般来说,自定义同步器要么是独占方式,要么是共享方式,他们也只需实现tryAcquire-tryRelease(比如ReentrantLock
)、tryAcquireShared-tryReleaseShared(比如CountDownLatch
)中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,比如ReentrantReadWriteLock
。
AQS依赖内部的同步队列(一个FIFO的CLH双向队列)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成为一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒(公平锁),使其再次尝试获取同步状态。
在CLH同步队列中,一个节点表示一个线程,它保存着线程的引用(thread)、状态(waitStatus)、前驱节点(prev)、后继节点(next):
节点Node是构成同步队列的基础,同步器拥有首节点(head)和尾节点(tail),没有成功获取同步状态的线程将会成为节点加入该队列的尾部:
通过调用同步器的acquire(int arg)
方法可以获取同步状态,该方法对中断不敏感,也就是由于线程获取同步状态失败后进入同步队列中,后续对线程进行中断操作时,线程不会从同步队列中移出。
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这段代码主要完成了同步状态获取、节点构造、加入同步队列以及在同步队列中自旋等待的相关工作,其主要逻辑是:首先调用自定义同步器实现的tryAcquire(int arg)
方法,该方法保证线程安全的获取同步状态,如果同步状态获取失败,则构造同步节点(独占式Node.EXCLUSIVE,同一时刻只能有一个线程成功获取同步状态)并通过addWaiter(Node node)
方法将该节点加入到同步队列的尾部,最后调用acquireQueued(Node node,int arg)
方法,使得该节点以“死循环”的方式获取同步状态。如果获取不到则阻塞节点中的线程,而被阻塞线程的唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现。
tryAcquire()
尝试直接去获取资源,如果成功则直接返回(这里体现了非公平锁,每个线程获取锁时会尝试直接抢占加塞一次,而CLH队列中可能还有别的线程在等待),如果“插队”失败才会有后面的acquireQueued()
AQS只是一个框架,具体资源的获取/释放方式交由自定义同步器去实现,因此这里的tryAcquire()默认抛一个异常。
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
为什么不定义成一个抽象类呢?
因为上面说过,自定义同步器要么是独占方式,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可(AQS也支持自定义同步器同时实现独占和共享两种方式);如果定义成抽象类,那么独占式的自定义同步器也需要去实现共享式同步器的方法(即便什么也不需要做)。所以Doug Lea这样设计还是站在开发者的角度考虑的。
1.
addWaiter()将该线程加入等待队列的尾部,并标记为独占模式(addWaiter()
尝试快速将新线程添加到等待队列的尾部,添加失败则调用enq()
方法重新添加)
private Node addWaiter(Node mode) {
//以给定模式构造结点。mode有两种:EXCLUSIVE(独占)和SHARED(共享)
Node node = new Node(Thread.currentThread(), mode);
//尝试快速方式直接放到队尾
Node pred = tail;
if (pred != null) {
node.prev = pred;
//CAS设置尾节点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//快速添加到队尾失败
enq(node);
return node;
}
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
private Node enq(final Node node) {
//CAS自旋,直到成功加入队尾
for (;;) {
Node t = tail;
//队列为空,创建一个空的标志结点作为head结点,并将tail也指向它
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
//正常流程连接到队尾
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued()使线程阻塞在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false
final boolean acquireQueued(final Node node, int arg) {
//标记是否成功拿到资源
boolean failed = true;
try {
//标记等待过程中是否被中断过
boolean interrupted = false;
//CAS自旋
for (;;) {
//获取前驱节点
final Node p = node.predecessor();
//如果前驱节点是头节点,即该结点已成老二,那么便有资格去尝试获取资源
if (p == head && tryAcquire(arg)) {
//拿到资源后,将head指向该结点。(head所指的结点就是当前获取到资源的那个结点或null)
setHead(node);
//setHead()中node.prev已置为null,此处再将head.next置为null,就是为了方便GC回收以前的head结点,也就意味着之前拿完资源的结点出队了
p.next = null;
//成功获取资源
failed = false;
//返回等待过程中是否被中断过
return interrupted;
}
//前驱不是头节点或尝试获取资源失败
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
//如果线程在阻塞等待状态时被中断了,则将中断标记interrupted置为true
interrupted = true;
}
} finally {
//如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了),那么取消结点在队列中的等待
if (failed)
cancelAcquire(node);
}
}
private void setHead(Node node) {
//将当前结点设置为头结点
head = node;
//取消对当前Node对象的t成员变量thread线程对象的引用(该线程已经拿到了锁去执行了),便于垃圾回收该结点。
node.thread = null;
node.prev = null;
}
//如果前驱节点是SIGNAL,就阻塞;否则就改成SIGNAL
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//拿到前驱的状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//如果前驱状态为SIGNAL,说明前驱在释放锁的时候会通知自己,那么就返回true(自己进入等待状态)
return true;
if (ws > 0) {
//如果前驱放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//如果前驱正常,那就把前驱的状态设置成SIGNAL,让它释放锁后通知一下自己
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
//调用park()使线程进入等待状态
LockSupport.park(this);
//如果被唤醒,查看自己是不是被中断的
return Thread.interrupted();
}
如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
当前线程获取同步状态并执行了相应逻辑之后,就需要释放同步状态,使得后续节点能够继续获取同步状态。通过调用同步器的release(int arg)
方法可以释放同步状态,该方法在释放了同步状态之后,会唤醒其后继节点(进而使后继节点重新尝试获取同步状态):
public final boolean release(int arg) {
//当前线程已经释放完了资源
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//唤醒等待队列里的下一个线程(唤醒当前结点的后继节点)
unparkSuccessor(h);
return true;
}
return false;
}
//tryRelease和tryAcquire一样,都是需要子类自己去实现的
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
private void unparkSuccessor(Node node) {
//这里node一般为当前线程所在的结点。
int ws = node.waitStatus;
//置零当前线程所在的结点状态,允许失败
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//找到下一个需要唤醒的结点s
Node s = node.next;
//如果s为空或已取消
if (s == null || s.waitStatus > 0) {
s = null;
//从后向前找
for (Node t = tail; t != null && t != node; t = t.prev)
//把有效的节点给到s
if (t.waitStatus <= 0)
s = t;
}
//s不为空则说明这是最靠前的一个有效节点
if (s != null)
//唤醒
LockSupport.unpark(s.thread);
}
独占锁的获取和释放总结: 在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程都会被加入到队列中并在队列中进行自旋;移出队列(或停止自旋)的条件是前驱节点为头节点且成功获取了同步状态。在释放同步状态时,同步器调用tryRelease(int arg)
方法释放同步状态,然后唤醒头节点的后继节点
通过调用同步器的acquireShared(int arg)
方法可以获取同步状态,该方法对中断不敏感,也就是由于线程获取同步状态失败后进入同步队列中,后续对线程进行中断操作时,线程不会从同步队列中移出。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
这里tryAcquireShared()
依然需要自定义同步器去实现。但是AQS已经把其返回值的语义定义好了:负值代表获取失败;0代表获取成功,但没有剩余资源;正数表示获取成功,还有剩余资源,其他线程还可以去获取
在doAcquireShared(int arg)
方法的自旋过程中,如果当前节点的前驱为头节点时,尝试获取同步状态,如果返回值大于等于0,表示该次获取同步状态成功并从自旋过程中退出。
doAcquireShared(int arg)
的流程跟acquireQueued(final Node node, int arg)
也是很相似的,只不过共享模式将selfInterrupt()
放到doAcquireShared(int arg)
内部,而独占模式将selfInterrupt()
放到acquireQueued(final Node node, int arg)
外部了。
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null;
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
setHeadAndPropagate()
与setHead()
的区别在于,setHeadAndPropagate()
除了会把当前结点设置为头结点,还会去判断资源是否有余量,如果有的话,就继续唤醒下一个结点线程:
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
获取资源的流程就是:
tryAcquireShared()
尝试获取资源,成功则直接返回doAcquireShared()
进入等待队列park()直到被unpark()/interrupt()并成功获取到资源才返回。整个等待过程也是忽略中断的。跟acquire()
的流程大同小异,只不过多了个自己拿到资源后,还会去唤醒后继队友(共享) 的操作。
当前线程获取同步状态并执行了相应逻辑之后,就需要释放同步状态,使得后续节点能够继续获取同步状态。通过调用同步器的releaseShared(int arg)
方法可以释放同步状态,该方法在释放了同步状态之后,会唤醒其后继节点(进而使后继节点重新尝试获取同步状态):
public final boolean releaseShared(int arg) {
//尝试释放资源
if (tryReleaseShared(arg)) {
//唤醒后继结点
doReleaseShared();
return true;
}
return false;
}
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
//唤醒后继节点
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}
AQS的CLH队列就是尝试去获取同一把锁的多个线程的等待队列。
如果锁是空闲的,这是有一个线程尝试去获取锁,会直接获取到,不会用到CLH队列;此时如果在锁被占用的情况下又有线程来竞争,才会用到CLH队列。
Node类有一个很重要的变量waitStatus
,用来标记这个结点的状态;这个变量有一个很重要的值SIGNAL(-1)
,表示当前结点释放资源的时候需要唤醒下一个结点。所以,每个节点在进入等待状态前,都需要判断自己的前驱结点的waitStatus
是不是SIGNAL
,如果不是,需要CAS将其修改为SIGNAL
,否则自己永远无法被唤醒(shouldParkAfterFailedAcquire()
方法中)。
此时CLH队列是一个空队列,新的Node结点入队时需要将它的前一个结点的waitStatus
设置为SIGNAL
,但是没有前一个结点,于是就只能新建一个空的结点(compareAndSetHead(new Node())
)先让head和tail都指向这个虚节点,之后再在CAS自旋中将这个线程结点连接到虚节点后面(enq()
方法中)
acquireQueued()
方法负责使队列中的结点不断自旋,直到自己的前驱节点变成了头结点且tryAcquire()
方法返回true,说明自己也可以尝试去获取资源了,如果成功获取到,就将自己设置为头结点(setHead()
方法会将自己的前驱节点和变量thread设置为空,因为该Node结点封装的线程已经获取到了锁,所以Node也没有必要继续连接着这个线程对象的引用了。不然不好GC)并且将自己的后继节点也设置为空,这样这个头结点就成了一个真正的“虚节点”(但是它的waitStatus
此时还是SIGNAL
)。也就是说当一个结点的线程获取到了锁时,那么封装这个线程的Node结点就没有用了,将其内部各个变量都置空变成一个“虚结点”也是为了方便GC。
另外,释放资源(unparkSuccessor()
)时,由于之前已将头结点的后继结点置空,因此会从尾结点开始向前查找,找到最靠前的一个结点将其唤醒。
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/qq_43613793/article/details/120550949
内容来源于网络,如有侵权,请联系作者删除!