Lock锁的使用

"Lock锁"

Posted by ming on September 18, 2019

“If you think you can, you can. And if you think you can’t, you’re right.”

1.Lock接口介绍

锁是用于通过多个线程控制对共享资源的访问的工具。通常,锁提供对共享资源的独占访问:一次只能有一个线程可以获取锁,并且对共享资源的所有访问都要求首先获取锁。但是,一些锁可能允许并发访问共享资源,如ReadWriteLock的读写锁。

在Lock接口出现之前,Java程序是靠Synchronized关键字实现锁功能的。JDK 1.5之后并发包中新增了Lock接口以及相关实现类来实现锁功能。虽然synchronized方法和语句的范围机制使得使用监视器锁更容易编程,并且有助于避免涉及锁的许多常见编程错误,但是有时候您需要以更灵活的方式处理锁。例如,用于遍历并发访问的数据结构的一些算法需要使用“手动”或“链锁定”:您需要获取节点A的锁定,然后获取节点B,获取到B之后释放锁A并获取C,然后释放锁B并获得D等。在这种情景中Synchronized关键字就不那么容易实现了,使用Lock接口更容易的多。

Lock接口提供了synchronized关键字不具备的主要特性:

特性 描述
尝试非阻塞地获取锁 当前线程尝试获取锁,如果这一时刻没有被其他线程获取到,则成功获取并持有锁
能被中断地获取锁 获取到锁的线程能够响应中断,当获取到锁的线程被中断时,中断异常将会被抛出,同时锁会被释放
超时获取锁 在指定的截止时间之前获取锁,超过截止时间后仍旧无法获取则返回
可以绑定多个Condition对象 synchronized中,锁对象的wait()和notify()或notifyAll()方法可以实现一个隐含的条件,如果要和多于一个条件关联时,只能再加一个额外的锁,而ReentrantLock只需要多次调用newCondition方法即可。

总结一下,也就是Lock提供了比synchronized更多的功能,但是要注意以下几点:

  1. Lock不是Java语言内置的,synchronized是Java语言的关键字,因此是内置特性。Lock是一个接口,有具体的实现类,通过实现类来实现同步访问。
  2. Lock和Synchronized有一点非常大的不同,采用synchronized不需要用户去手动释放锁,当synchronized方法或者synchronized代码块执行完之后,系统会自动让出线程对锁的占用;而Lock则必须要用户去手动释放锁,如果没有主动释放锁,就有可能导致出现死锁现象。

Lock接口的实现类具体有:

ReentrantLock, ReentrantReadWriteLock.ReadLock, ReentrantReadWriteLock.WriteLock

2. Lock接口方法介绍

Lock接口中定义了6个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//获取锁。如果锁不可用,则当前线程将被禁用以进行线程调度,并处于休眠状态,直到获取锁。
void lock();

//获取锁,如果可用并立即返回。如果锁不可用,那么当前线程将被禁用以进行线程调度,并且处于休眠状态,和lock()方法不同的是在锁的获取中可以中断当前线程(相应中断)。
void lockInterruptibly();

//获取等待通知组件,该组件和当前绑定,当前线程只有获得了锁,才能调用该组件的wait()方法,而调用后,当前线程将释放锁
Condition newCondition();

//只有在调用时才可以获得锁。如果可用,则获取锁定,并立即返回值为true;如果锁不可用,则此方法将立即返回值为false
boolean tryLock();

// 超时获取锁,当前线程在一下三种情况下会返回: 1. 当前线程在超时时间内获得了锁;2.当前线程在超时时间内被中断;3.超时时间结束,返回false.
boolean tryLock(long time, TimeUnit unit);

//释放锁
void unlock();

Lock的使用一般是以下面的模板:

1
2
3
4
5
6
7
Lock lock = new ReentrantLock();
try{
    lock.lock();//标记1
    doSomeThing();
}finally{
    lock.unlock();
}

我们可以看到,通过调用lock.lock()一句话很简单地我们就实现了代码的同步。那其具体的实现原理是什么呢,在介绍lock()方法的实现原理之前,我们先来了解一下AQS(AbstractQueuedSynchronizer).

3. AQS介绍

AQS的全称是AbstractQueuedSynchronizer,它的定位是为Java中几乎所有的锁和同步器提供一个基础框架。AQS是基于FIFO的队列实现的,并且内部维护了一个状态变量state,通过原子更新这个状态变量state即可实现加锁解锁操作。

AQS的主要内部类为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
    static final class Node {
        //标识一个节点是共享模式
        static final Node SHARED = new Node();
        //标识一个节点是互斥模式
        static final Node EXCLUSIVE = null;

        // 标识线程已取消
        static final int CANCELLED =  1;
        //标识后继节点需要唤醒
        static final int SIGNAL    = -1;
        //标识线程等待在一个条件上
        static final int CONDITION = -2;
        // 标识后面的共享锁需要无条件的传播(共享锁需要连续唤醒读的线程)
        static final int PROPAGATE = -3;

        // 当前节点保存的线程对应的等待状态
        volatile int waitStatus;

        // 前一个节点
        volatile Node prev;

        // 后一个节点
        volatile Node next;

        // 当前节点保存的线程
        volatile Thread thread;

        // 下一个等待在条件上的节点(Condition锁时使用)
        Node nextWaiter;

        // 是否是共享模式
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        // 获取前一个节点
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
        // 把共享模式还是互斥模式存储到nextWaiter这个字段里面了
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

很明显,是典型的双链表结构,节点中保存着当前线程、前一个节点、后一个节点以及线程的状态等信息。

下面再来看一下AQS的主要属性:

1
2
3
4
5
6
7
8
    // 队列的头节点
    private transient volatile Node head;

    // 队列的尾结点
    private transient volatile Node tail;

    // 控制加锁解锁的状态变量
    private volatile int state;

定义了一个状态变量和一个队列,状态变量用来控制加锁解锁,队列用来放置等待的线程。注意,这几个变量都要使用volatile关键字来修饰,因为是在多线程环境下操作,要保证它们的值修改之后对其它线程立即可见。

这几个变量的修改是直接使用的Unsafe这个类来操作的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
    // 获取Unsafe类的实例,注意这种方式仅限于jdk自己使用,普通用户是无法这样调用的
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    // 状态变量state的偏移量
    private static final long stateOffset;
    // 头节点的偏移量
    private static final long headOffset;
    // 尾节点的偏移量
    private static final long tailOffset;
    // 等待状态的偏移量(Node的属性)
    private static final long waitStatusOffset;
    // 下一个节点的偏移量(Node的属性)
    private static final long nextOffset;

    static {
        try {
            stateOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
            headOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
            tailOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
            waitStatusOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("waitStatus"));
            nextOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("next"));

        } catch (Exception ex) { throw new Error(ex); }
    }

    /**
     * CAS head field. Used only by enq.
     */
    private final boolean compareAndSetHead(Node update) {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }

    /**
     * CAS tail field. Used only by enq.
     */
    private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }

    /**
     * CAS waitStatus field of a node.
     */
    private static final boolean compareAndSetWaitStatus(Node node,
                                                         int expect,
                                                         int update) {
        return unsafe.compareAndSwapInt(node, waitStatusOffset,
                                        expect, update);
    }

    /**
     * CAS next field of a node.
     */
    private static final boolean compareAndSetNext(Node node,
                                                   Node expect,
                                                   Node update) {
        return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
    }

    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

