一、AQS的核心设计思想

AbstractQueuedSynchronizer(JSR 166)是 Doug Lea 为 JDK 并发包设计的"排队-模板方法"框架。所有基于AQS的同步器(ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier)都遵循同一套模式:**用int state表示资源数量,用CLH队列管理等待线程**。

1.1 两大核心组成部分

// AQS 核心数据结构
public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer implements java.io.Serializable {

    // 资源计数器(volatile保证可见性)
    // ReentrantLock: state=1表示已加锁,>1表示重入次数
    // Semaphore: state=可用许可证数量
    // CountDownLatch: state=初始count值
    private volatile int state;

    // CLH变体队列(FIFO双向队列)
    // head = 当前持有锁的线程(或已取消的节点)
    // tail = 最新加入的节点
    private transient volatile Node head;
    private transient volatile Node tail;

    // Node是等待队列中的节点
    static final class Node {
        volatile int waitStatus;  // CANCELLED/SIGNAL/PROPAGATE/CONDITION
        volatile Node prev;
        volatile Node next;
        volatile Thread thread;    // 封装的线程
        Node nextWaiter;           // 条件队列指针(ConditionObject用)

        // 关键常量
        static final int CANCELLED =  1;  // 已取消(超时或中断)
        static final int SIGNAL    = -1;  // 后继节点需要被唤醒
        static final int CONDITION = -2;  // 节点在条件队列中等待
        static final int PROPAGATE = -3;  // 共享锁释放需要传播给后继
    }
}

// AQS子类需要覆盖的两个模板方法:
// ✅ tryAcquire(int arg)    — 独占模式:尝试获取锁
// ✅ tryRelease(int arg)    — 独占模式:尝试释放锁
// ✅ tryAcquireShared(int)  — 共享模式:尝试获取共享锁
// ✅ tryReleaseShared(int)  — 共享模式:尝试释放共享锁
// ✅ isHeldExclusively()   — 是否被当前线程独占

二、acquire模板方法源码解析

2.1 独占模式获取锁

// public final void acquire(int arg)
// 入口:尝试"狡猾获取"——tryAcquire失败后才入队等待
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&    // 子类实现,原子CAS
        acquireQueued(         // 入CLH队列,自旋+park
            addWaiter(Node.EXCLUSIVE), // 添加独占节点
            arg)) {
        // 如果acquireQueued返回true(被中断),自我中断
        selfInterrupt();
    }
}

// addWaiter: 快速路径一次CAS入队,失败则走enq自旋入队
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {           // 队列已初始化
        node.prev = pred;
        if (compareAndSetTail(pred, node)) { // CAS设置尾节点
            pred.next = node;
            return node;
        }
    }
    enq(node);                    // 队列未初始化或CAS失败,自旋入队
    return node;
}

// enq: 初始化队列并入队
private Node enq(Node node) {
    for (;;) {                    // 死循环,保证最终入队成功
        Node t = tail;
        if (t == null) {          // 队列未初始化,先初始化head节点(空哨兵)
            if (compareAndSetHead(new Node())) {
                tail = head;      // head和tail都指向空哨兵节点
            }
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;        // 成功入队
            }
        }
    }
}

// acquireQueued: 在队列中自旋等待(核心)
final boolean acquireQueued(Node node, int arg) {
    boolean interrupted = false;
    for (;;) {
        Node pred = node.prev;
        if (pred == head) {       // 关键:前驱是head说明是第一个等待线程
            boolean acquired = tryAcquire(arg);
            if (acquired) {       // 再次尝试获取锁
                setHead(node);     // 获取成功,设为新的head
                pred.next = null;  // 断开旧head的next,帮助GC
                return interrupted;
            }
        }
        // 不能获取锁:判断是否需要park
        if (shouldParkAfterFailedAcquire(pred, node)) {
            // parkAndCheckInterrupt: 阻塞线程,等待被唤醒
            // park()使用的是LockSupport.park(),底层是pthread_cond_wait
            if (parkAndCheckInterrupt()) {
                interrupted = true; // 醒来后被中断
            }
        }
    }
}

