可以在對中對元素進行配對和交換的線程的同步點。每個線程將條目上的某個方法呈現給 exchange
方法,與伙伴線程進行匹配,并且在返回時接收其伙伴的對象。Exchanger 可能被視為 SynchronousQueue
的雙向形式。
換句話說Exchanger提供的是一個交換服務,允許原子性的交換兩個(多個)對象,但同時只有一對才會成功。先看一個簡單的實例模型。

在上面的模型中,我們假定一個空的棧(Stack),棧頂(Top)當然是沒有元素的。同時我們假定一個數據結構Node,包含一個要交換的元素E和一個要填充的“洞”Node。這時線程T1攜帶節點node1進入棧(cas_push),當然這是CAS操作,這樣棧頂就不為空了。線程T2攜帶節點node2進入棧,發現棧里面已經有元素了node1,同時發現node1的hold(Node)為空,于是將自己(node2)填充到node1的hold中(cas_fill)。然后將元素node1從棧中彈出(cas_take)。這樣線程T1就得到了node1.hold.item也就是node2的元素e2,線程T2就得到了node1.item也就是e1,從而達到了交換的目的。
算法描述就是下圖展示的內容。

JDK 5就是采用類似的思想實現的Exchanger。JDK 6以后為了支持多線程多對象同時Exchanger了就進行了改造(為了支持更好的并發),采用ConcurrentHashMap的思想,將Stack分割成很多的片段(或者說插槽Slot),線程Id(Thread.getId())hash相同的落在同一個Slot上,這樣在默認32個Slot上就有很好的吞吐量。當然會根據機器CPU內核的數量有一定的優化,有興趣的可以去了解下Exchanger的源碼。
至于Exchanger的使用,在JDK文檔上有個例子,講述的是兩個線程交換數據緩沖區的例子(實際上仍然可以認為是生產者/消費者模型)。
class FillAndEmpty {
Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>();
DataBuffer initialEmptyBuffer =
a made-up type
DataBuffer initialFullBuffer = 
class FillingLoop implements Runnable {
public void run() {
DataBuffer currentBuffer = initialEmptyBuffer;
try {
while (currentBuffer != null) {
addToBuffer(currentBuffer);
if (currentBuffer.isFull())
currentBuffer = exchanger.exchange(currentBuffer);
}
} catch (InterruptedException ex) {
handle
}
}
}
class EmptyingLoop implements Runnable {
public void run() {
DataBuffer currentBuffer = initialFullBuffer;
try {
while (currentBuffer != null) {
takeFromBuffer(currentBuffer);
if (currentBuffer.isEmpty())
currentBuffer = exchanger.exchange(currentBuffer);
}
} catch (InterruptedException ex) {
handle
}
}
}
void start() {
new Thread(new FillingLoop()).start();
new Thread(new EmptyingLoop()).start();
}
}
Exchanger實現的是一種數據分片的思想,這在大數據情況下將數據分成一定的片段并且多線程執行的情況下有一定的使用價值。
最近一直推托工作忙,更新頻度越來越低了,好在現在的工作還有點個人時間,以后爭取多更新下吧,至少也要把這個專輯寫完。
©2009-2014 IMXYLZ
|求賢若渴