我们可以看到AQS的全称是AbstractQueuedSynchronizer,它本质上是一个抽象类,说明它本质上应该是需要子类来实现的,那么子类实现一个同步器需要实现哪些方法呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
    // 互斥模式下使用:尝试获取锁
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

    // 互斥模式下使用:尝试释放锁
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

    // 共享模式下使用:尝试获取锁
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }

    // 共享模式下使用:尝试释放锁
    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }

    // 如果当前线程独占着锁,返回true
    protected boolean isHeldExclusively() {
        throw new UnsupportedOperationException();
    }

下面,我们基于AQS来自己实现一个锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class MyLockBaseOnAqs {

    // 定义一个同步器,实现AQS类
    private static class Sync extends AbstractQueuedSynchronizer{

        @Override
        protected boolean tryAcquire(int arg) {
            if(compareAndSetState(0,1)){
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }else {
                return false;
            }
        }

        @Override
        protected boolean tryRelease(int arg) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
    }

    //声明同步器
    private final Sync sync = new Sync();

    //加锁
    public void lock(){
        sync.acquire(1);
    }

    //解锁
    public void unlock(){
        sync.release(0);
    }

    private static int count = 0;

    public static void main(String[] args) throws InterruptedException {
        MyLockBaseOnAqs lock = new MyLockBaseOnAqs();
        CountDownLatch countDownLatch = new CountDownLatch(1000);

        IntStream.range(0,1000).forEach(i -> new Thread(() ->{
            lock.lock();
            try{
                IntStream.range(0,10000).forEach(j ->{
                    count++;
                });
            }finally {
                lock.unlock();
            }

            countDownLatch.countDown();
        }, "tt-" + i).start());

        countDownLatch.await();
        System.out.println(count);
    }
}

运行main()方法总是打印出10000000(一千万),说明这个锁也是可以直接使用的,当然这是一个不可重入的锁。

4. ReentrantLock源码分析

ReentrantLock翻译为可重入锁或者再入锁,是指一个线程获取锁之后再尝试获取锁时会自动获取锁。(在Java中,除了ReentrantLock以外,synchronized也是重入锁)。

4.1 继承体系

1
public class ReentrantLock implements Lock, java.io.Serializable {}

ReentrantLock实现了Lock接口,Lock接口里面定义了java中锁应该实现的几个方法:(上面已介绍)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//获取锁。如果锁不可用,则当前线程将被禁用以进行线程调度,并处于休眠状态,直到获取锁。
void lock();

//获取锁,如果可用并立即返回。如果锁不可用,那么当前线程将被禁用以进行线程调度,并且处于休眠状态,和lock()方法不同的是在锁的获取中可以中断当前线程(相应中断)。
void lockInterruptibly();

//获取等待通知组件,该组件和当前绑定,当前线程只有获得了锁,才能调用该组件的wait()方法,而调用后,当前线程将释放锁
Condition newCondition();

//只有在调用时才可以获得锁。如果可用,则获取锁定,并立即返回值为true;如果锁不可用,则此方法将立即返回值为false
boolean tryLock();

// 超时获取锁,当前线程在一下三种情况下会返回: 1. 当前线程在超时时间内获得了锁;2.当前线程在超时时间内被中断;3.超时时间结束,返回false.
boolean tryLock(long time, TimeUnit unit);

//释放锁
void unlock();

Lock接口中主要定义了 获取锁、尝试获取锁、释放锁、条件锁等几个方法。

4.2 主要内部类

ReentrantLock中主要定义了三个内部类:Sync、NonfairSync、FairSync。

1
2
3
4
5
abstract static class Sync extends AbstractQueuedSynchronizer { }

static final class NonfairSync extends Sync { }

static final class FairSync extends Sync { }
  • 抽象类Sync实现了AQS的部分方法;
  • NonfairSync实现了Sync,主要用于非公平锁的获取;
  • FairSync实现了Sync,主要用于公平锁的获取。

先简单了解一下内部类,后面在具体分析每个类中所拥有的代码。

4.3 主要属性及构造方法

1
    private final Sync sync;

主要属性就一个sync,它在构造函数中进行初始化,决定使用公平锁还是非公平锁的方式获取锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
    // 默认构造方法
    public ReentrantLock() {
        sync = new NonfairSync();
    }

    /**
     * Creates an instance of {@code ReentrantLock} with the
     * given fairness policy.
     * 自己可以选择是使用公平锁还是非公平锁
     * @param fair {@code true} if this lock should use a fair ordering policy
     */
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
  • 构造函数默认是使用的非公平锁;
  • 第二个构造函数可以自己决定使用公平锁还是非公平锁。

4.4 lock()方法

1. 公平锁

我们假设ReentrantLock的实例是通过下面的方式获取的:

1
ReentrantLock reentrantLock = new ReentrantLock(true);

