一、AQS

AQS(AbstractQueuedSynchronizer,抽象队列同步器)是JUC(java.util.concurrent)包中构建锁和其他同步组件(如Semaphore, CountDownLatch, ReentrantReadWriteLock等)的基础框架

你可以把它理解成一个多线程访问共享资源的“管理员”或“交警”。它的核心职责是:当共享资源被占用时,将未能成功获取资源的线程阻塞,并将其放入一个队列中进行管理;当资源被释放时,再从队列中唤醒等待的线程来尝试获取。

AQS的核心思想可以概括为三点:

  1. 一个状态量(State)

    • 这是一个volatile int类型的变量,表示共享资源的状态。
    • 不同的同步器对它的含义解读不同。对于ReentrantLockstate表示锁的重入次数(0表示未被占用,>0表示被占用且数值为重入次数);对于Semaphorestate表示剩余的许可证数量;对于CountDownLatchstate表示倒计数的数值。
  2. 一个FIFO线程等待队列(CLH变体)

    • 这是一个双向链表结构的队列,用于存放所有等待获取资源的线程。当线程尝试获取资源失败时,AQS会将当前线程包装成一个Node节点,并将其加入队列尾部。
  3. 两种资源访问模式

    • 独占模式(Exclusive): 同一时刻只有一个线程能访问资源,如ReentrantLock
    • 共享模式(Shared): 同一时刻多个线程可以访问资源,如Semaphore, CountDownLatch

AQS采用了模板方法模式,它定义了顶级逻辑骨架(如获取/释放资源、入队/出队),而将一些关键步骤的实现(如“尝试获取资源”、“尝试释放资源”)以protected方法的形式留给子类去重写。这就是为什么它叫“抽象”队列同步器。


二、加锁(acquire)与释放(release)

我们以最经典的独占模式(如ReentrantLock)为例,来分析acquirerelease的流程。JDK 1.8的源码略有调整但核心逻辑不变。

1. 加锁(acquire)流程

核心方法是acquire(int arg),它在AQS中的定义是:

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

这是一个非常精妙的“短路与”操作,包含了获取锁的完整逻辑。我们一步步拆解:

第一步:tryAcquire(arg)

  • 作用尝试直接获取资源。这是AQS留给子类实现的核心方法。对于ReentrantLock,它的非公平锁实现NonfairSync.tryAcquire会调用nonfairTryAcquire方法。
  • 逻辑
    1. 读取当前state值。
    2. 如果state == 0,说明锁空闲。尝试用CAS操作将state设置为arg(通常是1),如果成功,则将当前线程设置为独占所有者,返回true
    3. 如果state != 0,但当前线程已经是锁的独占所有者(重入),则将state加上arg,返回true
    4. 如果以上都不满足,获取失败,返回false
  • 结果: 如果tryAcquire返回true,则acquire方法直接结束,线程成功获取锁。如果返回false,则进入后续流程。

第二步:addWaiter(Node mode)

  • 作用将当前线程包装成一个独占模式(Node.EXCLUSIVE)的Node节点,并添加到等待队列的尾部
  • 逻辑
    1. 创建一个新的Node节点,模式为传入的mode(这里是Node.EXCLUSIVE)。
    2. 使用一个“快速入队”的尝试:如果队列的尾节点(tail)不为空,则尝试用CAS操作将新节点设置为新的尾节点。
    3. 如果快速入队失败(比如并发竞争导致CAS失败,或者tail为空),则调用enq(node)方法。enq方法内部是一个循环,会不断尝试CAS入队,直到成功为止。这个方法还负责在队列为空时初始化一个虚拟的头节点(dummy head)。

第三步:acquireQueued(final Node node, int arg)

  • 作用让已经入队的节点,以“自旋”的方式不断尝试获取资源,如果失败则阻塞,被唤醒后继续尝试。这是整个AQS同步队列管理的核心。
  • 逻辑
    1. 进入一个for (;;)死循环。
    2. 获取当前节点的前驱节点p
    3. 如果p是头节点,说明自己是队列中的第一个等待者(因为头节点是虚拟节点或刚释放锁的节点),有资格再去尝试获取资源(tryAcquire(arg))。
    4. 如果尝试成功
      • 将当前节点设置为新的头节点(setHead(node))。
      • 将老的头节点的next引用置为null,帮助GC。
      • 返回false,表示在等待过程中没有被中断过。
    5. 如果尝试失败,或者前驱节点不是头节点
      • 调用shouldParkAfterFailedAcquire(p, node)。这个方法检查前驱节点的waitStatus(等待状态)。如果前驱节点的状态是SIGNAL(-1),则返回true,表示当前节点可以安全地阻塞了。如果不是,则将其设置为SIGNAL(意思是“你释放锁的时候记得唤醒我”),然后返回false,进行下一轮循环重试。
      • 如果shouldParkAfterFailedAcquire返回true,则调用parkAndCheckInterrupt()。这个方法使用LockSupport.park(this)阻塞当前线程。线程在这里被挂起,等待被唤醒。
    6. 当线程被唤醒后(通常是前驱节点释放锁后唤醒它),会继续下一轮循环,再次尝试获取锁。
  • 返回值: 返回true表示在等待过程中线程被中断过。

