一、需求
前段時間做了一個項目,在后臺有很多的數據都放入到了cache中了,而且還會對cache中的數據進行更新。如果只有一臺server沒有任何問題,但是如果考慮到集群負載平衡,連接多個server的時候,就有問題出現了,怎么樣才能保證多個server之間cache的同步呢?請看下面的部署圖。

二、引入JGroups
JGroups是一個可靠的組間通訊工具,進程可以加入一個通訊組,給組內所有的成員或單獨的成員發送消息,同樣,也可以從組中的成員處接收消息。
系統會記錄組的每一個成員,在新成員加入或是現有的成員離開或是崩潰時,會通知組內的其他成員。

當我們更新一臺server上的cache的時候,利用JGroups進行廣播,其他的server接收到廣播,根據接收到的信息來更新自己的cache,這樣達到了
每個server的cache同步。

三、實現
1、定義一個接口BaseCache規定出對cache類操作的方法

 1public interface BaseCache {
 2    
 3    public void put(String key, Object ob);
 4    public Object get(String key);
 5    
 6    public void delete(String key);
 7    public void batchDelete(String[] list);
 8    public void batchDelete(List list);
 9    public void deleteAll();
10    
11}


2、定義一個同步器(CacheSynchronizer),這個類利用JGroups進行發送廣播和接收廣播

 1public class CacheSynchronizer {
 2    private static String protocolStackString=
 3        "UDP(mcast_addr=235.11.17.19;mcast_port=32767;ip_ttl=3;"+
 4        "mcast_send_buf_size=150000;mcast_recv_buf_size=80000):"+
 5        "PING(timeout=2000;num_initial_members=3):"+
 6        "MERGE2(min_interval=5000;max_interval=10000):"+
 7        "FD_SOCK:"+
 8        "VERIFY_SUSPECT(timeout=1500):"+
 9        "pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800):"+
10        "pbcast.STABLE(desired_avg_gossip=20000):"+
11        "UNICAST(timeout=2500,5000):"+
12        "FRAG:"+
13        "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=false)";
14    private static String groupName="YHPTEST";
15    
16    private Channel jgroupsChannel=null;
17    
18    //inner class ,定義接收廣播,已經對接收到的廣播進行處理
19    private class ReceiveCallback extends ExtendedReceiverAdapter{
20        private BaseCache cache=null;
21        public void setCache(BaseCache baseCache){//設置cache類
22            cache=baseCache;
23        }

24        public void receive(Message msg) {
25            if(cache==nullreturn ;
26            String strMsg = (String) msg.getObject();
27            if(strMsg!=null&&(!"".equals(strMsg))){
28                cache.put(strMsg, strMsg);    //根據接收到的廣播,同步cache
29            }

30        }

31    }

32    
33    private ReceiveCallback recvCallback = null;
34    
35    public CacheSynchronizer(BaseCache cache) throws Exception{
36        jgroupsChannel = new JChannel(protocolStackString);
37        recvCallback = new ReceiveCallback();
38        recvCallback.setCache(cache);
39        jgroupsChannel.setReceiver(recvCallback);
40        jgroupsChannel.connect(groupName);
41    }

42    
43    /**
44     * 發送廣播信息,我們可以自定義廣播的格式。
45     * 這里簡單起見,僅僅發送一個字符串
46     * @param sendMsg
47     * @throws Exception
48     */

49    public void sendCacheFlushMessage(String sendMsg)throws Exception {
50        jgroupsChannel.send(nullnull, sendMsg); //發送廣播
51        
52    }

53}


3、定義cache類,調用同步器同步cache

 1public class TestDataCache implements BaseCache {
 2    private Map dataCache=null;//保持cache數據
 3    private CacheSynchronizer cacheSyncer = null//同步器
 4    
 5    //inner class for thread safe.
 6    private static final class TestDataCacheHold{
 7        private static TestDataCache  theSingleton=new TestDataCache();        
 8        public static TestDataCache getSingleton(){
 9            return theSingleton;
10        }

11        private TestDataCacheHold(){}
12    }

13    
14    //Prevents to inherit
15    private TestDataCache(){
16        dataCache=new HashMap();
17        createSynchronizer();
18    }

19    
20    public static TestDataCache getInstance(){
21        return TestDataCacheHold.getSingleton();
22    }

23    
24    public CacheSynchronizer getSynchronizer(){
25        return cacheSyncer;
26    }

27    
28    public int getCacheLength(){
29        return dataCache.size();
30    }

31    
32    public void createSynchronizer(){
33        try{
34            cacheSyncer=new CacheSynchronizer(this);
35        }
catch(Exception e){
36            e.printStackTrace();
37        }

38    }

39    
40    public void batchDelete(String[] list) {
41        if(list!=nullreturn ;
42        synchronized (dataCache){
43            for(int i=0;i<list.length;i++){
44                if(list[i].length()>0){
45                    dataCache.remove(list[i]);
46                }

47            }

48        }

49
50    }

51
52    public void batchDelete(List list) {
53        synchronized (dataCache){
54            Iterator itor=list.iterator();
55            while(itor.hasNext()){
56                String tmpKey=(String)itor.next();
57                if(tmpKey.length()>0){
58                    dataCache.remove(tmpKey);
59                }

60            }

61        }

62    }

63
64    public void delete(String key) {
65        synchronized (dataCache) {
66            dataCache.remove(key);
67        }

68    }

69
70    public void deleteAll() {
71        synchronized (dataCache){
72            dataCache.clear();
73        }

74
75    }

76
77    public Object get(String key) {
78        Object theObj=null;
79        synchronized (dataCache) {
80            theObj =dataCache.get(key);
81        }

82        return theObj;
83    }

84
85    public void put(String key, Object obj) {
86        Object theObj=null;
87        synchronized (dataCache){
88            theObj=dataCache.get(key);
89            if(theObj==null){
90                dataCache.put(key, obj);
91            }
else{
92                theObj=obj;
93            }

94        }

95    }

96
97}

98

4、更新cache,測試是否同步

 1Scanner    cin=new    Scanner(System.in);
 2        String input=cin.next().trim();
 3        TestDataCache cache=TestDataCache.getInstance();
 4        while(!"q".equalsIgnoreCase(input)){            
 5            if(!"".equals(input)){
 6                cache.put(input, input);
 7                cache.getSynchronizer().sendCacheFlushMessage(input);
 8            }
            
 9            System.out.println(cache.getCacheLength());
10            input=cin.next();
11        }

我打開兩個Eclipse,run此程序,輸入測試數據,控制臺顯示同步后cache的長度。
下面是相應的類圖:

四、引申
在此實例中,我們為簡單起見僅僅考慮了新增對cache的同步,如果是個真正的項目,這顯然是不夠的。這樣我們就必須定義出消息的格式,例如操作的對象,操作的命令等等。根據消息的定義來執行同步數據的操作。

五、附錄
Multicast是一種同時像多臺機器發送數據的機制。
Multicast使用224.0.0.0 到 239.255.255.255 這段IP來傳送數據,這段IP地址是保留的,發送到這上面的數據不會通過你的子網轉發。
在RFC-1060中定義了一部分預留的組播地址,使用時應注意不要重復。

一些比較特別的組播地址:(更多內容請查看RFC-1060)
1) 224.0.0.0 這個是保留地址,不會被指定到任何的組播組
2) 224.0.0.1 這個地址在所有的主機上被指定為一個永久組播組,這個地址可以用來找到本地子網內所有的組播主機。
使用ping 224.0.0.1可以查看這些地址

在一個組播中的所有主機使用一個相同的組播地址,它們被稱為一個組(Group),組中的成員是動態的,他們可以隨時加入或者離開組。每臺主機可以同時是多個組的成員,也可以不屬于任何一個組。比較特別的是,并不是只有組中的成員才可以給組發送數據。
組分為兩種,一種是永久性的,一種是動態的。對于永久性的組,他們擁有一個眾所周知的管理IP地址,這個地址不是組中的成員,它是永久的。永久性的組可以擁有任何數量的成員,甚至沒有成員。而動態組只有在擁有成員的時候才存在。JGroups使用的就是動態組來實現組播數據的。

六、參考

http://renex.spaces.live.com/blog/cns!93BE33C757C385AE!280.entry?_c=BlogPart
http://puras.javaeye.com/blog/81783

七、備注
在linux下,如果要調用JGroups,啟動Tomcat時,必須修改catalina.sh,增加下面的參數:
JAVA_OPTS=" -Djava.net.preferIPv4Stack=true  "
請參考下面的鏈接:
http://weblogs.java.net/blog/dcengija/archive/2006/04/jgroups_demos_o.html