// shouldParkAfterFailedAcquire: 判断是否可以安全park
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL) {        // ✅ 安全:前驱会唤醒自己
        return true;
    }
    if (ws > 0) {                   // ❌ CANCELLED:跳过已取消的节点
        do {
            node.prev = pred = pred.prev; // 循环找到有效前驱
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // ws=0(PROPAGATE/CONDITION):需要确保前驱能唤醒自己
        // 将前驱设为SIGNAL(暗示需要唤醒后继)
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false; // 继续自旋,下一轮再检查
}

2.2 独占模式释放锁

// public final boolean release(int arg)
public final boolean release(int arg) {
    if (tryRelease(arg)) {           // 子类实现,释放资源
        Node h = head;
        // head存在且waitStatus != 0,需要唤醒后继
        if (h != null && h.waitStatus != 0) {
            unparkSuccessor(h);      // 唤醒head的后继节点
        }
        return true;
    }
    return false;
}

// unparkSuccessor: 唤醒队列中第一个需要被唤醒的节点
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0) {                   // 将head的waitStatus改为0(已处理)
        compareAndSetWaitStatus(node, ws, 0);
    }

    Node s = node.next;              // 后继节点
    if (s == null || s.waitStatus > 0) { // 后继为空或已取消
        // 从tail向前遍历,找到离head最近的非取消节点
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev) {
            if (t.waitStatus <= 0) { s = t; }
        }
    }
    if (s != null) {
        LockSupport.unpark(s.thread); // 唤醒后继线程
    }
}

// ✅ 关键洞察:为什么从tail向前遍历?
// 因为enq中node.prev=t和tail=node之间可能有竞争
// 导致tail.next=null但tail.prev已设置
// 从tail向前可以找到还未完全入队的节点

三、共享模式:Semaphore与CountDownLatch

3.1 共享模式获取:doAcquireSharedInterruptibly

// Semaphore的acquire()走的是共享模式
public final void acquireSharedInterruptibly(int arg) {
    if (Thread.interrupted()) throw new InterruptedException();
    if (tryAcquireShared(arg) < 0) { // < 0 表示获取失败
        doAcquireSharedInterruptibly(arg);
    }
}

private void doAcquireSharedInterruptibly(int arg) {
    Node node = addWaiter(Node.SHARED); // 添加SHARED模式节点
    boolean failed = true;
    try {
        for (;;) {
            Node pred = node.prev;
            if (pred == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {          // ≥0 表示成功
                    // 关键:传播(Propagate)逻辑
                    setHeadAndPropagate(node, r);
                    pred.next = null;  // 帮助GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(pred, node) &&
                parkAndCheckInterrupt()) {
                throw new InterruptedException();
            }
        }
    } finally {
        if (failed) cancelAcquire(node);
    }
}

// setHeadAndPropagate: 设置新head并传播唤醒
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head;
    setHead(node); // 设为新head

    // 传播条件:
    // propagate > 0:还有可用资源
    // h.waitStatus < 0:旧head需要唤醒
    // (h = head) != null:新head也需要唤醒
    if (propagate > 0 || h == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared()) { // 后继是共享节点
            doReleaseShared(); // 继续传播,唤醒更多等待线程
        }
    }
}

// doReleaseShared: 共享模式释放(核心传播逻辑)
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                // head是SIGNAL,需要唤醒后继
                if (compareAndSetWaitStatus(h, SIGNAL, 0)) {
                    unparkSuccessor(h);
                }
            } else if (ws == 0 &&
                       !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {
                // PROPAGATE:解决多线程同时释放时的传播问题
                // 保证releaseShared也能正确传播唤醒
            }
        }
        if (h == head) break; // head没变则退出
    }
}

3.2 CountDownLatch 源码

// CountDownLatch只使用共享模式,不涉及独占
// Sync是CountDownLatch的内部AQS实现
private static final class Sync extends AbstractQueuedSynchronizer {
    Sync(int count) { setState(count); } // state=初始count

    // 获取:count减到0才能成功
    protected int tryAcquireShared(int acquires) {
        return getState() == 0 ? 1 : -1;
    }

    // 释放:count递减,递减到0时唤醒所有等待线程
    protected boolean tryReleaseShared(int releases) {
        for (;;) {
            int c = getState();
            if (c == 0) return false; // 已经是0了,不能再减
            int nextc = c - 1;
            if (compareAndSetState(c, nextc)) {
                return nextc == 0; // 只有减到0才返回true唤醒
            }
        }
    }
}

