思考集结处

vuePress-theme-reco 思考集结处    2024
思考集结处 思考集结处

Choose mode

  • dark
  • auto
  • light
首页
标签
分类
  • AI
  • Docker
  • 分布式事务
  • 文件存储
  • 框架
  • Spring
  • java
  • 其他
  • 搜索引擎
  • 源码
  • 网站
Java
网站
容器技术
搜索引擎
分布式事务
源码系列
框架系列
文件存储
AI
其他
GitHub
author-avatar

思考集结处

43

文章

18

标签

首页
标签
分类
  • AI
  • Docker
  • 分布式事务
  • 文件存储
  • 框架
  • Spring
  • java
  • 其他
  • 搜索引擎
  • 源码
  • 网站
Java
网站
容器技术
搜索引擎
分布式事务
源码系列
框架系列
文件存储
AI
其他
GitHub
  • Java相关技术
  • 网络编程相关概念
  • 同步阻塞IO模型
  • 同步非阻塞IO模型
  • 多路复用IO模型
  • 异步IO模型
  • 多线程源码分析
  • 线程池
  • Spring 事件监听
  • Spring 重试机制
  • Spring 重试机制之listeners参数

多线程源码分析

vuePress-theme-reco 思考集结处    2024

多线程源码分析

思考集结处 2021-11-21 多线程

java多线程源码分析

# JUC之锁机制原理

# LockSupport原理

用于支撑JUC包用于操作线程阻塞和唤醒的工具类。可以看到底层都是通过Unsafe类来支撑。

public class LockSupport {
    private LockSupport() {}
    
    // 设置造成线程阻塞的对象信息,用于:debug、jstack
    private static void setBlocker(Thread t, Object arg) {
        UNSAFE.putObject(t, parkBlockerOffset, arg); // 去除CAS的volatile优化
    }
    
    // 唤醒当前阻塞的线程对象
    public static void unpark(Thread thread) {
        if (thread != null)
            UNSAFE.unpark(thread);
    }
    
    // 带有提示对象blocker的阻塞线程操作
    public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(false, 0L);
        setBlocker(t, null);
    }

    // 带超时时间(绝对时间)和提示对象的阻塞线程操作
    public static void parkNanos(Object blocker, long nanos) {
        if (nanos > 0) {
            Thread t = Thread.currentThread();
            setBlocker(t, blocker);
            UNSAFE.park(false, nanos);
            setBlocker(t, null);
        }
    }
    
    // 带超时时间(相对时间)和提示对象的阻塞线程操作
    public static void parkUntil(Object blocker, long deadline) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(true, deadline);
        setBlocker(t, null);
    }

    // 获取阻塞对象
    public static Object getBlocker(Thread t) {
        if (t == null)
            throw new NullPointerException();
        return UNSAFE.getObjectVolatile(t, parkBlockerOffset);
    }

    
    public static void park() {
        UNSAFE.park(false, 0L);
    }

    public static void parkUntil(long deadline) {
        UNSAFE.park(true, deadline);
    }


    static final int nextSecondarySeed() {
        int r;
        Thread t = Thread.currentThread();
        if ((r = UNSAFE.getInt(t, SECONDARY)) != 0) {
            r ^= r << 13;   // xorshift
            r ^= r >>> 17;
            r ^= r << 5;
        }
        else if ((r = java.util.concurrent.ThreadLocalRandom.current().nextInt()) == 0)
            r = 1; // avoid zero
        UNSAFE.putInt(t, SECONDARY, r);
        return r;
    }

    // 获取unsafe需要的地址值
    private static final sun.misc.Unsafe UNSAFE;
    private static final long parkBlockerOffset;
    private static final long SEED;
    private static final long PROBE;
    private static final long SECONDARY;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> tk = Thread.class;
            parkBlockerOffset = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("parkBlocker"));
            SEED = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("threadLocalRandomSeed"));
            PROBE = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("threadLocalRandomProbe"));
            SECONDARY = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("threadLocalRandomSecondarySeed"));
        } catch (Exception ex) { throw new Error(ex); }
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93

# AQS原理

  1. Abstract : 因为它并不知道怎么上锁。模板方法设计模式即可,暴露出上锁逻辑
  2. Queue:线程阻塞队列
  3. Synchronizer:同步
  4. CAS+state 完成多线程抢锁逻辑
  5. Queue 完成抢不到锁的线程排队

# acquire方法

获取写锁(互斥锁)的代码。tryAcquire方法由子类来完成,该方法也称之为模板方法,为何如此设计?这时因为AQS无法知道子类如何定义获取锁的操作。假如子类判断当前线程没有获取到锁,那么如何?排队去。addWaiter(Node.EXCLUSIVE)方法用于排队站位,acquireQueued方法用于站位后去沙发等待,此时不难猜出这里面肯定调用了tryAcquire(arg),可以想想为什么?因为在任何过程中,都有可能别的线程已经释放了锁。

public final void acquire(int arg) {
    if (!tryAcquire(arg) && // 子类判定获取锁失败返回false,那么这里取反,表示为 true
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // addWaiter方法为获取失败后添加到阻塞队列。进入阻塞队列后?考虑是否睡眠?
        selfInterrupt();
}

// 子类实现获取锁的逻辑,AQS并不知道你怎么用这个state来上锁
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}
1
2
3
4
5
6
7
8
9
10

