一、需求
前段時間做了一個項目,在后臺有很多的數(shù)據(jù)都放入到了cache中了,而且還會對cache中的數(shù)據(jù)進行更新。如果只有一臺server沒有任何問題,但是如果考慮到集群負載平衡,連接多個server的時候,就有問題出現(xiàn)了,怎么樣才能保證多個server之間cache的同步呢?請看下面的部署圖。

二、引入JGroups
JGroups是一個可靠的組間通訊工具,進程可以加入一個通訊組,給組內(nèi)所有的成員或單獨的成員發(fā)送消息,同樣,也可以從組中的成員處接收消息。
系統(tǒng)會記錄組的每一個成員,在新成員加入或是現(xiàn)有的成員離開或是崩潰時,會通知組內(nèi)的其他成員。
當我們更新一臺server上的cache的時候,利用JGroups進行廣播,其他的server接收到廣播,根據(jù)接收到的信息來更新自己的cache,這樣達到了
每個server的cache同步。
三、實現(xiàn)
1、定義一個接口BaseCache規(guī)定出對cache類操作的方法
1
public interface BaseCache
{
2
3
public void put(String key, Object ob);
4
public Object get(String key);
5
6
public void delete(String key);
7
public void batchDelete(String[] list);
8
public void batchDelete(List list);
9
public void deleteAll();
10
11
}
2、定義一個同步器(CacheSynchronizer),這個類利用JGroups進行發(fā)送廣播和接收廣播
1
public class CacheSynchronizer
{
2
private static String protocolStackString=
3
"UDP(mcast_addr=235.11.17.19;mcast_port=32767;ip_ttl=3;"+
4
"mcast_send_buf_size=150000;mcast_recv_buf_size=80000):"+
5
"PING(timeout=2000;num_initial_members=3):"+
6
"MERGE2(min_interval=5000;max_interval=10000):"+
7
"FD_SOCK:"+
8
"VERIFY_SUSPECT(timeout=1500):"+
9
"pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800):"+
10
"pbcast.STABLE(desired_avg_gossip=20000):"+
11
"UNICAST(timeout=2500,5000):"+
12
"FRAG:"+
13
"pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=false)";
14
private static String groupName="YHPTEST";
15
16
private Channel jgroupsChannel=null;
17
18
//inner class ,定義接收廣播,已經(jīng)對接收到的廣播進行處理
19
private class ReceiveCallback extends ExtendedReceiverAdapter
{
20
private BaseCache cache=null;
21
public void setCache(BaseCache baseCache)
{//設(shè)置cache類
22
cache=baseCache;
23
}
24
public void receive(Message msg)
{
25
if(cache==null) return ;
26
String strMsg = (String) msg.getObject();
27
if(strMsg!=null&&(!"".equals(strMsg)))
{
28
cache.put(strMsg, strMsg); //根據(jù)接收到的廣播,同步cache
29
}
30
}
31
}
32
33
private ReceiveCallback recvCallback = null;
34
35
public CacheSynchronizer(BaseCache cache) throws Exception
{
36
jgroupsChannel = new JChannel(protocolStackString);
37
recvCallback = new ReceiveCallback();
38
recvCallback.setCache(cache);
39
jgroupsChannel.setReceiver(recvCallback);
40
jgroupsChannel.connect(groupName);
41
}
42
43
/** *//**
44
* 發(fā)送廣播信息,我們可以自定義廣播的格式。
45
* 這里簡單起見,僅僅發(fā)送一個字符串
46
* @param sendMsg
47
* @throws Exception
48
*/
49
public void sendCacheFlushMessage(String sendMsg)throws Exception
{
50
jgroupsChannel.send(null, null, sendMsg); //發(fā)送廣播
51
52
}
53
}
3、定義cache類,調(diào)用同步器同步cache
1
public class TestDataCache implements BaseCache
{
2
private Map dataCache=null;//保持cache數(shù)據(jù)
3
private CacheSynchronizer cacheSyncer = null; //同步器
4
5
//inner class for thread safe.
6
private static final class TestDataCacheHold
{
7
private static TestDataCache theSingleton=new TestDataCache();
8
public static TestDataCache getSingleton()
{
9
return theSingleton;
10
}
11
private TestDataCacheHold()
{}
12
}
13
14
//Prevents to inherit
15
private TestDataCache()
{
16
dataCache=new HashMap();
17
createSynchronizer();
18
}
19
20
public static TestDataCache getInstance()
{
21
return TestDataCacheHold.getSingleton();
22
}
23
24
public CacheSynchronizer getSynchronizer()
{
25
return cacheSyncer;
26
}
27
28
public int getCacheLength()
{
29
return dataCache.size();
30
}
31
32
public void createSynchronizer()
{
33
try
{
34
cacheSyncer=new CacheSynchronizer(this);
35
}catch(Exception e)
{
36
e.printStackTrace();
37
}
38
}
39
40
public void batchDelete(String[] list)
{
41
if(list!=null) return ;
42
synchronized (dataCache)
{
43
for(int i=0;i<list.length;i++)
{
44
if(list[i].length()>0)
{
45
dataCache.remove(list[i]);
46
}
47
}
48
}
49
50
}
51
52
public void batchDelete(List list)
{
53
synchronized (dataCache)
{
54
Iterator itor=list.iterator();
55
while(itor.hasNext())
{
56
String tmpKey=(String)itor.next();
57
if(tmpKey.length()>0)
{
58
dataCache.remove(tmpKey);
59
}
60
}
61
}
62
}
63
64
public void delete(String key)
{
65
synchronized (dataCache)
{
66
dataCache.remove(key);
67
}
68
}
69
70
public void deleteAll()
{
71
synchronized (dataCache)
{
72
dataCache.clear();
73
}
74
75
}
76
77
public Object get(String key)
{
78
Object theObj=null;
79
synchronized (dataCache)
{
80
theObj =dataCache.get(key);
81
}
82
return theObj;
83
}
84
85
public void put(String key, Object obj)
{
86
Object theObj=null;
87
synchronized (dataCache)
{
88
theObj=dataCache.get(key);
89
if(theObj==null)
{
90
dataCache.put(key, obj);
91
}else
{
92
theObj=obj;
93
}
94
}
95
}
96
97
}
98
4、更新cache,測試是否同步
1
Scanner cin=new Scanner(System.in);
2
String input=cin.next().trim();
3
TestDataCache cache=TestDataCache.getInstance();
4
while(!"q".equalsIgnoreCase(input))
{
5
if(!"".equals(input))
{
6
cache.put(input, input);
7
cache.getSynchronizer().sendCacheFlushMessage(input);
8
}
9
System.out.println(cache.getCacheLength());
10
input=cin.next();
11
}
我打開兩個Eclipse,run此程序,輸入測試數(shù)據(jù),控制臺顯示同步后cache的長度。
下面是相應的類圖:

