<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    posts - 56,  comments - 12,  trackbacks - 0
    Java5增加了新的類庫并發(fā)集java.util.concurrent,該類庫為并發(fā)程序提供了豐富的API多線程編程在Java 5中更加容易,靈活。本文通過一個網(wǎng)絡(luò)服務(wù)器模型,來實踐Java5的多線程編程,該模型中使用了Java5中的線程池,阻塞隊列,可重入鎖等,還實踐了 Callable, Future等接口,并使用了Java 5的另外一個新特性泛型。

      簡介

      本文將實現(xiàn)一個網(wǎng)絡(luò)服務(wù)器模型,一旦有客戶端連接到該服務(wù)器,則啟動一個新線程為該連接服務(wù),服務(wù)內(nèi)容為往客戶端輸送一些字符信息。一個典型的網(wǎng)絡(luò)服務(wù)器模型如下:

      1. 建立監(jiān)聽端口。

      2. 發(fā)現(xiàn)有新連接,接受連接,啟動線程,執(zhí)行服務(wù)線程。 3. 服務(wù)完畢,關(guān)閉線程。

    這個模型在大部分情況下運行良好,但是需要頻繁的處理用戶請求而每次請求需要的服務(wù)又是簡短的時候,系統(tǒng)會將大量的時間花費在線程的創(chuàng)建銷毀。Java 5的線程池克服了這些缺點。通過對重用線程來執(zhí)行多個任務(wù),避免了頻繁線程的創(chuàng)建與銷毀開銷,使得服務(wù)器的性能方面得到很大提高。因此,本文的網(wǎng)絡(luò)服務(wù)器 模型將如下:

      1. 建立監(jiān)聽端口,創(chuàng)建線程池。

      2. 發(fā)現(xiàn)有新連接,使用線程池來執(zhí)行服務(wù)任務(wù)。

      3. 服務(wù)完畢,釋放線程到線程池。

      下面詳細介紹如何使用Java 5的concurrent包提供的API來實現(xiàn)該服務(wù)器。

      初始化

    初始化包括創(chuàng)建線程池以及初始化監(jiān)聽端口。創(chuàng)建線程池可以通過調(diào)用java.util.concurrent.Executors類里的靜態(tài)方法 newChahedThreadPool或是newFixedThreadPool來創(chuàng)建,也可以通過新建一個 java.util.concurrent.ThreadPoolExecutor實例來執(zhí)行任務(wù)。這里我們采用newFixedThreadPool方 法來建立線程池。

    ExecutorService pool = Executors.newFixedThreadPool(10);

      表示新建了一個線程池,線程池里面有10個線程為任務(wù)隊列服務(wù)。

      使用ServerSocket對象來初始化監(jiān)聽端口。

    private static final int PORT = 19527;
    serverListenSocket = new ServerSocket(PORT);
    serverListenSocket.setReuseAddress(true);
    serverListenSocket.setReuseAddress(true);

      服務(wù)新連接

      當有新連接建立時,accept返回時,將服務(wù)任務(wù)提交給線程池執(zhí)行。

    while(true){
     Socket socket = serverListenSocket.accept();
     pool.execute(new ServiceThread(socket));
    }

      這里使用線程池對象來執(zhí)行線程,減少了每次線程創(chuàng)建和銷毀的開銷。任務(wù)執(zhí)行完畢,線程釋放到線程池。

      服務(wù)任務(wù)

    服務(wù)線程ServiceThread維護一個count來記錄服務(wù)線程被調(diào)用的次數(shù)。每當服務(wù)任務(wù)被調(diào)用一次時,count的值自增1,因此 ServiceThread提供一個increaseCount和getCount的方法,分別將count值自增1和取得該count值。由于可能多個 線程存在競爭,同時訪問count,因此需要加鎖機制,在Java 5之前,我們只能使用synchronized來鎖定。Java 5中引入了性能更加粒度更細的重入鎖ReentrantLock。我們使用ReentrantLock保證代碼線程安全。下面是具體代碼:

    private static ReentrantLock lock = new ReentrantLock ();
    private static int count = 0;
    private int getCount(){
     int ret = 0;
     try{
      lock.lock();
      ret = count;
     }finally{
      lock.unlock();
     }
     return ret;
    }
    private void increaseCount(){
     try{
      lock.lock();
      ++count;
     }finally{
      lock.unlock();
     }
    }

      服務(wù)線程在開始給客戶端打印一個歡迎信息,

    increaseCount();
    int curCount = getCount();
    helloString = "hello, id = " + curCount+"\r\n";
    dos = new DataOutputStream(connectedSocket.getOutputStream());
    dos.write(helloString.getBytes());

    然后使用ExecutorService的submit方法提交一個Callable的任務(wù),返回一個Future接口的引用。這種做法對費時的任務(wù)非 常有效,submit任務(wù)之后可以繼續(xù)執(zhí)行下面的代碼,然后在適當?shù)奈恢每梢允褂肍uture的get方法來獲取結(jié)果,如果這時候該方法已經(jīng)執(zhí)行完畢,則 無需等待即可獲得結(jié)果,如果還在執(zhí)行,則等待到運行完畢。

    ExecutorService executor = Executors.newSingleThreadExecutor();
    Future future = executor.submit(new TimeConsumingTask());
    dos.write("let's do soemthing other".getBytes());
    String result = future.get();
    dos.write(result.getBytes());

      其中TimeConsumingTask實現(xiàn)了Callable接口

    class TimeConsumingTask implements Callable {
     public String call() throws Exception {
      System.out.println("It's a time-consuming task, you'd better retrieve your result in the furture");
      return "ok, here's the result: It takes me lots of time to produce this result";
     }
    }

    這里使用了Java 5的另外一個新特性泛型,聲明TimeConsumingTask的時候使用了String做為類型參數(shù)。必須實現(xiàn)Callable接口的call函數(shù), 其作用類似與Runnable中的run函數(shù),在call函數(shù)里寫入要執(zhí)行的代碼,其返回值類型等同于在類聲明中傳入的類型值。在這段程序中,我們提交了 一個Callable的任務(wù),然后程序不會堵塞,而是繼續(xù)執(zhí)行dos.write("let's do soemthing other".getBytes());當程序執(zhí)行到String result = future.get()時如果call函數(shù)已經(jīng)執(zhí)行完畢,則取得返回值,如果還在執(zhí)行,則等待其執(zhí)行完畢。

     

    服務(wù)器端的完整實現(xiàn)

      服務(wù)器端的完整實現(xiàn)代碼如下:

    package com.andrew;

    import java.io.DataOutputStream;
    import java.io.IOException;
    import java.io.Serializable;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.RejectedExecutionHandler;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.ReentrantLock;

    public class Server {
     private static int produceTaskSleepTime = 100;
     private static int consuskSleepTime = 1200;
     private static int produceTaskMaxNumber = 100;
     private static final int CORE_POOL_SIZE = 2;
     private static final int MAX_POOL_SIZE = 100;
     private static final int KEEPALIVE_TIME = 3;
     private static final int QUEUE_CAPACITY = (CORE_POOL_SIZE + MAX_POOL_SIZE) / 2;
     private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;
     private static final String HOST = "127.0.0.1";
     private static final int PORT = 19527;
     private BlockingQueue workQueue = new ArrayBlockingQueue(QUEUE_CAPACITY);
     //private ThreadPoolExecutor serverThreadPool = null;
     private ExecutorService pool = null;
     private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.DiscardOldestPolicy();
     private ServerSocket serverListenSocket = null;
     private int times = 5;
     public void start() {
      // You can also init thread pool in this way.
      /*serverThreadPool = new ThreadPoolExecutor(CORE_POOL_SIZE,
      MAX_POOL_SIZE, KEEPALIVE_TIME, TIME_UNIT, workQueue,
      rejectedExecutionHandler);*/
      pool = Executors.newFixedThreadPool(10);
      try {
       serverListenSocket = new ServerSocket(PORT);
       serverListenSocket.setReuseAddress(true);

       System.out.println("I'm listening");
       while (times-- > 0) {
        Socket socket = serverListenSocket.accept();
        String welcomeString = "hello";
        //serverThreadPool.execute(new ServiceThread(socket, welcomeString));
        pool.execute(new ServiceThread(socket));
       }
      } catch (IOException e) {
       // TODO Auto-generated catch block
       e.printStackTrace();
      }
      cleanup();
     }

     public void cleanup() {
      if (null != serverListenSocket) {
       try {
        serverListenSocket.close();
       } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
       }
      }
      //serverThreadPool.shutdown();
      pool.shutdown();
     }

     public static void main(String args[]) {
      Server server = new Server();
      server.start();
     }
    }

    class ServiceThread implements Runnable, Serializable {
     private static final long serialVersionUID = 0;
     private Socket connectedSocket = null;
     private String helloString = null;
     private static int count = 0;
     private static ReentrantLock lock = new ReentrantLock();

     ServiceThread(Socket socket) {
      connectedSocket = socket;
     }

     public void run() {
      increaseCount();
      int curCount = getCount();
      helloString = "hello, id = " + curCount + "\r\n";

      ExecutorService executor = Executors.newSingleThreadExecutor();
      Future future = executor.submit(new TimeConsumingTask());

      DataOutputStream dos = null;
      try {
       dos = new DataOutputStream(connectedSocket.getOutputStream());
       dos.write(helloString.getBytes());
       try {
        dos.write("let's do soemthing other.\r\n".getBytes());
        String result = future.get();
        dos.write(result.getBytes());
       } catch (InterruptedException e) {
        e.printStackTrace();
       } catch (ExecutionException e) {
        e.printStackTrace();
       }
      } catch (IOException e) {
       // TODO Auto-generated catch block
       e.printStackTrace();
      } finally {
       if (null != connectedSocket) {
        try {
         connectedSocket.close();
        } catch (IOException e) {
         // TODO Auto-generated catch block
         e.printStackTrace();
        }
       }
       if (null != dos) {
        try {
         dos.close();
        } catch (IOException e) {
         // TODO Auto-generated catch block
         e.printStackTrace();
        }
       }
       executor.shutdown();
      }
     }

     private int getCount() {
      int ret = 0;
      try {
       lock.lock();
       ret = count;
      } finally {
       lock.unlock();
      }
      return ret;
     }

     private void increaseCount() {
      try {
       lock.lock();
       ++count;
      } finally {
       lock.unlock();
      }
     }
    }

    class TimeConsumingTask implements Callable {
     public String call() throws Exception {
      System.out.println("It's a time-consuming task, you'd better retrieve your result in the furture");
      return "ok, here's the result: It takes me lots of time to produce this result";
     }

    }
      運行程序

      運行服務(wù)端,客戶端只需使用telnet 127.0.0.1 19527 即可看到信息如下:
    posted on 2007-01-19 00:08 苦笑枯 閱讀(241) 評論(0)  編輯  收藏 所屬分類: Java
    收藏來自互聯(lián)網(wǎng),僅供學習。若有侵權(quán),請與我聯(lián)系!

    <2007年1月>
    31123456
    78910111213
    14151617181920
    21222324252627
    28293031123
    45678910

    常用鏈接

    留言簿(2)

    隨筆分類(56)

    隨筆檔案(56)

    搜索

    •  

    最新評論

    閱讀排行榜

    評論排行榜

    主站蜘蛛池模板: 在线a人片天堂免费观看高清 | 亚洲一区二区三区精品视频| 中国videos性高清免费| 又大又粗又爽a级毛片免费看| 亚洲欧洲专线一区| 日韩一级免费视频| 国产精品亚洲一区二区三区在线观看| 日韩免费视频在线观看| 亚洲AV无码精品国产成人| 四虎影视在线永久免费看黄| 美女被艹免费视频| jlzzjlzz亚洲乱熟在线播放| 二个人看的www免费视频| 国产日韩亚洲大尺度高清| 久久综合九色综合97免费下载 | 拍拍拍又黄又爽无挡视频免费| 亚洲精品国产日韩| 日本一区二区三区日本免费| 国产精品亚洲一区二区在线观看| 免费成人午夜视频| 中文字幕免费在线观看动作大片| 国产亚洲精品va在线| 国产91色综合久久免费| 亚洲欧洲国产综合AV无码久久| 亚洲第一区精品日韩在线播放| 国产免费一区二区三区免费视频| 久久精品亚洲综合专区| 91在线视频免费看| 欧洲美女大片免费播放器视频| 亚洲AV综合色区无码一区| 亚洲电影免费观看| 国产精品亚洲一区二区三区在线观看 | 欧洲亚洲综合一区二区三区 | 亚洲AV无码AV日韩AV网站| 久久久久亚洲av毛片大| 国产精品国产免费无码专区不卡 | 亚洲综合精品网站| 亚洲视频免费在线看| 日韩成人精品日本亚洲| 亚洲一本综合久久| 国产精品免费看久久久无码|