Java多线程高并发系列:(八)ReentrantReadWriteLock读写锁源码解读

2024-11-20 00:49
345
0

一、类图结构

从下图可以看出,ReentrantReadWriterLock内部维护了一个读锁和写锁,并提供了公平锁和非公平锁的实现。

二、构造函数源码解读

如下图所示,可以看出默认读锁和写锁都是非公平锁,也可以通过构造函数传入fair字段来指定公平和非公平锁。

三、读写锁状态的存储

继续查看ReentrantReadWriteLock中的Sync类,如下图所示:读写锁对于同步状态的实现是在一个整形变量上通过“按位切割使用”:将变量切割成两部分,高16位表示读状态,也就是获取到读锁的次数,低16位表示获取到写线程的可重入次数。

再用如下图示,解释下一个int中存储两个count的值:

了解了这2个count的存储方式后,再次回到上文ReentrantReadWriteLock的构造函数来看一下,读写锁的实例化都传入了this,其实就是把Sync同步器对象传入进去,即读写锁用的同一个Sync同步器,读写锁等待的线程都同一个等待队列中。

四、写锁的获取与释放

WriteLock类种的lock和unlock方法如下,其实就是调用的sync中的acquire和release方法,往下追踪代码,核心的实现逻辑其实就在Sync的tryAcquire和tryRelease中。

public void lock() {
    sync.acquire(1);
}
public void unlock() {
    sync.release(1);
}

4.1、写锁的获取

如下,逐行研读一下tryAcquire方法:

protected final boolean tryAcquire(int acquires) {
    // 当前线程
    Thread current = Thread.currentThread();
    int c = getState(); // 获得状态字段,此字段中保存了读锁count和写锁count
    int w = exclusiveCount(c); // 通过状态字段解析出写线程数量
    if (c != 0) { // 状态不为0表示读写锁至少有一个数量已经不为0了
        // 1、(Note: if c != 0 and w == 0 then shared count != 0) 表明已经有线程获得了读锁,返回false。2、W!=0并且当前线程不是写锁的拥有者,返回false
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        // 执行到这里说明获取了写锁,判断可重入次数是否大于65535,acquires从上游传入的值为1
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // 执行到这说明可重入次数不超过最大值,设置可重入次数,写锁count+1
        setState(c + acquires);
        return true;
    }
    // 执行到这里说明c=0,读写锁数量都为0。
    // writerShouldBlock方法判断写线程是否应该被阻塞,true表示阻塞。具体实现逻辑在公平锁和非公平锁中。
    // 非公平锁:写死返回false;公平锁时:如果在当前线程之前有一个排队的线程(同步队列中是否有等待时间更长的线程),则为True表示要阻塞;如果当前线程位于队列之首或队列为空,则为false
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires)) // CAS操作状态字段
        return false;
    // 执行到这说明没有阻塞,程序第一次获得锁,设置锁为当前线程所持有
    setExclusiveOwnerThread(current);
    return true;
}

代码中if (w == 0 || current != getExclusiveOwnerThread())判断了在线程持有读锁的情况下,该线程不能取得写锁,不是当前写锁拥有者的线程也不能取得写锁。

4.2、写锁的释放

写锁的释放核心逻辑在tryRelease方法中,逻辑比较简单,如下:

protected final boolean tryRelease(int releases) {
    // 若锁的持有则不是当前线程,抛出异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    //写锁释放后新线程数state-1
    int nextc = getState() - releases;
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        // 如果写锁数量为0了,则将锁的持有者设置为null
        setExclusiveOwnerThread(null);
    // 更新写锁count
    setState(nextc);
    return free;
}

五、读锁的获取与释放

类似于写锁,读锁的lock和unlock的实际实现对应Sync的 tryAcquireShared 和 tryReleaseShared方法。

