9.?? 同步(Concurrent)
????
1.????? Executor接口
???? Executor接口提供了一個(gè)類似于線程池的管理工具。用于只需要往Executor中提交Runnable對象,剩下的啟動(dòng)線程等工作,都會(huì)有對應(yīng)的實(shí)現(xiàn)類來完成。ScheduledExecutorService比ExecutorService增加了,時(shí)間上的控制,即用戶可以在提交的時(shí)候額外的定義該任務(wù)的啟動(dòng)時(shí)機(jī),以及隨后的執(zhí)行間隔和延遲等。
???? 例子:
???? 任務(wù):
???? public class ETask implements Runnable{
????????? private int id = 0;
????????? public ETask(int id){
?????????????? this.id = id;
????????? }
????????? public void run(){
?????????????? try{
?????????????????? System.out.println(id+" Start");
?????????????????? Thread.sleep(1000);
?????????????????? System.out.println(id+" Do");
?????????????????? Thread.sleep(1000);
?????????????????? System.out.println(id+" Exit");
????????????? }catch(Exception e){
?????????????????? e.printStackTrace();
????????????? }
????????? }
???? }
????
???? 測試類:
???? public class ETest{
????????? public static void main(String[] args){???????
????????????? ExecutorService executor = Executors.newFixedThreadPool(2);
????????????? for(int i=0;i<5;i++){
?????????????????? Runnable r = new ETask(i);
?????????????????? executor.execute(r);
????????????? }
????????????? executor.shutdown();
????????? }
???? }
?
???? 輸出:
???? 0 Start
???? 1 Start
???? 0 Do
???? 1 Do
???? 0 Exit
???? 2 Start
???? 1 Exit
???? 3 Start
???? 2 Do
???? 3 Do
???? 2 Exit
???? 3 Exit
???? 4 Start
???? 4 Do
???? 4 Exit
?
2.????? Future和Callable
???? Callable是一個(gè)類似于Runnable的接口,他與Runnable的區(qū)別是,她在執(zhí)行完畢之后能夠返回結(jié)果。Future用于獲取線程的執(zhí)行結(jié)果,或者取消已向Executor的任務(wù)。當(dāng)我們通過Future提供的get()方法獲取任務(wù)的執(zhí)行結(jié)果時(shí),如果任務(wù)沒有完成,則調(diào)用get()方法的線程將會(huì)被阻塞,知道任務(wù)完成為止。一般我們都會(huì)使用Future的實(shí)現(xiàn)類FutureTask。
???? 例子:
???? Callable對象:
???? public class ETask implements Callable{
????????? private String id = null;
????????? public ETask(String id){
?????????????? this.id = id;
????????? }
????
????????? public String call(){
????????????? try{
?????????????????? System.out.println(id+" Start");
?????????????????? Thread.sleep(1000);
?????????????????? System.out.println(id+" Do");
?????????????????? Thread.sleep(1000);
?????????????????? System.out.println(id+" Exit");??????????
????????????? }catch(Exception e){
?????????????????? e.printStackTrace();
????????????? }
????????????? return id;
????????? }
???? }
?
???? 測試類:
???? public class ETest{
????????? public static void main(String[] args){???????
????????????? ExecutorService executor = Executors.newFixedThreadPool(2);
????????????? for(int i=0;i<5;i++){???????????
?????????????????? try{
?????????????????????? Callable c = new ETask(String.valueOf(i));
??????????????????????? FutureTask ft = new FutureTask(c);
??????????????????????? executor.execute(ft);
??????????????????????? System.out.println("Finish:" + ft.get());?????????
?????????????????? }catch(Exception e){
?????????????????????? e.printStackTrace();
?????????????????? }
????????????? }
?????????????? executor.shutdown();
????????? }
???? }
?
???? 輸出:
???? 0 Start
???? 0 Do
???? 0 Exit
???? Finish:0
???? 1 Start
???? 1 Do
???? 1 Exit
???? Finish:1
???? 2 Start
???? …
3.????? CompletionService和ExecutorCompletionService
???? CompletionService類似于一個(gè)Executor和Queue的混合。我們可以通過submit()向CompletionService提交任務(wù),然后通過poll()來獲取第一個(gè)完成的任務(wù),也可以通過take()來阻塞等待下一個(gè)完成的任務(wù)。ExecutorCompletionService是CompletionService的實(shí)現(xiàn)類,他需要提供一個(gè)Executor作為構(gòu)造函數(shù)的參數(shù)。
???? 例子:
???? Executor executor = …;
???? CompletionService cs = new ExecutorCompletionService(executor);
???? Future fs = cs.submit(…);
???? Future ft = cs.take();
?
4.????? Semaphore
???? 信號(hào)量是用于同步和互斥的低級原語。信號(hào)量提供的acquire()和release()操作,與操作系統(tǒng)上的p,v操作同。
???? 例子:
???? 緩沖區(qū):
???? public class Buffer{
????????? private Semaphore s = null;
????????? private Semaphore p = null;
????????? Vector<Integer> v = new Vector<Integer>();
?????????
????????? public Buffer(int capacity){
?????????????? s = new Semaphore(capacity);
????????????? p = new Semaphore(0);
????????? }
????
????????? public void put(int i){
????????????? try{
?????????????????? s.acquire();
?????????????????? v.add(new Integer(i));
?????????????????? p.release();
?????????????? }catch(Exception e){
?????????????????? e.printStackTrace();
????????????? }
????????? }
????
????????? public int get(){?
?????????????? int i = 0;
????????????? try{
?????????????????? p.acquire();
?????????????????? i = ((Integer)v.remove(0)).intValue();
?????????????????? s.release();
????????????? }catch(Exception e){
?????????????????? e.printStackTrace();
????????????? }
?????????????? return i;
????????? }???
???? }
?
???? 生產(chǎn)者:
???? public class Producer extends Thread{
????????? private Buffer b;
????????? private int count;
????????? private int step;
????????? private int id;
?
????????? public Producer(Buffer b,int step,int id){
?????????????? this.b =? b;
????????????? this.step = step;
????????????? this.id = id;
?????????????? count = 0;
????????? }
????
????????? public void run(){
????????????? try{
?????????????????? while(true){
?????????????????????? System.out.println("In put");
??????????????????????? b.put(count);
??????????????????????? System.out.println("Producer "+id+":"+count);
??????????????????????? count++;
?????????????????????? Thread.sleep(step);
??????????????????????? System.out.println("Out put");
?????????????????? }
?????????????? }catch(Exception e){
?????????????????? e.printStackTrace();
????????????? }
????????? }
???? }
?
???? 消費(fèi)者:
???? public class Consumer extends Thread{
????????? private Buffer b;
????????? private int step;
????????? private int id;
????
????????? public Consumer(Buffer b,int step,int id){
????????????? this.b = b;
?????????????? this.step = step;
????????????? this.id = id;
????????? }
?????????
????????? public void run(){
????????????? try{
?????????????????? while(true){
??????????????????????? System.out.println("In get");
?????????????????????? System.out.println("\t\tConsume "+id+":"+b.get());
??????????????????????? System.out.println("Out get");
??????????????????????? Thread.sleep(step);
?????????????????? }
?????????????? }catch(Exception e){
?????????????????? e.printStackTrace();
????????????? }???
????????? }
???? }
?
???? 測試程序:
???? public class CPTest{
????????? public static void main(String[] args){
?????????????? Buffer b = new Buffer(3);
????????????? Consumer c1 = new Consumer(b,1000,1);
????????????? Consumer c2 = new Consumer(b,1000,2);
?????????????? Producer p1 = new Producer(b,100,1);
????????????? Producer p2 = new Producer(b,100,2);
????????
????????????? c1.start();
?????????????? c2.start();
????????????? p1.start();
????????????? p2.start();
????????? }
???? }
?
5.????? CyclicBarrier
???? CyclicBarrier可以讓一組線程在某一個(gè)時(shí)間點(diǎn)上進(jìn)行等待,當(dāng)所有進(jìn)程都到達(dá)該等待點(diǎn)后,再繼續(xù)往下執(zhí)行。CyclicBarrier使用完以后,通過調(diào)用reset()方法,可以重用該CyclicBarrier。線程通過調(diào)用await()來減少計(jì)數(shù)。
?
CyclicBarrier
?
?
?
?
?
?
?
?
???? 例子:
???? 任務(wù):
???? public class Task extends Thread{
????????? private String id;
????????? private CyclicBarrier c;
????????? private int time;
????
????????? public Task(CyclicBarrier c,String id,int time){
?????????????? this.c = c;
????????????? this.id = id;
?????????????? this.time = time;
????????? }
????
????????? public void run(){
?????????????? try{
?????????????????? System.out.println(id+" Start");
????????????????? Thread.sleep(time);
?????????????????? System.out.println(id+" Finish");
?????????????????? c.await();
?????????????????? System.out.println(id+" Exit");?????????
?????????????? }catch(Exception e){
?????????????????? e.printStackTrace();
????????????? }
????????? }???
???? }
?
???? 測試類:
???? public class Test{
????????? public static void main(String[] args){
????????????? CyclicBarrier c = new CyclicBarrier(3,new Runnable(){
?????????????????? public void run(){
??????????????????????? System.out.println("All Work Done");
?????????????????? }
????????????? });
?????????????? Task t1 = new Task(c,"1",1000);
????????????? Task t2 = new Task(c,"2",3000);
????????????? Task t3 = new Task(c,"3",5000);
?????????????? t1.start();
????????????? t2.start();
????????????? t3.start();???????
????????? }
???? }
?
???? 輸出結(jié)果:
???? 1 Start
???? 2 Start
???? 3 Start
???? 1 Finish
???? 2 Finish
???? 3 Finish
???? All Work Done
???? 3 Exit
???? 1 Exit
???? 2 Exit
?
6.????? CountdownLatch
???? CountdownLatch具有與CyclicBarrier相似的功能,也能讓一組線程在某個(gè)點(diǎn)上進(jìn)行同步。但是與CyclicBarrier不同的是:1.CountdownLatch不能重用,2.線程在CountdownLatch上調(diào)用await()操作一定會(huì)被阻塞,直到計(jì)數(shù)值為0時(shí)才會(huì)被喚醒,而且計(jì)數(shù)值只能通過conutDown()方法進(jìn)行減少。
特別的,當(dāng)CountdownLatch的值為1時(shí),該Latch被稱為“啟動(dòng)大門”,所有任務(wù)線程都在該Latch上await(),直到某個(gè)非任務(wù)線程調(diào)用countDown()觸發(fā),所有任務(wù)線程開始同時(shí)工作。
?
7.????? Exchanger
???? Exchanger是一個(gè)類似于計(jì)數(shù)值為2的CyclicBarrier。她允許兩個(gè)線程在某個(gè)點(diǎn)上進(jìn)行數(shù)據(jù)交換。
?????? 例子:
???? public class FillAndEmpty {
???????? Exchanger<DataBuffer> exchanger = new Exchanger();
???????? DataBuffer initialEmptyBuffer = ... a made-up type
???????? DataBuffer initialFullBuffer = ...
?
???????? public class FillingLoop implements Runnable {
????????????? public void run() {
?????????????????? DataBuffer currentBuffer = initialEmptyBuffer;
?????????????????? try {
?????????????????????? while (currentBuffer != null) {
??????????????????????????? addToBuffer(currentBuffer);
??????????????????????????? if (currentBuffer.full())
???????????????????????????????? currentBuffer = exchanger.exchange(currentBuffer);
?????????????????????? }
?????????????????? }catch(InterruptedException ex) { ... handle ... }
????????????? }
???????? }
?
???????? public class EmptyingLoop implements Runnable {
????????????? public void run() {
?????????????????? DataBuffer currentBuffer = initialFullBuffer;
?????????????????? try {
?????????????????????? while (currentBuffer != null) {
??????????????????????????? takeFromBuffer(currentBuffer);
??????????????????????????? if (currentBuffer.empty())
???????????????????????????????? currentBuffer = exchanger.exchange(currentBuffer);
?????????????????????? }
?????????????????? } catch (InterruptedException ex) { ... handle ...}
????????????? }
???????? }
?
???????? public void start() {
????????????? new Thread(new FillingLoop()).start();
????????????? new Thread(new EmptyingLoop()).start();
???????? }
???? }
Exchange
?
?
?
????
?
?
?
?
?
?
?
?
8.????? Lock,Condition
???? 鎖是最基本的同步原語。通過在鎖上面調(diào)用lock()和unlock()操作,可以達(dá)到與synchronized關(guān)鍵字相似的效果,但是有一點(diǎn)要注意的是,鎖必須顯式釋放,如果由于拋出異常,而沒有釋放鎖,將導(dǎo)致死鎖出現(xiàn)。Condition提供的await(),signal(),signal()操作,與原來的wai(),notify(),notifyAll()操作具有相似的含義。Lock的兩個(gè)主要子類是ReentrantLock和ReadWriteLock。其中ReadWriteLock的作用是允許多人讀,而一人寫。
???? 例子:
???? 使用Lock和Condition的生產(chǎn)者,消費(fèi)者問題
???? public class BoundedBuffer {
???????? final Lock lock = new ReentrantLock();
???????? final Condition notFull? = lock.newCondition();
???????? final Condition notEmpty = lock.newCondition();
???????? final Object[] items = new Object[100];
???????? int putptr, takeptr, count;
????????
???????? public void put(Object x) throws InterruptedException {
????????????? lock.lock();
????????????? try {
?????????????????? while (count == items.length)
?????????????????????? notFull.await();
?????????????????? items[putptr] = x;
?????????????????? if (++putptr == items.length)
??????????????????????? putptr = 0;
?????????????????? ++count;
?????????????????? notEmpty.signal();
????????????? } finally {
?????????????????? lock.unlock();
?????????????? }
????????? }
????
????????? public Object take() throws InterruptedException {
?????????????? lock.lock();
????????????? try {
?????????????????? while (count == 0)
?????????????????????? notEmpty.await();
?????????????????? Object x = items[takeptr];
?????????????????? if (++takeptr == items.length)
??????????????????????? takeptr = 0;
?????????????????? --count;
?????????????????? notFull.signal();
?????????????????? return x;
?????????????? } finally {
?????????????????? lock.unlock();
????????????? }
????????? }
???? }???
?
9.????? 小結(jié):新的concurrent包提供了一個(gè)從低到高的同步操作。
?
posted on 2007-01-22 17:33
Lib 閱讀(1606)
評論(0) 編輯 收藏 所屬分類:
Java