# addWaiter方法

将阻塞线程节点放入阻塞队列。采用 全路径 + 优化前置 的技巧,实现快速入队。

private Node addWaiter(Node mode) {
    // 创建等待节点:当前线程对象 + 锁模式(互斥锁?共享锁)
    Node node = new Node(Thread.currentThread(), mode);
    // 快速加入到队列
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            // (面试重点,在后面进行操作时,最难理解,还有特重点在这里)特别注意:当上面的CAS成功后,有一瞬间 这里的pred.next并没有关联。会导致什么问题?有一瞬间,你通过head引用遍历的时候,是到达不了最后一个节点的,A(head)    ->     B(旧tail)   <-   C(tail)。如何获取最新的节点呢?通过tail指针往前遍历即可
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

// 全路径入队方法
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // 懒加载时,head和tail 分别代表aqs的头尾节点
            // 通过CAS实现原子初始化操作,直接用一个空节点实现初始化,此时head和tail指向同一个空节点
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

# acquireQueued方法

当加入阻塞队列后,调用该方法考虑是否将当前线程进行阻塞。在看该方法时,请考虑一个情况:假如在添加到阻塞队列后,当前锁状态是无锁时, 怎么办?那么一定时尝试获取锁。

// node节点为阻塞队列中的线程节点
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 当前节点的前驱节点
            final Node p = node.predecessor();
            // 假如p是头结点,有可能此时头结点释放了锁,那么尝试调用tryAcquire,让子类抢一下锁
            if (p == head && tryAcquire(arg)) {
                // 获取成功,更新头结点,释放引用
                setHead(node);
                p.next = null;
                failed = false;
                return interrupted;
            }
            // 如果前驱节点不是头结点或者抢锁失败,何如?那就先判断是否应该被阻塞,如果阻塞呢?调用parkAndCheckInterrupt方法来阻塞当前线程。
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

// 阻塞当前线程。响应中断的方式阻塞线程
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this); // this是啥意思?用于指明当前线程阻塞在哪个对象上,后期可以通过jstack命令来看到,用于排除问题
    return Thread.interrupted(); // 判断是否是通过中断的方式来唤醒的。1、unpark 2、interrupt
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

# shouldParkAfterFailedAcquire方法

该方法用于判断当前线程节点是否应该阻塞。无非就是找到一个可靠(活着有效)的节点,然后将当前线程节点作为其后继节点即可。

