<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    隨筆-71  評論-4  文章-0  trackbacks-0
    java語言已經(jīng)內(nèi)置了多線程支持,所有實(shí)現(xiàn)Runnable接口的類都可被啟動(dòng)一個(gè)新線程,新線程會(huì)執(zhí)行該實(shí)例的run()方法,當(dāng)run()方法執(zhí)行完畢后,線程就結(jié)束了。一旦一個(gè)線程執(zhí)行完畢,這個(gè)實(shí)例就不能再重新啟動(dòng),只能重新生成一個(gè)新實(shí)例,再啟動(dòng)一個(gè)新線程。

    Thread類是實(shí)現(xiàn)了Runnable接口的一個(gè)實(shí)例,它代表一個(gè)線程的實(shí)例,并且,啟動(dòng)線程的唯一方法就是通過Thread類的start()實(shí)例方法:

    Thread t = new Thread();
    t.start();

    start()方法是一個(gè)native方法,它將啟動(dòng)一個(gè)新線程,并執(zhí)行run()方法。Thread類默認(rèn)的run()方法什么也不做就退出了。注意:直接調(diào)用run()方法并不會(huì)啟動(dòng)一個(gè)新線程,它和調(diào)用一個(gè)普通的java方法沒有什么區(qū)別。

    因此,有兩個(gè)方法可以實(shí)現(xiàn)自己的線程:

    方法1:自己的類extend Thread,并復(fù)寫run()方法,就可以啟動(dòng)新線程并執(zhí)行自己定義的run()方法。例如:

    public class MyThread extends Thread {
        public run() {
            System.out.println("MyThread.run()");
        }
    }

    在合適的地方啟動(dòng)線程:new MyThread().start();

    方法2:如果自己的類已經(jīng)extends另一個(gè)類,就無法直接extends Thread,此時(shí),必須實(shí)現(xiàn)一個(gè)Runnable接口:

    public class MyThread extends OtherClass implements Runnable {
        public run() {
            System.out.println("MyThread.run()");
        }
    }

    為了啟動(dòng)MyThread,需要首先實(shí)例化一個(gè)Thread,并傳入自己的MyThread實(shí)例:

    MyThread myt = new MyThread();
    Thread t = new Thread(myt);
    t.start();

    事實(shí)上,當(dāng)傳入一個(gè)Runnable target參數(shù)給Thread后,Thread的run()方法就會(huì)調(diào)用target.run(),參考JDK源代碼:

    public void run() {
        if (target != null) {
            target.run();
        }
    }

    線程還有一些Name, ThreadGroup, isDaemon等設(shè)置,由于和線程設(shè)計(jì)模式關(guān)聯(lián)很少,這里就不多說了。


    由于同一進(jìn)程內(nèi)的多個(gè)線程共享內(nèi)存空間,在Java中,就是共享實(shí)例,當(dāng)多個(gè)線程試圖同時(shí)修改某個(gè)實(shí)例的內(nèi)容時(shí),就會(huì)造成沖突,因此,線程必須實(shí)現(xiàn)共享互斥,使多線程同步。

    最簡單的同步是將一個(gè)方法標(biāo)記為synchronized,對同一個(gè)實(shí)例來說,任一時(shí)刻只能有一個(gè)synchronized方法在執(zhí)行。當(dāng)一個(gè)方法正在執(zhí)行某個(gè)synchronized方法時(shí),其他線程如果想要執(zhí)行這個(gè)實(shí)例的任意一個(gè)synchronized方法,都必須等待當(dāng)前執(zhí)行 synchronized方法的線程退出此方法后,才能依次執(zhí)行。

    但是,非synchronized方法不受影響,不管當(dāng)前有沒有執(zhí)行synchronized方法,非synchronized方法都可以被多個(gè)線程同時(shí)執(zhí)行。

    此外,必須注意,只有同一實(shí)例的synchronized方法同一時(shí)間只能被一個(gè)線程執(zhí)行,不同實(shí)例的synchronized方法是可以并發(fā)的。例如,class A定義了synchronized方法sync(),則不同實(shí)例a1.sync()和a2.sync()可以同時(shí)由兩個(gè)線程來執(zhí)行。


    多線程同步的實(shí)現(xiàn)最終依賴鎖機(jī)制。我們可以想象某一共享資源是一間屋子,每個(gè)人都是一個(gè)線程。當(dāng)A希望進(jìn)入房間時(shí),他必須獲得門鎖,一旦A獲得門鎖,他進(jìn)去后就立刻將門鎖上,于是B,C,D...就不得不在門外等待,直到A釋放鎖出來后,B,C,D...中的某一人搶到了該鎖(具體搶法依賴于 JVM的實(shí)現(xiàn),可以先到先得,也可以隨機(jī)挑選),然后進(jìn)屋又將門鎖上。這樣,任一時(shí)刻最多有一人在屋內(nèi)(使用共享資源)。

    Java語言規(guī)范內(nèi)置了對多線程的支持。對于Java程序來說,每一個(gè)對象實(shí)例都有一把“鎖”,一旦某個(gè)線程獲得了該鎖,別的線程如果希望獲得該鎖,只能等待這個(gè)線程釋放鎖之后。獲得鎖的方法只有一個(gè),就是synchronized關(guān)鍵字。例如:

    public class SharedResource {
        private int count = 0;

        public int getCount() { return count; }

        public synchronized void setCount(int count) { this.count = count; }

    }

    同步方法public synchronized void setCount(int count) { this.count = count; } 事實(shí)上相當(dāng)于:

    public void setCount(int count) {
        synchronized(this) { // 在此獲得this鎖
             this.count = count;
        } // 在此釋放this鎖
    }

    紅色部分表示需要同步的代碼段,該區(qū)域?yàn)椤拔kU(xiǎn)區(qū)域”,如果兩個(gè)以上的線程同時(shí)執(zhí)行,會(huì)引發(fā)沖突,因此,要更改SharedResource的內(nèi)部狀態(tài),必須先獲得SharedResource實(shí)例的鎖。

    退出synchronized塊時(shí),線程擁有的鎖自動(dòng)釋放,于是,別的線程又可以獲取該鎖了。

    為了提高性能,不一定要鎖定this,例如,SharedResource有兩個(gè)獨(dú)立變化的變量:

    public class SharedResouce {
        private int a = 0;
        private int b = 0;

        public synchronized void setA(int a) { this.a = a; }

        public synchronized void setB(int b) { this.b = b; }
    }

    若同步整個(gè)方法,則setA()的時(shí)候無法setB(),setB()時(shí)無法setA()。為了提高性能,可以使用不同對象的鎖:

    public class SharedResouce {
        private int a = 0;
        private int b = 0;
        private Object sync_a = new Object();
        private Object sync_b = new Object();

        public void setA(int a) {
            synchronized(sync_a) {
                this.a = a;
            }
        }

        public synchronized void setB(int b) {
            synchronized(sync_b) {
                this.b = b;
            }
        }
    }


    通常,多線程之間需要協(xié)調(diào)工作。例如,瀏覽器的一個(gè)顯示圖片的線程displayThread想要執(zhí)行顯示圖片的任務(wù),必須等待下載線程 downloadThread將該圖片下載完畢。如果圖片還沒有下載完,displayThread可以暫停,當(dāng)downloadThread完成了任務(wù)后,再通知displayThread“圖片準(zhǔn)備完畢,可以顯示了”,這時(shí),displayThread繼續(xù)執(zhí)行。

    以上邏輯簡單的說就是:如果條件不滿足,則等待。當(dāng)條件滿足時(shí),等待該條件的線程將被喚醒。在Java中,這個(gè)機(jī)制的實(shí)現(xiàn)依賴于wait/notify。等待機(jī)制與鎖機(jī)制是密切關(guān)聯(lián)的。例如:

    synchronized(obj) {
        while(!condition) {
            obj.wait();
        }
        obj.doSomething();
    }

    當(dāng)線程A獲得了obj鎖后,發(fā)現(xiàn)條件condition不滿足,無法繼續(xù)下一處理,于是線程A就wait()。

    在另一線程B中,如果B更改了某些條件,使得線程A的condition條件滿足了,就可以喚醒線程A:

    synchronized(obj) {
        condition = true;
        obj.notify();
    }

    需要注意的概念是:

    # 調(diào)用obj的wait(), notify()方法前,必須獲得obj鎖,也就是必須寫在synchronized(obj) {...} 代碼段內(nèi)。

    # 調(diào)用obj.wait()后,線程A就釋放了obj的鎖,否則線程B無法獲得obj鎖,也就無法在synchronized(obj) {...} 代碼段內(nèi)喚醒A。

    # 當(dāng)obj.wait()方法返回后,線程A需要再次獲得obj鎖,才能繼續(xù)執(zhí)行。

    # 如果A1,A2,A3都在obj.wait(),則B調(diào)用obj.notify()只能喚醒A1,A2,A3中的一個(gè)(具體哪一個(gè)由JVM決定)。

    # obj.notifyAll()則能全部喚醒A1,A2,A3,但是要繼續(xù)執(zhí)行obj.wait()的下一條語句,必須獲得obj鎖,因此,A1,A2,A3只有一個(gè)有機(jī)會(huì)獲得鎖繼續(xù)執(zhí)行,例如A1,其余的需要等待A1釋放obj鎖之后才能繼續(xù)執(zhí)行。

    # 當(dāng)B調(diào)用obj.notify/notifyAll的時(shí)候,B正持有obj鎖,因此,A1,A2,A3雖被喚醒,但是仍無法獲得obj鎖。直到B退出synchronized塊,釋放obj鎖后,A1,A2,A3中的一個(gè)才有機(jī)會(huì)獲得鎖繼續(xù)執(zhí)行。



    前面講了wait/notify機(jī)制,Thread還有一個(gè)sleep()靜態(tài)方法,它也能使線程暫停一段時(shí)間。sleep與wait的不同點(diǎn)是: sleep并不釋放鎖,并且sleep的暫停和wait暫停是不一樣的。obj.wait會(huì)使線程進(jìn)入obj對象的等待集合中并等待喚醒。

    但是wait()和sleep()都可以通過interrupt()方法打斷線程的暫停狀態(tài),從而使線程立刻拋出InterruptedException。

    如果線程A希望立即結(jié)束線程B,則可以對線程B對應(yīng)的Thread實(shí)例調(diào)用interrupt方法。如果此刻線程B正在wait/sleep/join,則線程B會(huì)立刻拋出InterruptedException,在catch() {} 中直接return即可安全地結(jié)束線程。

    需要注意的是,InterruptedException是線程自己從內(nèi)部拋出的,并不是interrupt()方法拋出的。對某一線程調(diào)用 interrupt()時(shí),如果該線程正在執(zhí)行普通的代碼,那么該線程根本就不會(huì)拋出InterruptedException。但是,一旦該線程進(jìn)入到 wait()/sleep()/join()后,就會(huì)立刻拋出InterruptedException。



    GuardedSuspention模式主要思想是:

    當(dāng)條件不滿足時(shí),線程等待,直到條件滿足時(shí),等待該條件的線程被喚醒。

    我們設(shè)計(jì)一個(gè)客戶端線程和一個(gè)服務(wù)器線程,客戶端線程不斷發(fā)送請求給服務(wù)器線程,服務(wù)器線程不斷處理請求。當(dāng)請求隊(duì)列為空時(shí),服務(wù)器線程就必須等待,直到客戶端發(fā)送了請求。

    先定義一個(gè)請求隊(duì)列:Queue

    package com.crackj2ee.thread;

    import java.util.*;

    public class Queue {
        private List queue = new LinkedList();

        public synchronized Request getRequest() {
            while(queue.size()==0) {
                try {
                    this.wait();
                }
                catch(InterruptedException ie) {
                    return null;
                }
            }
            return (Request)queue.remove(0);
        }

        public synchronized void putRequest(Request request) {
            queue.add(request);
            this.notifyAll();
        }

    }

    藍(lán)色部分就是服務(wù)器線程的等待條件,而客戶端線程在放入了一個(gè)request后,就使服務(wù)器線程等待條件滿足,于是喚醒服務(wù)器線程。

    客戶端線程:ClientThread

    package com.crackj2ee.thread;

    public class ClientThread extends Thread {
        private Queue queue;
        private String clientName;

        public ClientThread(Queue queue, String clientName) {
            this.queue = queue;
            this.clientName = clientName;
        }

        public String toString() {
            return "[ClientThread-" + clientName + "]";
        }

        public void run() {
            for(int i=0; i<100; i++) {
                Request request = new Request("" + (long)(Math.random()*10000));
                System.out.println(this + " send request: " + request);
                queue.putRequest(request);
                try {
                    Thread.sleep((long)(Math.random() * 10000 + 1000));
                }
                catch(InterruptedException ie) {
                }
            }
            System.out.println(this + " shutdown.");
        }
    }

    服務(wù)器線程:ServerThread

    package com.crackj2ee.thread;
    public class ServerThread extends Thread {
        private boolean stop = false;
        private Queue queue;

        public ServerThread(Queue queue) {
            this.queue = queue;
        }

        public void shutdown() {
            stop = true;
            this.interrupt();
            try {
                this.join();
            }
            catch(InterruptedException ie) {}
        }

        public void run() {
            while(!stop) {
                Request request = queue.getRequest();
                System.out.println("[ServerThread] handle request: " + request);
                try {
                    Thread.sleep(2000);
                }
                catch(InterruptedException ie) {}
            }
            System.out.println("[ServerThread] shutdown.");
        }
    }

    服務(wù)器線程在紅色部分可能會(huì)阻塞,也就是說,Queue.getRequest是一個(gè)阻塞方法。這和java標(biāo)準(zhǔn)庫的許多IO方法類似。

    最后,寫一個(gè)Main來啟動(dòng)他們:

    package com.crackj2ee.thread;

    public class Main {

        public static void main(String[] args) {
            Queue queue = new Queue();
            ServerThread server = new ServerThread(queue);
            server.start();
            ClientThread[] clients = new ClientThread[5];
            for(int i=0; i<clients.length; i++) {
                clients[i] = new ClientThread(queue, ""+i);
                clients[i].start();
            }
            try {
                Thread.sleep(100000);
            }
            catch(InterruptedException ie) {}
            server.shutdown();
        }
    }

    我們啟動(dòng)了5個(gè)客戶端線程和一個(gè)服務(wù)器線程,運(yùn)行結(jié)果如下:

    [ClientThread-0] send request: Request-4984
    [ServerThread] handle request: Request-4984
    [ClientThread-1] send request: Request-2020
    [ClientThread-2] send request: Request-8980
    [ClientThread-3] send request: Request-5044
    [ClientThread-4] send request: Request-548
    [ClientThread-4] send request: Request-6832
    [ServerThread] handle request: Request-2020
    [ServerThread] handle request: Request-8980
    [ServerThread] handle request: Request-5044
    [ServerThread] handle request: Request-548
    [ClientThread-4] send request: Request-1681
    [ClientThread-0] send request: Request-7859
    [ClientThread-3] send request: Request-3926
    [ServerThread] handle request: Request-6832
    [ClientThread-2] send request: Request-9906
    ......

    可以觀察到ServerThread處理來自不同客戶端的請求。

    思考

    Q: 服務(wù)器線程的wait條件while(queue.size()==0)能否換成if(queue.size()==0)?

    A: 在這個(gè)例子中可以,因?yàn)榉?wù)器線程只有一個(gè)。但是,如果服務(wù)器線程有多個(gè)(例如Web應(yīng)用程序有多個(gè)線程處理并發(fā)請求,這非常普遍),就會(huì)造成嚴(yán)重問題。

    Q: 能否用sleep(1000)代替wait()?

    A: 絕對不可以。sleep()不會(huì)釋放鎖,因此sleep期間別的線程根本沒有辦法調(diào)用getRequest()和putRequest(),導(dǎo)致所有相關(guān)線程都被阻塞。

    Q: (Request)queue.remove(0)可以放到synchronized() {}塊外面嗎?

    A: 不可以。因?yàn)閣hile()是測試queue,remove()是使用queue,兩者是一個(gè)原子操作,不能放在synchronized外面。

    總結(jié)

    多線程設(shè)計(jì)看似簡單,實(shí)際上必須非常仔細(xì)地考慮各種鎖定/同步的條件,稍不小心,就可能出錯(cuò)。并且,當(dāng)線程較少時(shí),很可能發(fā)現(xiàn)不了問題,一旦問題出現(xiàn)又難以調(diào)試。

    所幸的是,已有一些被驗(yàn)證過的模式可以供我們使用,我們會(huì)繼續(xù)介紹一些常用的多線程設(shè)計(jì)模式。


    前面談了多線程應(yīng)用程序能極大地改善用戶相應(yīng)。例如對于一個(gè)Web應(yīng)用程序,每當(dāng)一個(gè)用戶請求服務(wù)器連接時(shí),服務(wù)器就可以啟動(dòng)一個(gè)新線程為用戶服務(wù)。

    然而,創(chuàng)建和銷毀線程本身就有一定的開銷,如果頻繁創(chuàng)建和銷毀線程,CPU和內(nèi)存開銷就不可忽略,垃圾收集器還必須負(fù)擔(dān)更多的工作。因此,線程池就是為了避免頻繁創(chuàng)建和銷毀線程。

    每當(dāng)服務(wù)器接受了一個(gè)新的請求后,服務(wù)器就從線程池中挑選一個(gè)等待的線程并執(zhí)行請求處理。處理完畢后,線程并不結(jié)束,而是轉(zhuǎn)為阻塞狀態(tài)再次被放入線程池中。這樣就避免了頻繁創(chuàng)建和銷毀線程。

    Worker Pattern實(shí)現(xiàn)了類似線程池的功能。首先定義Task接口:

    package com.crackj2ee.thread;
    public interface Task {
        void execute();
    }

    線程將負(fù)責(zé)執(zhí)行execute()方法。注意到任務(wù)是由子類通過實(shí)現(xiàn)execute()方法實(shí)現(xiàn)的,線程本身并不知道自己執(zhí)行的任務(wù)。它只負(fù)責(zé)運(yùn)行一個(gè)耗時(shí)的execute()方法。

    具體任務(wù)由子類實(shí)現(xiàn),我們定義了一個(gè)CalculateTask和一個(gè)TimerTask:

    // CalculateTask.java
    package com.crackj2ee.thread;
    public class CalculateTask implements Task {
        private static int count = 0;
        private int num = count;
        public CalculateTask() {
            count++;
        }
        public void execute() {
            System.out.println("[CalculateTask " + num + "] start...");
            try {
                Thread.sleep(3000);
            }
            catch(InterruptedException ie) {}
            System.out.println("[CalculateTask " + num + "] done.");
        }
    }

    // TimerTask.java
    package com.crackj2ee.thread;
    public class TimerTask implements Task {
        private static int count = 0;
        private int num = count;
        public TimerTask() {
            count++;
        }
        public void execute() {
            System.out.println("[TimerTask " + num + "] start...");
            try {
                Thread.sleep(2000);
            }
            catch(InterruptedException ie) {}
            System.out.println("[TimerTask " + num + "] done.");
        }
    }

    以上任務(wù)均簡單的sleep若干秒。

    TaskQueue實(shí)現(xiàn)了一個(gè)隊(duì)列,客戶端可以將請求放入隊(duì)列,服務(wù)器線程可以從隊(duì)列中取出任務(wù):

    package com.crackj2ee.thread;
    import java.util.*;
    public class TaskQueue {
        private List queue = new LinkedList();
        public synchronized Task getTask() {
            while(queue.size()==0) {
                try {
                    this.wait();
                }
                catch(InterruptedException ie) {
                    return null;
                }
            }
            return (Task)queue.remove(0);
        }
        public synchronized void putTask(Task task) {
            queue.add(task);
            this.notifyAll();
        }
    }

    終于到了真正的WorkerThread,這是真正執(zhí)行任務(wù)的服務(wù)器線程:

    package com.crackj2ee.thread;
    public class WorkerThread extends Thread {
        private static int count = 0;
        private boolean busy = false;
        private boolean stop = false;
        private TaskQueue queue;
        public WorkerThread(ThreadGroup group, TaskQueue queue) {
            super(group, "worker-" + count);
            count++;
            this.queue = queue;
        }
        public void shutdown() {
            stop = true;
            this.interrupt();
            try {
                this.join();
            }
            catch(InterruptedException ie) {}
        }
        public boolean isIdle() {
            return !busy;
        }
        public void run() {
            System.out.println(getName() + " start.");        
            while(!stop) {
                Task task = queue.getTask();
                if(task!=null) {
                    busy = true;
                    task.execute();
                    busy = false;
                }
            }
            System.out.println(getName() + " end.");
        }
    }

    前面已經(jīng)講過,queue.getTask()是一個(gè)阻塞方法,服務(wù)器線程可能在此wait()一段時(shí)間。此外,WorkerThread還有一個(gè)shutdown方法,用于安全結(jié)束線程。

    最后是ThreadPool,負(fù)責(zé)管理所有的服務(wù)器線程,還可以動(dòng)態(tài)增加和減少線程數(shù):

    package com.crackj2ee.thread;
    import java.util.*;
    public class ThreadPool extends ThreadGroup {
        private List threads = new LinkedList();
        private TaskQueue queue;
        public ThreadPool(TaskQueue queue) {
            super("Thread-Pool");
            this.queue = queue;
        }
        public synchronized void addWorkerThread() {
            Thread t = new WorkerThread(this, queue);
            threads.add(t);
            t.start();
        }
        public synchronized void removeWorkerThread() {
            if(threads.size()>0) {
                WorkerThread t = (WorkerThread)threads.remove(0);
                t.shutdown();
            }
        }
        public synchronized void currentStatus() {
            System.out.println("-----------------------------------------------");
            System.out.println("Thread count = " + threads.size());
            Iterator it = threads.iterator();
            while(it.hasNext()) {
                WorkerThread t = (WorkerThread)it.next();
                System.out.println(t.getName() + ": " + (t.isIdle() ? "idle" : "busy"));
            }
            System.out.println("-----------------------------------------------");
        }
    }

    currentStatus()方法是為了方便調(diào)試,打印出所有線程的當(dāng)前狀態(tài)。

    最后,Main負(fù)責(zé)完成main()方法:

    package com.crackj2ee.thread;
    public class Main {
        public static void main(String[] args) {
            TaskQueue queue = new TaskQueue();
            ThreadPool pool = new ThreadPool(queue);
            for(int i=0; i<10; i++) {
                queue.putTask(new CalculateTask());
                queue.putTask(new TimerTask());
            }
            pool.addWorkerThread();
            pool.addWorkerThread();
            doSleep(8000);
            pool.currentStatus();
            pool.addWorkerThread();
            pool.addWorkerThread();
            pool.addWorkerThread();
            pool.addWorkerThread();
            pool.addWorkerThread();
            doSleep(5000);
            pool.currentStatus();
        }
        private static void doSleep(long ms) {
            try {
                Thread.sleep(ms);
            }
            catch(InterruptedException ie) {}
        }
    }

    main()一開始放入了20個(gè)Task,然后動(dòng)態(tài)添加了一些服務(wù)線程,并定期打印線程狀態(tài),運(yùn)行結(jié)果如下:

    worker-0 start.
    [CalculateTask 0] start...
    worker-1 start.
    [TimerTask 0] start...
    [TimerTask 0] done.
    [CalculateTask 1] start...
    [CalculateTask 0] done.
    [TimerTask 1] start...
    [CalculateTask 1] done.
    [CalculateTask 2] start...
    [TimerTask 1] done.
    [TimerTask 2] start...
    [TimerTask 2] done.
    [CalculateTask 3] start...
    -----------------------------------------------
    Thread count = 2
    worker-0: busy
    worker-1: busy
    -----------------------------------------------
    [CalculateTask 2] done.
    [TimerTask 3] start...
    worker-2 start.
    [CalculateTask 4] start...
    worker-3 start.
    [TimerTask 4] start...
    worker-4 start.
    [CalculateTask 5] start...
    worker-5 start.
    [TimerTask 5] start...
    worker-6 start.
    [CalculateTask 6] start...
    [CalculateTask 3] done.
    [TimerTask 6] start...
    [TimerTask 3] done.
    [CalculateTask 7] start...
    [TimerTask 4] done.
    [TimerTask 7] start...
    [TimerTask 5] done.
    [CalculateTask 8] start...
    [CalculateTask 4] done.
    [TimerTask 8] start...
    [CalculateTask 5] done.
    [CalculateTask 9] start...
    [CalculateTask 6] done.
    [TimerTask 9] start...
    [TimerTask 6] done.
    [TimerTask 7] done.
    -----------------------------------------------
    Thread count = 7
    worker-0: idle
    worker-1: busy
    worker-2: busy
    worker-3: idle
    worker-4: busy
    worker-5: busy
    worker-6: busy
    -----------------------------------------------
    [CalculateTask 7] done.
    [CalculateTask 8] done.
    [TimerTask 8] done.
    [TimerTask 9] done.
    [CalculateTask 9] done.

    仔細(xì)觀察:一開始只有兩個(gè)服務(wù)器線程,因此線程狀態(tài)都是忙,后來線程數(shù)增多,6個(gè)線程中的兩個(gè)狀態(tài)變成idle,說明處于wait()狀態(tài)。

    思考:本例的線程調(diào)度算法其實(shí)根本沒有,因?yàn)檫@個(gè)應(yīng)用是圍繞TaskQueue設(shè)計(jì)的,不是以Thread Pool為中心設(shè)計(jì)的。因此,Task調(diào)度取決于TaskQueue的getTask()方法,你可以改進(jìn)這個(gè)方法,例如使用優(yōu)先隊(duì)列,使優(yōu)先級高的任務(wù)先被執(zhí)行。

    如果所有的服務(wù)器線程都處于busy狀態(tài),則說明任務(wù)繁忙,TaskQueue的隊(duì)列越來越長,最終會(huì)導(dǎo)致服務(wù)器內(nèi)存耗盡。因此,可以限制TaskQueue的等待任務(wù)數(shù),超過最大長度就拒絕處理。許多Web服務(wù)器在用戶請求繁忙時(shí)就會(huì)拒絕用戶:HTTP 503 SERVICE UNAVAILABLE


    多線程讀寫同一個(gè)對象的數(shù)據(jù)是很普遍的,通常,要避免讀寫沖突,必須保證任何時(shí)候僅有一個(gè)線程在寫入,有線程正在讀取的時(shí)候,寫入操作就必須等待。簡單說,就是要避免“寫-寫”沖突和“讀-寫”沖突。但是同時(shí)讀是允許的,因?yàn)椤白x-讀”不沖突,而且很安全。

    要實(shí)現(xiàn)以上的ReadWriteLock,簡單的使用synchronized就不行,我們必須自己設(shè)計(jì)一個(gè)ReadWriteLock類,在讀之前,必須先獲得“讀鎖”,寫之前,必須先獲得“寫鎖”。舉例說明:

    DataHandler對象保存了一個(gè)可讀寫的char[]數(shù)組:

    package com.crackj2ee.thread;

    public class DataHandler {
        // store data:
        private char[] buffer = "AAAAAAAAAA".toCharArray();

        private char[] doRead() {
            char[] ret = new char[buffer.length];
            for(int i=0; i<buffer.length; i++) {
                ret[i] = buffer[i];
                sleep(3);
            }
            return ret;
        }

        private void doWrite(char[] data) {
            if(data!=null) {
                buffer = new char[data.length];
                for(int i=0; i<buffer.length; i++) {
                    buffer[i] = data[i];
                    sleep(10);
                }
            }
        }

        private void sleep(int ms) {
            try {
                Thread.sleep(ms);
            }
            catch(InterruptedException ie) {}
        }
    }

    doRead()和doWrite()方法是非線程安全的讀寫方法。為了演示,加入了sleep(),并設(shè)置讀的速度大約是寫的3倍,這符合通常的情況。

    為了讓多線程能安全讀寫,我們設(shè)計(jì)了一個(gè)ReadWriteLock:

    package com.crackj2ee.thread;
    public class ReadWriteLock {
        private int readingThreads = 0;
        private int writingThreads = 0;
        private int waitingThreads = 0; // waiting for write
        private boolean preferWrite = true;

        public synchronized void readLock() throws InterruptedException {
            while(writingThreads>0 || (preferWrite && waitingThreads>0))
                this.wait();
            readingThreads++;
        }

        public synchronized void readUnlock() {
            readingThreads--;
            preferWrite = true;
            notifyAll();
        }

        public synchronized void writeLock() throws InterruptedException {
            waitingThreads++;
            try {
                while(readingThreads>0 || writingThreads>0)
                    this.wait();
            }
            finally {
                waitingThreads--;
            }
            writingThreads++;
        }

        public synchronized void writeUnlock() {
            writingThreads--;
            preferWrite = false;
            notifyAll();
        }
    }

    readLock()用于獲得讀鎖,readUnlock()釋放讀鎖,writeLock()和writeUnlock()一樣。由于鎖用完必須釋放,因此,必須保證lock和unlock匹配。我們修改DataHandler,加入ReadWriteLock:

    package com.crackj2ee.thread;
    public class DataHandler {
        // store data:
        private char[] buffer = "AAAAAAAAAA".toCharArray();
        // lock:
        private ReadWriteLock lock = new ReadWriteLock();

        public char[] read(String name) throws InterruptedException {
            System.out.println(name + " waiting for read...");
            lock.readLock();
            try {
                char[] data = doRead();
                System.out.println(name + " reads data: " + new String(data));
                return data;
            }
            finally {
                lock.readUnlock();
            }
        }

        public void write(String name, char[] data) throws InterruptedException {
            System.out.println(name + " waiting for write...");
            lock.writeLock();
            try {
                System.out.println(name + " wrote data: " + new String(data));
                doWrite(data);
            }
            finally {
                lock.writeUnlock();
            }
        }

        private char[] doRead() {
            char[] ret = new char[buffer.length];
            for(int i=0; i<buffer.length; i++) {
                ret[i] = buffer[i];
                sleep(3);
            }
            return ret;
        }
        private void doWrite(char[] data) {
            if(data!=null) {
                buffer = new char[data.length];
                for(int i=0; i<buffer.length; i++) {
                    buffer[i] = data[i];
                    sleep(10);
                }
            }
        }
        private void sleep(int ms) {
            try {
                Thread.sleep(ms);
            }
            catch(InterruptedException ie) {}
        }
    }

    public方法read()和write()完全封裝了底層的ReadWriteLock,因此,多線程可以安全地調(diào)用這兩個(gè)方法:

    // ReadingThread不斷讀取數(shù)據(jù):
    package com.crackj2ee.thread;
    public class ReadingThread extends Thread {
        private DataHandler handler;
        public ReadingThread(DataHandler handler) {
            this.handler = handler;
        }
        public void run() {
            for(;;) {
                try {
                    char[] data = handler.read(getName());
                    Thread.sleep((long)(Math.random()*1000+100));
                }
                catch(InterruptedException ie) {
                    break;
                }
            }
        }
    }

    // WritingThread不斷寫入數(shù)據(jù),每次寫入的都是10個(gè)相同的字符:
    package com.crackj2ee.thread;
    public class WritingThread extends Thread {
        private DataHandler handler;
        public WritingThread(DataHandler handler) {
            this.handler = handler;
        }
        public void run() {
            char[] data = new char[10];
            for(;;) {
                try {
                    fill(data);
                    handler.write(getName(), data);
                    Thread.sleep((long)(Math.random()*1000+100));
                }
                catch(InterruptedException ie) {
                    break;
                }
            }
        }
        // 產(chǎn)生一個(gè)A-Z隨機(jī)字符,填入char[10]:
        private void fill(char[] data) {
            char c = (char)(Math.random()*26+'A');
            for(int i=0; i<data.length; i++)
                data[i] = c;
        }
    }

    最后Main負(fù)責(zé)啟動(dòng)這些線程:

    package com.crackj2ee.thread;
    public class Main {
        public static void main(String[] args) {
            DataHandler handler = new DataHandler();
            Thread[] ts = new Thread[] {
                    new ReadingThread(handler),
                    new ReadingThread(handler),
                    new ReadingThread(handler),
                    new ReadingThread(handler),
                    new ReadingThread(handler),
                    new WritingThread(handler),
                    new WritingThread(handler)
            };
            for(int i=0; i<ts.length; i++) {
                ts[i].start();
            }
        }
    }

    我們啟動(dòng)了5個(gè)讀線程和2個(gè)寫線程,運(yùn)行結(jié)果如下:

    Thread-0 waiting for read...
    Thread-1 waiting for read...
    Thread-2 waiting for read...
    Thread-3 waiting for read...
    Thread-4 waiting for read...
    Thread-5 waiting for write...
    Thread-6 waiting for write...
    Thread-4 reads data: AAAAAAAAAA
    Thread-3 reads data: AAAAAAAAAA
    Thread-2 reads data: AAAAAAAAAA
    Thread-1 reads data: AAAAAAAAAA
    Thread-0 reads data: AAAAAAAAAA
    Thread-5 wrote data: EEEEEEEEEE
    Thread-6 wrote data: MMMMMMMMMM
    Thread-1 waiting for read...
    Thread-4 waiting for read...
    Thread-1 reads data: MMMMMMMMMM
    Thread-4 reads data: MMMMMMMMMM
    Thread-2 waiting for read...
    Thread-2 reads data: MMMMMMMMMM
    Thread-0 waiting for read...
    Thread-0 reads data: MMMMMMMMMM
    Thread-4 waiting for read...
    Thread-4 reads data: MMMMMMMMMM
    Thread-2 waiting for read...
    Thread-5 waiting for write...
    Thread-2 reads data: MMMMMMMMMM
    Thread-5 wrote data: GGGGGGGGGG
    Thread-6 waiting for write...
    Thread-6 wrote data: AAAAAAAAAA
    Thread-3 waiting for read...
    Thread-3 reads data: AAAAAAAAAA
    ......

    可以看到,每次讀/寫都是完整的原子操作,因?yàn)槲覀兠看螌懭氲亩际?0個(gè)相同字符。并且,每次讀出的都是最近一次寫入的內(nèi)容。

    如果去掉ReadWriteLock:

    package com.crackj2ee.thread;
    public class DataHandler {

        // store data:
        private char[] buffer = "AAAAAAAAAA".toCharArray();

        public char[] read(String name) throws InterruptedException {
            char[] data = doRead();
            System.out.println(name + " reads data: " + new String(data));
            return data;
        }
        public void write(String name, char[] data) throws InterruptedException {
            System.out.println(name + " wrote data: " + new String(data));
            doWrite(data);
        }

        private char[] doRead() {
            char[] ret = new char[10];
            for(int i=0; i<10; i++) {
                ret[i] = buffer[i];
                sleep(3);
            }
            return ret;
        }
        private void doWrite(char[] data) {
            for(int i=0; i<10; i++) {
                buffer[i] = data[i];
                sleep(10);
            }
        }
        private void sleep(int ms) {
            try {
                Thread.sleep(ms);
            }
            catch(InterruptedException ie) {}
        }
    }

    運(yùn)行結(jié)果如下:

    Thread-5 wrote data: AAAAAAAAAA
    Thread-6 wrote data: MMMMMMMMMM
    Thread-0 reads data: AAAAAAAAAA
    Thread-1 reads data: AAAAAAAAAA
    Thread-2 reads data: AAAAAAAAAA
    Thread-3 reads data: AAAAAAAAAA
    Thread-4 reads data: AAAAAAAAAA
    Thread-2 reads data: MAAAAAAAAA
    Thread-3 reads data: MAAAAAAAAA
    Thread-5 wrote data: CCCCCCCCCC
    Thread-1 reads data: MAAAAAAAAA
    Thread-0 reads data: MAAAAAAAAA
    Thread-4 reads data: MAAAAAAAAA
    Thread-6 wrote data: EEEEEEEEEE
    Thread-3 reads data: EEEEECCCCC
    Thread-4 reads data: EEEEEEEEEC
    Thread-1 reads data: EEEEEEEEEE

    可以看到在Thread-6寫入EEEEEEEEEE的過程中,3個(gè)線程讀取的內(nèi)容是不同的。

    思考

    java的synchronized提供了最底層的物理鎖,要在synchronized的基礎(chǔ)上,實(shí)現(xiàn)自己的邏輯鎖,就必須仔細(xì)設(shè)計(jì)ReadWriteLock。

    Q: lock.readLock()為什么不放入try{ } 內(nèi)?
    A: 因?yàn)閞eadLock()會(huì)拋出InterruptedException,導(dǎo)致readingThreads++不執(zhí)行,而readUnlock()在finally{ } 中,導(dǎo)致readingThreads--執(zhí)行,從而使readingThread狀態(tài)出錯(cuò)。writeLock()也是類似的。

    Q: preferWrite有用嗎?
    A: 如果去掉preferWrite,線程安全不受影響。但是,如果讀取線程很多,上一個(gè)線程還沒有讀取完,下一個(gè)線程又開始讀了,就導(dǎo)致寫入線程長時(shí)間無法獲得writeLock;如果寫入線程等待的很多,一個(gè)接一個(gè)寫,也會(huì)導(dǎo)致讀取線程長時(shí)間無法獲得readLock。preferWrite的作用是讓讀 /寫交替執(zhí)行,避免由于讀線程繁忙導(dǎo)致寫無法進(jìn)行和由于寫線程繁忙導(dǎo)致讀無法進(jìn)行。

    Q: notifyAll()換成notify()行不行?
    A: 不可以。由于preferWrite的存在,如果一個(gè)線程剛讀取完畢,此時(shí)preferWrite=true,再notify(),若恰好喚醒的是一個(gè)讀線程,則while(writingThreads>0 || (preferWrite && waitingThreads>0))可能為true導(dǎo)致該讀線程繼續(xù)等待,而等待寫入的線程也處于wait()中,結(jié)果所有線程都處于wait ()狀態(tài),誰也無法喚醒誰。因此,notifyAll()比notify()要來得安全。程序驗(yàn)證notify()帶來的死鎖:

    Thread-0 waiting for read...
    Thread-1 waiting for read...
    Thread-2 waiting for read...
    Thread-3 waiting for read...
    Thread-4 waiting for read...
    Thread-5 waiting for write...
    Thread-6 waiting for write...
    Thread-0 reads data: AAAAAAAAAA
    Thread-4 reads data: AAAAAAAAAA
    Thread-3 reads data: AAAAAAAAAA
    Thread-2 reads data: AAAAAAAAAA
    Thread-1 reads data: AAAAAAAAAA
    Thread-5 wrote data: CCCCCCCCCC
    Thread-2 waiting for read...
    Thread-1 waiting for read...
    Thread-3 waiting for read...
    Thread-0 waiting for read...
    Thread-4 waiting for read...
    Thread-6 wrote data: LLLLLLLLLL
    Thread-5 waiting for write...
    Thread-6 waiting for write...
    Thread-2 reads data: LLLLLLLLLL
    Thread-2 waiting for read...
    (運(yùn)行到此不動(dòng)了)

    注意到這種死鎖是由于所有線程都在等待別的線程喚醒自己,結(jié)果都無法醒過來。這和兩個(gè)線程希望獲得對方已有的鎖造成死鎖不同。因此多線程設(shè)計(jì)的難度遠(yuǎn)遠(yuǎn)高于單線程應(yīng)用。
    posted on 2005-10-24 22:22 zjw_albert 閱讀(111) 評論(0)  編輯  收藏

    只有注冊用戶登錄后才能發(fā)表評論。


    網(wǎng)站導(dǎo)航:
     
    主站蜘蛛池模板: 亚洲日韩乱码久久久久久| 日韩va亚洲va欧洲va国产| 亚洲13又紧又嫩又水多| 国产成人精品免费视频大全麻豆| 亚洲国产精品国自产拍电影| 久久成人免费电影| 亚洲国产一区在线| 亚洲免费观看网站| 亚洲日本久久久午夜精品| 免费观看的毛片手机视频| 日韩精品亚洲专区在线影视| 国产精品无码一区二区三区免费| 国产精品亚洲专区无码不卡| 国产免费私拍一区二区三区| 一级特黄色毛片免费看| 久久91亚洲人成电影网站| 99久久免费精品视频| 中文字幕在线观看亚洲日韩| 免费人成在线观看播放国产| 日本一区二区三区在线视频观看免费 | 亚洲国产天堂久久综合网站| 67194国产精品免费观看| 亚洲另类小说图片| 在线观看成人免费视频| 一级毛片免费在线观看网站| 在线精品亚洲一区二区小说| 暖暖日本免费中文字幕| 亚洲一本之道高清乱码| 国产高清视频在线免费观看| WWW国产成人免费观看视频| 亚洲免费在线播放| 日韩高清在线免费观看| 香蕉视频在线免费看| 亚洲乱码中文论理电影| 免费人成年激情视频在线观看| 日本免费A级毛一片| 亚洲一级毛片在线播放| 亚洲日本在线观看视频| 最近中文字幕高清免费中文字幕mv | 抽搐一进一出gif免费视频| 亚洲精品国产成人|