本小節(jié)介紹鎖釋放Lock.unlock()。
Release/TryRelease
unlock操作實(shí)際上就調(diào)用了AQS的release操作,釋放持有的鎖。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
前面提到過tryRelease(arg)操作,此操作里面總是嘗試去釋放鎖,如果成功,說明鎖確實(shí)被當(dāng)前線程持有,那么就看AQS隊(duì)列中的頭結(jié)點(diǎn)是否為空并且能否被喚醒,如果可以的話就喚醒繼任節(jié)點(diǎn)(下一個(gè)非CANCELLED節(jié)點(diǎn),下面會(huì)具體分析)。
對(duì)于獨(dú)占鎖而言,java.util.concurrent.locks.ReentrantLock.Sync.tryRelease(int)展示了如何嘗試釋放鎖(tryRelease)操作。
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
整個(gè)tryRelease操作是這樣的:
- 判斷持有鎖的線程是否是當(dāng)前線程,如果不是就拋出IllegalMonitorStateExeception(),因?yàn)橐粋€(gè)線程是不能釋放另一個(gè)線程持有的鎖(否則鎖就失去了意義)。否則進(jìn)行2。
- 將AQS狀態(tài)位減少要釋放的次數(shù)(對(duì)于獨(dú)占鎖而言總是1),如果剩余的狀態(tài)位0(也就是沒有線程持有鎖),那么當(dāng)前線程就是最后一個(gè)持有鎖的線程,清空AQS持有鎖的獨(dú)占線程。進(jìn)行3。
- 將剩余的狀態(tài)位寫回AQS,如果沒有線程持有鎖就返回true,否則就是false。
參考上一節(jié)的分析就可以知道,這里c==0決定了是否完全釋放了鎖。由于ReentrantLock是可重入鎖,因此同一個(gè)線程可能多重持有鎖,那么當(dāng)且僅當(dāng)最后一個(gè)持有鎖的線程釋放鎖是才能將AQS中持有鎖的獨(dú)占線程清空,這樣接下來的操作才需要喚醒下一個(gè)需要鎖的AQS節(jié)點(diǎn)(Node),否則就只是減少鎖持有的計(jì)數(shù)器,并不能改變其他操作。
當(dāng)tryRelease操作成功后(也就是完全釋放了鎖),release操作才能檢查是否需要喚醒下一個(gè)繼任節(jié)點(diǎn)。這里的前提是AQS隊(duì)列的頭結(jié)點(diǎn)需要鎖(waitStatus!=0),如果頭結(jié)點(diǎn)需要鎖,就開始檢測(cè)下一個(gè)繼任節(jié)點(diǎn)是否需要鎖操作。
在上一節(jié)中說道acquireQueued操作完成后(拿到了鎖),會(huì)將當(dāng)前持有鎖的節(jié)點(diǎn)設(shè)為頭結(jié)點(diǎn),所以一旦頭結(jié)點(diǎn)釋放鎖,那么就需要尋找頭結(jié)點(diǎn)的下一個(gè)需要鎖的繼任節(jié)點(diǎn),并喚醒它。
private void unparkSuccessor(Node node) {
//此時(shí)node是需要是需要釋放鎖的頭結(jié)點(diǎn)
//清空頭結(jié)點(diǎn)的waitStatus,也就是不再需要鎖了
compareAndSetWaitStatus(node, Node.SIGNAL, 0);
//從頭結(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)開始尋找繼任節(jié)點(diǎn),當(dāng)且僅當(dāng)繼任節(jié)點(diǎn)的waitStatus<=0才是有效繼任節(jié)點(diǎn),否則將這些waitStatus>0(也就是CANCELLED的節(jié)點(diǎn))從AQS隊(duì)列中剔除
//這里并沒有從head->tail開始尋找,而是從tail->head尋找最后一個(gè)有效節(jié)點(diǎn)。
//解釋在這里 http://www.tkk7.com/xylz/archive/2010/07/08/325540.html#377512
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//如果找到一個(gè)有效的繼任節(jié)點(diǎn),就喚醒此節(jié)點(diǎn)線程
if (s != null)
LockSupport.unpark(s.thread);
}
這里再一次把acquireQueued的過程找出來。對(duì)比unparkSuccessor,一旦頭節(jié)點(diǎn)的繼任節(jié)點(diǎn)被喚醒,那么繼任節(jié)點(diǎn)就會(huì)嘗試去獲取鎖(在acquireQueued中node就是有效的繼任節(jié)點(diǎn),p就是喚醒它的頭結(jié)點(diǎn)),如果成功就會(huì)將頭結(jié)點(diǎn)設(shè)置為自身,并且將頭結(jié)點(diǎn)的前任節(jié)點(diǎn)清空,這樣前任節(jié)點(diǎn)(已經(jīng)過時(shí)了)就可以被GC釋放了。
final boolean acquireQueued(final Node node, int arg) {
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} catch (RuntimeException ex) {
cancelAcquire(node);
throw ex;
}
}
在setHead中,將頭結(jié)點(diǎn)的前任節(jié)點(diǎn)清空并且將頭結(jié)點(diǎn)的線程清空就是為了更好的GC,防止內(nèi)存泄露。
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
對(duì)比lock()操作,unlock()操作還是比較簡(jiǎn)單的,主要就是釋放響應(yīng)的資源,并且喚醒AQS隊(duì)列中有效的繼任節(jié)點(diǎn)。這樣所就按照請(qǐng)求的順序去嘗試獲取鎖了。
整個(gè)lock()/unlock()過程完成了,我們?cè)倩仡^看公平鎖(FairSync)和非公平鎖(NonfairSync)。
公平鎖和非公平鎖只是在獲取鎖的時(shí)候有差別,其它都是一樣的。
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
在上面非公平鎖的代碼中總是優(yōu)先嘗試當(dāng)前是否有線程持有鎖,一旦沒有任何線程持有鎖,那么非公平鎖就霸道的嘗試將鎖“占為己有”。如果在搶占鎖的時(shí)候失敗就和公平鎖一樣老老實(shí)實(shí)的去排隊(duì)。
也即是說公平鎖和非公平鎖只是在入AQS的CLH隊(duì)列之前有所差別,一旦進(jìn)入了隊(duì)列,所有線程都是按照隊(duì)列中先來后到的順序請(qǐng)求鎖。
Condition
條件變量很大一個(gè)程度上是為了解決Object.wait/notify/notifyAll難以使用的問題。
條件(也稱為條件隊(duì)列 或條件變量)為線程提供了一個(gè)含義,以便在某個(gè)狀態(tài)條件現(xiàn)在可能為 true 的另一個(gè)線程通知它之前,一直掛起該線程(即讓其“等待”)。因?yàn)樵L問此共享狀態(tài)信息發(fā)生在不同的線程中,所以它必須受保護(hù),因此要將某種形式的鎖與該條件相關(guān)聯(lián)。等待提供一個(gè)條件的主要屬性是:以原子方式 釋放相關(guān)的鎖,并掛起當(dāng)前線程,就像 Object.wait
做的那樣。
上述API說明表明條件變量需要與鎖綁定,而且多個(gè)Condition需要綁定到同一鎖上。前面的Lock中提到,獲取一個(gè)條件變量的方法是Lock.newCondition()。
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
以上是Condition接口定義的方法,await*對(duì)應(yīng)于Object.wait,signal對(duì)應(yīng)于Object.notify,signalAll對(duì)應(yīng)于Object.notifyAll。特別說明的是Condition的接口改變名稱就是為了避免與Object中的wait/notify/notifyAll的語義和使用上混淆,因?yàn)镃ondition同樣有wait/notify/notifyAll方法。
每一個(gè)Lock可以有任意數(shù)據(jù)的Condition對(duì)象,Condition是與Lock綁定的,所以就有Lock的公平性特性:如果是公平鎖,線程為按照FIFO的順序從Condition.await中釋放,如果是非公平鎖,那么后續(xù)的鎖競(jìng)爭(zhēng)就不保證FIFO順序了。
一個(gè)使用Condition實(shí)現(xiàn)生產(chǎn)者消費(fèi)者的模型例子如下。
package xylz.study.concurrency.lock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ProductQueue<T> {
private final T[] items;
private final Lock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();
//
private int head, tail, count;
public ProductQueue(int maxSize) {
items = (T[]) new Object[maxSize];
}
public ProductQueue() {
this(10);
}
public void put(T t) throws InterruptedException {
lock.lock();
try {
while (count == getCapacity()) {
notFull.await();
}
items[tail] = t;
if (++tail == getCapacity()) {
tail = 0;
}
++count;
notEmpty.signalAll();
} finally {
lock.unlock();
}
}
public T take() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
notEmpty.await();
}
T ret = items[head];
items[head] = null;//GC
//
if (++head == getCapacity()) {
head = 0;
}
--count;
notFull.signalAll();
return ret;
} finally {
lock.unlock();
}
}
public int getCapacity() {
return items.length;
}
public int size() {
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
}
在這個(gè)例子中消費(fèi)take()需要 隊(duì)列不為空,如果為空就掛起(await()),直到收到notEmpty的信號(hào);生產(chǎn)put()需要隊(duì)列不滿,如果滿了就掛起(await()),直到收到notFull的信號(hào)。
可能有人會(huì)問題,如果一個(gè)線程lock()對(duì)象后被掛起還沒有unlock,那么另外一個(gè)線程就拿不到鎖了(lock()操作會(huì)掛起),那么就無法通知(notify)前一個(gè)線程,這樣豈不是“死鎖”了?
await* 操作
上一節(jié)中說過多次ReentrantLock是獨(dú)占鎖,一個(gè)線程拿到鎖后如果不釋放,那么另外一個(gè)線程肯定是拿不到鎖,所以在lock.lock()和lock.unlock()之間可能有一次釋放鎖的操作(同樣也必然還有一次獲取鎖的操作)。我們?cè)倩仡^看代碼,不管take()還是put(),在進(jìn)入lock.lock()后唯一可能釋放鎖的操作就是await()了。也就是說await()操作實(shí)際上就是釋放鎖,然后掛起線程,一旦條件滿足就被喚醒,再次獲取鎖!
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
上面是await()的代碼片段。上一節(jié)中說過,AQS在獲取鎖的時(shí)候需要有一個(gè)CHL的FIFO隊(duì)列,所以對(duì)于一個(gè)Condition.await()而言,如果釋放了鎖,要想再一次獲取鎖那么就需要進(jìn)入隊(duì)列,等待被通知獲取鎖。完整的await()操作是安裝如下步驟進(jìn)行的:
- 將當(dāng)前線程加入Condition鎖隊(duì)列。特別說明的是,這里不同于AQS的隊(duì)列,這里進(jìn)入的是Condition的FIFO隊(duì)列。后面會(huì)具體談到此結(jié)構(gòu)。進(jìn)行2。
- 釋放鎖。這里可以看到將鎖釋放了,否則別的線程就無法拿到鎖而發(fā)生死鎖。進(jìn)行3。
- 自旋(while)掛起,直到被喚醒或者超時(shí)或者CACELLED等。進(jìn)行4。
- 獲取鎖(acquireQueued)。并將自己從Condition的FIFO隊(duì)列中釋放,表明自己不再需要鎖(我已經(jīng)拿到鎖了)。
這里再回頭介紹Condition的數(shù)據(jù)結(jié)構(gòu)。我們知道一個(gè)Condition可以在多個(gè)地方被await*(),那么就需要一個(gè)FIFO的結(jié)構(gòu)將這些Condition串聯(lián)起來,然后根據(jù)需要喚醒一個(gè)或者多個(gè)(通常是所有)。所以在Condition內(nèi)部就需要一個(gè)FIFO的隊(duì)列。
private transient Node firstWaiter;
private transient Node lastWaiter;
上面的兩個(gè)節(jié)點(diǎn)就是描述一個(gè)FIFO的隊(duì)列。我們?cè)俳Y(jié)合前面提到的節(jié)點(diǎn)(Node)數(shù)據(jù)結(jié)構(gòu)。我們就發(fā)現(xiàn)Node.nextWaiter就派上用場(chǎng)了!nextWaiter就是將一系列的Condition.await*串聯(lián)起來組成一個(gè)FIFO的隊(duì)列。
signal/signalAll 操作
await*()清楚了,現(xiàn)在再來看signal/signalAll就容易多了。按照signal/signalAll的需求,就是要將Condition.await*()中FIFO隊(duì)列中第一個(gè)Node喚醒(或者全部Node)喚醒。盡管所有Node可能都被喚醒,但是要知道的是仍然只有一個(gè)線程能夠拿到鎖,其它沒有拿到鎖的線程仍然需要自旋等待,就上上面提到的第4步(acquireQueued)。
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
上面的代碼很容易看出來,signal就是喚醒Condition隊(duì)列中的第一個(gè)非CANCELLED節(jié)點(diǎn)線程,而signalAll就是喚醒所有非CANCELLED節(jié)點(diǎn)線程。當(dāng)然了遇到CANCELLED線程就需要將其從FIFO隊(duì)列中剔除。
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);
int c = p.waitStatus;
if (c > 0 || !compareAndSetWaitStatus(p, c, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
上面就是喚醒一個(gè)await*()線程的過程,根據(jù)前面的小節(jié)介紹的,如果要unpark線程,并使線程拿到鎖,那么就需要線程節(jié)點(diǎn)進(jìn)入AQS的隊(duì)列。所以可以看到在LockSupport.unpark之前調(diào)用了enq(node)操作,將當(dāng)前節(jié)點(diǎn)加入到AQS隊(duì)列。
整個(gè)鎖機(jī)制的原理就介紹完了,從下一節(jié)開始就進(jìn)入了鎖機(jī)制的應(yīng)用了。
©2009-2014 IMXYLZ
|求賢若渴