<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    Vincent.Chan‘s Blog

    常用鏈接

    統(tǒng)計

    積分與排名

    網(wǎng)站

    最新評論

    Java5 多線程實(shí)踐

    文檔選項(xiàng)
    將此頁作為電子郵件發(fā)送

    將此頁作為電子郵件發(fā)送

    未顯示需要 JavaScript 的文檔選項(xiàng)


    對此頁的評價

    幫助我們改進(jìn)這些內(nèi)容


    張黃矚 , 軟件工程師

    2006 年 1 月 18 日

    Java5 增加了新的類庫并發(fā)集java.util.concurrent,該類庫為并發(fā)程序提供了豐富的API多線程編程在Java 5中更加容易,靈活。本文通過一個網(wǎng)絡(luò)服務(wù)器模型,來實(shí)踐Java5的多線程編程,該模型中使用了Java5中的線程池,阻塞隊列,可重入鎖等,還實(shí)踐了 Callable, Future等接口,并使用了Java 5的另外一個新特性泛型。

    簡介

    本文將實(shí)現(xiàn)一個網(wǎng)絡(luò)服務(wù)器模型,一旦有客戶端連接到該服務(wù)器,則啟動一個新線程為該連接服務(wù),服務(wù)內(nèi)容為往客戶端輸送一些字符信息。一個典型的網(wǎng)絡(luò)服務(wù)器模型如下:

    1. 建立監(jiān)聽端口。

    2. 發(fā)現(xiàn)有新連接,接受連接,啟動線程,執(zhí)行服務(wù)線程。

    3. 服務(wù)完畢,關(guān)閉線程。

    這 個模型在大部分情況下運(yùn)行良好,但是需要頻繁的處理用戶請求而每次請求需要的服務(wù)又是簡短的時候,系統(tǒng)會將大量的時間花費(fèi)在線程的創(chuàng)建銷毀。Java 5的線程池克服了這些缺點(diǎn)。通過對重用線程來執(zhí)行多個任務(wù),避免了頻繁線程的創(chuàng)建與銷毀開銷,使得服務(wù)器的性能方面得到很大提高。因此,本文的網(wǎng)絡(luò)服務(wù)器 模型將如下:

    1. 建立監(jiān)聽端口,創(chuàng)建線程池。

    2. 發(fā)現(xiàn)有新連接,使用線程池來執(zhí)行服務(wù)任務(wù)。

    3. 服務(wù)完畢,釋放線程到線程池。

    下面詳細(xì)介紹如何使用Java 5的concurrent包提供的API來實(shí)現(xiàn)該服務(wù)器。



    回頁首


    初始化

    初 始化包括創(chuàng)建線程池以及初始化監(jiān)聽端口。 創(chuàng)建線程池可以通過調(diào)用java.util.concurrent.Executors類里的靜態(tài)方法newChahedThreadPool或是 newFixedThreadPool來創(chuàng)建,也可以通過新建一個java.util.concurrent.ThreadPoolExecutor實(shí)例 來執(zhí)行任務(wù)。這里我們采用newFixedThreadPool方法來建立線程池。

    ExecutorService pool = Executors.newFixedThreadPool(10);
    表示新建了一個線程池,線程池里面有10個線程為任務(wù)隊列服務(wù)。

    使用ServerSocket對象來初始化監(jiān)聽端口。




    private static final int PORT = 19527;
    serverListenSocket = new ServerSocket(PORT);
    serverListenSocket.setReuseAddress(true);
    serverListenSocket.setReuseAddress(true);



    回頁首


    服務(wù)新連接

    當(dāng)有新連接建立時,accept返回時,將服務(wù)任務(wù)提交給線程池執(zhí)行。





    while(true){
    Socket socket = serverListenSocket.accept();
    pool.execute(new ServiceThread(socket));

    }

    這里使用線程池對象來執(zhí)行線程,減少了每次線程創(chuàng)建和銷毀的開銷。任務(wù)執(zhí)行完畢,線程釋放到線程池。



    回頁首


    服務(wù)任務(wù)

    服 務(wù)線程ServiceThread維護(hù)一個count來記錄服務(wù)線程被調(diào)用的次數(shù)。每當(dāng)服務(wù)任務(wù)被調(diào)用一次時,count的值自增1,因此 ServiceThread提供一個increaseCount和getCount的方法,分別將count值自增1和取得該count值。由于可能多個 線程存在競爭,同時訪問count,因此需要加鎖機(jī)制,在Java 5之前,我們只能使用synchronized來鎖定。Java 5中引入了性能更加粒度更細(xì)的重入鎖ReentrantLock。我們使用ReentrantLock保證代碼線程安全。下面是具體代碼:





    private static ReentrantLock lock = new ReentrantLock ();
    private static int count = 0;
    private int getCount(){
    int ret = 0;
    try{
    lock.lock();
    ret = count;
    }finally{
    lock.unlock();
    }
    return ret;
    }
    private void increaseCount(){
    try{
    lock.lock();
    ++count;
    }finally{
    lock.unlock();
    }
    }

    服務(wù)線程在開始給客戶端打印一個歡迎信息,





    increaseCount();
    int curCount = getCount();
    helloString = "hello, id = " + curCount+"\r\n";
    dos = new DataOutputStream(connectedSocket.getOutputStream());
    dos.write(helloString.getBytes());

    然后使用ExecutorService的submit 方法提交一個Callable的任務(wù),返回一個Future接口的引用。這種做法對費(fèi)時的任務(wù)非常有效,submit任務(wù)之后可以繼續(xù)執(zhí)行下面的代碼,然 后在適當(dāng)?shù)奈恢每梢允褂肍uture的get方法來獲取結(jié)果,如果這時候該方法已經(jīng)執(zhí)行完畢,則無需等待即可獲得結(jié)果,如果還在執(zhí)行,則等待到運(yùn)行完畢。





    ExecutorService executor = Executors.newSingleThreadExecutor();
    Future <String> future = executor.submit(new TimeConsumingTask());
    dos.write("let's do soemthing other".getBytes());
    String result = future.get();
    dos.write(result.getBytes());
    其中TimeConsumingTask實(shí)現(xiàn)了Callable接口
    class TimeConsumingTask implements Callable <String>{
    public String call() throws Exception {
    System.out.println
    ("It's a time-consuming task,
    you'd better retrieve your result in the furture");
    return "ok, here's the result: It takes me lots of time to produce this result";
    }
    }

    這里使用了Java 5的另外一個新特性泛型,聲明TimeConsumingTask的時候使用了String做為類型參數(shù)。必須實(shí)現(xiàn)Callable接口的call函數(shù), 其作用類似與Runnable中的run函數(shù),在call函數(shù)里寫入要執(zhí)行的代碼,其返回值類型等同于在類聲明中傳入的類型值。在這段程序中,我們提交了 一個Callable的任務(wù),然后程序不會堵塞,而是繼續(xù)執(zhí)行dos.write("let's do soemthing other".getBytes());當(dāng)程序執(zhí)行到String result = future.get()時如果call函數(shù)已經(jīng)執(zhí)行完畢,則取得返回值,如果還在執(zhí)行,則等待其執(zhí)行完畢。



    回頁首


    服務(wù)器端的完整實(shí)現(xiàn)

    服務(wù)器端的完整實(shí)現(xiàn)代碼如下:





    package com.andrew;

    import java.io.DataOutputStream;
    import java.io.IOException;
    import java.io.Serializable;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.RejectedExecutionHandler;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.ReentrantLock;

    public class Server {

    private static int produceTaskSleepTime = 100;

    private static int consumeTaskSleepTime = 1200;

    private static int produceTaskMaxNumber = 100;

    private static final int CORE_POOL_SIZE = 2;

    private static final int MAX_POOL_SIZE = 100;

    private static final int KEEPALIVE_TIME = 3;

    private static final int QUEUE_CAPACITY = (CORE_POOL_SIZE + MAX_POOL_SIZE) / 2;

    private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;

    private static final String HOST = "127.0.0.1";

    private static final int PORT = 19527;

    private BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(
    QUEUE_CAPACITY);

    //private ThreadPoolExecutor serverThreadPool = null;

    private ExecutorService pool = null;

    private RejectedExecutionHandler rejectedExecutionHandler = new
    ThreadPoolExecutor.DiscardOldestPolicy();

    private ServerSocket serverListenSocket = null;

    private int times = 5;

    public void start() {
    // You can also init thread pool in this way.
    /*serverThreadPool = new ThreadPoolExecutor(CORE_POOL_SIZE,
    MAX_POOL_SIZE, KEEPALIVE_TIME, TIME_UNIT, workQueue,
    rejectedExecutionHandler);*/
    pool = Executors.newFixedThreadPool(10);
    try {
    serverListenSocket = new ServerSocket(PORT);
    serverListenSocket.setReuseAddress(true);

    System.out.println("I'm listening");
    while (times-- > 0) {
    Socket socket = serverListenSocket.accept();
    String welcomeString = "hello";
    //serverThreadPool.execute(new ServiceThread(socket, welcomeString));
    pool.execute(new ServiceThread(socket));
    }
    } catch (IOException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }
    cleanup();
    }

    public void cleanup() {
    if (null != serverListenSocket) {
    try {
    serverListenSocket.close();
    } catch (IOException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }
    }
    //serverThreadPool.shutdown();
    pool.shutdown();
    }

    public static void main(String args[]) {
    Server server = new Server();
    server.start();
    }
    }

    class ServiceThread implements Runnable, Serializable {
    private static final long serialVersionUID = 0;

    private Socket connectedSocket = null;

    private String helloString = null;

    private static int count = 0;

    private static ReentrantLock lock = new ReentrantLock();

    ServiceThread(Socket socket) {
    connectedSocket = socket;
    }

    public void run() {
    increaseCount();
    int curCount = getCount();
    helloString = "hello, id = " + curCount + "\r\n";

    ExecutorService executor = Executors.newSingleThreadExecutor();
    Future<String> future = executor.submit(new TimeConsumingTask());

    DataOutputStream dos = null;
    try {
    dos = new DataOutputStream(connectedSocket.getOutputStream());
    dos.write(helloString.getBytes());
    try {
    dos.write("let's do soemthing other.\r\n".getBytes());
    String result = future.get();
    dos.write(result.getBytes());
    } catch (InterruptedException e) {
    e.printStackTrace();
    } catch (ExecutionException e) {
    e.printStackTrace();
    }
    } catch (IOException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    } finally {
    if (null != connectedSocket) {
    try {
    connectedSocket.close();
    } catch (IOException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }
    }
    if (null != dos) {
    try {
    dos.close();
    } catch (IOException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }
    }
    executor.shutdown();
    }
    }

    private int getCount() {
    int ret = 0;
    try {
    lock.lock();
    ret = count;
    } finally {
    lock.unlock();
    }
    return ret;
    }

    private void increaseCount() {
    try {
    lock.lock();
    ++count;
    } finally {
    lock.unlock();
    }
    }
    }

    class TimeConsumingTask implements Callable<String> {
    public String call() throws Exception {
    System.out
    .println("It's a time-consuming task,
    you'd better retrieve your result in the furture");
    return "ok, here's the result: It takes me lots of time to produce this result";
    }

    }



    回頁首


    運(yùn)行程序

    運(yùn)行服務(wù)端,客戶端只需使用telnet 127.0.0.1 19527 即可看到信息如下:




    回頁首




    回頁首


    關(guān)于作者


    張黃矚 聯(lián)系方式zhanghuangzhu@gmail.com,熟悉 WBI Server Foundation, WPS 6.0,對Java,C/C++編程有濃厚的興趣。

    posted on 2006-02-18 15:05 Vincent.Chen 閱讀(129) 評論(0)  編輯  收藏 所屬分類: Java

    主站蜘蛛池模板: 亚洲色欲啪啪久久WWW综合网| 国产亚洲综合视频| 成年性午夜免费视频网站不卡| 亚洲狠狠色丁香婷婷综合| 亚洲熟妇无码另类久久久| 2021国产精品成人免费视频| 四虎成人免费观看在线网址 | 成人毛片18女人毛片免费96| 亚洲国产欧洲综合997久久| 国产亚洲AV夜间福利香蕉149| 波多野结衣在线免费观看| 免费很黄无遮挡的视频毛片| 久久久久久久亚洲Av无码| 亚洲五月午夜免费在线视频| 国产乱子伦精品免费女| 免费无遮挡无遮羞在线看| 777亚洲精品乱码久久久久久| 麻豆国产精品入口免费观看| 美女无遮挡拍拍拍免费视频 | 国产免费午夜a无码v视频| 巨胸狂喷奶水视频www网站免费| 亚洲一卡二卡三卡四卡无卡麻豆| 亚洲国产黄在线观看| 岛国岛国免费V片在线观看| 久久精品国产亚洲av麻豆蜜芽| 狠狠色婷婷狠狠狠亚洲综合| 成全视频免费高清| 免费无码毛片一区二区APP| 久久亚洲私人国产精品| | 青娱乐免费视频在线观看| 国产亚洲男人的天堂在线观看 | 69视频在线观看免费| 日韩亚洲人成在线综合| 91亚洲自偷在线观看国产馆| 国产V亚洲V天堂无码久久久| 久久久久免费看成人影片| 一级午夜免费视频| 蜜桃传媒一区二区亚洲AV| 国外亚洲成AV人片在线观看| 日本免费人成黄页在线观看视频|