Java5增加了新的類庫并發(fā)集java.util.concurrent,該類庫為并發(fā)程序提供了豐富的API多線程編程在Java
5中更加容易,靈活。本文通過一個網(wǎng)絡服務器模型,來實踐Java5的多線程編程,該模型中使用了Java5中的線程池,阻塞隊列,可重入鎖等,還實踐了
Callable, Future等接口,并使用了Java 5的另外一個新特性泛型。
簡介
本文將實現(xiàn)一個網(wǎng)絡服務器模型,一旦有客戶端連接到該服務器,則啟動一個新線程為該連接服務,服務內(nèi)容為往客戶端輸送一些字符信息。一個典型的網(wǎng)絡服務器模型如下:
1. 建立監(jiān)聽端口。
2. 發(fā)現(xiàn)有新連接,接受連接,啟動線程,執(zhí)行服務線程。 3. 服務完畢,關閉線程。
這個模型在大部分情況下運行良好,但是需要頻繁的處理用戶請求而每次請求需要的服務又是簡短的時候,系統(tǒng)會將大量的時間花費在線程的創(chuàng)建銷毀。Java
5的線程池克服了這些缺點。通過對重用線程來執(zhí)行多個任務,避免了頻繁線程的創(chuàng)建與銷毀開銷,使得服務器的性能方面得到很大提高。因此,本文的網(wǎng)絡服務器
模型將如下:
1. 建立監(jiān)聽端口,創(chuàng)建線程池。
2. 發(fā)現(xiàn)有新連接,使用線程池來執(zhí)行服務任務。
3. 服務完畢,釋放線程到線程池。
下面詳細介紹如何使用Java 5的concurrent包提供的API來實現(xiàn)該服務器。
初始化
初始化包括創(chuàng)建線程池以及初始化監(jiān)聽端口。創(chuàng)建線程池可以通過調用java.util.concurrent.Executors類里的靜態(tài)方法
newChahedThreadPool或是newFixedThreadPool來創(chuàng)建,也可以通過新建一個
java.util.concurrent.ThreadPoolExecutor實例來執(zhí)行任務。這里我們采用newFixedThreadPool方
法來建立線程池。
ExecutorService pool = Executors.newFixedThreadPool(10);
表示新建了一個線程池,線程池里面有10個線程為任務隊列服務。
使用ServerSocket對象來初始化監(jiān)聽端口。
private static final int PORT = 19527;
serverListenSocket = new ServerSocket(PORT);
serverListenSocket.setReuseAddress(true);
serverListenSocket.setReuseAddress(true);
服務新連接
當有新連接建立時,accept返回時,將服務任務提交給線程池執(zhí)行。
while(true){
Socket socket = serverListenSocket.accept();
pool.execute(new ServiceThread(socket));
}
這里使用線程池對象來執(zhí)行線程,減少了每次線程創(chuàng)建和銷毀的開銷。任務執(zhí)行完畢,線程釋放到線程池。
服務任務
服務線程ServiceThread維護一個count來記錄服務線程被調用的次數(shù)。每當服務任務被調用一次時,count的值自增1,因此
ServiceThread提供一個increaseCount和getCount的方法,分別將count值自增1和取得該count值。由于可能多個
線程存在競爭,同時訪問count,因此需要加鎖機制,在Java 5之前,我們只能使用synchronized來鎖定。Java
5中引入了性能更加粒度更細的重入鎖ReentrantLock。我們使用ReentrantLock保證代碼線程安全。下面是具體代碼:
private static ReentrantLock lock = new ReentrantLock ();
private static int count = 0;
private int getCount(){
int ret = 0;
try{
lock.lock();
ret = count;
}finally{
lock.unlock();
}
return ret;
}
private void increaseCount(){
try{
lock.lock();
++count;
}finally{
lock.unlock();
}
}
服務線程在開始給客戶端打印一個歡迎信息,
increaseCount();
int curCount = getCount();
helloString = "hello, id = " + curCount+"\r\n";
dos = new DataOutputStream(connectedSocket.getOutputStream());
dos.write(helloString.getBytes());
然后使用ExecutorService的submit方法提交一個Callable的任務,返回一個Future接口的引用。這種做法對費時的任務非
常有效,submit任務之后可以繼續(xù)執(zhí)行下面的代碼,然后在適當?shù)奈恢每梢允褂肍uture的get方法來獲取結果,如果這時候該方法已經(jīng)執(zhí)行完畢,則
無需等待即可獲得結果,如果還在執(zhí)行,則等待到運行完畢。
ExecutorService executor = Executors.newSingleThreadExecutor();
Future
future = executor.submit(new TimeConsumingTask());
dos.write("let's do soemthing other".getBytes());
String result = future.get();
dos.write(result.getBytes());
其中TimeConsumingTask實現(xiàn)了Callable接口
class TimeConsumingTask implements Callable {
public String call() throws Exception {
System.out.println("It's a time-consuming task, you'd better retrieve your result in the furture");
return "ok, here's the result: It takes me lots of time to produce this result";
}
}
這里使用了Java
5的另外一個新特性泛型,聲明TimeConsumingTask的時候使用了String做為類型參數(shù)。必須實現(xiàn)Callable接口的call函數(shù),
其作用類似與Runnable中的run函數(shù),在call函數(shù)里寫入要執(zhí)行的代碼,其返回值類型等同于在類聲明中傳入的類型值。在這段程序中,我們提交了
一個Callable的任務,然后程序不會堵塞,而是繼續(xù)執(zhí)行dos.write("let's do soemthing
other".getBytes());當程序執(zhí)行到String result =
future.get()時如果call函數(shù)已經(jīng)執(zhí)行完畢,則取得返回值,如果還在執(zhí)行,則等待其執(zhí)行完畢。
服務器端的完整實現(xiàn)
服務器端的完整實現(xiàn)代碼如下:
package com.andrew;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
public class Server {
private static int produceTaskSleepTime = 100;
private static int consuskSleepTime = 1200;
private static int produceTaskMaxNumber = 100;
private static final int CORE_POOL_SIZE = 2;
private static final int MAX_POOL_SIZE = 100;
private static final int KEEPALIVE_TIME = 3;
private static final int QUEUE_CAPACITY = (CORE_POOL_SIZE + MAX_POOL_SIZE) / 2;
private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;
private static final String HOST = "127.0.0.1";
private static final int PORT = 19527;
private BlockingQueue workQueue = new ArrayBlockingQueue(QUEUE_CAPACITY);
//private ThreadPoolExecutor serverThreadPool = null;
private ExecutorService pool = null;
private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.DiscardOldestPolicy();
private ServerSocket serverListenSocket = null;
private int times = 5;
public void start() {
// You can also init thread pool in this way.
/*serverThreadPool = new ThreadPoolExecutor(CORE_POOL_SIZE,
MAX_POOL_SIZE, KEEPALIVE_TIME, TIME_UNIT, workQueue,
rejectedExecutionHandler);*/
pool = Executors.newFixedThreadPool(10);
try {
serverListenSocket = new ServerSocket(PORT);
serverListenSocket.setReuseAddress(true);
System.out.println("I'm listening");
while (times-- > 0) {
Socket socket = serverListenSocket.accept();
String welcomeString = "hello";
//serverThreadPool.execute(new ServiceThread(socket, welcomeString));
pool.execute(new ServiceThread(socket));
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
cleanup();
}
public void cleanup() {
if (null != serverListenSocket) {
try {
serverListenSocket.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
//serverThreadPool.shutdown();
pool.shutdown();
}
public static void main(String args[]) {
Server server = new Server();
server.start();
}
}
class ServiceThread implements Runnable, Serializable {
private static final long serialVersionUID = 0;
private Socket connectedSocket = null;
private String helloString = null;
private static int count = 0;
private static ReentrantLock lock = new ReentrantLock();
ServiceThread(Socket socket) {
connectedSocket = socket;
}
public void run() {
increaseCount();
int curCount = getCount();
helloString = "hello, id = " + curCount + "\r\n";
ExecutorService executor = Executors.newSingleThreadExecutor();
Future future = executor.submit(new TimeConsumingTask());
DataOutputStream dos = null;
try {
dos = new DataOutputStream(connectedSocket.getOutputStream());
dos.write(helloString.getBytes());
try {
dos.write("let's do soemthing other.\r\n".getBytes());
String result = future.get();
dos.write(result.getBytes());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
if (null != connectedSocket) {
try {
connectedSocket.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if (null != dos) {
try {
dos.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
executor.shutdown();
}
}
private int getCount() {
int ret = 0;
try {
lock.lock();
ret = count;
} finally {
lock.unlock();
}
return ret;
}
private void increaseCount() {
try {
lock.lock();
++count;
} finally {
lock.unlock();
}
}
}
class TimeConsumingTask implements Callable {
public String call() throws Exception {
System.out.println("It's a time-consuming task, you'd better retrieve your result in the furture");
return "ok, here's the result: It takes me lots of time to produce this result";
}
}
運行程序
運行服務端,客戶端只需使用telnet 127.0.0.1 19527 即可看到信息如下:
posted on 2007-01-19 00:08
苦笑枯 閱讀(240)
評論(0) 編輯 收藏 所屬分類:
Java