5.1、读锁的获取

	protected final int tryAcquireShared(int unused) {
            // 获取当前线程
            Thread current = Thread.currentThread();
            int c = getState(); // 获取状态值
            // 如果写锁线程数不为0且当前线程不是持有写锁的线程,则返回,(此含义其实就是锁降级:持有写锁的当前线程可以继续往下走去获取读锁)
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            int r = sharedCount(c); // 获得读锁数量,state右移16位
            if (!readerShouldBlock() && // 读锁是否需要等待,具体实现在公平锁和非公平锁类中
                r < MAX_COUNT && // 读数量小于最大值 65535
                compareAndSetState(c, c + SHARED_UNIT)) { // 更新读锁+1
                if (r == 0) { // r=0表示第一个读锁线程
                    firstReader = current; // 设置第一个读线程
                    firstReaderHoldCount = 1; // 第一个读锁线程用此字段保存重入数量,读线程占用资源数为1
                } else if (firstReader == current) { // 当前线程为第一个读线程,表示读线程锁重入
                    firstReaderHoldCount++; // 即读线程可重入数+1
                } else { // 读锁数量不为0,且不是当前线程,通过HoldCounter记录数量,
                    HoldCounter rh = cachedHoldCounter; // 获得计数器,内存中做一个缓存减少ThreadLocal
                    if (rh == null || rh.tid != getThreadId(current)) // 缓存计数器为null或者计数器的tid不是当前正在运行线程的tid
                        cachedHoldCounter = rh = readHolds.get(); // 获取当前线程的计数器.并加入缓存中(线程第一次执行,会调用initialValue初始化)
                    else if (rh.count == 0) // 计数器数量为0
                        readHolds.set(rh); // 加入到readHolds中保存(其实就是放到ThreadLocal中)
                    rh.count++; // 计数器数量+1
                }
                return 1;
            }
            return fullTryAcquireShared(current);
        }

读锁的逻辑比写锁稍微复杂些,首先代码if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)进行了是否能锁降级的判断(当前线程持有写锁可以继续执行获得读锁),然后判断了持有锁数量的最大值限制。若没超过最大值,则会记录读锁数量和线程重入次数。
如果读线程被阻塞或者超过最大值,或者CAS更新状态值没成功,则会进入fullTryAcquireShared函数,它用来保证相关操作可以成功,下文继续查看fullTryAcquireShared方法逻辑:

		/**
         * 处理在tryAcquireShared中没有处理的CAS缺失和可重入读取。
         */
        final int fullTryAcquireShared(Thread current) {
            /*
             * 这段代码与tryacquirered中的代码有部分冗余,
             * 但总体上更简单,因为重试和惰性读取保持计数之间的交互没有使tryacquirered复杂化。
             */
            HoldCounter rh = null;
            for (;;) { // 无限循环
                int c = getState(); // 获取状态
                // 锁降级:持有写锁的当前线程可以继续往下走去获取读锁
                if (exclusiveCount(c) != 0) {
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                } else if (readerShouldBlock()) { // 写线程数量为0并且读线程被阻塞
                    // 确保没有获得可重入读锁
                    if (firstReader == current) {
                        // assert firstReaderHoldCount > 0;
                    } else { // 当前线程不是第一个读线程
                        if (rh == null) { // 计数器如果为空赋值
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) {
                                rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();
                            }
                        }
                        if (rh.count == 0)
                            return -1;
                    }
                }
                // 判断读锁的最大数量
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    if (sharedCount(c) == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }

5.2、读锁的释放

	protected final boolean tryReleaseShared(int unused) {
            Thread current = Thread.currentThread(); // 当前线程
            if (firstReader == current) { // 判断当前线程释放为第一个读线程
                // assert firstReaderHoldCount > 0;
                if (firstReaderHoldCount == 1)
                    firstReader = null;
                else
                    firstReaderHoldCount--;
            } else { // 不是第一个线程就从ThreadLocal获取计数器
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current)) // 计数器为空或者计数器tid不为当前正在运行的线程的tid
                    rh = readHolds.get();
                int count = rh.count;
                if (count <= 1) {
                    readHolds.remove();
                    if (count <= 0)
                        throw unmatchedUnlockException();
                }
                --rh.count;
            }
            for (;;) { // 无限循环,确保CAS设置状态值成功。
                int c = getState(); // 获取转台
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc))
                    // 释放读锁对读锁没有影响,但是如果读锁和写锁都空闲了,则可以允许等待的写锁继续进行。
                    return nextc == 0;
            }
        }

 

 

 

 

全部评论