给出下面主要的加锁逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
    // 1. ReentrantLock.lock()
    public void lock() {
        // 调用的sync属性的lock()方法,这里的sync是公平锁,所以是FairSync的实例
        sync.lock();
    }

    //2. ReentrantLock.FairSync.lock()
    final void lock() {
        // 调用AQS的acquire()方法获取锁
        // 注意,这里传的值为1
        acquire(1);
    }

    //3. AbstractQueuedSynchronizer.acquire()
    public final void acquire(int arg) {
        // 尝试获取锁,如果获取失败了,就排队
        if (!tryAcquire(arg) &&
            // 注意addWaiter()这里传入的节点模式为独占模式
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

    // ReentrantLock.FairSync.tryAcquire()
    protected final boolean tryAcquire(int acquires) {
        //当前线程
        final Thread current = Thread.currentThread();
        //查看当前变量的值+
        int c = getState();
        // 如果状态为0,说明还没有人占有锁
        if (c == 0) {
            // 如果没有其他线程在排队,那么当前线程尝试更新state的值为1
            // 如果成功了,则说明当前线程获取了锁
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                // 当前线程获取了锁,把自己设置到exclusiveOwnerThread变量中
                // exclusiveOwnerThread是AQS的父类AbstractOwnableSynchronizer中提供的变量
                setExclusiveOwnerThread(current);
                // 返回true说明成功获取到了锁
                return true;
            }
        }
        // 如果当前线程本身就占有着锁,现在又尝试获取锁,那么,直接让它获取锁并返回true
        else if (current == getExclusiveOwnerThread()) {
            // 状态变量state的值加1
            int nextc = c + acquires;
            // 如果溢出了则报错
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            // 设置到state中,这里不需要CAS更新state
            // 因为当前线程占有着锁,其他线程只会CAS把state从0更新成1,是不会成功的;所以不存在竞争,自然也就不需要使用CAS来更新
            setState(nextc);
            //当前线程获取锁成功
            return true;
        }
        // 当前线程尝试获取锁失败
        return false;
    }

    // AbstractQueuedSynchronizer.addWaiter()
    // 调用整个方法,说明上面获取锁失败了
    private Node addWaiter(Node mode) {
        // 新建一个节点
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        // 这里先尝试把新节点加到尾节点后面
        // 如果成功了就返回新节点
        // 如果没成功再调用enq()方法不断尝试
        Node pred = tail;
        // 如果尾结点不为空
        if (pred != null) {
            // 设置新节点的前置节点为现在的尾节点
            node.prev = pred;
            // CAS更新尾节点为新节点
            if (compareAndSetTail(pred, node)) {
                // 如果成功了,把旧尾节点的下一个节点指向新节点
                pred.next = node;
                //并返回新节点
                return node;
            }
        }
        //如果上面尝试入队新节点没成功,调用enq()处理
        enq(node);
        return node;
    }

    // AbstractQueuedSynchronizer.enq()
    private Node enq(final Node node) {
        // 自旋,不断尝试
        for (;;) {
            Node t = tail;
            // 如果尾结点为空,说明还未初始化
            if (t == null) { // Must initialize
                // 初始化头节点和尾结点
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                // 如果尾结点不为空,设置新节点的前一个节点为现在的尾节点
                node.prev = t;
                // CAS更新尾结点为新节点
                if (compareAndSetTail(t, node)) {
                    // 成功了,则设置旧尾节点的下一个节点为新节点
                    t.next = node;
                    //并返回旧尾节点
                    return t;
                }
            }
        }
    }

    // AbstractQueuedSynchronizer.acquireQueued()
    // 调用上面的addWaiter()方法使得新节点已经成功入队了
    // 这个方法是尝试让当前节点来获取锁的
    final boolean acquireQueued(final Node node, int arg) {
        //失败标记
        boolean failed = true;
        try {
            // 中断标记
            boolean interrupted = false;
            // 自旋
            for (;;) {
                //当前节点的前一个节点
                final Node p = node.predecessor();
                // 如果当前节点的前一个节点为head节点,则说明轮到自己获取锁了
                // 调用ReentrantLock.FairSync.tryAcquire()方法再次尝试获取锁
                if (p == head && tryAcquire(arg)) {
                    //尝试获取锁成功,这里同时只会有一个线程在执行,所以不需要CAS更新
                    // 把当前节点设置为新的头节点
                    setHead(node);
                    // 并把上一个节点从链表删除
                    p.next = null; // help GC
                    // 未失败
                    failed = false;
                    return interrupted;
                }
                //是否需要阻塞
                if (shouldParkAfterFailedAcquire(p, node) &&
                    //真正阻塞的方法
                    parkAndCheckInterrupt())
                    // 如果中断了
                    interrupted = true;
            }
        } finally {
            //如果失败了
            if (failed)
                cancelAcquire(node); // 取消获取锁
        }
    }

    // AbstractQueuedSynchronizer.shouldParkAfterFailedAcquire()
    // 这个方式是在上面的for循环里面调用的,第一次调用会把前一个节点的等待状态设为SIGNAL,并返回false
    // 第二次调用才会返回true
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 上一个节点的等待状态
        // 注意到Node的waitStatus字段我们在上面创建的时候并没有指定,也就是说默认值是0
        // 这里给出各种等待状态
        // static final int CANCELLED = 1;
        // static final int SIGNAL = -1;
        // static final int CONDITION = -2;
        // static final int PROPAGATE = -3;
        int ws = pred.waitStatus;
        // 如果等待状态为SIGNAL(等待唤醒),直接返回true
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        // 如果前一个节点的状态大于0,也就是已取消状态
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
             // 把前面所有取消状态的节点都从链表中删除
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            // 如果前一个节点的状态小于等于0,则把其状态设置为等待唤醒
            // 这里可以简单地理解为把初始状态0设置为SIGNALL, CONDITION是条件锁的时候使用的
            // PROPAGATE是共享锁使用的
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

    // AbstractQueuedSynchronizer.parkAndCheckInterrupt()
    private final boolean parkAndCheckInterrupt() {
        // 阻塞当前线程
        // 底层调用的是Unsafe的park()方法
        LockSupport.park(this);
        // 返回是否已中断
        return Thread.interrupted();
    }

下面我们来看一下主要方法的调用关系:

1
2
3
4
5
6
7
8
9
ReentrantLock#lock()
-> ReentrantLock.FairSync#lock()   //公平模式获取锁
    -> AbstractQueuedSynchronizer#acquire() // AQS的获取锁办法
        -> ReentrantLock.FairSync#tryAcquire() // 尝试获取锁
        -> AbstractQueuedSynchronizer#addWaiter() //添加到队列
            -> AbstractQueuedSynchronizer#enq() // 入队
        -> AbstractQueuedSynchronizer#acquireQueued() // 里面有个for()循环,唤醒后再次尝试获取锁
            -> AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire() // 检查是否要阻塞
            -> AbstractQueuedSynchronizer#parkAndCheckInterrupt() //真正阻塞的地方

获取锁的主要过程大致如下:

  1. 尝试获取锁,如果获取到了直接返回了;
  2. 尝试获取锁失败,再调用addWaiter()构建新节点并把新节点入队;
  3. 然后调用acquireQueued()再次尝试获取锁,如果成功了,直接返回;
  4. 如果再次失败,再调用shouldParkAfterFailedAcquire()将节点的等待状态置为等待唤醒(SIGNAL);
  5. 调用parkAndCheckInterrupt()阻塞当前线程;
  6. 如果被唤醒了,会继续在acquireQueued()的for()循环再次尝试获取锁,如果成功了就返回;
  7. 如果不成功,再次阻塞,重复(3)(4)(5)直到成功获取到锁。

以上就是整个公平锁获取锁的过程。下面我们来看看非公平锁是怎么获取锁的。

2. 非公平锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
    // ReentrantLock.lock()
    public void lock() {
        sync.lock();
    }

    // ReentrantLock.NonfairSync.lock()
    // 这个方法在公平锁模式下是直接调用的acquire(1)
    final void lock() {
        // 直接尝试CAS更新状态变量
        if (compareAndSetState(0, 1))
            // 如果更新成功,说明获取到锁,把当前线程设为独占线程
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    // ReentrantLock.NonfairSync.tryAcquire()
    protected final boolean tryAcquire(int acquires) {
        //调用父类的方法
        return nonfairTryAcquire(acquires);
    }

    // ReentrantLock.Sync.nonfairTryAcquire()
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            // 如果状态变量的值为0,再次尝试更新CAS更新状态变量的值
            // 相对于公平锁模式少了!hasQueuedPredecessors()条件
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

相对于公平锁,非公平锁加锁的过程主要有两点不同:

  1. 一开始尝试CAS更新状态变量state的值,如果成功了就获取到锁了;
  2. 在tryAcquire()的时候没有检查是否前面有排队的线程,直接上去获取锁才不管别人有没有排队。

总的来说,相对于公平锁,非公平锁在一开始就多了两次直接尝试获取锁的过程。

4.5 lockInterruptibly()方法

支持线程中断,它与lock()方法的主要区别在于lockInterruptibly()获取锁的时候如果线程中断了,会抛出一个异常,而lock()不会管线程是否中断都会一直尝试获取锁,获取锁之后把自己标记为已中断,继续执行自己的逻辑,后面也会正常释放锁。

线程中断,只是在线程上打一个中断标志,并不会对运行中的线程有什么影响,具体需要根据这个中断标志干些什么,用户自己去决定。

比如,如果用户在调用lock()获取锁后,发现线程中断了,就直接返回了,而导致没有释放锁,这也是允许的,但是会导致这个锁一直得不到释放,就出现了死锁。

1
2
3
4
5
6
7
lock.lock();

if (Thread.currentThread().interrupted()){
    return ;
}

lock.unlock();

当然,这里只是举个例子,实际使用肯定是要把lock.lock()后面的代码都放在try…finally…里面的以保证锁始终会释放,这里主要是为了说明线程中断只是一个标志,至于要做什么完全由用户自己决定。

4.6 tryLock()方法

尝试获取一次锁,成功了就返回true,没成功就返回false,不会继续尝试。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
    // ReentrantLock.tryLock()
    public boolean tryLock() {
        return sync.nonfairTryAcquire(1);
    }

     // ReentrantLock.Sync.nonfairTryAcquire()
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            // 如果状态变量的值为0,再次尝试更新CAS更新状态变量的值
            // 相对于公平锁模式少了!hasQueuedPredecessors()条件
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

tryLock()方法比较简单,直接以非公平的模式去尝试获取一次锁,获取到了或者锁本身就是当前线程占有着就返回true,否则返回false。

tryLock(long time, TimeUnit unit)方法

尝试获取锁,并等待一段时间,如果在这段时间内都没有获取到锁,就返回false。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
    // ReentrantLock.tryLock()
    public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        // 调用AQS中的方法
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }

    // AbstractQueuedSynchronizer.tryAcquireNanos()
    public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        // 如果线程中断了,抛出异常
        if (Thread.interrupted())
            throw new InterruptedException();
        // 先尝试获取一次锁
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);
    }

    private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        // 如果时间已经到期了,直接返回false
        if (nanosTimeout <= 0L)
            return false;
        // 到期时间
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime();
                //如果到期了直接返回false
                if (nanosTimeout <= 0L)
                    return false;
                // spinForTimeoutThresold = 1000L;
                // 只有到期时间大于1000纳秒,才阻塞
                // 小于等于1000纳秒,直接自旋解决
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    // 阻塞一段时间
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

