ZooKeeper是一個(gè)優(yōu)秀的協(xié)調(diào)服務(wù), 目前是Hadoop的一個(gè)子項(xiàng)目. 我們可以用它來(lái)為我們的服務(wù)提供配置中心, 注冊(cè)中心, 分布式同步鎖, 消息隊(duì)列等服務(wù), 更多信息請(qǐng)瀏覽 http://hadoop.apache.org/zookeeper/
上篇文章中實(shí)現(xiàn)一個(gè)CXF的負(fù)載均衡服務(wù), 本次我們使用ZooKeeper來(lái)為我們的服務(wù)提供動(dòng)態(tài)服務(wù)器列表, 以便把客戶端的調(diào)用分配到各個(gè)有效的服務(wù)上去.
動(dòng)態(tài)更新服務(wù)列表有2種方法
* 定時(shí)去獲取數(shù)據(jù), 更新我們的數(shù)據(jù) --- 通用
* 使用ZooKeeper的watch特性, 有服務(wù)器加入/退出時(shí)我們自動(dòng)獲取通知 --- 適用于有消息通知機(jī)制的
首先我們的HelloService部分要向ZooKeeper注冊(cè)
只有注冊(cè)到ZooKeeper上, 我們才知道你可以提供這個(gè)服務(wù). 在實(shí)際環(huán)境中, 需要每個(gè)服務(wù)都需要向ZooKeeper注冊(cè) ()
注冊(cè)代碼如下:
private void register2Zookeeper(String address) throws Exception
{
ZooKeeper zk = new ZooKeeper(zkAddress, 3000, null);
GroupMemberCenter gmc = new GroupMemberCenter();
gmc.setZooKeeper(zk);
gmc.createAndSetGroup(groupName);
gmc.joinGroupByDefine(address);
System.out.println("register service to zookeeper: " + address);
}
GroupMemberCenter是一個(gè)輔助類, 代碼如下:
/**
* Dynamic member center.
* <p/>
* The member maybe leave or dead dynamiclly.
*
*
* @author: Felix Zhang Date: 2010-9-30 17:58:16
*/
public class GroupMemberCenter
{
public static final String ESCAPE_PREFIX = "|||";
private static final Log log = LogFactory.getLog(GroupMemberCenter.class);
private static final List<String> EMPTY_MEMBERS = new ArrayList<String>(0);
private ZooKeeper zk;
private String group = "";
private String me = "";
public void setZooKeeper(ZooKeeper zk)
{
this.zk = zk;
}
public void setGroup(String groupName)
{
if (groupName != null && groupName.length() > 0)
{
if (!groupName.startsWith("/"))
{
groupName = "/" + groupName;
}
this.group = groupName;
}
}
public boolean createAndSetGroup(String groupName)
{
boolean result = createGroup(groupName);
if (result)
{
setGroup(groupName);
}
return result;
}
public boolean createGroup(String groupName)
{
assert groupName != null;
if (!groupName.startsWith("/"))
{
groupName = "/" + groupName;
}
try
{
Stat s = zk.exists(groupName, false);
if (s == null)
{
zk.create(groupName, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
catch (Exception e)
{
log.error("create group error: " + groupName, e);
return false;
}
return true;
}
protected String buildName(String name)
{
return group + "/" + name;
}
public boolean joinGroup()
{
return joinGroup(null);
}
public boolean joinGroup(Integer port)
{
try
{
//use ipaddress as default, if you will use different ipaddress, you need joinGroup(yourip)
me = InetAddress.getLocalHost().getHostAddress();
return joinGroupByDefine(me + ":" + port);
}
catch (Exception e)
{
log.error("join group error", e);
return false;
}
}
public boolean joinGroupByDefine(String userdefine)
{
assert userdefine != null;
assert userdefine.length() > 0;
try
{
me = userdefine;
if (me.contains("[host]"))
{
String host = InetAddress.getLocalHost().getHostAddress();
me = me.replaceFirst("\\[host\\]", host);
}
//if contains "/", how to deal? --- maybe we need more format in future
me = ESCAPE_PREFIX + URLEncoder.encode(me, "UTF-8");
zk.create(buildName(me), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}
catch (Exception e)
{
log.error("join group error: " + me, e);
return false;
}
return true;
}
public void leaveGroup()
{
try
{
zk.delete(buildName(me), 0);
}
catch (Exception e)
{
log.error("leave group error: " + me, e);
}
}
public List<String> fetchGroupMembers()
{
return fetchGroupMembers(group, null);
}
public List<String> fetchGroupMembers(String groupName)
{
return fetchGroupMembers(groupName, null);
}
public List<String> fetchGroupMembers(String groupName, Watcher watcher)
{
if (groupName != null && groupName.length() > 0)
{
if (!groupName.startsWith("/"))
{
groupName = "/" + groupName;
}
}
else
{
return EMPTY_MEMBERS;
}
try
{
List<String> childlist;
if(watcher == null)
{
childlist = zk.getChildren(groupName, false);
}
else
{
childlist = zk.getChildren(groupName, watcher);
}
List<String> lastresult = new ArrayList<String>();
for (String item : childlist)
{
if (item.startsWith(ESCAPE_PREFIX))
{
lastresult.add(URLDecoder.decode(item, "UTF-8").substring(3));
}
else
{
lastresult.add(item);
}
}
return lastresult;
}
catch (Exception e)
{
log.error("fetch group members error", e);
return EMPTY_MEMBERS;
}
}
}
GroupMemberCenter主要是把用戶的address信息做一下轉(zhuǎn)義然后在ZooKeeper中創(chuàng)建一個(gè)節(jié)點(diǎn), 注冊(cè)時(shí)使用 CreateMode.EPHEMERAL 模式, 也就是類似心跳監(jiān)測(cè), 如果服務(wù)掛掉, 那么ZooKeeper會(huì)自動(dòng)刪除此節(jié)點(diǎn).
為了方便測(cè)試, 編寫3個(gè)啟動(dòng)服務(wù)的程序來(lái)模擬多臺(tái)機(jī)器, 啟動(dòng)的都是Hello服務(wù), 只是端口不一樣而已:
public class HelloServiceServer5Zookeeper1 {
public static void main(String[] args) throws Exception {
new HelloServicePublisher5Zookeeper().start("http://localhost:8081/service/Hello", new HelloFirstImpl());
}
}
其他2個(gè)請(qǐng)自己看源碼包.
下面我們來(lái)準(zhǔn)備Client, 代碼和上篇文章中的一樣, 首先是一個(gè)XML:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jaxws="http://cxf.apache.org/jaxws"
xmlns:clustering="http://cxf.apache.org/clustering"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
<bean id="loadBalanceStrategy" class="org.javascud.extensions.cxf.RandomLoadBalanceStrategy">
<property name="removeFailedEndpoint" value="true" />
</bean>
<bean id="loadBalanceFeature" class="org.javascud.extensions.cxf.LoadBalanceFeature">
<property name="strategy" ref="loadBalanceStrategy" />
</bean>
<jaxws:client name="helloClient"
serviceClass="org.javascud.extensions.cxf.service.Hello" >
<jaxws:features>
<ref bean="loadBalanceFeature" />
</jaxws:features>
</jaxws:client>
<bean id="zooKeeper" class="org.apache.zookeeper.ZooKeeper">
<constructor-arg index="0" value="127.0.0.1:2181" />
<constructor-arg index="1" value="3000" />
<constructor-arg index="2" ><null/></constructor-arg>
</bean>
</beans>
XML沒(méi)有寫任何服務(wù)的網(wǎng)址, 后面的程序負(fù)責(zé)更新服務(wù)列表. 此XML定義了一個(gè)ZooKeeper客戶端, 你可以根據(jù)自己的實(shí)際情況修改, 例如ZooKeeper本身也可以是負(fù)載均衡的 (一般為3臺(tái)服務(wù)器, 方便投票).
調(diào)用的Java代碼如下:
ClassPathXmlApplicationContext context
= new ClassPathXmlApplicationContext(new String[]
{"org/javascud/extensions/cxf/zookeeper/client/loadbalance_fail_zookeeper.xml"});
final Hello client = (Hello) context.getBean("helloClient");
final AbstractLoadBalanceStrategy strategy = (AbstractLoadBalanceStrategy) context.getBean("loadBalanceStrategy");
Client myclient = ClientProxy.getClient(client);
String address = myclient.getEndpoint().getEndpointInfo().getAddress();
System.out.println(address);
ZooKeeper zk = (ZooKeeper) context.getBean("zooKeeper");
//使用定時(shí)刷新的方式更新服務(wù)列表: 實(shí)際代碼中可以寫一個(gè)單獨(dú)的類來(lái)調(diào)用
ServiceEndpointsFetcher fetcher = new ServiceEndpointsFetcher();
fetcher.setStrategy(strategy);
fetcher.setZooKeeper(zk);
fetcher.setGroupName(groupName);
fetcher.start();
//調(diào)用服務(wù)
for (int i = 1; i <= 1000; i++) {
String result1 = client.sayHello("Felix" + i);
System.out.println("Call " + i + ": " + result1);
int left = strategy.getAlternateAddresses(null).size();
System.out.println("================== left " + left + " ===========================");
Thread.sleep(100);
}
查看上面的代碼可以發(fā)現(xiàn), 我們使用了ServiceEndpointsFetcher來(lái)刷新, 間隔固定的時(shí)間去獲取最新的服務(wù)列表.
我們還可以采用觀察者方式來(lái)更新服務(wù)列表:
/**
* watcher service from zookeeper.
*
* @author Felix Zhang Date:2010-10-16 01:13
*/
public class ServiceEndpointsWatcher extends ZooKeeperChildrenWatcher {
private AbstractLoadBalanceStrategy strategy;
public void setStrategy(AbstractLoadBalanceStrategy strategy) {
this.strategy = strategy;
}
@Override
protected void updateData(List<String> members) {
strategy.setAlternateAddresses(members);
}
}
ZooKeeperChildrenWatcher是一個(gè)父類, 調(diào)用GroupMemberCenter的代碼來(lái)監(jiān)測(cè)ZooKeeper上的對(duì)應(yīng)節(jié)點(diǎn):
/**
* a Watcher for monitor zookeeper by getChildren
*
* @author Felix Zhang Date:2010-10-16 14:39
*/
public abstract class ZooKeeperChildrenWatcher implements Watcher {
private ZooKeeper zooKeeper;
private String groupName;
private GroupMemberCenter gmc = null;
public void setZooKeeper(ZooKeeper zooKeeper) {
this.zooKeeper = zooKeeper;
}
public void setGroupName(String groupName) {
this.groupName = groupName;
}
@Override
public void process(WatchedEvent event) {
fetchAndUpdate();
}
private void fetchAndUpdate() {
//get children and register watcher again
List<String> members = gmc.fetchGroupMembers(groupName, this);
updateData(members);
}
protected abstract void updateData(List<String> members);
public void init() {
if (zooKeeper != null) {
gmc = new GroupMemberCenter();
gmc.setZooKeeper(zooKeeper);
fetchAndUpdate();
}
}
}
調(diào)用ServiceEndpointsWatcher的代碼是在Spring的XML中, 當(dāng)然也可以在單獨(dú)程序中調(diào)用:
<bean id="serviceEndpointsWatcher"
class="org.javascud.extensions.cxf.zookeeper.ServiceEndpointsWatcher"
init-method="init">
<property name="strategy" ref="loadBalanceStrategy" />
<property name="zooKeeper" ref="zooKeeper" />
<property name="groupName" value="helloservice" />
</bean>
ok, 下面我們啟動(dòng)ZooKeeper, 在2181端口. 然后其次啟動(dòng)三個(gè)HelloService: HelloServiceServer5Zookeeper1, HelloServiceServer5Zookeeper2, HelloServiceServer5Zookeeper3, 它們分別監(jiān)測(cè)在8081, 8082, 8083端口, 并且會(huì)向ZooKeeper注冊(cè), 你查看用ZooKeeper的客戶端查看 ls /helloservice, 應(yīng)該可以看到三個(gè)節(jié)點(diǎn).
然后我們運(yùn)行客戶端代碼 HelloClient5Zookeeper, 在客戶端運(yùn)行的過(guò)程中, 我們可以終止/啟動(dòng)HelloService, 就可以看到我們的程序會(huì)動(dòng)態(tài)地訪問(wèn)不同的HelloService, 達(dá)到了負(fù)載均衡的目的.
注: ServiceEndpointsWatcher 或ServiceEndpointsFetcher 一定現(xiàn)行運(yùn)行, 否則調(diào)用服務(wù)的部分會(huì)拋出異常, 因?yàn)闆](méi)有可用的服務(wù)地址.
代碼打包下載: http://cnscud.googlecode.com/files/extensions-20101016.zip
SVN 源碼位置: http://cnscud.googlecode.com/svn/trunk/extensions/
轉(zhuǎn)載請(qǐng)注明作者和出處 http://scud.blogjava.net