一、UDP Server
項目的需要,需要利用java實現一個udp server,主要的功能是偵聽來自客戶端的udp請求,客戶請求可能是大并發量的,對于每個請求Server端的處理很簡單,處理每個請求的時間大約在1ms左右,但是Server端需要維護一個對立于請求的全局變量Cache,項目本身已經采用Mina架構(http://mina.apache.org/),我要開發的Server作為整個項目的一個模塊,由于之前沒有開發UDP Server,受TCP Server的影響,很自然的想利用多線程來實現,對于每個客戶請求,新建一個線程來處理相應的邏輯,在實現的過程中,利用Mina的Thread Model,實現了一個多線程的UDP Server,但是由于要維護一個全局Cache,需要在各線程之間同步,加之處理請求的時間很短,很快就發現在此利用多線程,并不能提高性能,于是決定采用單線程來實現,在動手之前,還是發現有兩種方案來實現單線程UDP Server:
1) 采用JDK的DatagramSocket和DatagramPacket來實現
public class UDPServerUseJDK extends Thread{
/**
* Constructor
* @param port port used to listen incoming connection
* @throws SocketException error to consturct a DatagramSocket with this port
*/
public MediatorServerUseJDK(int port) throws SocketException{
this("MediatorServerThread", port);
}
/**
* Constructor
* @param name the thread name
* @param port port used to listen incoming connection
* @throws SocketException error to consturct a DatagramSocket with this port
*/
public MediatorServerUseJDK(String name, int port) throws SocketException{
super(name);
socket = new DatagramSocket(port);
System.out.println("Mediator server started on JDK model...");
System.out.println("Socket buffer size: " + socket.getReceiveBufferSize());
}
public void run(){
long startTime = 0;
while(true){
try {
buf = new byte[1024];
// receive request
packet = new DatagramPacket(buf, buf.length);
socket.receive(packet);
.........
}catch (IOException e) {
.........
}
}
}
}
2) 采用Mina的DatagramAcceptor來實現,在創建Exector的時候,只傳遞單個線程
public class MediatorServerUseMina {
private DatagramAcceptor acceptor;
public MediatorServerUseMina() {
}
public void startListener(InetSocketAddress address, IoHandler handler) {
// create an acceptor with a single thread
this.acceptor = new DatagramAcceptor(Executors.newSingleThreadExecutor());
// configure the thread models
DatagramAcceptorConfig acceptorConfig = acceptor.getDefaultConfig();
acceptorConfig.setThreadModel(ThreadModel.MANUAL);
// set the acceptor to reuse the address
acceptorConfig.getSessionConfig().setReuseAddress(true);
// add io filters
DefaultIoFilterChainBuilder filterChainBuilder = acceptor.getFilterChain();
// add CPU-bound job first,
filterChainBuilder.addLast("codec", new ProtocolCodecFilter(new StringCodecFactory()));
try {
// bind
acceptor.bind(address, handler);
System.out.println("Mediator Server started on mina model...");
System.out.println("Socket buffer size: " + acceptorConfig.getSessionConfig().getReceiveBufferSize());
} catch (IOException e) {
System.err.println("Error starting component listener on port(UDP) " + address.getPort() + ": "
+ e.getMessage());
}
}
}
二、Performance Test
為了測試兩個Server的性能,寫了個簡單的測試客戶端
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.SocketException;
import java.net.UnknownHostException;
/**
* @author Herry Hong
*
*/
public class PerformanceTest {
/** Number of threads to be created */
static final int THREADS = 100;
/** Packets to be sent per thread */
static final int PACKETS = 500;
/** The interval of two packets been sent for each thread */
private static final int INTERVAL = 80;
private static final String DATA = "5a76d93cb435fc54eba0b97156fe38f432a4e1da3a87cce8a222644466ed1317";
private class Sender implements Runnable {
private InetAddress address = null;
private DatagramSocket socket = null;
private String msg = null;
private String name = null;
private int packet_sent = 0;
public Sender(String addr, String msg, String name) throws SocketException,
UnknownHostException {
this.address = InetAddress.getByName(addr);
this.socket = new DatagramSocket();
this.msg = msg;
this.name = name;
}
@Override
public void run() {
// send request
byte[] buf = msg.getBytes();
DatagramPacket packet = new DatagramPacket(buf, buf.length,
address, 8000);
try {
for (int i = 0; i < PerformanceTest.PACKETS; i++) {
socket.send(packet);
packet_sent++;
Thread.sleep(INTERVAL);
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
//System.out.println("Thread " + name + " sends " + packet_sent + " packets.");
//System.out.println("Thread " + name + " end!");
}
}
/**
* @param args
*/
public static void main(String[] args) {
if (args.length != 1) {
System.out.println("Usage: java PerformanceTest <hostname>");
return;
}
String msg;
for (int i = 0; i < THREADS; i++) {
if(i % 2 == 0){
msg = i + "_" + (i+1) + ""r"n" + (i+1) + "_" + i + ""r"n" + DATA;
}else{
msg = i + "_" + (i-1) + ""r"n" + (i-1) + "_" + i + ""r"n" + DATA;
}
try {
new Thread(new PerformanceTest().new Sender(args[0], msg, "" + i)).start();
} catch (SocketException e) {
e.printStackTrace();
} catch (UnknownHostException e) {
e.printStackTrace();
}
}
}
}
三、測試結果
測試環境:
Server:AMD Athlon(tm) 64 X2 Dual Core Processor 4000+,1G memory
Client:AMD Athlon(tm) 64 X2 Dual Core Processor 4000+,1G memory
在測試的過程中,當INTERVAL設置的太小時,服務器端會出現丟包現象,INTERVAL越小,丟包越嚴重,為了提高Server的性能,特將Socket的ReceiveBufferSize設置成默認大小的兩倍,
對于JDK實現:
public MediatorServerUseJDK(String name, int port) throws SocketException{
super(name);
socket = new DatagramSocket(port);
// set the receive buffer size to double default size
socket.setReceiveBufferSize(socket.getReceiveBufferSize() * 2);
System.out.println("Mediator server started on JDK model...");
System.out.println("Socket buffer size: " + socket.getReceiveBufferSize());
}
對于Mina實現:
DatagramAcceptorConfig acceptorConfig = acceptor.getDefaultConfig();
acceptorConfig.setThreadModel(ThreadModel.MANUAL);
// set the acceptor to reuse the address
acceptorConfig.getSessionConfig().setReuseAddress(true);
// set the receive buffer size to double default size
int recBufferSize = acceptorConfig.getSessionConfig().getReceiveBufferSize();
acceptorConfig.getSessionConfig().setReceiveBufferSize(recBufferSize * 2);
此時,相同的INTERVAL,丟包現象明顯減少。
接下來,再測試不同實現的性能差異:
UDP server started on JDK model...
Socket buffer size: 110592
INTERVAL = 100ms,沒有出現丟包,
Process time: 49988
Process time: 49982
Process time: 49984
Process time: 49986
Process time: 49984
INTERVAL = 80ms,仍然沒有丟包,不管Server是不是初次啟動
Process time: 40006
Process time: 40004
Process time: 40003
Process time: 40005
Process time: 40013
UDP Server started on mina model...
Socket buffer size: 110592
INTERVAL = 80ms,Server初次啟動時,經常會出現丟包,當第一次(指服務器初次啟動時)沒有丟包時,隨后基本不丟包,
Process time: 39973
Process time: 40006
Process time: 40007
Process time: 40008
Process time: 40008
INTERVAL = 100ms,沒有出現丟包
Process time: 49958
Process time: 49985
Process time: 49983
Process time: 49988
四、結論
在該要求下,采用JDK和Mina實現性能相當,但是在Server初次啟動時JDK實現基本不會出現丟包,而Mina實現則在Server初次啟動時經常出現丟包現象,在經歷第一次測試后,兩種實現處理時間相近,請求并發量大概為每ms一個請求時,服務器不會出現丟包。