四、引申
在此實例中,我們?yōu)楹唵纹鹨妰H僅考慮了新增對cache的同步,如果是個真正的項目,這顯然是不夠的。這樣我們就必須定義出消息的格式,例如操作的對象,操作的命令等等。根據(jù)消息的定義來執(zhí)行同步數(shù)據(jù)的操作。
五、附錄
Multicast是一種同時像多臺機器發(fā)送數(shù)據(jù)的機制。
Multicast使用224.0.0.0 到 239.255.255.255 這段IP來傳送數(shù)據(jù),這段IP地址是保留的,發(fā)送到這上面的數(shù)據(jù)不會通過你的子網(wǎng)轉(zhuǎn)發(fā)。
在RFC-1060中定義了一部分預留的組播地址,使用時應注意不要重復。
一些比較特別的組播地址:(更多內(nèi)容請查看RFC-1060)
1) 224.0.0.0 這個是保留地址,不會被指定到任何的組播組
2) 224.0.0.1 這個地址在所有的主機上被指定為一個永久組播組,這個地址可以用來找到本地子網(wǎng)內(nèi)所有的組播主機。
使用ping 224.0.0.1可以查看這些地址
在一個組播中的所有主機使用一個相同的組播地址,它們被稱為一個組(Group),組中的成員是動態(tài)的,他們可以隨時加入或者離開組。每臺主機可以同時是多個組的成員,也可以不屬于任何一個組。比較特別的是,并不是只有組中的成員才可以給組發(fā)送數(shù)據(jù)。
組分為兩種,一種是永久性的,一種是動態(tài)的。對于永久性的組,他們擁有一個眾所周知的管理IP地址,這個地址不是組中的成員,它是永久的。永久性的組可以擁有任何數(shù)量的成員,甚至沒有成員。而動態(tài)組只有在擁有成員的時候才存在。JGroups使用的就是動態(tài)組來實現(xiàn)組播數(shù)據(jù)的。
六、參考
http://renex.spaces.live.com/blog/cns!93BE33C757C385AE!280.entry?_c=BlogParthttp://puras.javaeye.com/blog/81783 七、備注
在linux下,如果要調(diào)用JGroups,啟動Tomcat時,必須修改catalina.sh,增加下面的參數(shù):
JAVA_OPTS=" -Djava.net.preferIPv4Stack=true "
請參考下面的鏈接:
http://weblogs.java.net/blog/dcengija/archive/2006/04/jgroups_demos_o.html