Posted on 2008-09-01 19:38
dennis 閱讀(2396)
評論(2) 編輯 收藏 所屬分類:
java
這個題目比較怪,聽俺道來。俺一直在負責公司游戲服務器的開發和維護,日積月累下來終于將原本混亂的代碼和結構重構的比較清晰了,在此過程中的體會就是,重構啊,不僅僅是技術活,更多是要克服不情愿的、得過且過的心理去做,去做了才發現麻煩并沒有想象中的大。
改造過程中遇到這么個問題,我想將對某個創建的游戲的操作都固定在一個線程執行,與其他游戲可以并發地處理;或者說依據游戲id派發到某個固定的線程處理,對此游戲的操作都是串行化。不是俺不想徹底并行化,但是要將現有的代碼改造成適應并行化相當困難,俺嘗試的結果是問題百出,因此就想了這么個折中策略,不同游戲之間的操作可以并行,單個游戲內操作串行。怎么派發呢?很簡單的機制,根據id%size結果來處理就好,size就是你準備開的線程數。因此可以很容易地模擬一個生產者消費者模型的線程池,根據游戲id%size的結果將任務塞到隊列中,讓生產者線程順序處理。已經有部分代碼是這樣處理的,不過是自己實現的模型(BlockingQueue),比較不適合俺想要的任務式的處理過程,靈機一動,jdk5引入的線程池不是有個單線程的版本嗎?俺將這個線程池再做個池不就OK了?說起來不好理解,看代碼:
public interface Task extends Runnable {
public int getCode();
}
嗯,定義一個Task接口,繼承Runnable,多了個getCode方法用于決定派發任務到哪個ExecutorService執行。線程池池登場:
public class SingleThreadPoolPool {
private Map<Integer, ExecutorService> threadPoolMap = new HashMap<Integer, ExecutorService>();
private int size;
public SingleThreadPoolPool(int size) {
this.size = size;
for (int i = 0; i < size; i++) {
ExecutorService executor = Executors.newSingleThreadExecutor();
threadPoolMap.put(i, executor);
}
}
public void execute(Task task) {
if (task == null)
return;
threadPoolMap.get(getIndex(task.getCode())).execute(task);
}
public void execute(int code, Runnable r) {
if (r == null)
return;
threadPoolMap.get(getIndex(code)).execute(r);
}
private int getIndex(int code) {
int index = -1;
if (code < 0)
index = 0;
else
index = code % this.size;
return index;
}
public void shutdown() {
for (int i = 0; i < size; i++) {
threadPoolMap.get(i).shutdown();
}
threadPoolMap.clear();
}
public int size() {
return this.size;
}
}
哇靠,這也太簡單了,這就能保證code相同的任務會被排隊順序執行。是啊,很簡單,不是啥高科技,但簡單明了地實現了俺的需求。需要注意的是,只有通過Executor的execute方法提交的任務才會被排到隊列中哦。
補充一個線程安全測試:
import java.util.concurrent.CountDownLatch;
import com.xlands.game.lobby.util.SingleThreadPoolPool;
import junit.framework.TestCase;
class Counter {
int i;
public void incr() {
i++;
}
}
class IncrTask implements Runnable {
Counter counter;
CountDownLatch latch;
public IncrTask(Counter counter, CountDownLatch latch) {
this.counter = counter;
this.latch = latch;
}
public void run() {
try {
counter.incr();
} finally {
latch.countDown();
}
}
}
public class SingleThreadPoolPoolTest extends TestCase {
static final int NUM = 10000;
SingleThreadPoolPool singleThreadPoolPool;
@Override
protected void setUp() throws Exception {
singleThreadPoolPool = new SingleThreadPoolPool(2);
super.setUp();
}
@Override
protected void tearDown() throws Exception {
singleThreadPoolPool.shutdown();
super.tearDown();
}
public void testThreadSafe() throws Exception {
Counter c1 = new Counter();
Counter c2 = new Counter();
assertEquals(singleThreadPoolPool.size(), 2);
CountDownLatch latch = new CountDownLatch(NUM * 2);
for (int i = 0; i < NUM; i++) {
singleThreadPoolPool.execute(0, new IncrTask(c1, latch));
singleThreadPoolPool.execute(1, new IncrTask(c2, latch));
}
latch.await();
assertEquals(NUM, c1.i);
assertEquals(NUM, c2.i);
}
}