// await()逻辑:count>0时进入共享等待
// countDown()逻辑:每次count--,减到0时调用doReleaseShared唤醒所有节点

// 典型应用:等待多个线程完成
CountDownLatch start = new CountDownLatch(1);
CountDownLatch end = new CountDownLatch(N);
// N个线程: start.await() → 工作 → end.countDown()
// 主线程: start.countDown() → end.await() → 全部完成

四、ConditionObject源码:await/signal

// ConditionObject是Object.wait/notify的替代方案
// 每个Condition维护一个独立的等待队列(不同于AQS的CLH队列)
public class ConditionObject implements Condition, java.io.Serializable {
    private transient Node firstWaiter; // 条件队列头(FIFO)
    private transient Node lastWaiter;  // 条件队列尾

    // await: 将当前线程加入条件队列,并释放锁
    public final void await() throws InterruptedException {
        if (Thread.interrupted()) throw new InterruptedException();
        Node node = addConditionWaiter(); // 加入条件队列
        int savedState = fullyRelease(node); // 释放所有已持有的锁
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) { // 不在CLH队列中
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhilePark(node)) != 0) break;
        }
        // 被signal唤醒后,重新acquireQueued竞争锁
        acquireQueued(node, savedState);
        // ...
    }

    // signal: 将条件队列的第一个节点移到CLH队列
    public final void signal() {
        if (!isHeldExclusively()) throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null) {
            doSignal(first);
        }
    }

    private void doSignal(Node first) {
        do {
            firstWaiter = first.nextWaiter;
            if (firstWaiter == null) lastWaiter = null;
            first.nextWaiter = null; // 断开条件队列连接
            // 关键:将节点从条件队列转移到CLH队列
            transferForSignal(first) ?
                (first = null) : // 成功,结束
                (first = firstWaiter); // 失败(已取消),处理下一个
        } while (first != null);
    }

    // transferForSignal: 将节点转移到CLH队列
    final boolean transferForSignal(Node node) {
        // 如果节点已被取消(waitStatus != CONDITION),返回false
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        // 入CLH队列
        Node p = enq(node);
        int ws = p.waitStatus;
        // 如果前驱已取消,或前驱设为SIGNAL失败(CAS)
        // 则直接unpark当前节点(确保signal生效)
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }
}

// ⚠️ 生产使用注意:
// - signal()只是将节点移到CLH队列,不立即唤醒
// - 节点在CLH队列中竞争锁,成功后才继续执行
// - signalAll():将所有条件队列节点移到CLH队列
// - 与Object.wait/notify相比:Condition可以创建多个,一个锁可以有多个条件队列

五、自定义同步器实战

// 自定义互斥锁(不可重入)
public class Mutex implements Lock {
    private final Sync sync = new Sync();

    private static class Sync extends AbstractQueuedSynchronizer {
        protected boolean tryAcquire(int arg) {
            // CAS将state从0设为1(成功=获取锁)
            return compareAndSetState(0, 1);
        }
        protected boolean tryRelease(int arg) {
            setState(0); // 释放锁,直接设为0
            return true;
        }
        public boolean isHeldExclusively() { return getState() == 1; }
    }

    public void lock()    { sync.acquire(1); }
    public void unlock()  { sync.release(1); }
    public boolean tryLock() { return sync.tryAcquire(1); }
    public Condition newCondition() { return sync.newCondition(); }
}

// 自定义信号量(资源池)
public class BoundedSemaphore {
    private final Sync sync;
    public BoundedSemaphore(int permits) {
        sync = new Sync(permits);
    }

    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public void release() {
        sync.releaseShared(1);
    }

    private static class Sync extends AbstractQueuedSynchronizer {
        Sync(int permits) { setState(permits); }
        protected int tryAcquireShared(int reduces) {
            for (;;) {
                int available = getState();
                int remaining = available - reduces;
                if (remaining < 0) return -1; // 获取失败
                if (compareAndSetState(available, remaining)) return 1; // 成功
            }
        }
        protected boolean tryReleaseShared(int returns) {
            for (;;) {
                int current = getState();
                int next = current + returns;
                if (compareAndSetState(current, next)) return true;
            }
        }
    }
}