Util.concurrent工具包概述
Doug Lea
State University of New York at Oswego
dl@cs.oswego.edu
http://gee.cs.oswego.edu
翻譯:
Cocia Lin(cocia@163.com)
Huihoo.org
原文
http://gee.cs.oswego.edu/dl/cpjslides/util.pdf
要點
–目標和結構
–主要的接口和實現(xiàn)
Sync:獲得/釋放(acquire/release) 協(xié)議
Channel:放置/取走(put/take) 協(xié)議
Executor:執(zhí)行Runnable任務
–每一個部分都有一些關聯(lián)的接口和支持類
–簡單的涉及其他的類和特性
目標
–一些簡單的接口
-但是覆蓋大部分程序員需要小心處理代碼的問題
– 高質量實現(xiàn)
-正確的,保守的,有效率的,可移植的
–可能作為將來標準的基礎
-獲取經(jīng)驗和收集反饋信息
Sync
– acquire/release協(xié)議的主要接口
-用來定制鎖,資源管理,其他的同步用途
- 高層抽象接口
- 沒有區(qū)分不同的加鎖用法
–實現(xiàn)
-Mutex, ReentrantLock, Latch, CountDown,Semaphore, WaiterPreferenceSemaphore,FIFOSemaphore, PrioritySemaphore
n 還有,有幾個簡單的實現(xiàn),例如ObservableSync, LayeredSync
獨占鎖
try {
lock.acquire();
try {
action();
}
finally {
lock.release();
}
}
catch (InterruptedException ie) { ... }
– Java同步塊不適用的時候使用它
- 超時,回退(back-off)
- 確保可中斷
- 大量迅速鎖定
- 創(chuàng)建Posix風格應用(condvar)
獨占例子
class ParticleUsingMutex {
int x; int y;
final Random rng = new Random();
final Mutex mutex = new Mutex();
public void move() {
try {
mutex.acquire();
try { x += rng.nextInt(2)-1; y += rng.nextInt(2)-1; }
finally { mutex.release(); }
}
catch (InterruptedException ie) {
Thread.currentThread().interrupt(); }
}
public void draw(Graphics g) {
int lx, ly;
try {
mutex.acquire();
try { lx = x; ly = y; }
finally { mutex.release(); }
}
catch (InterruptedException ie) {
Thread.currentThread().interrupt(); return; }
g.drawRect(lx, ly, 10, 10);
}
}
回退(Backoff)例子
class CellUsingBackoff {
private long val;
private final Mutex mutex = new Mutex();
void swapVal(CellUsingBackoff other)
throws InterruptedException {
if (this == other) return; // alias check
for (;;) {
mutex.acquire();
try {
I f (other.mutex.attempt(0)) {
try {
long t = val;
val = other.val;
other.val = t;
return;
}
finally { other.mutex.release(); }
}
}
finally { mutex.release(); };
Thread.sleep(100); // heuristic retry interval
}
}
}
讀寫鎖
interface ReadWriteLock {
Sync readLock();
Sync writeLock();
}
– 管理一對鎖
- 和普通的鎖一樣的使用習慣
– 對集合類很有用
-半自動的方式實現(xiàn)SyncSet, SyncMap, …
– 實現(xiàn)者使用不同的鎖策略
- WriterPreference, ReentrantWriterPreference,
ReaderPreference, FIFO
ReadWriteLock例子
– 示范在讀寫鎖中執(zhí)行任何Runnable的包裝類
class WithRWLock {
final ReadWriteLock rw;
public WithRWLock(ReadWriteLock l) { rw = l; }
public void performRead(Runnable readCommand)
throws InterruptedException {
rw.readLock().acquire();
try { readCommand.run(); }
finally { rw.readlock().release(); }
}
public void performWrite(…) // similar
}
閉鎖(Latch)
– 閉鎖是開始時設置為false,但一旦被設置為true,他將永遠保持true狀態(tài)
- 初始化標志
- 流結束定位
- 線程中斷
- 事件出發(fā)指示器
– CountDown和他有點類似,不同的是,CountDown需要一定數(shù)量的觸發(fā)設置,而不是一次
– 非常簡單,但是廣泛使用的類
- 替換容易犯錯的開發(fā)代碼
Latch Example 閉鎖例子
class Worker implements Runnable {
Latch startSignal;
Worker(Latch l) { startSignal = l; }
public void run() {
startSignal.acquire();
// … doWork();
}
}
class Driver { // …
void main() {
Latch ss = new Latch();
for (int i = 0; i < N; ++i) // make threads
new Thread(new Worker(ss)).start();
doSomethingElse(); // don’t let run yet
ss.release(); // now let all threads proceed
}
}
信號(Semaphores)
-- 服務于數(shù)量有限的占有者
- 使用許可數(shù)量構造對象(通常是0)
- 如果需要一個許可才能獲取,等待,然后取走一個許可
- 釋放的時候將許可添加回來
-- 但是真正的許可并沒有轉移(But no actual permits change hands.)
- 信號量僅僅保留當前的計數(shù)值
-- 應用程序
- 鎖:一個信號量可以被用作互斥體(mutex)
- 一個獨立的等待緩存或者資源控制的操作
- 設計系統(tǒng)是想忽略底層的系統(tǒng)信號
-- (phores ‘remember’ past signals)記住已經(jīng)消失的信號量
信號量例子
class Pool {
ArrayList items = new ArrayList();
HashSet busy = new HashSet();
final Semaphore available;
public Pool(int n) {
available = new Semaphore(n);
// … somehow initialize n items …;
}
public Object getItem() throws InterruptedException {
available.acquire();
return doGet();
}
public void returnItem(Object x) {
if (doReturn(x)) available.release();
}
synchronized Object doGet() {
Object x = items.remove(items.size()-1);
busy.add(x); // put in set to check returns
return x;
}
synchronized boolean doReturn(Object x) {
return busy.remove(x); // true if was present
}
}
屏障(Barrier)
– 多部分同步接口
- 每一部分都必須等待其他的分不撞倒屏障
– CyclicBarrier類
- CountDown的一個可以重新設置的版本
- 對于反復劃分算法很有用(iterative partitioning algorithms)
– Rendezvous類
- 一個每部分都能夠和其他部分交換信息的屏障
- 行為類似同時的在一個同步通道上put和take
- 對于資源交換協(xié)議很有用(resource-exchange protocols)
通道(Channel)
–為緩沖,隊列等服務的主接口
– 具體實現(xiàn)
- LinkedQueue, BoundedLinkedQueue,BoundedBuffer, BoundedPriorityQueue,SynchronousChannel, Slot
通道屬性
– 被定義為Puttable和Takable的子接口
- 允許安裝生產(chǎn)者/消費者模式執(zhí)行
– 支持可超時的操作offer和poll
- 當超時值是0時,可能會被阻塞
- 所有的方法能夠拋出InterruptedException異常
– 沒有接口需要size方法
- 但是一些實現(xiàn)定義了這個方法
- BoundedChannel有capacity方法
通道例子
class Service { // …
final Channel msgQ = new LinkedQueue();
public void serve() throws InterruptedException {
String status = doService();
msgQ.put(status);
}
public Service() { // start background thread
Runnable logger = new Runnable() {
public void run() {
try {
for(;;)
System.out.println(msqQ.take());
}
catch(InterruptedException ie) {} }
};
new Thread(logger).start();
}
}
運行器(Executor)
– 類似線程的類的主接口
- 線程池
- 輕量級運行框架
- 可以定制調度算法
– 只需要支持execute(Runnable r)
- 同Thread.start類似
– 實現(xiàn)
- PooledExecutor, ThreadedExecutor,QueuedExecutor, FJTaskRunnerGroup
- 相關的ThreadFactory類允許大多數(shù)的運行器通過定制屬性使用線程
PooledExecutor
– 一個可調的工作者線程池,可修改得屬性如下:
- 任務隊列的類型
- 最大線程數(shù)
- 最小線程數(shù)
- 預熱(預分配)和立即(分配)線程
- 保持活躍直到工作線程結束
– 以后如果需要可能被一個新的代替
- 飽和(Saturation)協(xié)議
– 阻塞,丟棄,生產(chǎn)者運行,等等
PooledExecutor例子
class WebService {
public static void main(String[] args) {
PooledExecutor pool =
new PooledExecutor(new BoundedBuffer(10), 20);
pool.createThreads(4);
try {
ServerSocket socket = new ServerSocket(9999);
for (;;) {
final Socket connection = socket.accept();
pool.execute(new Runnable() {
public void run() {
new Handler().process(connection);
}});
}
}
catch(Exception e) { } // die
}
}
class Handler { void process(Socket s); }
前景(Future)和可調用(Callable)
– Callabe是類似于Runnable的接口,用來作為參數(shù)和傳遞結果
interface Callable {
Object call(Object arg) throws Exception;
}
– FutureResult管理Callable的異步執(zhí)行
class FutureResult { // …
// block caller until result is ready
public Object get()
throws InterruptedException, InvocationTargetException;
public void set(Object result); // unblocks get
// create Runnable that can be used with an Executor
public Runnable setter(Callable function);
}
FutureResult例子
class ImageRenderer { Image render(byte[] raw); }
class App { // …
Executor executor = …; // any executor
ImageRenderer renderer = new ImageRenderer();
public void display(byte[] rawimage) {
try {
FutureResult futureImage = new FutureResult();
Runnable cmd = futureImage.setter(new Callable(){
public Object call() {
return renderer.render(rawImage);
}});
executor.execute(cmd);
drawBorders(); // do other things while executing
drawCaption();
drawImage((Image)(futureImage.get())); // use future
}
catch (Exception ex) {
cleanup();
return;
}
}
}
其他的類
– CopyOnWriteArrayList
- 支持整個集合復制時每一個修改的無鎖訪問
- 適合大多數(shù)的多路廣播應用程序
– 工具包還包括了一個java.beans多路廣播類的COW版本
– SynchronizedDouble, SynchronizedInt,SynchronizedRef, etc
- 類似于java.lang.Double,提供可變操作的同步版本.例如,addTo,inc
- 添加了一些象swap,commit這樣的實用操作
未來計劃
– 并發(fā)數(shù)據(jù)構架
- 一組繁重線程連接環(huán)境下有用的工具集合
–支持側重I/O的程序
- 事件機制的IO系統(tǒng)
– 小版本的實現(xiàn)
- 例如SingleSourceQueue
–小幅度的改善
- 使運行器更容易使用
– 替換
- JDK1.3 java.util.Timer 被ClockDaemon取代