第四步:selfInterrupt()

  • 如果acquireQueued返回true(表示在排队等待过程中被中断过),则在成功获取到锁之后,补上一次中断操作Thread.currentThread().interrupt()),响应中断。

加锁流程总结
tryAcquire -> (失败) -> addWaiter -> acquireQueued(循环尝试tryAcquire或阻塞)-> (成功后) selfInterrupt(如果需要)。

2. 释放锁(release)流程

核心方法是release(int arg)

1
2
3
4
5
6
7
8
9
public final boolean release(int arg) {
if (tryRelease(arg)) { // 步骤1:尝试释放资源
Node h = head; // 步骤2:获取头节点
if (h != null && h.waitStatus != 0) // 步骤3:检查头节点是否需要唤醒后继者
unparkSuccessor(h); // 步骤4:唤醒后继节点
return true;
}
return false;
}

第一步:tryRelease(int releases)

  • 作用尝试释放资源。同样是子类必须实现的方法。
  • 逻辑(以ReentrantLock为例):
    1. state减去releases
    2. 如果当前线程不是锁的独占所有者,抛出IllegalMonitorStateException异常。
    3. 如果state减为0,说明锁完全释放了(解决了重入问题),将独占所有者线程设为null,并返回true
    4. 如果state不为0,说明只是部分释放(重入锁释放了一层),返回false,锁依然被当前线程持有(但重入次数减少了)。
  • 结果: 只有完全释放(state==0)时返回true,才会进行后续的唤醒操作。

第二步和第三步:检查头节点

  • 获取等待队列的头节点h
  • 如果头节点不为null并且它的waitStatus不为0waitStatus不为0通常意味着它的值是SIGNAL(-1),表示它有责任唤醒它的后继节点。也可能是PROPAGATE(-3)或已取消的CANCELLED(1)

第四步:unparkSuccessor(Node node)

  • 作用唤醒头节点node的后继节点
  • 逻辑
    1. 将当前头节点的waitStatus用CAS操作置为0(表示它已经完成了唤醒的职责)。
    2. 从队列尾部开始向前遍历,找到离头节点最近的、且waitStatus <= 0(即非取消状态)的节点s。为什么要从后往前?因为在addWaiter入队时,是先设置新节点的prev指针,然后才CAS设置尾指针,最后设置原尾节点的next指针。从后往前遍历可以避免因为next指针还未正确设置而丢失节点的问题。
    3. 如果找到了这个节点s,则调用LockSupport.unpark(s.thread)唤醒该节点对应的线程

释放流程总结
tryRelease -> (成功) -> 检查头节点状态 -> unparkSuccessor(唤醒下一个有效的等待线程)。


总结与图示

加锁流程

graph TD
    A[线程调用acquire] --> B{tryAcquire 成功?}
    B -- 是 --> C[获取锁成功, 流程结束]
    B -- 否 --> D[addWaiter: 将线程包装成Node并入队]
    D --> E[acquireQueued: 循环尝试]
    subgraph E [acquireQueued 循环]
        F{前驱是头节点 & tryAcquire成功?}
        F -- 是 --> G[将自己设为新头节点, 返回]
        F -- 否 --> H[shouldParkAfterFailedAcquire: 
设置前驱状态为SIGNAL] H --> I{parkAndCheckInterrupt: 阻塞线程} I -- 被unpark唤醒 --> F end E --> J{等待过程中被中断?} J -- 是 --> K[selfInterrupt: 补上中断] J -- 否 --> C

释放流程

graph TD
    A[线程调用release] --> B{tryRelease 成功?}
    B -- 否 --> C[返回false]
    B -- 是 --> D[获取头节点h]
    D --> E{h != null & h.waitStatus != 0?}
    E -- 否 --> F[返回true]
    E -- 是 --> G[unparkSuccessor: 唤醒后继节点]
    G --> F

AQS通过这种“状态管理” + “CLH队列” + “模板方法”的设计,提供了一个非常强大且灵活的基础,使得基于它构建的各种同步工具既高效又可靠。理解AQS是深入理解JUC并发包的钥匙。