9.?? 同步(Concurrent)
????
1.????? Executor接口
???? Executor接口提供了一個類似于線程池的管理工具。用于只需要往Executor中提交Runnable對象,剩下的啟動線程等工作,都會有對應的實現類來完成。ScheduledExecutorService比ExecutorService增加了,時間上的控制,即用戶可以在提交的時候額外的定義該任務的啟動時機,以及隨后的執行間隔和延遲等。
???? 例子:
???? 任務:
???? public class ETask implements Runnable{
????????? private int id = 0;
????????? public ETask(int id){
?????????????? this.id = id;
????????? }
????????? public void run(){
?????????????? try{
?????????????????? System.out.println(id+" Start");
?????????????????? Thread.sleep(1000);
?????????????????? System.out.println(id+" Do");
?????????????????? Thread.sleep(1000);
?????????????????? System.out.println(id+" Exit");
????????????? }catch(Exception e){
?????????????????? e.printStackTrace();
????????????? }
????????? }
???? }
????
???? 測試類:
???? public class ETest{
????????? public static void main(String[] args){???????
????????????? ExecutorService executor = Executors.newFixedThreadPool(2);
????????????? for(int i=0;i<5;i++){
?????????????????? Runnable r = new ETask(i);
?????????????????? executor.execute(r);
????????????? }
????????????? executor.shutdown();
????????? }
???? }
?
???? 輸出:
???? 0 Start
???? 1 Start
???? 0 Do
???? 1 Do
???? 0 Exit
???? 2 Start
???? 1 Exit
???? 3 Start
???? 2 Do
???? 3 Do
???? 2 Exit
???? 3 Exit
???? 4 Start
???? 4 Do
???? 4 Exit
?
2.????? Future和Callable
???? Callable是一個類似于Runnable的接口,他與Runnable的區別是,她在執行完畢之后能夠返回結果。Future用于獲取線程的執行結果,或者取消已向Executor的任務。當我們通過Future提供的get()方法獲取任務的執行結果時,如果任務沒有完成,則調用get()方法的線程將會被阻塞,知道任務完成為止。一般我們都會使用Future的實現類FutureTask。
???? 例子:
???? Callable對象:
???? public class ETask implements Callable{
????????? private String id = null;
????????? public ETask(String id){
?????????????? this.id = id;
????????? }
????
????????? public String call(){
????????????? try{
?????????????????? System.out.println(id+" Start");
?????????????????? Thread.sleep(1000);
?????????????????? System.out.println(id+" Do");
?????????????????? Thread.sleep(1000);
?????????????????? System.out.println(id+" Exit");??????????
????????????? }catch(Exception e){
?????????????????? e.printStackTrace();
????????????? }
????????????? return id;
????????? }
???? }
?
???? 測試類:
???? public class ETest{
????????? public static void main(String[] args){???????
????????????? ExecutorService executor = Executors.newFixedThreadPool(2);
????????????? for(int i=0;i<5;i++){???????????
?????????????????? try{
?????????????????????? Callable c = new ETask(String.valueOf(i));
??????????????????????? FutureTask ft = new FutureTask(c);
??????????????????????? executor.execute(ft);
??????????????????????? System.out.println("Finish:" + ft.get());?????????
?????????????????? }catch(Exception e){
?????????????????????? e.printStackTrace();
?????????????????? }
????????????? }
?????????????? executor.shutdown();
????????? }
???? }
?
???? 輸出:
???? 0 Start
???? 0 Do
???? 0 Exit
???? Finish:0
???? 1 Start
???? 1 Do
???? 1 Exit
???? Finish:1
???? 2 Start
???? …
3.????? CompletionService和ExecutorCompletionService
???? CompletionService類似于一個Executor和Queue的混合。我們可以通過submit()向CompletionService提交任務,然后通過poll()來獲取第一個完成的任務,也可以通過take()來阻塞等待下一個完成的任務。ExecutorCompletionService是CompletionService的實現類,他需要提供一個Executor作為構造函數的參數。
???? 例子:
???? Executor executor = …;
???? CompletionService cs = new ExecutorCompletionService(executor);
???? Future fs = cs.submit(…);
???? Future ft = cs.take();
?
4.????? Semaphore
???? 信號量是用于同步和互斥的低級原語。信號量提供的acquire()和release()操作,與操作系統上的p,v操作同。
???? 例子:
???? 緩沖區:
???? public class Buffer{
????????? private Semaphore s = null;
????????? private Semaphore p = null;
????????? Vector<Integer> v = new Vector<Integer>();
?????????
????????? public Buffer(int capacity){
?????????????? s = new Semaphore(capacity);
????????????? p = new Semaphore(0);
????????? }
????
????????? public void put(int i){
????????????? try{
?????????????????? s.acquire();
?????????????????? v.add(new Integer(i));
?????????????????? p.release();
?????????????? }catch(Exception e){
?????????????????? e.printStackTrace();
????????????? }
????????? }
????
????????? public int get(){?
?????????????? int i = 0;
????????????? try{
?????????????????? p.acquire();
?????????????????? i = ((Integer)v.remove(0)).intValue();
?????????????????? s.release();
????????????? }catch(Exception e){
?????????????????? e.printStackTrace();
????????????? }
?????????????? return i;
????????? }???
???? }
?
???? 生產者:
???? public class Producer extends Thread{
????????? private Buffer b;
????????? private int count;
????????? private int step;
????????? private int id;
?
????????? public Producer(Buffer b,int step,int id){
?????????????? this.b =? b;
????????????? this.step = step;
????????????? this.id = id;
?????????????? count = 0;
????????? }
????
????????? public void run(){
????????????? try{
?????????????????? while(true){
?????????????????????? System.out.println("In put");
??????????????????????? b.put(count);
??????????????????????? System.out.println("Producer "+id+":"+count);
??????????????????????? count++;
?????????????????????? Thread.sleep(step);
??????????????????????? System.out.println("Out put");
?????????????????? }
?????????????? }catch(Exception e){
?????????????????? e.printStackTrace();
????????????? }
????????? }
???? }
?
???? 消費者:
???? public class Consumer extends Thread{
????????? private Buffer b;
????????? private int step;
????????? private int id;
????
????????? public Consumer(Buffer b,int step,int id){
????????????? this.b = b;
?????????????? this.step = step;
????????????? this.id = id;
????????? }
?????????
????????? public void run(){
????????????? try{
?????????????????? while(true){
??????????????????????? System.out.println("In get");
?????????????????????? System.out.println("\t\tConsume "+id+":"+b.get());
??????????????????????? System.out.println("Out get");
??????????????????????? Thread.sleep(step);
?????????????????? }
?????????????? }catch(Exception e){
?????????????????? e.printStackTrace();
????????????? }???
????????? }
???? }
?
???? 測試程序:
???? public class CPTest{
????????? public static void main(String[] args){
?????????????? Buffer b = new Buffer(3);
????????????? Consumer c1 = new Consumer(b,1000,1);
????????????? Consumer c2 = new Consumer(b,1000,2);
?????????????? Producer p1 = new Producer(b,100,1);
????????????? Producer p2 = new Producer(b,100,2);
????????
????????????? c1.start();
?????????????? c2.start();
????????????? p1.start();
????????????? p2.start();
????????? }
???? }
?
5.????? CyclicBarrier
???? CyclicBarrier可以讓一組線程在某一個時間點上進行等待,當所有進程都到達該等待點后,再繼續往下執行。CyclicBarrier使用完以后,通過調用reset()方法,可以重用該CyclicBarrier。線程通過調用await()來減少計數。
?
CyclicBarrier
?
?
?
?
?
?
?
?
???? 例子:
???? 任務:
???? public class Task extends Thread{
????????? private String id;
????????? private CyclicBarrier c;
????????? private int time;
????
????????? public Task(CyclicBarrier c,String id,int time){
?????????????? this.c = c;
????????????? this.id = id;
?????????????? this.time = time;
????????? }
????
????????? public void run(){
?????????????? try{
?????????????????? System.out.println(id+" Start");
????????????????? Thread.sleep(time);
?????????????????? System.out.println(id+" Finish");
?????????????????? c.await();
?????????????????? System.out.println(id+" Exit");?????????
?????????????? }catch(Exception e){
?????????????????? e.printStackTrace();
????????????? }
????????? }???
???? }
?
???? 測試類:
???? public class Test{
????????? public static void main(String[] args){
????????????? CyclicBarrier c = new CyclicBarrier(3,new Runnable(){
?????????????????? public void run(){
??????????????????????? System.out.println("All Work Done");
?????????????????? }
????????????? });
?????????????? Task t1 = new Task(c,"1",1000);
????????????? Task t2 = new Task(c,"2",3000);
????????????? Task t3 = new Task(c,"3",5000);
?????????????? t1.start();
????????????? t2.start();
????????????? t3.start();???????
????????? }
???? }
?
???? 輸出結果:
???? 1 Start
???? 2 Start
???? 3 Start
???? 1 Finish
???? 2 Finish
???? 3 Finish
???? All Work Done
???? 3 Exit
???? 1 Exit
???? 2 Exit
?
6.????? CountdownLatch
???? CountdownLatch具有與CyclicBarrier相似的功能,也能讓一組線程在某個點上進行同步。但是與CyclicBarrier不同的是:1.CountdownLatch不能重用,2.線程在CountdownLatch上調用await()操作一定會被阻塞,直到計數值為0時才會被喚醒,而且計數值只能通過conutDown()方法進行減少。
特別的,當CountdownLatch的值為1時,該Latch被稱為“啟動大門”,所有任務線程都在該Latch上await(),直到某個非任務線程調用countDown()觸發,所有任務線程開始同時工作。
?
7.????? Exchanger
???? Exchanger是一個類似于計數值為2的CyclicBarrier。她允許兩個線程在某個點上進行數據交換。
?????? 例子:
???? public class FillAndEmpty {
???????? Exchanger<DataBuffer> exchanger = new Exchanger();
???????? DataBuffer initialEmptyBuffer = ... a made-up type
???????? DataBuffer initialFullBuffer = ...
?
???????? public class FillingLoop implements Runnable {
????????????? public void run() {
?????????????????? DataBuffer currentBuffer = initialEmptyBuffer;
?????????????????? try {
?????????????????????? while (currentBuffer != null) {
??????????????????????????? addToBuffer(currentBuffer);
??????????????????????????? if (currentBuffer.full())
???????????????????????????????? currentBuffer = exchanger.exchange(currentBuffer);
?????????????????????? }
?????????????????? }catch(InterruptedException ex) { ... handle ... }
????????????? }
???????? }
?
???????? public class EmptyingLoop implements Runnable {
????????????? public void run() {
?????????????????? DataBuffer currentBuffer = initialFullBuffer;
?????????????????? try {
?????????????????????? while (currentBuffer != null) {
??????????????????????????? takeFromBuffer(currentBuffer);
??????????????????????????? if (currentBuffer.empty())
???????????????????????????????? currentBuffer = exchanger.exchange(currentBuffer);
?????????????????????? }
?????????????????? } catch (InterruptedException ex) { ... handle ...}
????????????? }
???????? }
?
???????? public void start() {
????????????? new Thread(new FillingLoop()).start();
????????????? new Thread(new EmptyingLoop()).start();
???????? }
???? }
Exchange
?
?
?
????
?
?
?
?
?
?
?
?
8.????? Lock,Condition
???? 鎖是最基本的同步原語。通過在鎖上面調用lock()和unlock()操作,可以達到與synchronized關鍵字相似的效果,但是有一點要注意的是,鎖必須顯式釋放,如果由于拋出異常,而沒有釋放鎖,將導致死鎖出現。Condition提供的await(),signal(),signal()操作,與原來的wai(),notify(),notifyAll()操作具有相似的含義。Lock的兩個主要子類是ReentrantLock和ReadWriteLock。其中ReadWriteLock的作用是允許多人讀,而一人寫。
???? 例子:
???? 使用Lock和Condition的生產者,消費者問題
???? public class BoundedBuffer {
???????? final Lock lock = new ReentrantLock();
???????? final Condition notFull? = lock.newCondition();
???????? final Condition notEmpty = lock.newCondition();
???????? final Object[] items = new Object[100];
???????? int putptr, takeptr, count;
????????
???????? public void put(Object x) throws InterruptedException {
????????????? lock.lock();
????????????? try {
?????????????????? while (count == items.length)
?????????????????????? notFull.await();
?????????????????? items[putptr] = x;
?????????????????? if (++putptr == items.length)
??????????????????????? putptr = 0;
?????????????????? ++count;
?????????????????? notEmpty.signal();
????????????? } finally {
?????????????????? lock.unlock();
?????????????? }
????????? }
????
????????? public Object take() throws InterruptedException {
?????????????? lock.lock();
????????????? try {
?????????????????? while (count == 0)
?????????????????????? notEmpty.await();
?????????????????? Object x = items[takeptr];
?????????????????? if (++takeptr == items.length)
??????????????????????? takeptr = 0;
?????????????????? --count;
?????????????????? notFull.signal();
?????????????????? return x;
?????????????? } finally {
?????????????????? lock.unlock();
????????????? }
????????? }
???? }???
?
9.????? 小結:新的concurrent包提供了一個從低到高的同步操作。
?