tryLock(long time, TimeUnit unit)方法在阻塞的时候加上阻塞时间,并且会随着检查是否到期,只要到期了没获得锁就返回false。

4.7 unlock()方法

释放锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
    // java.util.concurrent.locks.ReentrantLock.unlock()
    public void unlock() {
        sync.release(1);
    }

    // java.util.concurrent.locks.AbstractQueuedSynchronizer.release
    public final boolean release(int arg) {
        // 调用AQS实现类的tryRelease()方法释放锁
        if (tryRelease(arg)) {
            Node h = head;
            // 如果头节点不为空,且等待状态不是0,就唤醒下一个节点
            // 之前的shouldParkAfterFailedAcquire方法中在每个节点阻塞之前会把其上一个节点的等待状态设为SIGNAL(-1)
            // 所以,SIGNAL的准确理解应该是唤醒下一个等待的线程
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        // 如果当前线程不是占有着锁的线程的话,抛出异常
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        // 如果状态变量的值为0了,说明完全释放了锁
        // 这也是为什么重入锁调用了多少次lock()就要调用多少次unlock的原因
        // 如果不这样做,会导致锁不会完全释放,别的线程永远无法获取到锁
        if (c == 0) {
            free = true;
            //清空占有线程
            setExclusiveOwnerThread(null);
        }
        //设置状态变量的值
        setState(c);
        return free;
    }

    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        //注意,这里的node是头节点
        // 如果头节点的等待状态小于0,就把它设置为0
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        // 头节点的下一个节点
        Node s = node.next;
        // 如果下一个节点为空,或者其等待状态大于0(实际为已取消)
        if (s == null || s.waitStatus > 0) {
            s = null;
            //从尾节点向前遍历取到队列最前面的那个状态不是已取消状态的节点
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        // 如果下一个节点不为空,则唤醒它
        if (s != null)
            LockSupport.unpark(s.thread);
    }

释放锁的过程大致为:

  1. 将state的值减去1;
  2. 如果state减到了0,说明已经完全释放锁了,唤醒下一个等待着的节点。

4.8 条件锁

条件锁,是指在获取锁之后发现当前业务场景自己无法处理,而需要等待某个条件的出现才可以继续处理时使用的一把锁。

比如,在阻塞队列中,当队列中没有元素的时候是无法弹出一个元素的,这个时候就需要阻塞在条件notEmpty上,等待其他线程往里面放入一个元素后,唤醒这个条件notEmpty,当前线程才可以继续去做“弹出一个元素的行为”。

注意,这里的条件,必须是在获取锁之后去等待,对应到ReentrantLock的条件锁,就是获取锁之后才能调用condition.await()方法。

在Java中,条件锁的实现都在AQS的ConditionObject类中,ConditionObject实现了Condition接口,我们先来通过一个例子来看看条件锁的使用流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class ReentrantLockTest {

    public static void main(String[] args) throws InterruptedException {
        ReentrantLock lock = new ReentrantLock();
        //声明一个条件锁
        Condition condition = lock.newCondition();

        new Thread(() ->{
            try {
                lock.lock(); // 1
                try {
                    System.out.println("before await"); // 2
                    //等待条件
                    condition.await(); // 3
                    System.out.println("after await"); // 10
                }finally {
                    lock.unlock(); // 11
                }
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        }).start();

        //这里睡1000ms是为了让下面的线程先获取到锁
        Thread.sleep(1000);
        lock.lock(); // 4
        try{
            // 这里睡2000ms代表这个线程执行业务所需要的时间
            Thread.sleep(2000); // 5
            System.out.println("before signal"); // 6
            //通知条件已成立
            condition.signal(); // 7
            System.out.println("after signal"); // 8
        }finally {
            lock.unlock(); // 9
        }
    }
}

上面的代码流程很简单,一个线程等待条件,另一个线程通知条件已成立,后面的数字代表代码实际运行的顺序。

ConditionObject的主要属性

1
2
3
4
5
6
7
    public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;
        /** First node of condition queue. */
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        private transient Node lastWaiter;
    }

可以看到条件锁也维护了一个队列,为了和AQS的队列区分,我们这里将其称之为条件队列,firstWaiter是队列的头节点,lastWaiter是队列的尾节点。

lock.newCondition()方法

新建一个条件锁。

1
2
3
4
5
6
7
8
9
10
11
12
    // ReentrantLock.newCondition()
    public Condition newCondition() {
        return sync.newCondition();
    }

    // ReentrantLock.Sync.newCondition()
    final ConditionObject newCondition() {
        return new ConditionObject();
    }

    // AbstractQueuedSynchronizer.ConditionObject.ConditionObject()
    public ConditionObject() { }

新建一个条件锁最后就是调用的AQS中的ConditionObject类来实例化条件锁。

condition.await()方法

condition.await()方法,表明现在要等待条件的出现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
    // AbstractQueuedSynchronizer.ConditionObject.await()
    public final void await() throws InterruptedException {
        // 如果线程中断了,抛出异常
        if (Thread.interrupted())
            throw new InterruptedException();
        // 添加节点到Condition的队列中,并返回该节点
        Node node = addConditionWaiter();
        // 完全释放当前线程获取的锁
        // 因为锁是可重人的,所以这里需要把获取的锁全部释放掉
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        // 是否在同步队列中
        while (!isOnSyncQueue(node)) {
            // 阻塞当前线程
            LockSupport.park(this);

            // 上面部分是调用await()时区释放自己占有的锁,并阻塞自己等待条件的出现
            // ******************************************************************
            // 下面部分是条件已经出现,尝试去获取锁
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        // 尝试去获取锁,注意第二个参数;如果没有获取到锁会再次阻塞
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        // 清除取消的节点
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        // 线程中断相关
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }

    // AbstractQueuedSynchronizer.ConditionObject.addConditionWaiter
    private Node addConditionWaiter() {
        Node t = lastWaiter;
        // If lastWaiter is cancelled, clean out.
        //如果条件队列的尾结点已经取消,从头节点开始清除所有已经取消的节点
        if (t != null && t.waitStatus != Node.CONDITION) {
            unlinkCancelledWaiters();
            // 重新获取尾节点
            t = lastWaiter;
        }
        // 新建一个节点,它的等待状态时CONDITION
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        // 如果尾节点为空,则把新节点赋值给头节点(相当于初始化队列)
        // 否则把新节点赋值给尾节点的nextWaiter指针
        if (t == null)
            firstWaiter = node;
        else
            t.nextWaiter = node;
        // 尾节点指向新节点
        lastWaiter = node;
        // 返回新节点
        return node;
    }

    // AbstractQueuedSynchronizer.fullyRelease
    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            // 获取状态变量的值,重复获取锁,这个值会一直累加,所以这个值也代表这获取锁的次数
            int savedState = getState();
            // 一次性释放所有获得的锁
            if (release(savedState)) {
                failed = false;
                // 返回获取锁的次数
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

    // AbstractQueuedSynchronizer.isOnSyncQueue
    final boolean isOnSyncQueue(Node node) {
        // 如果等待状态是CONDITION,或者前一个指针为空,则返回false
        // 说明还没有移动到AQS的队列之中
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        // 如果next指针有值的话,说明已经移动到AQS的队列之中了
        if (node.next != null) // If has successor, it must be on queue
            return true;
        /*
         * node.prev can be non-null, but not yet on queue because
         * the CAS to place it on queue can fail. So we have to
         * traverse from tail to make sure it actually made it.  It
         * will always be near the tail in calls to this method, and
         * unless the CAS failed (which is unlikely), it will be
         * there, so we hardly ever traverse much.
         */
        // 从AQS的尾节点开始往前寻找看是否可以找到当前节点,找到了也说明已经在AQS的队列中了
        return findNodeFromTail(node);
    }

这里面有几个难以理解的点:

  1. Condition的队列和AQS的队列不完全一样;
1
2
AQS的队列头节点是不存储任何值的,是一个虚节点;
Condition的队列头节点是存储着实实在在的元素值的,是真实节点。
  1. 各种等待状态(waitStatus)的变化:
1
2
3
4
5
首先,在条件队列中,新建节点的初始等待状态时CONDITION(-2);
其次,移到AQS的队列中时等待状态会更改为0(AQS队列节点的初始等待状态为0)
然后,在AQS的队列中如果需要阻塞,会把它上一个节点的等待状态设置为SIGNAL(-1);
最后,不管在Condition队列还是AQS队列中,已取消的节点的等待状态都会设置为CANCELLED(1);
另外,后面我们会在共享锁的时候还会讲另外一种等待状态叫PROPAGATE(-3).
  1. 相似的名称:
1
2
AQS中下一个节点是next, 上一个节点是prev;
Condition中下一个节点是nextWaiter,没有上一个节点

下面总结一下await()方法的大致流程:

  • 新建一个节点加入到条件队列中去;
  • 完全释放当前线程占有的锁;
  • 阻塞当前线程,并等待条件的出现;
  • 条件已出现(此时节点已经移到AQS的队列中),尝试获取锁。

也就是说,await()方法内部其实是先释放锁->等待条件->再次获取锁的过程。

condition.signal()方法

condition.signal()方法通知条件已经出现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
    // AbstractQueuedSynchronizer.ConditionObject.signal
    public final void signal() {
        // 如果不是当前线程占有着锁,调用这个方法抛出异常
        // 说明signal()也要在获取锁之后执行
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        //条件队列的头节点
        Node first = firstWaiter;
        // 如果有等待条件的节点,则通知它条件已经成立
        if (first != null)
            doSignal(first);
    }

    // AbstractQueuedSynchronizer.ConditionObject.doSignal
    private void doSignal(Node first) {
        do {
            // 移动条件队列的头节点往后面一位
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            // 相当于把头节点从队列中出队
            first.nextWaiter = null;
            // 转移动AQS队列之中
        } while (!transferForSignal(first) &&
                (first = firstWaiter) != null);
    }

    // AbstractQueuedSynchronizer.transferForSignal
    final boolean transferForSignal(Node node) {
        // 把节点的状态更改为0,也就是说即将转移到AQS队列中
        // 如果失败了,说明节点已经被改成取消状态了
        // 返回false,通过上面的循环可知会寻找下一个可用节点
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        // 调用AQS的入队方法把节点移到AQS队列中
        // 注意,这里的enq()的返回值是node的上一个节点,也就是旧尾节点
        Node p = enq(node);
        // 上一个节点的等待状态
        int ws = p.waitStatus;
        // 如果上一个节点已经取消了,或者更新状态为SIGNAL失败(也就是上一个节点已经取消了)
        // 则直接唤醒当前节点对应的线程
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        // 如果更新上一个节点的等待状态为SIGNAL成功了
        // 则返回true,这时上面的循环不成立了,退出循环,也就是只通知了一个节点
        // 此时当前节点还是阻塞状态
        // 也就是说调用signal()的时候并不会真正唤醒一个节点
        // 只是把节点从条件队列中移动到AQS队列中
        return true;
    }

signal()方法的大致流程如下所示:

  1. 从条件队列的头节点开始寻找一个非取消状态的节点;
  2. 把它从条件队列移动到AQS队列;
  3. 且只移动一个节点。

注意到,这里调用signal()方法并不会真正唤醒一个节点,那么,真正唤醒一个节点是在什么时候呢?

还记得前面给出的例子不?signal()方法后,最终会调用lock.unlock()方法,此时才会真正唤醒一个节点,唤醒的这个节点如果是曾经的条件节点的话又会继续执行await()方法“分界线”下面的代码。

给出下面一个图来表示一下整个流程:

条件锁流程

4.9 总结

  1. 重入锁是指可重复获取的锁,即一个线程获取锁之后再尝试获取锁时会自动获取锁。
  2. 在ReentrantLock中可重入锁是通过不断累加state变量的值实现的。
  3. ReentrantLock的释放要跟获取次数相匹配,即获取了几次也要释放几次。
  4. ReentrantLock默认是非公平模式,因为非公平模式的效率更高。
  5. 条件锁是指为了等待某个条件出现而使用的一种锁。
  6. 条件锁比较经典的使用场景就是队列为空时阻塞在条件notEmpty上。
  7. ReentrantLock中的条件锁是通过AQS的ConditionObject内部类实现的。
  8. await()和signal()方法都必须在获取锁之后释放锁之前使用。
  9. await()方法会新建一个节点放到条件队列中,接着完全释放锁,然后阻塞当前线程并等待条件的出现。
  10. signal()方法会寻找条件队列中第一个可用节点移到AQS队列中。
  11. 在调用signal()方法的线程调用unlock()方法才真正唤醒阻塞在条件上的节点(此时节点已经在AQS队列中了)。
  12. 之后该节点会再次尝试获取锁,后面的逻辑与lock()逻辑基本一致了。

5. ReentrantLock与synchronized的对比

synchronized是Java原生提供的用于在多线程环境中保持同步的关键字,底层是通过修改对象头中的MarkWord来实现的。

ReentrantLock是Java语言层面提供的用于在多线程环境中保持同步的类,底层是通过原子更新状态变量state来实现的。

我们直接来看看这两者的对比如下所示:

功能 ReentrantLock synchronized
可重入 支持 支持
非公平 支持(默认) 支持
加锁/解锁方式 需要手动加锁,解锁,一般使用try…finally..保证锁能够被释放 手动加锁,无须刻意解锁
按key锁 不支持,比如按用户id加锁 支持,synchronized加锁时需要传入一个对象
公平锁 支持,new ReentrantLock(true) 不支持
中断 支持,lockInterruptibly() 不支持
尝试加锁 支持,tryLock() 不支持
超时锁 支持,tryLock(timeout, unit) 不支持
获取当前线程获取锁的次数 支持,getHoldCount() 不支持
获取等待的线程 支持,getWaitingThreads() 不支持
检测是否被当前线程占有 支持,isHeldByCurrentThread() 不支持
检测是否被任意线程占有 支持,isLocked() 不支持
条件锁 可支持多个条件,condition.await(), condition.signal(), condition.signalAll() 只支持一个,obj.wait(), obj.notify(), obj.notifyAll()

6. ReentrantReadWriteLock源码解析

读写锁是一种特殊的锁,它把对共享资源的访问分为读访问和写访问,多个线程可以同时对共享资源进行读访问,但是同一时间只能有一个线程对共享资源进行写访问,使用读写锁可以极大地提高并发量。(读写锁除了读读不互斥,读写、写读和写写都是互斥的)

那么,ReentrantReadWriteLock是如何实现读写锁的呢?

在我们分析源码之前,我们先来看一下ReentrantReadWriteLock这个类的主要结构:

读写锁类继承结构

ReentrantReadWriteLock中的类分成了三个部分:

  1. ReentrantReadWriteLock本身实现了ReadWriteLock,这个接口只提供了两个方法readLock()writeLock();
  2. 同步器,包含一个继承了AQS的Sync内部类,以及两个子类FairSyncNotfairSync;
  3. ReadLock和WriteLock两个内部类实现了Lock接口,它们具有锁的一些特性。

6.1 主要属性

1
2
3
4
5
6
    //读锁
    private final ReentrantReadWriteLock.ReadLock readerLock;
    //写锁
    private final ReentrantReadWriteLock.WriteLock writerLock;
    // 同步器
    final Sync sync;

维护了读锁、写锁和同步器。

6.2 构造方法

1
2
3
4
5
6
7
8
9
10
11
    // 默认构造方法
    public ReentrantReadWriteLock() {
        this(false);
    }

    // 是否使用公平锁的构造方法
    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

它们提供了两个构造方法,默认构造方法使用的是非公平锁模式,在构造方法中初始化了读锁和写锁。

其中,获取读锁和写锁的方法如下:

1
2
    public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
    public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }

属性中的读锁和写锁是私有属性,通过这两个方法暴露出去。

下面,我们来主要分析读锁和写锁的加锁、解锁方法,且都是基于非公平模式的。

6.3 源码分析

ReadLock.lock()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
    //ReentrantReadWriteLock.ReadLock.lock()
    public void lock() {
        sync.acquireShared(1);
    }

    // AbstractQueuedSynchronizer.acquireShared()
    public final void acquireShared(int arg) {
        //尝试获取共享锁(返回1表示成功,返回-1表示失败)
        if (tryAcquireShared(arg) < 0)
            // 失败了就可能要排队
            doAcquireShared(arg);
    }

    // ReentrantReadWriteLock.Sync.tryAcquireShared()
    protected final int tryAcquireShared(int unused) {
        Thread current = Thread.currentThread();
        // 状态变量的值
        // 在读写锁模式下,高16位存储的是共享锁(读锁)被获取的次数,低16位存储的是互斥锁(写锁)被获取的次数
        int c = getState();
        // 互斥锁的次数
        // 如果其他线程获得了写锁,直接返回-1
        if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current)
            return -1;
        // 读锁被获取的次数
        int r = sharedCount(c);

        // 下面说明此时还没有写锁,尝试去更新state的值获取读锁
        // 读锁是否需要排队(是否是公平模式)
        if (!readerShouldBlock() &&
            r < MAX_COUNT &&
            compareAndSetState(c, c + SHARED_UNIT)) {
            // 获取读锁成功
            if (r == 0) {
                // 如果之前还没有线程获取读锁
                // 记录第一个读锁为当前线程
                firstReader = current;
                // 第一个读锁重入的次数为1
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                // 如果有线程获取了读锁且当前线程是第一个读锁的
                // 则从把其重入次数加1
                firstReaderHoldCount++;
            } else {
                // 如果有线程获取了读锁并且当前线程不是第一个读锁的
                // 则从缓存中获取重入次数保存器
                HoldCounter rh = cachedHoldCounter;
                // 如果缓存不属于当前线程,再从ThreadLocal中获取
                // readHolds 本身是一个ThreadLocal,里面存储的是HoldCounter
                if (rh == null || rh.tid != getThreadId(current))
                    // get()的时候会初始化rh
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    // 如果rh的次数为0,把它放到ThreadLocal中去
                    readHolds.set(rh);
                // 重入的次数加1(初始次数为0)
                rh.count++;
            }
            // 获取读锁成功,返回1
            return 1;
        }
        // 通过这个方法再去尝试获取读锁(如果之前其他线程获取了写锁,一样返回-1表示失败)
        return fullTryAcquireShared(current);
    }

    // AbstractQueuedSynchronizer.doAcquireShared()
    private void doAcquireShared(int arg) {
        // 进入AQS的队列
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 当前节点的前一个节点
                final Node p = node.predecessor();
                // 如果前一个节点是头节点(说明是第一个排队的节点)
                if (p == head) {
                    // 再次尝试获取锁
                    int r = tryAcquireShared(arg);
                    // 如果成功了
                    if (r >= 0) {
                        // 头节点后移并传播
                        // 传播即唤醒后面连续的读节点
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                // 没获取到锁,阻塞并等待被唤醒
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    // AbstractQueuedSynchronizer.setHeadAndPropagate()
    private void setHeadAndPropagate(Node node, int propagate) {
        // h为旧的头节点
        Node h = head; // Record old head for check below
        // 设置当前节点为新的头节点
        setHead(node);
        // 如果旧的头节点或新的头节点或者其等待状态小于0(表示其状态为SIGNAL/PROPAGATE)
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            // 需要传播,取下一个节点
            Node s = node.next;
            //如果下一个节点为空,或者是需要获取读锁的节点
            if (s == null || s.isShared())
                //唤醒下一个节点
                doReleaseShared();
        }
    }

    // AbstractQueuedSynchronizer.doReleaseShared()
    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                // 如果头节点的状态为SIGNAL,说明要唤醒下一个节点
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    //唤醒下一个节点
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 把头节点的状态改为PROPAGATE成功才会跳到下面的if
                    continue;                // loop on failed CAS
            }
            // 如果唤醒后head没变,则跳出循环
            if (h == head)                   // loop if head changed
                break;
        }
    }       

