AbstractQueuedSynchronizer详解
一、AQS
AQS(AbstractQueuedSynchronizer
,抽象队列同步器)是JUC(java.util.concurrent
)包中构建锁和其他同步组件(如Semaphore
, CountDownLatch
, ReentrantReadWriteLock
等)的基础框架。
你可以把它理解成一个多线程访问共享资源的“管理员”或“交警”。它的核心职责是:当共享资源被占用时,将未能成功获取资源的线程阻塞,并将其放入一个队列中进行管理;当资源被释放时,再从队列中唤醒等待的线程来尝试获取。
AQS的核心思想可以概括为三点:
一个状态量(State):
- 这是一个
volatile int
类型的变量,表示共享资源的状态。 - 不同的同步器对它的含义解读不同。对于
ReentrantLock
,state
表示锁的重入次数(0表示未被占用,>0表示被占用且数值为重入次数);对于Semaphore
,state
表示剩余的许可证数量;对于CountDownLatch
,state
表示倒计数的数值。
- 这是一个
一个FIFO线程等待队列(CLH变体):
- 这是一个双向链表结构的队列,用于存放所有等待获取资源的线程。当线程尝试获取资源失败时,AQS会将当前线程包装成一个
Node
节点,并将其加入队列尾部。
- 这是一个双向链表结构的队列,用于存放所有等待获取资源的线程。当线程尝试获取资源失败时,AQS会将当前线程包装成一个
两种资源访问模式:
- 独占模式(Exclusive): 同一时刻只有一个线程能访问资源,如
ReentrantLock
。 - 共享模式(Shared): 同一时刻多个线程可以访问资源,如
Semaphore
,CountDownLatch
。
- 独占模式(Exclusive): 同一时刻只有一个线程能访问资源,如
AQS采用了模板方法模式,它定义了顶级逻辑骨架(如获取/释放资源、入队/出队),而将一些关键步骤的实现(如“尝试获取资源”、“尝试释放资源”)以protected
方法的形式留给子类去重写。这就是为什么它叫“抽象”队列同步器。
二、加锁(acquire)与释放(release)
我们以最经典的独占模式(如ReentrantLock
)为例,来分析acquire
和release
的流程。JDK 1.8的源码略有调整但核心逻辑不变。
1. 加锁(acquire)流程
核心方法是acquire(int arg)
,它在AQS中的定义是:
1 | public final void acquire(int arg) { |
这是一个非常精妙的“短路与”操作,包含了获取锁的完整逻辑。我们一步步拆解:
第一步:tryAcquire(arg)
- 作用: 尝试直接获取资源。这是AQS留给子类实现的核心方法。对于
ReentrantLock
,它的非公平锁实现NonfairSync.tryAcquire
会调用nonfairTryAcquire
方法。 - 逻辑:
- 读取当前
state
值。 - 如果
state == 0
,说明锁空闲。尝试用CAS操作将state
设置为arg
(通常是1),如果成功,则将当前线程设置为独占所有者,返回true
。 - 如果
state != 0
,但当前线程已经是锁的独占所有者(重入),则将state
加上arg
,返回true
。 - 如果以上都不满足,获取失败,返回
false
。
- 读取当前
- 结果: 如果
tryAcquire
返回true
,则acquire
方法直接结束,线程成功获取锁。如果返回false
,则进入后续流程。
第二步:addWaiter(Node mode)
- 作用: 将当前线程包装成一个独占模式(Node.EXCLUSIVE)的Node节点,并添加到等待队列的尾部。
- 逻辑:
- 创建一个新的
Node
节点,模式为传入的mode
(这里是Node.EXCLUSIVE
)。 - 使用一个“快速入队”的尝试:如果队列的尾节点(
tail
)不为空,则尝试用CAS操作将新节点设置为新的尾节点。 - 如果快速入队失败(比如并发竞争导致CAS失败,或者
tail
为空),则调用enq(node)
方法。enq
方法内部是一个循环,会不断尝试CAS入队,直到成功为止。这个方法还负责在队列为空时初始化一个虚拟的头节点(dummy head)。
- 创建一个新的
第三步:acquireQueued(final Node node, int arg)
- 作用: 让已经入队的节点,以“自旋”的方式不断尝试获取资源,如果失败则阻塞,被唤醒后继续尝试。这是整个AQS同步队列管理的核心。
- 逻辑:
- 进入一个
for (;;)
死循环。 - 获取当前节点的前驱节点
p
。 - 如果
p
是头节点,说明自己是队列中的第一个等待者(因为头节点是虚拟节点或刚释放锁的节点),有资格再去尝试获取资源(tryAcquire(arg)
)。 - 如果尝试成功:
- 将当前节点设置为新的头节点(
setHead(node)
)。 - 将老的头节点的
next
引用置为null
,帮助GC。 - 返回
false
,表示在等待过程中没有被中断过。
- 将当前节点设置为新的头节点(
- 如果尝试失败,或者前驱节点不是头节点:
- 调用
shouldParkAfterFailedAcquire(p, node)
。这个方法检查前驱节点的waitStatus
(等待状态)。如果前驱节点的状态是SIGNAL(-1)
,则返回true
,表示当前节点可以安全地阻塞了。如果不是,则将其设置为SIGNAL
(意思是“你释放锁的时候记得唤醒我”),然后返回false
,进行下一轮循环重试。 - 如果
shouldParkAfterFailedAcquire
返回true
,则调用parkAndCheckInterrupt()
。这个方法使用LockSupport.park(this)
阻塞当前线程。线程在这里被挂起,等待被唤醒。
- 调用
- 当线程被唤醒后(通常是前驱节点释放锁后唤醒它),会继续下一轮循环,再次尝试获取锁。
- 进入一个
- 返回值: 返回
true
表示在等待过程中线程被中断过。
第四步:selfInterrupt()
- 如果
acquireQueued
返回true
(表示在排队等待过程中被中断过),则在成功获取到锁之后,补上一次中断操作(Thread.currentThread().interrupt()
),响应中断。
加锁流程总结:tryAcquire
-> (失败) -> addWaiter
-> acquireQueued
(循环尝试tryAcquire
或阻塞)-> (成功后) selfInterrupt
(如果需要)。
2. 释放锁(release)流程
核心方法是release(int arg)
:
1 | public final boolean release(int arg) { |
第一步:tryRelease(int releases)
- 作用: 尝试释放资源。同样是子类必须实现的方法。
- 逻辑(以
ReentrantLock
为例):- 将
state
减去releases
。 - 如果当前线程不是锁的独占所有者,抛出
IllegalMonitorStateException
异常。 - 如果
state
减为0
,说明锁完全释放了(解决了重入问题),将独占所有者线程设为null
,并返回true
。 - 如果
state
不为0
,说明只是部分释放(重入锁释放了一层),返回false
,锁依然被当前线程持有(但重入次数减少了)。
- 将
- 结果: 只有完全释放(
state==0
)时返回true
,才会进行后续的唤醒操作。
第二步和第三步:检查头节点
- 获取等待队列的头节点
h
。 - 如果头节点不为
null
并且它的waitStatus
不为0
。waitStatus
不为0
通常意味着它的值是SIGNAL(-1)
,表示它有责任唤醒它的后继节点。也可能是PROPAGATE(-3)
或已取消的CANCELLED(1)
。
第四步:unparkSuccessor(Node node)
- 作用: 唤醒头节点
node
的后继节点。 - 逻辑:
- 将当前头节点的
waitStatus
用CAS操作置为0
(表示它已经完成了唤醒的职责)。 - 从队列尾部开始向前遍历,找到离头节点最近的、且
waitStatus <= 0
(即非取消状态)的节点s
。为什么要从后往前?因为在addWaiter
入队时,是先设置新节点的prev
指针,然后才CAS设置尾指针,最后设置原尾节点的next
指针。从后往前遍历可以避免因为next
指针还未正确设置而丢失节点的问题。 - 如果找到了这个节点
s
,则调用LockSupport.unpark(s.thread)
唤醒该节点对应的线程。
- 将当前头节点的
释放流程总结:tryRelease
-> (成功) -> 检查头节点状态 -> unparkSuccessor
(唤醒下一个有效的等待线程)。
总结与图示
加锁流程:
graph TD A[线程调用acquire] --> B{tryAcquire 成功?} B -- 是 --> C[获取锁成功, 流程结束] B -- 否 --> D[addWaiter: 将线程包装成Node并入队] D --> E[acquireQueued: 循环尝试] subgraph E [acquireQueued 循环] F{前驱是头节点 & tryAcquire成功?} F -- 是 --> G[将自己设为新头节点, 返回] F -- 否 --> H[shouldParkAfterFailedAcquire:
设置前驱状态为SIGNAL] H --> I{parkAndCheckInterrupt: 阻塞线程} I -- 被unpark唤醒 --> F end E --> J{等待过程中被中断?} J -- 是 --> K[selfInterrupt: 补上中断] J -- 否 --> C
释放流程:
graph TD A[线程调用release] --> B{tryRelease 成功?} B -- 否 --> C[返回false] B -- 是 --> D[获取头节点h] D --> E{h != null & h.waitStatus != 0?} E -- 否 --> F[返回true] E -- 是 --> G[unparkSuccessor: 唤醒后继节点] G --> F
AQS通过这种“状态管理” + “CLH队列” + “模板方法”的设计,提供了一个非常强大且灵活的基础,使得基于它构建的各种同步工具既高效又可靠。理解AQS是深入理解JUC并发包的钥匙。