這一節主要是談談讀寫鎖的實現。
上一節中提到,ReadWriteLock看起來有兩個鎖:readLock/writeLock。如果真的是兩個鎖的話,它們之間又是如何相互影響的呢?
事實上在ReentrantReadWriteLock里鎖的實現是靠java.util.concurrent.locks.ReentrantReadWriteLock.Sync完成的。這個類看起來比較眼熟,實際上它是AQS的一個子類,這中類似的結構在CountDownLatch、ReentrantLock、Semaphore里面都存在。同樣它也有兩種實現:公平鎖和非公平鎖,也就是java.util.concurrent.locks.ReentrantReadWriteLock.FairSync和java.util.concurrent.locks.ReentrantReadWriteLock.NonfairSync。這里暫且不提。
在ReentrantReadWriteLock里面的鎖主體就是一個Sync,也就是上面提到的FairSync或者NonfairSync,所以說實際上只有一個鎖,只是在獲取讀取鎖和寫入鎖的方式上不一樣,所以前面才有讀寫鎖是獨占鎖的兩個不同視圖一說。
ReentrantReadWriteLock里面有兩個類:ReadLock/WriteLock,這兩個類都是Lock的實現。
清單1 ReadLock 片段
public static class ReadLock implements Lock, java.io.Serializable {
private final Sync sync;
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
public void lock() {
sync.acquireShared(1);
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public boolean tryLock() {
return sync.tryReadLock();
}
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public void unlock() {
sync.releaseShared(1);
}
public Condition newCondition() {
throw new UnsupportedOperationException();
}
}
清單2 WriteLock 片段
public static class WriteLock implements Lock, java.io.Serializable {
private final Sync sync;
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
public void lock() {
sync.acquire(1);
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock( ) {
return sync.tryWriteLock();
}
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public void unlock() {
sync.release(1);
}
public Condition newCondition() {
return sync.newCondition();
}
public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();
}
public int getHoldCount() {
return sync.getWriteHoldCount();
}
}
清單1描述的是讀鎖的實現,清單2描述的是寫鎖的實現。顯然WriteLock就是一個獨占鎖,這和ReentrantLock里面的實現幾乎相同,都是使用了AQS的acquire/release操作。當然了在內部處理方式上與ReentrantLock還是有一點不同的。對比清單1和清單2可以看到,ReadLock獲取的是共享鎖,WriteLock獲取的是獨占鎖。
在AQS章節中介紹到AQS中有一個state字段(int類型,32位)用來描述有多少線程獲持有鎖。在獨占鎖的時代這個值通常是0或者1(如果是重入的就是重入的次數),在共享鎖的時代就是持有鎖的數量。在上一節中談到,ReadWriteLock的讀、寫鎖是相關但是又不一致的,所以需要兩個數來描述讀鎖(共享鎖)和寫鎖(獨占鎖)的數量。顯然現在一個state就不夠用了。于是在ReentrantReadWrilteLock里面將這個字段一分為二,高位16位表示共享鎖的數量,低位16位表示獨占鎖的數量(或者重入數量)。2^16-1=65536,這就是上節中提到的為什么共享鎖和獨占鎖的數量最大只能是65535的原因了。
有了上面的知識后再來分析讀寫鎖的獲取和釋放就容易多了。
清單3 寫入鎖獲取片段
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
}
if ((w == 0 && writerShouldBlock(current)) ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
清單3 是寫入鎖獲取的邏輯片段,整個工作流程是這樣的:
- 持有鎖線程數非0(c=getState()不為0),如果寫線程數(w)為0(那么讀線程數就不為0)或者獨占鎖線程(持有鎖的線程)不是當前線程就返回失敗,或者寫入鎖的數量(其實是重入數)大于65535就拋出一個Error異常。否則進行2。
- 如果當且寫線程數位0(那么讀線程也應該為0,因為步驟1已經處理c!=0的情況),并且當前線程需要阻塞那么就返回失敗;如果增加寫線程數失敗也返回失敗。否則進行3。
- 設置獨占線程(寫線程)為當前線程,返回true。
清單3 中 exclusiveCount(c)就是獲取寫線程數(包括重入數),也就是state的低16位值。另外這里有一段邏輯是當前寫線程是否需要阻塞writerShouldBlock(current)。清單4 和清單5 就是公平鎖和非公平鎖中是否需要阻塞的片段。很顯然對于非公平鎖而言總是不阻塞當前線程,而對于公平鎖而言如果AQS隊列不為空或者當前線程不是在AQS的隊列頭那么就阻塞線程,直到隊列前面的線程處理完鎖邏輯。
清單4 公平讀寫鎖寫線程是否阻塞
final boolean writerShouldBlock(Thread current) {
return !isFirst(current);
}
清單5 非公平讀寫鎖寫線程是否阻塞
final boolean writerShouldBlock(Thread current) {
return false;
}
寫入鎖的獲取邏輯清楚后,釋放鎖就比較簡單了。清單6 描述的寫入鎖釋放邏輯片段,其實就是檢測下剩下的寫入鎖數量,如果是0就將獨占鎖線程清空(意味著沒有線程獲取鎖),否則就是說當前是重入鎖的一次釋放,所以不能將獨占鎖線程清空。然后將剩余線程狀態數寫回AQS。
清單6 寫入鎖釋放邏輯片段
protected final boolean tryRelease(int releases) {
int nextc = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
if (exclusiveCount(nextc) == 0) {
setExclusiveOwnerThread(null);
setState(nextc);
return true;
} else {
setState(nextc);
return false;
}
}
清單3~6 描述的寫入鎖的獲取釋放過程。讀取鎖的獲取和釋放過程要稍微復雜些。 清單7描述的是讀取鎖的獲取過程。
清單7 讀取鎖獲取過程片段
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (!readerShouldBlock(current) &&
compareAndSetState(c, c + SHARED_UNIT)) {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId())
cachedHoldCounter = rh = readHolds.get();
rh.count++;
return 1;
}
return fullTryAcquireShared(current);
}
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId())
rh = readHolds.get();
for (;;) {
int c = getState();
int w = exclusiveCount(c);
if ((w != 0 && getExclusiveOwnerThread() != current) ||
((rh.count | w) == 0 && readerShouldBlock(current)))
return -1;
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
cachedHoldCounter = rh; // cache for release
rh.count++;
return 1;
}
}
}
讀取鎖獲取的過程是這樣的:
- 如果寫線程持有鎖(也就是獨占鎖數量不為0),并且獨占線程不是當前線程,那么就返回失敗。因為允許寫入線程獲取鎖的同時獲取讀取鎖。否則進行2。
- 如果讀線程請求鎖數量達到了65535(包括重入鎖),那么就跑出一個錯誤Error,否則進行3。
- 如果讀線程不用等待(實際上是是否需要公平鎖),并且增加讀取鎖狀態數成功,那么就返回成功,否則進行4。
- 步驟3失敗的原因是CAS操作修改狀態數失敗,那么就需要循環不斷嘗試去修改狀態直到成功或者鎖被寫入線程占有。實際上是過程3的不斷嘗試直到CAS計數成功或者被寫入線程占有鎖。
在清單7 中有一個對象HoldCounter,這里暫且不提這是什么結構和為什么存在這樣一個結構。
接下來根據清單8 我們來看如何釋放一個讀取鎖。同樣先不理HoldCounter,關鍵的在于for循環里面,其實就是一個不斷嘗試的CAS操作,直到修改狀態成功。前面說過state的高16位描述的共享鎖(讀取鎖)的數量,所以每次都需要減去2^16,這樣就相當于讀取鎖數量減1。實際上SHARED_UNIT=1<<16。
清單8 讀取鎖釋放過程
protected final boolean tryReleaseShared(int unused) {
HoldCounter rh = cachedHoldCounter;
Thread current = Thread.currentThread();
if (rh == null || rh.tid != current.getId())
rh = readHolds.get();
if (rh.tryDecrement() <= 0)
throw new IllegalMonitorStateException();
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
好了,現在回頭看HoldCounter到底是一個什么東西。首先我們可以看到只有在獲取共享鎖(讀取鎖)的時候加1,也只有在釋放共享鎖的時候減1有作用,并且在釋放鎖的時候拋出了一個IllegalMonitorStateException異常。而我們知道IllegalMonitorStateException通常描述的是一個線程操作一個不屬于自己的監視器對象的引發的異常。也就是說這里的意思是一個線程釋放了一個不屬于自己或者不存在的共享鎖。
前面的章節中一再強調,對于共享鎖,其實并不是鎖的概念,更像是計數器的概念。一個共享鎖就相對于一次計數器操作,一次獲取共享鎖相當于計數器加1,釋放一個共享鎖就相當于計數器減1。顯然只有線程持有了共享鎖(也就是當前線程攜帶一個計數器,描述自己持有多少個共享鎖或者多重共享鎖),才能釋放一個共享鎖。否則一個沒有獲取共享鎖的線程調用一次釋放操作就會導致讀寫鎖的state(持有鎖的線程數,包括重入數)錯誤。
明白了HoldCounter的作用后我們就可以猜到它的作用其實就是當前線程持有共享鎖(讀取鎖)的數量,包括重入的數量。那么這個數量就必須和線程綁定在一起。
在Java里面將一個對象和線程綁定在一起,就只有ThreadLocal才能實現了。所以毫無疑問HoldCounter就應該是綁定到線程上的一個計數器。
清單9 線程持有讀取鎖數量的計數器
static final class HoldCounter {
int count;
final long tid = Thread.currentThread().getId();
int tryDecrement() {
int c = count;
if (c > 0)
count = c - 1;
return c;
}
}
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
清單9 描述的是線程持有讀取鎖數量的計數器。可以看到這里使用ThreadLocal將HoldCounter綁定到當前線程上,同時HoldCounter也持有線程Id,這樣在釋放鎖的時候才能知道ReadWriteLock里面緩存的上一個讀取線程(cachedHoldCounter)是否是當前線程。這樣做的好處是可以減少ThreadLocal.get()的次數,因為這也是一個耗時操作。需要說明的是這樣HoldCounter綁定線程id而不綁定線程對象的原因是避免HoldCounter和ThreadLocal互相綁定而GC難以釋放它們(盡管GC能夠智能的發現這種引用而回收它們,但是這需要一定的代價),所以其實這樣做只是為了幫助GC快速回收對象而已。
除了readLock()和writeLock()外,Lock對象還允許tryLock(),那么ReadLock和WriteLock的tryLock()不一樣。清單10 和清單11 分別描述了讀取鎖的tryLock()和寫入鎖的tryLock()。
讀取鎖tryLock()也就是tryReadLock()成功的條件是:沒有寫入鎖或者寫入鎖是當前線程,并且讀線程共享鎖數量沒有超過65535個。
寫入鎖tryLock()也就是tryWriteLock()成功的條件是: 沒有寫入鎖或者寫入鎖是當前線程,并且嘗試一次修改state成功。
清單10 讀取鎖的tryLock()
final boolean tryReadLock() {
Thread current = Thread.currentThread();
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return false;
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId())
cachedHoldCounter = rh = readHolds.get();
rh.count++;
return true;
}
}
}
清單11 寫入鎖的tryLock()
final boolean tryWriteLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c != 0) {
int w = exclusiveCount(c);
if (w == 0 ||current != getExclusiveOwnerThread())
return false;
if (w == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
}
if (!compareAndSetState(c, c + 1))
return false;
setExclusiveOwnerThread(current);
return true;
}
整個讀寫鎖的邏輯大概就這么多,其實真正研究起來也不是很復雜,真正復雜的東西都在AQS里面。
鎖部分的原理和思想都介紹完了,下一節里面會對鎖機進行小節,并對線程并發也會有一些簡單的小節。
©2009-2014 IMXYLZ
|求賢若渴