我们来看看大致的逻辑:

  1. 先尝试获取读锁;
  2. 如果成功了直接结束;
  3. 如果失败了,进入doAcquireShared()方法;
  4. doAcquireShared()方法中首先会生成一个新节点并进入AQS队列中;
  5. 如果头节点正好是当前节点的上一个节点,再次尝试获取锁;
  6. 如果成功了,则设置头节点为新节点,并传播;
  7. 传播即唤醒下一个读节点(如果下一个节点是读节点的话);
  8. 如果头节点不是当前节点的上一个节点或者步骤(5)失败,则阻塞当前线程等待被唤醒。
  9. 唤醒之后继续走步骤(5)的流程。

那么在整个逻辑中是在哪里连续唤醒读节点的呢?

其答案就是在doAcquireShared()方法中,在这里一个节点A获取了读锁之后,会唤醒下一个读节点B,这时候B也会获取读锁,然后B继续唤醒C,依次往复,也就是这里的节点是一个唤醒一个这样的形式,而不是一个节点获取了读锁之后一次性唤醒后面所有的读节点、

唤醒流程

ReadLock.unlock()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
    // java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock.unlock
    public void unlock() {
        sync.releaseShared(1);
    }

    // AbstractQueuedSynchronizer.releaseShared
    public final boolean releaseShared(int arg) {
        // 如果尝试释放成功了,就唤醒下一个节点
        if (tryReleaseShared(arg)) {
            // 这个方法实际上是唤醒下一个节点
            doReleaseShared();
            return true;
        }
        return false;
    }

    // AbstractQueuedSynchronizer.Sync.tryReleaseShared
    protected final boolean tryReleaseShared(int unused) {
        Thread current = Thread.currentThread();
        if (firstReader == current) {
            // 如果第一个读线程是当前线程,就把它的重入的次数减1
            // 如果减到0了就把第一个读线程置为空
            if (firstReaderHoldCount == 1)
                firstReader = null;
            else
                firstReaderHoldCount--;
        } else {
            // 如果第一个读者不是当前线程
            // 一样地,把它重入的次数减1
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                rh = readHolds.get();
            int count = rh.count;
            if (count <= 1) {
                readHolds.remove();
                if (count <= 0)
                    throw unmatchedUnlockException();
            }
            --rh.count;
        }
        for (;;) {
            // 共享锁获取的次数减1
            // 如果减为0了说明完全释放了,才返回true
            int c = getState();
            int nextc = c - SHARED_UNIT;
            if (compareAndSetState(c, nextc))
                // Releasing the read lock has no effect on readers,
                // but it may allow waiting writers to proceed if
                // both read and write locks are now free.
                return nextc == 0;
        }
    }

    // AbstractQueuedSynchronizer.doReleaseShared
    // 唤醒下一个节点
    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                // 如果头节点状态为SIGNAL,说明要唤醒下一个节点
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    // 唤醒下一个节点
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                        // 把头节点的状态改为PROPAGATE成功才会跳到下面的if
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            // 如果唤醒后head没变,则跳出循环
            if (h == head)                   // loop if head changed
                break;
        }
    }