// pred是前驱节点,node是当前线程节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        // 如果前驱节点是SIGNAL,那么此时可以安全的睡眠(因为SIGNAL状态,代表了上一个线程是活的,它可以通知你,所以当前线程节点可以安全的阻塞了)
        return true;
    if (ws > 0) {
        // 前一个节点是CANCEL无效节点,那咋整?一直往前找,直到找到(有效节点)
        do {
            // 前驱节点的前驱节点,也即:踩着死亡节点往前跳
            Node predPrev = pred.prev;
            // 将前驱的前驱当成当前线程节点的前驱节点
            pred = predPrev;
            // 更新引用
            node.prev = pred;
        } while (pred.waitStatus > 0);
        // 更新找到的前驱有效节点的next引用,指向当前节点
        pred.next = node;
    } else {
        // 正常节点,那么CAS 将其变为SIGNAL
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

# Node内部类

该内部类用于作用AQS的阻塞队列,封装线程节点。

static final class Node {
    // 标志当前线程节点阻塞状态为:共享节点
    static final Node SHARED = new Node();
    // 标志当前线程节点的阻塞状态为:互斥节点
    static final Node EXCLUSIVE = null;

    //  唯一的大于0的节点,表示当前节点已经被取消,属于无效节点
    static final int CANCELLED =  1;
    // 表明当前节点是活动节点,可以唤醒后继的等待节点
    static final int SIGNAL    = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    static final int CONDITION = -2;
    // 用于共享锁唤醒后继节点标志位用
    static final int PROPAGATE = -3;

    // 代表了当前节点的状态值,简称ws,取值为以上4个
    volatile int waitStatus;

    /**
     * Link to predecessor node that current node/thread relies on
     * for checking waitStatus. Assigned during enqueuing, and nulled
     * out (for sake of GC) only upon dequeuing.  Also, upon
     * cancellation of a predecessor, we short-circuit while
     * finding a non-cancelled one, which will always exist
     * because the head node is never cancelled: A node becomes
     * head only as a result of successful acquire. A
     * cancelled thread never succeeds in acquiring, and a thread only
     * cancels itself, not any other node.
     */
    volatile Node prev;

    /**
     * Link to the successor node that the current node/thread
     * unparks upon release. Assigned during enqueuing, adjusted
     * when bypassing cancelled predecessors, and nulled out (for
     * sake of GC) when dequeued.  The enq operation does not
     * assign next field of a predecessor until after attachment,
     * so seeing a null next field does not necessarily mean that
     * node is at end of queue. However, if a next field appears
     * to be null, we can scan prev's from the tail to
     * double-check.  The next field of cancelled nodes is set to
     * point to the node itself instead of null, to make life
     * easier for isOnSyncQueue.
     */
    volatile Node next;

    /**
     * The thread that enqueued this node.  Initialized on
     * construction and nulled out after use.
     */
    volatile Thread thread;

    /**
     * Link to next node waiting on condition, or the special
     * value SHARED.  Because condition queues are accessed only
     * when holding in exclusive mode, we just need a simple
     * linked queue to hold nodes while they are waiting on
     * conditions. They are then transferred to the queue to
     * re-acquire. And because conditions can only be exclusive,
     * we save a field by using special value to indicate shared
     * mode.
     */
    Node nextWaiter;

    /**
     * Returns true if node is waiting in shared mode.
     */
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    /**
     * Returns previous node, or throws NullPointerException if null.
     * Use when predecessor cannot be null.  The null check could
     * be elided, but is present to help the VM.
     *
     * @return the predecessor of this node
     */
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99

# release方法

释放写锁(互斥锁)的代码。

public final boolean release(int arg) {
    // 子类判定释放锁成功
    if (tryRelease(arg)) {
        // 检查阻塞队列唤醒即可
        Node h = head;
        if (h != null && // AQS队列从来没被使用过
            h.waitStatus != 0) // 那就是SIGNAL=-1
            // 头结点为有效节点,且标注于需要唤醒后继节点,那么唤醒即可
            unparkSuccessor(h);
        return true;
    }
    return false;
}

// 子类实现获取锁的逻辑,AQS并不知道你怎么用这个state来上锁
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# unparkSuccessor方法

该方法用于释放node节点后的有效后继节点。说白了,就是从node节点往后找到有效地节点,唤醒即可。

private void unparkSuccessor(Node node) {
    // 开始唤醒后继节点,当前头节点,且ws=SINGAL,CAS将其变为0,代表了我当前已经响应了这一次的唤醒操作
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    // 取当前头结点的后继节点,作为唤醒节点,但是,请注意条件
    Node s = node.next;
    if (s == null ||  // 为什么这里有可能为空?因为我们首先更新的是tail引用,然后再是旧的tail.next所以有可能一瞬间为空
        s.waitStatus > 0) { // 后继节点居然是无效节点???
        s = null;
        // tail引用一定是最新的引用,那么从后往前找到 第一个(node节点的后继的第一个) 有效节点
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 找到了s节点,此时s就是要唤醒的节点
    if (s != null)
        LockSupport.unpark(s.thread);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# acquireShared方法

该方法用于获取共享锁。

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0) // 由子类判断是否获取锁成功,若不成功?该阻塞阻塞去
        doAcquireShared(arg);
}

protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED); // 和互斥锁一样,只不过呢?这里添加的是共享模式的锁节点
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            // 刚好这个节点就在头结点后面,可能头结点很快就释放锁了,那么尝试获取锁
            if (p == head) {
                int r = tryAcquireShared(arg);
                // 如果获取锁成功,尝试唤醒后面的共享节点(因为共享锁是可以多线程同时获取,参考下:读写锁实现的读锁)
                // A(head 写)->B(读)->C(读)
                if (r >= 0) {
                    setHeadAndPropagate(node, r); // 将当前获取锁的共享节点更新头部,然后唤醒后继节点
                    p.next = null; // help GC
                    if (interrupted) // 如果在等待过程中发生了中断,由于中断而被唤醒,那么置位当前线程的中断标志位
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

// 用于更新头结点,并且唤醒后继共享节点(面试重点,难点)
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    if (propagate > 0 || // 信号量还有多余的,那么直接唤醒。eg:A:2 head->B-C 
        h == null || // 不可能发生
        h.waitStatus < 0 || // SIGNAL 表明必须唤醒后继节点 那么直接唤醒
        (h = head) == null || // 不可能发生。A B (获取锁) head  -> D 
        h.waitStatus < 0) { // SIGNAL 表明必须唤醒后继节点 那么直接唤醒
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

// 释放共享锁节点。(难点,核心点,面试点)。eg:当前状态:A B (获取锁) head -> C -> D 
private void doReleaseShared() {
    for (;;) {
        Node h = head; // 保存head临时变量(两个状态:head没有被更新、head被更新)
        if (h != null && h != tail) { // 链表中还存在等待节点
            int ws = h.waitStatus;
            // 理应要唤醒后面的节点,因为SIGNAL的语义就是必须唤醒后面的节点(正常状态)
            if (ws == Node.SIGNAL) {
                // CAS将该状态由SIGNAL改为0,表示唤醒成功,那么如果失败呢?此时表明多个线程同时释放了共享锁,都需要同时唤醒后继节点,此时state 状态为 2 表明了资源数可用为2,需要唤醒后面两个等待的共享节点
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                // 直接唤醒后继节点即可
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // PROPAGATE = -3 若CAS成功,表明A释放了一个共享锁时唤醒了C,此时在C还没有更新头节点之前,A马上又释放了一个共享锁,且CAS成功将头结点的状态改为了PROPAGATE,此时完美保证了第一个状态唤醒的正确性
                continue;
        }
        if (h == head) // 头结点至今没被改变过,且 h != null && h != tail 成功,那么直接退出循环,没有节点了
            break;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77

# cancelAcquire方法

取消的这个在竞争队列上的节点有几种状态:

  1. 在队列尾部
  2. 在队列中间
  3. 在队列前面,头结点的后面
private void cancelAcquire(Node node) {
    if (node == null)
        return;
    node.thread = null; 
    // 找到一个合适的前驱节点(未被cancel的节点)
    Node pred = node.prev;
    while (pred.waitStatus > 0) // 也即ws不大于 cancel的状态 1
        node.prev = pred = pred.prev;
    // 有效前驱节点的后继节点
    Node predNext = pred.next;
    // 标识节点是无效节点,此时其他线程将会看到该状态,同时越过该节点处理
    node.waitStatus = Node.CANCELLED;
    if (node == tail &&    //   当前删除的节点  可能(删除时可以并发插入)   是尾结点
        compareAndSetTail(node, pred)) { // 若当前快照看到了是尾结点,那么CAS尝试删除(修改tail指针为pred)
        compareAndSetNext(pred, predNext, null); // cas修改next指针
    } else {
        int ws;
        if (pred != head && // 被取消的节点不在头结点的后面
            ((ws = pred.waitStatus) == Node.SIGNAL || // pred需要唤醒后继节点
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) { // 线程对象必须不为空,因为只有线程对象存在的情况下该节点才是最终有效节点
            Node next = node.next;
            // 当前被删除节点的后继节点必须时有效节点
            if (next != null && next.waitStatus <= 0)
                // A->B(CANCEL)->C    A->C
                compareAndSetNext(pred, predNext, next); // 尝试修改pred前驱节点的next指向node的后继节点
        } else {
            // 唤醒后继节点即可
            unparkSuccessor(node);
        }
        node.next = node; // help GC
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

# ConditionObject原理

# 核心变量定义与构造器原理

根据变量定义,公用AQS的Node节点。

public interface Condition {
    void await() throws InterruptedException;
    void awaitUninterruptibly();
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    boolean awaitUntil(Date deadline) throws InterruptedException;
    void signal();
    void signalAll();
}

public class ConditionObject implements Condition {
    /** 条件阻塞队列的头结点 */
    private transient Node firstWaiter;
    /** 条件阻塞队列的尾结点 */
    private transient Node lastWaiter;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# await方法原理
public final void await() throws InterruptedException {
    // 检测中断
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter(); // 生成一个Node等待节点将其放入条件阻塞队列
    int savedState = fullyRelease(node); // 调用release操作唤醒竞争队列的节点。注意:当前线程的state变量,也即控制可重入的变量需要保存,因为在后面唤醒后要恢复状态
    int interruptMode = 0;
    // 节点未放入到AQS的竞争队列之前一直阻塞
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // 响应中断
            break;
    }
    // 开始竞争锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# addConditionWaiter方法原理
private Node addConditionWaiter() {
    Node t = lastWaiter;
    // 尾结点被取消了,那么将其从节点中干掉
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    // 创建等待节点,模式为CONDITION,表明等待在条件变量上
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    // 标准链表操作
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# fullyRelease方法原理
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState(); // 先获取当前状态
        if (release(savedState)) { // 释放所有状态,并且唤醒后继节点
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        // 如果失败,那么将节点状态变为CANCELLED即可
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# isOnSyncQueue方法原理
final boolean isOnSyncQueue(Node node) {
    // waitStatus等于CONDITION一定在条件阻塞队列上,prev为null,一定不在竞争队列上?因为竞争队列上的节点prev一定不为空,且prev节点的状态可能为SIGNAL。有同学说:头结点的prev为null,那么我问你:头结点在等待队列上吗?头结点代表了什么?代表了假节点,仅仅只是为了保证获取到后继的节点而设立的,如果硬要说有意义,那么一定是代表了当前获取到了锁的节点
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    // 若节点的next节点不为空,那么一定在AQS 竞争队列上
    if (node.next != null)
        return true;
    return findNodeFromTail(node);
}

private boolean findNodeFromTail(Node node) {
    Node t = tail;
    // 从尾指针开始遍历往前遍历直到找到该节点为止
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# checkInterruptWhileWaiting原理

其实特殊处理的就是:线程被中断返回,而不是正常unpark返回。如果正常返回该方法返回0。

private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT)  : 0;
}

final boolean transferAfterCancelledWait(Node node) {
    // CAS将节点状态修改为0,初始状态
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        enq(node); // 添加到竞争队列
        return true;
    }
    // 等待该节点进入竞争队列
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# unlinkCancelledWaiters
private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    Node trail = null;
    while (t != null) {
        Node next = t.nextWaiter;
        if (t.waitStatus != Node.CONDITION) {
            t.nextWaiter = null;
            if (trail == null)
                firstWaiter = next;
            else
                trail.nextWaiter = next;
            if (next == null)
                lastWaiter = trail;
        }
        else
            trail = t;
        t = next;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# signal方法原理

唤醒线程将等待节点从等待节点上移动到竞争队列,尽量给它找一个很好地归宿,如果没有找到,那么唤醒它自己找吧。

public final void signal() {
    // 当前线程一定要持有互斥锁
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first); 
}

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
    // CAS避免中断唤醒竞争
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    Node p = enq(node);
    // 当添加到竞争队列之后,节点的状态有可能会改变,但是切记这里返回的前驱节点
    int ws = p.waitStatus;
    if (ws > 0 ||  // 前驱节点已经被取消,为无效节点(此操作为优化操作)
        !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread); // 唤醒等待条件变量的线程,让其调用acquireQueued方法将自己调整位置并再次阻塞或者获取锁
    return true;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

# 总结

子类只需要实现自己的获取锁逻辑和释放锁逻辑即可,至于排队阻塞等待、唤醒机制均由AQS来完成。

# ReentrantLock 原理

# 概念

基于AQS实现的可重入锁实现类。

# 核心变量和构造器

public class ReentrantLock implements Lock, java.io.Serializable {
    private final Sync sync;
    public ReentrantLock() {
        // 默认为非公平锁。为何默认为非公平锁?因为通过大量测试下来,发现非公平锁的性能优于公平锁
        sync = new NonfairSync();
    }
    public ReentrantLock(boolean fair) {
        // 由fair变量来表明选择锁类型
        sync = fair ? new FairSync() : new NonfairSync();
    }

    abstract static class Sync extends AbstractQueuedSynchronizer {
        abstract void lock();
        // 非公平锁标准获取锁方法
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            // 当执行到这里时,正好获取所得线程释放了锁,那么可以尝试抢锁
            if (c == 0) {
                // 继续抢锁,不看有没有线程排队
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 当前线程就是持有锁的线程,表明锁重入
            else if (current == getExclusiveOwnerThread()) {
                // 利用state整形变量进行次数记录
                int nextc = c + acquires;
                // 如果超过了int表示范围,表明符号溢出,所以抛出异常0111 1111 + 1 = 1000 0000 
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            // 返回false 表明需要AQS来将当前线程放入阻塞队列,然后进行阻塞操作等待唤醒获取锁
            return false;
        }

        // 公平锁和非公平锁公用方法,因为在释放锁的时候,并不区分是否公平
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            // 如果当前线程不是上锁的那个线程
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            // 不是重入锁,那么当前线程一定是释放锁了,然后我们把当前AQS用于保存当前锁对象的变量ExclusiveOwnerThread设置为null,表明释放锁成功
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            // 注意:此时state全局变量没有改变,也就意味着在setState之前,没有别的线程能够获取锁,这时保证了以上的操作原子性
            setState(c);
            // 告诉AQS,我当前释放锁成功了,你可以去唤醒正在等待锁的线程了
            return free;
        }

        protected final boolean isHeldExclusively() {
            return getExclusiveOwnerThread() == Thread.currentThread();
        }

        final ConditionObject newCondition() {
            return new ConditionObject();
        }

    }

    static final class NonfairSync extends Sync {
        // 由ReentrantLock调用获取锁
        final void lock() {
            // 非公平锁,直接抢锁,不管有没有线程排队
            if (compareAndSetState(0, 1))
                // 上锁成功,那么标识当前线程为获取锁的线程
                setExclusiveOwnerThread(Thread.currentThread());
            else
                // 抢锁失败,进入AQS的标准获取锁流程
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            // 使用父类提供的获取非公平锁的方法来获取锁
            return nonfairTryAcquire(acquires);
        }
    }

    static final class FairSync extends Sync {
        // 由ReentrantLock调用
        final void lock() {
            // 没有尝试抢锁,直接进入AQS标准获取锁流程
            acquire(1);
        }
        
        // AQS调用,子类自己实现获取锁的流程
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            // 此时有可能正好获取锁的线程释放了锁,也有可能本身就没有线程获取锁
            if (c == 0) {
                // 注意:这里和非公平锁的区别在于:hasQueuedPredecessors看看队列中是否有线程正在排队,没有的话再通过CAS抢锁
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    // 抢锁成功
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 当前线程就是获取锁的线程,那么这里是锁重入,和非公平锁操作一模一样
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            // 返回false 表明需要AQS来将当前线程放入阻塞队列,然后进行阻塞操作等待唤醒获取锁
            return false;
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119

# 核心方法

  1. 获取锁操作

    public void lock() {
        // 直接通过sync同步器上锁
        sync.lock();
    }
    
    1
    2
    3
    4
  2. 释放锁操作

    public void unlock() {
        sync.release(1);
    }
    
    
    1
    2
    3
    4

# ReentrantReadWriteLock 原理

# 用例

将原来的锁,分割为两把锁:读锁、写锁。适用于读多写少的场景,读锁可以并发,写锁与其他锁互斥。写写互斥、写读互斥、读读兼容。

public class ThreadDemo {
    static volatile int a;

    public static void readA() {
        System.out.println(a);
    }

    public static void writeA() {
        a++;
    }

    public static void main(String[] args) {
        ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
        ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();
        ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock();
        Thread readThread1 = new Thread(() -> {
            readLock.lock();
            try {
                readA();
            } finally {
                readLock.unlock();
            }

        });
        Thread readThread2 = new Thread(() -> {
            readLock.lock();
            try {
                readA();
            } finally {
                readLock.unlock();
            }
        });

        Thread writeThread = new Thread(() -> {
            writeLock.lock();
            try {
                writeA();
            } finally {
                writeLock.unlock();
            }
        });

        readThread1.start();
        readThread2.start();
        writeThread.start();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

# 核心变量和构造器

该接口用于获取读锁和写锁对象

public interface ReadWriteLock {
    // 用于获取读锁
    Lock readLock();
    // 用于获取写锁
    Lock writeLock();
}
1
2
3
4
5
6

readerLock和writerLock变量用于支撑以上描述的ReadWriteLock接口的读锁和写锁方法。通过构造方法得知,读写锁对象的创建和用例均依赖于公平锁或者非公平锁同步器。

public class ReentrantReadWriteLock implements ReadWriteLock {
    // 读锁对象
    private final ReentrantReadWriteLock.ReadLock readerLock;
    // 写锁对象
    private final ReentrantReadWriteLock.WriteLock writerLock;
    // 同步器
    final Sync sync;
    // 默认构造器,创建了非公平锁
    public ReentrantReadWriteLock() {
        this(false);
    }
    // 根据fair变量,来选择创建不同的锁:公平锁 FairSync 和非公平锁 NonfairSync
    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        // 用同步器来创建读写锁对象
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }
    public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
    public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }    
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

# Sync类

核心变量和构造器

我们说读锁可以多个线程同时持有,而写锁只允许一个线程持有,此时我们称 读锁-----共享锁 写锁------互斥锁(排他锁)。然后我们在AQS中了解到一个变量state,它是32位的值,那么我们这里将其切割为高16位和低16位。

abstract static class Sync extends AbstractQueuedSynchronizer {
    // 高16位用于表示读锁
    static final int SHARED_SHIFT   = 16;
    // 用于对高16位操作:加1 减1 
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    // 最大读锁量
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
    // 用于获取低16位的值。例如 获取低八位:0000 0000 1111 1111
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

    /** 获取当前持有读锁的线程数量  */
    static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
    /** 获取当前持有写锁的线程数量 */
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

    // 高16位为所有读锁获取,那么我想知道每个线程对于读锁重入的次数?采用ThreadLocal来进行统计,每个线程自己统计自己的
    static final class HoldCounter {
        int count = 0;
        final long tid = getThreadId(Thread.currentThread());
    }
    // 继承自ThreadLocal,重写了其中的initialValue方法,该方法将在线程第一次获取该变量时调用初始化HoldCounter计数器
    static final class ThreadLocalHoldCounter
        extends ThreadLocal<HoldCounter> {
        public HoldCounter initialValue() {
            return new HoldCounter();
        }
    }
    // 创建ThreadLocal对象
    private transient ThreadLocalHoldCounter readHolds;
    // 缓存最后一个线程获取的读锁数量
    private transient HoldCounter cachedHoldCounter;
    // 保存获取到该锁的第一个读锁线程
    private transient Thread firstReader = null;
    // 保存第一个该锁的第一个读锁线程获取到的读锁数量
    private transient int firstReaderHoldCount;

    Sync() {
        // 构造器中初始化ThreadLocalHoldCounter ThreadLocal对象
        readHolds = new ThreadLocalHoldCounter();
        // 用于保证可见性,使用了state变量的volatile语义
        setState(getState()); 
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

# tryAcquire获取写锁的流程

由AQS调用,用于子类实现自己的上锁逻辑,和原有获取互斥锁保持一致,

protected final boolean tryAcquire(int acquires) {
    // 获取当前线程
    Thread current = Thread.currentThread();
    // 获取当前状态值和互斥锁的数量
    int c = getState();
    int w = exclusiveCount(c);
    // 状态值有效
    if (c != 0) {
        // 有线程获取到了读锁或者当前线程不是持有互斥锁的线程
        if (w == 0 ||  // 有线程获取到了读锁
            current != getExclusiveOwnerThread()) // 有线程获取到了写锁
            // 返回false 让AQS执行阻塞操作
            return false;
        // 写锁重入,而又由于写锁的数量保存在低16位,所以直接加就行了
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        setState(c + acquires);
        return true;
    }
    // 既没有读锁,也没有写锁
    if (writerShouldBlock() || // 由子类实现判断当前线程是否应该获取写锁
        !compareAndSetState(c, c + acquires)) // 通过CAS抢写锁
        return false;
    // 获取写锁成功,那么将当前线程标识为获取互斥锁的线程对象
    setExclusiveOwnerThread(current);
    return true;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

# tryAcquireShared获取读锁的流程获取写锁的流程

protected final int tryAcquireShared(int unused) {
    // 获取到当前线程对象
    Thread current = Thread.currentThread();
    // 获取到当前状态值
    int c = getState();
    if (exclusiveCount(c) != 0 && // 有没有线程持有写锁
        getExclusiveOwnerThread() != current) // 如果有线程获取到了互斥锁,那么进一步看看是不是当前线程
        // 不是当前线程,那么直接返回-1,告诉AQS获取共享锁失败
        return -1;
    // 获取到读锁的持有数量
    int r = sharedCount(c);
    if (!readerShouldBlock() && // 让子类来判定当前获取读锁的线程是否应该被阻塞
        r < MAX_COUNT && // 判断是否发生了溢出
        compareAndSetState(c, c + SHARED_UNIT)) { // 直接CAS 增加state的高16位的读锁持有数量
        // 增加高16位之前的计数为0,此时表明当前线程就是第一个获取读锁的线程
        if (r == 0) {
            // 注意:持有两个变量来优化threadlocal 
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            // 当前获取读锁的线程就是一个线程,那么此时表明:锁重入,直接++计数位即可
            firstReaderHoldCount++;
        } else {
            // 当前线程不是第一个读线程,此时将其获取读锁的次数保存在ThreadLocal中
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    // 有很多同学走到这里,直接懵逼?不知道这是啥情况?经验:在看doug lea写的代码时,请注意:经常做优化,就是把一些常见的场景前置,保证性能
    return fullTryAcquireShared(current);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

# fullTryAcquireShared完全获取读锁流程

final int fullTryAcquireShared(Thread current) {
    HoldCounter rh = null;
    for (;;) {
        int c = getState();
        // 当前已经有线程获取到写锁且当前获取写锁的线程不是,当前线程
        if (exclusiveCount(c) != 0) {
            if (getExclusiveOwnerThread() != current)
                return -1;
        } else if (readerShouldBlock()) {
            // 子类判断当前线程应该阻塞
            if (firstReader == current) {
                // 当前线程就是第一个获取到读锁的线程
            } else {
                // 获取到当前线程记录读锁重入次数的HoldCounter对象
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                // 当前读锁重入次数为0时,表明没有获取读锁,此时返回-1,阻塞当前线程
                if (rh.count == 0)
                    return -1;
            }
        }
        // 读锁获取次数溢出
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // CAS增加读锁次数
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh;
            }
            return 1;
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

# tryRelease释放写锁的流程

protected final boolean tryRelease(int releases) {
    // 没有获取写锁,为啥能释放写锁呢?
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases;
    // 释放完毕后,写锁状态是否为0(锁重入),因为此时计算的不是当前state,是nextc
    boolean free = exclusiveCount(nextc) == 0;
    // 如果下一个状态值为0,此时表明当前线程完全释放了锁,也即锁重入为0,那么将当前线程对象从OwnerThread中移除
    if (free)
        setExclusiveOwnerThread(null);
    // 此时设置全局state变量即可
    setState(nextc);
    // 如果返回为true,那么由AQS完成后面线程的唤醒
    return free;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# tryReleaseShared释放读锁的流程

释放时,需要考虑:重入多少次,就释放多少次。总结:先完成自己的释放,然后再完成共享的高16位的释放。

protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    // 当前线程是第一个获取到读锁的线程
    if (firstReader == current) {
        // 当前重入次数为1,代表什么?代表可以直接释放,如果不是1,那么表明还持有多个读锁,也即重入多次,那么直接--
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            // 当前线程已经释放完读锁,那么不需要在ThreadLocal里持有HoldCounter对象
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    for (;;) {
        // CAS释放高16位计数
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            // 释放完毕后是否为0,为无锁状态,此时需要干啥?由AQS来唤醒阻塞的线程
            return nextc == 0;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

# readerShouldBlock和writerShouldBlock模板方法公平锁实现

判断条件只有一个:hasQueuedPredecessors()方法,就是看看AQS的阻塞队列里是否有其他线程正在等待,如果有排队去。总结:有人在排队,那么不插队。w->r->r->r 此时来了个r:w->r->r->r->r, 此时来了个w:w->r->r->r->w。

static final class FairSync extends Sync {
    final boolean writerShouldBlock() {
        return hasQueuedPredecessors();
    }
    final boolean readerShouldBlock() {
        return hasQueuedPredecessors();
    } // w->r->r   r获取锁  w->r->r-r
}
1
2
3
4
5
6
7
8

# readerShouldBlock和writerShouldBlock模板方法非公平锁实现

写线程永远false,因为读写锁本身适用的是读多写少,此时不应该 让写线程饥饿,而且非公平,写锁永远不阻塞,让它抢,不管前面是否有人排队,先抢了再说。apparentlyFirstQueuedIsExclusive()第一个排队的是不是写线程。r(10),当前线程是第十一个,此时已经有一个写线程排队,r(10)->w,此时排队去。r(10)->w->r。

static final class NonfairSync extends Sync {
    final boolean writerShouldBlock() {
        return false;
    }
    final boolean readerShouldBlock() {
        return apparentlyFirstQueuedIsExclusive();
    } // w->r->r   r获取锁  r->r->r
}
1
2
3
4
5
6
7
8

# Semaphore原理

# 定义

得知:使用AQS作为模板类,然后使用其共享锁机制,实现了公平锁和非公平锁来完成Semaphore信号量语义。获取permit的acquire(int permits)操作、释放permit的release(int permits)操作,均是由sync类来完成的

public class Semaphore implements java.io.Serializable {
    private final Sync sync;
    static final class FairSync extends Sync {}
    static final class NonfairSync extends Sync {}
    abstract static class Sync extends AbstractQueuedSynchronizer {}
    
    public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
    }
    
    public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# acquireSharedInterruptibly方法原理

该代码由AQS完成,我们不做过多赘述,主要看这里的Interruptibly的意思,此时相当于响应线程的中断。最后还是调用子类的tryAcquireShared模板方法,让子类实现自己获取信号量的机制。

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer{
    public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
}
1
2
3
4
5
6
7
8
9

我们先来看公平锁的实现:

state变量用于标识permit值。

static final class FairSync extends Sync {
    // 构造器用于初始化父类AQS的state变量
    FairSync(int permits) {
        super(permits);
    }

    protected int tryAcquireShared(int acquires) {
        for (;;) {
            // 如果有线程在AQS的队列中排队,那么返回-1,将由AQS完成阻塞操作
            if (hasQueuedPredecessors())
                return -1;
            int available = getState();
            int remaining = available - acquires;
            // 如果此时有可用的多余的信号量,那么进行CAS操作,如果失败,那么返回剩下的资源数。如果此时CAS成功,那么返回的资源数就为当前值,有可能为0或者大于0
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

非公平锁的实现:

static final class NonfairSync extends Sync {
    NonfairSync(int permits) {
        super(permits);
    }
    
    // 由父类Sync来完成调用
    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
}

// sync类实现方法
final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        // 非公平锁直接CAS抢即可,直到可用资源数小于0
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

# releaseShared方法原理

AQS实现方法,我们可以看到当模板方法tryReleaseShared,由子类完成释放后,那么将会调用doReleaseShared方法唤醒后面等待的线程。

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer{
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
}
1
2
3
4
5
6
7
8
9

直接看Sync类的实现:

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        // 直接通过CAS操作对state变量+1即可
        int current = getState();
        int next = current + releases;
        if (next < current)
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}
1
2
3
4
5
6
7
8
9
10
11

# CountDownLatch原理

# 定义

是一个同步器,用于一个或者多个线程等待其他线程完成一组操作,原理如下:

1、AQS的state变量用于表示操作个数

2、AQS的共享锁机制完成唤醒

3、等待锁的线程使用acquireShared方法获取共享锁等待

4、操作线程使用releaseShared方法用于唤醒等待共享锁的线程

# 构造器原理

从构造器中得出,count值和核心操作均由内部同步器类Sync完成

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}
1
2
3
4

# CountDown方法原理

public void countDown() {
    sync.releaseShared(1);
}
1
2
3

# Await方法原理

public boolean await(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
1
2
3
4

# tryAcquireSharedNanos方法原理

该方法由AQS实现,可以看到方法中抛出了InterruptedException中断异常,由此可见该方法响应了线程中断,但是核心操作还是由子类来实现tryAcquireShared(arg)来完成共享锁的获取操作。

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer{
    public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquireShared(arg) >= 0 ||
            doAcquireSharedNanos(arg, nanosTimeout); // 该方法在后面说AQS时完成讲解
    }
}
1
2
3
4
5
6
7
8
9

# tryAcquireShared方法实现

就是看变量是否为0,如果为0,那么无条件返回1,此时将会直接获取到共享锁。

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}
1
2
3

# releaseShared方法

该方法由AQS来实现,可以看到通过子类完成tryReleaseShared方法释放共享锁,如果释放成功,那么直接调用doReleaseShared方法完成等待获取共享锁的线程,获取共享锁。

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer{
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
}
1
2
3
4
5
6
7
8
9

# tryReleaseShared方法实现过程

protected boolean tryReleaseShared(int releases) {
    for (;;) {
        // 获取当前state值,代表了完成的操作个数
        int c = getState();
        if (c == 0)
            return false;
        // 计算更新值,CAS原子性的修改即可
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            // 若修改成功,那么判断当前线程是不是最后一个完成操作的线程,如果是,那么返回true,此时唤醒所有等待共享锁的线程
            return nextc == 0;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13

# CyclicBarrier原理

# 核心数据结构与构造器原理

public class CyclicBarrier {
    // 该类用于reset后复用该结构,每一次的party都会生成一个新的该类的实例
    private static class Generation {
        boolean broken = false; // 当前party有没有被强制中断
    }
    private final ReentrantLock lock = new ReentrantLock();
    // 用于阻塞线程的条件变量:有未到party的线程,那么等待在该条件变量上
    private final Condition trip = lock.newCondition();
    // 参与party的线程数
    private final int parties;
    // 当所有的线程都参与到了party中后回调的方法
    private final Runnable barrierCommand;
    // 当前party
    private Generation generation = new Generation();
    // 还未到party的线程数
    private int count;
    
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

# await方法

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException,
TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        final Generation g = generation;  // 保存当前party时的Generation快照,更新后将不会影响这里的实例
        if (g.broken)
            throw new BrokenBarrierException();
        // 有中断的线程混入其中,干掉其他参会人重新开始,此时并没有改变party的Generation
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }
        int index = --count;
        if (index == 0) {  // 最后一个到达party的线程,负责唤醒所有阻塞在条件变量上的线程,然后回调barrierCommand(若正常完成,那么不需要手动调用reset,因为这里调用了nextGeneration)
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                nextGeneration(); // 进入下一个party
                return 0;
            } finally {
                // barrierCommand回调方法发生了异常,那么设置broken标志位
                if (!ranAction)
                    breakBarrier();
            }
        }
        // 循环等待最后一个参与party的线程唤醒自己,或者?被中断。或者?等待超时
        for (;;) {
            try {
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    Thread.currentThread().interrupt();
                }
            }
            if (g.broken)
                throw new BrokenBarrierException();
            if (g != generation)
                return index;
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65

# reset方法

public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        breakBarrier(); // 将所有参与party的线程唤醒
        nextGeneration();  // 生成下一代
    } finally {
        lock.unlock();
    }
}
1
2
3
4
5
6
7
8
9
10

# breakBarrier方法

private void breakBarrier() {
    generation.broken = true;
    count = parties;
    trip.signalAll();
}
1
2
3
4
5

# nextGeneration方法

private void nextGeneration() {
    trip.signalAll();
    count = parties;
    generation = new Generation(); // 生成了下一代party实例
}
1
2
3
4
5
我是思考集结处欢迎你的关注
看板娘