Semaphore 是一個計數信號量。從概念上講,信號量維護了一個許可集。如有必要,在許可可用前會阻塞每一個 acquire()
,然后再獲取該許可。每個 release()
添加一個許可,從而可能釋放一個正在阻塞的獲取者。但是,不使用實際的許可對象,Semaphore
只對可用許可的號碼進行計數,并采取相應的行動。
說白了,Semaphore是一個計數器,在計數器不為0的時候對線程就放行,一旦達到0,那么所有請求資源的新線程都會被阻塞,包括增加請求到許可的線程,也就是說Semaphore不是可重入的。每一次請求一個許可都會導致計數器減少1,同樣每次釋放一個許可都會導致計數器增加1,一旦達到了0,新的許可請求線程將被掛起。
緩存池整好使用此思想來實現的,比如鏈接池、對象池等。
清單1 對象池
package xylz.study.concurrency.lock;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ObjectCache<T> {
public interface ObjectFactory<T> {
T makeObject();
}
class Node {
T obj;
Node next;
}
final int capacity;
final ObjectFactory<T> factory;
final Lock lock = new ReentrantLock();
final Semaphore semaphore;
private Node head;
private Node tail;
public ObjectCache(int capacity, ObjectFactory<T> factory) {
this.capacity = capacity;
this.factory = factory;
this.semaphore = new Semaphore(this.capacity);
this.head = null;
this.tail = null;
}
public T getObject() throws InterruptedException {
semaphore.acquire();
return getNextObject();
}
private T getNextObject() {
lock.lock();
try {
if (head == null) {
return factory.makeObject();
} else {
Node ret = head;
head = head.next;
if (head == null) tail = null;
ret.next = null;//help GC
return ret.obj;
}
} finally {
lock.unlock();
}
}
private void returnObjectToPool(T t) {
lock.lock();
try {
Node node = new Node();
node.obj = t;
if (tail == null) {
head = tail = node;
} else {
tail.next = node;
tail = node;
}
} finally {
lock.unlock();
}
}
public void returnObject(T t) {
returnObjectToPool(t);
semaphore.release();
}
}
清單1描述了一個基于信號量Semaphore的對象池實現。此對象池最多支持capacity個對象,這在構造函數中傳入。對象池有一個基于FIFO的隊列,每次從對象池的頭結點開始取對象,如果頭結點為空就直接構造一個新的對象返回。否則將頭結點對象取出,并且頭結點往后移動。特別要說明的如果對象的個數用完了,那么新的線程將被阻塞,直到有對象被返回回來。返還對象時將對象加入FIFO的尾節點并且釋放一個空閑的信號量,表示對象池中增加一個可用對象。
實際上對象池、線程池的原理大致上就是這樣的,只不過真正的對象池、線程池要處理比較復雜的邏輯,所以實現起來還需要做很多的工作,例如超時機制,自動回收機制,對象的有效期等等問題。
這里特別說明的是信號量只是在信號不夠的時候掛起線程,但是并不能保證信號量足夠的時候獲取對象和返還對象是線程安全的,所以在清單1中仍然需要鎖Lock來保證并發的正確性。
將信號量初始化為 1,使得它在使用時最多只有一個可用的許可,從而可用作一個相互排斥的鎖。這通常也稱為二進制信號量,因為它只能有兩種狀態:一個可用的許可,或零個可用的許可。按此方式使用時,二進制信號量具有某種屬性(與很多 Lock
實現不同),即可以由線程釋放“鎖”,而不是由所有者(因為信號量沒有所有權的概念)。在某些專門的上下文(如死鎖恢復)中這會很有用。
上面這段話的意思是說當某個線程A持有信號量數為1的信號量時,其它線程只能等待此線程釋放資源才能繼續,這時候持有信號量的線程A就相當于持有了“鎖”,其它線程的繼續就需要這把鎖,于是線程A的釋放才能決定其它線程的運行,相當于扮演了“鎖”的角色。
另外同公平鎖非公平鎖一樣,信號量也有公平性。如果一個信號量是公平的表示線程在獲取信號量時按FIFO的順序得到許可,也就是按照請求的順序得到釋放。這里特別說明的是:所謂請求的順序是指在請求信號量而進入FIFO隊列的順序,有可能某個線程先請求信號而后進去請求隊列,那么次線程獲取信號量的順序就會晚于其后請求但是先進入請求隊列的線程。這個在公平鎖和非公平鎖中談過很多。
除了acquire以外,Semaphore還有幾種類似的acquire方法,這些方法可以更好的處理中斷和超時或者異步等特性,可以參考JDK API。
按照同樣的學習原則,下面對主要的實現進行分析。Semaphore的acquire方法實際上訪問的是AQS的acquireSharedInterruptibly(arg)方法。這個可以參考CountDownLatch一節或者AQS一節。
所以Semaphore的await實現也是比較簡單的。與CountDownLatch不同的是,Semaphore區分公平信號和非公平信號。
清單2 公平信號獲取方法
protected int tryAcquireShared(int acquires) {
Thread current = Thread.currentThread();
for (;;) {
Thread first = getFirstQueuedThread();
if (first != null && first != current)
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
清單3 非公平信號獲取方法
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
對比清單2和清單3可以看到,公平信號和非公平信號在于第一次嘗試能否獲取信號時,公平信號量總是將當前線程進入AQS的CLH隊列進行排隊(因為第一次嘗試時隊列的頭結點線程很有可能不是當前線程,當然不排除同一個線程第二次進入信號量),從而根據AQS的CLH隊列的順序FIFO依次獲取信號量;而對于非公平信號量,第一次立即嘗試能否拿到信號量,一旦信號量的剩余數available大于請求數(acquires通常為1),那么線程就立即得到了釋放,而不需要進行AQS隊列進行排隊。只有remaining<0的時候(也就是信號量不夠的時候)才會進入AQS隊列。
所以非公平信號量的吞吐量總是要比公平信號量的吞吐量要大,但是需要強調的是非公平信號量和非公平鎖一樣存在“饑渴死”的現象,也就是說活躍線程可能總是拿到信號量,而非活躍線程可能難以拿到信號量。而對于公平信號量由于總是靠請求的線程的順序來獲取信號量,所以不存在此問題。
參考資料:
- 信號量(Semaphore)在生產者和消費者模式的使用
- What is mutex and semaphore in Java ? What is the main difference ?
- 關于 java.util.concurrent 您不知道的 5 件事,第 2 部分
- Semahores
©2009-2014 IMXYLZ
|求賢若渴