解锁的大致流程如下:

  1. 将当前线程重入的次数减1;
  2. 将共享锁总共被获取的次数减1;
  3. 如果共享锁获取的次数减为0了,说明共享锁完全释放了,那就唤醒下一个节点。

如下图所示,ABC三个节点各获取了一次共享锁,三者释放的顺序分别为ACB,那么最后B释放共享锁的时候tryReleaseShared()才返回true,进而才会唤醒下一个节点D.

释放锁

WriteLock.lock()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
    // ReentrantReadWriteLock.WriteLock.lock()
    public void lock() {
        sync.acquire(1);
    }

    // AbstractQueuedSynchronizer.acquire()
    public final void acquire(int arg) {
        // 先尝试获取锁
        // 如果失败,则会进入队列中排队,后面的逻辑跟ReentrantLock一模一样了
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

    // ReentrantReadWriteLock.Sync.tryAcquire()
    protected final boolean tryAcquire(int acquires) {
        Thread current = Thread.currentThread();
        // 状态变量state的值
        int c = getState();
        // 互斥锁被获取的次数
        int w = exclusiveCount(c);
        if (c != 0) {
            // 如果c!=0且w==0,说明共享锁被获取的次数不为0
            // 这句话的意思就是如果共享锁被获取的次数不为0,或者被其他线程获取了互斥锁(写锁)
            // 那么就返回false,获取写锁失败
            if (w == 0 || current != getExclusiveOwnerThread())
                return false;
            // 溢出检查
            if (w + exclusiveCount(acquires) > MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            // Reentrant acquire
            // 到这里说明当前线程已经获取过写锁,这里是重入了,直接把state加1即可
            setState(c + acquires);
            // 获取写锁成功
            return true;
        }
        // 如果c等于0,就尝试更新state的值(非公平模式writeShouldBlock()返回false)
        // 如果失败了,说明获取写锁失败,返回false
        // 如果成功了,说明获取写锁成功,把自己设置为占有者,并返回true
        if (writerShouldBlock() ||
            !compareAndSetState(c, c + acquires))
            return false;
        setExclusiveOwnerThread(current);
        return true;
    }
// 获取写锁失败了后面的逻辑跟ReentrantLock是一致的,进入队列排队,这里就不列源码了

写锁获取的过程大致如下:

  1. 尝试获取锁;
  2. 如果有读者占有着读锁,尝试获取写锁失败;
  3. 如果有其它线程占有着写锁,尝试获取写锁失败;
  4. 如果是当前线程占有着写锁,尝试获取写锁成功,state值加1;
  5. 如果没有线程占有着锁(state == 0),当前线程尝试更新state的值,成功了表示尝试获取锁成功,否则失败;
  6. 尝试获取锁失败以后,进入队列排队,等待被唤醒;
  7. 后续逻辑跟ReentrantLock是一致的。

WriteLock.unlock()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
    // ReentrantReadWriteLock.WriteLock.unlock()
    public void unlock() {
        sync.release(1);
    }

    // AbstractQueuedSynchronizer.release()
    public final boolean release(int arg) {
        // 如果尝试获取锁成功(完全释放锁)
        // 就尝试唤醒下一个节点
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    // ReentrantReadWriteLock.Sync.tryRelease()
    protected final boolean tryRelease(int releases) {
        // 如果写锁不是当前线程占有着,则抛出异常
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        // 状态变量的值减去1
        int nextc = getState() - releases;
        // 是否完全释放锁
        boolean free = exclusiveCount(nextc) == 0;
        if (free)
            setExclusiveOwnerThread(null);
        //设置状态变量的值
        setState(nextc);
        // 如果完全释放了写锁,返回true
        return free;
    }

写锁释放的过程大致为:

  1. 先尝试释放锁,即状态变量state的值减去1;
  2. 如果减为0了,说明完全释放了锁;
  3. 完全释放了锁才唤醒下一个等待的节点。

6.4 总结

  • ReentrantReadWriteLock采用读写锁的思想,能提高并发的吞吐量;
  • 读锁使用的是共享锁,多个读锁可以一起获取锁,互相不会影响,即读读不互斥;
  • 读写、写读和写写是会互斥的,前者占有着锁,后者需要进入AQS队列中排队;
  • 多个连续的读线程是一个接着一个被唤醒的,而不是一次性唤醒所有读线程;
  • 只有多个读锁都完全释放了才会唤醒下一个写线程;
  • 只有写锁完全释放了才会唤醒下一个等待者,这个等待者有可能是读线程,也可能是写线程。