<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    使用ZooKeeper為CXF或其他服務(wù)動(dòng)態(tài)更新服務(wù)器信息




    ZooKeeper是一個(gè)優(yōu)秀的協(xié)調(diào)服務(wù), 目前是Hadoop的一個(gè)子項(xiàng)目. 我們可以用它來(lái)為我們的服務(wù)提供配置中心, 注冊(cè)中心, 分布式同步鎖, 消息隊(duì)列等服務(wù), 更多信息請(qǐng)瀏覽 http://hadoop.apache.org/zookeeper/

    上篇文章中實(shí)現(xiàn)一個(gè)CXF的負(fù)載均衡服務(wù), 本次我們使用ZooKeeper來(lái)為我們的服務(wù)提供動(dòng)態(tài)服務(wù)器列表, 以便把客戶端的調(diào)用分配到各個(gè)有效的服務(wù)上去.


    動(dòng)態(tài)更新服務(wù)列表有2種方法
      * 定時(shí)去獲取數(shù)據(jù), 更新我們的數(shù)據(jù) --- 通用
      * 使用ZooKeeper的watch特性, 有服務(wù)器加入/退出時(shí)我們自動(dòng)獲取通知 --- 適用于有消息通知機(jī)制的


       首先我們的HelloService部分要向ZooKeeper注冊(cè)
        只有注冊(cè)到ZooKeeper上, 我們才知道你可以提供這個(gè)服務(wù). 在實(shí)際環(huán)境中, 需要每個(gè)服務(wù)都需要向ZooKeeper注冊(cè) ()
       
        注冊(cè)代碼如下:

        private void register2Zookeeper(String address) throws Exception
        {
            ZooKeeper zk 
    = new ZooKeeper(zkAddress, 3000null);

            GroupMemberCenter gmc 
    = new GroupMemberCenter();
            gmc.setZooKeeper(zk);

            gmc.createAndSetGroup(groupName);
            gmc.joinGroupByDefine(address);

            System.out.println(
    "register service to zookeeper: " + address);
        }





        GroupMemberCenter是一個(gè)輔助類, 代碼如下:


        /**
         * Dynamic member center.
         * <p/>
         * The member maybe leave or dead dynamiclly.
         *
         *
         * 
    @author: Felix Zhang  Date: 2010-9-30 17:58:16
         
    */
        
    public class GroupMemberCenter
        {
            
    public static final String ESCAPE_PREFIX = "|||";

            
    private static final Log log = LogFactory.getLog(GroupMemberCenter.class);
            
    private static final List<String> EMPTY_MEMBERS = new ArrayList<String>(0);

            
    private ZooKeeper zk;
            
    private String group = "";

            
    private String me = "";

            
    public void setZooKeeper(ZooKeeper zk)
            {
                
    this.zk = zk;
            }

            
    public void setGroup(String groupName)
            {
                
    if (groupName != null && groupName.length() > 0)
                {
                    
    if (!groupName.startsWith("/"))
                    {
                        groupName 
    = "/" + groupName;
                    }

                    
    this.group = groupName;
                }
            }

            
    public boolean createAndSetGroup(String groupName)
            {
                
    boolean result = createGroup(groupName);

                
    if (result)
                {
                    setGroup(groupName);
                }

                
    return result;
            }

            
    public boolean createGroup(String groupName)
            {
                
    assert groupName != null;

                
    if (!groupName.startsWith("/"))
                {
                    groupName 
    = "/" + groupName;
                }

                
    try
                {
                    Stat s 
    = zk.exists(groupName, false);

                    
    if (s == null)
                    {
                        zk.create(groupName, 
    new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                    }
                }
                
    catch (Exception e)
                {
                    log.error(
    "create group error: " + groupName, e);
                    
    return false;
                }
                
    return true;
            }

            
    protected String buildName(String name)
            {
                
    return group + "/" + name;
            }

            
    public boolean joinGroup()
            {
                
    return joinGroup(null);
            }

            
    public boolean joinGroup(Integer port)
            {
                
    try
                {
                    
    //use ipaddress as default, if you will use different ipaddress, you need joinGroup(yourip)
                    me = InetAddress.getLocalHost().getHostAddress();
                    
    return joinGroupByDefine(me + ":" + port);
                }
                
    catch (Exception e)
                {
                    log.error(
    "join group error", e);
                    
    return false;
                }
            }

            
    public boolean joinGroupByDefine(String userdefine)
            {
                
    assert userdefine != null;
                
    assert userdefine.length() > 0;

                
    try
                {
                    me 
    = userdefine;
                    
    if (me.contains("[host]"))
                    {
                        String host 
    = InetAddress.getLocalHost().getHostAddress();
                        me 
    = me.replaceFirst("\\[host\\]", host);
                    }

                    
    //if contains "/", how to deal?      --- maybe we need more format in future
                    me = ESCAPE_PREFIX + URLEncoder.encode(me, "UTF-8");

                    zk.create(buildName(me), 
    new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                }
                
    catch (Exception e)
                {
                    log.error(
    "join group error: " + me, e);
                    
    return false;
                }

                
    return true;
            }

            
    public void leaveGroup()
            {
                
    try
                {
                    zk.delete(buildName(me), 
    0);
                }
                
    catch (Exception e)
                {
                    log.error(
    "leave group error: " + me, e);
                }
            }

            
    public List<String> fetchGroupMembers()
            {
                
    return fetchGroupMembers(group, null);
            }

            
    public List<String> fetchGroupMembers(String groupName)
            {
                
    return fetchGroupMembers(groupName, null);
            }

            
    public List<String> fetchGroupMembers(String groupName, Watcher watcher)
            {
                
    if (groupName != null && groupName.length() > 0)
                {
                    
    if (!groupName.startsWith("/"))
                    {
                        groupName 
    = "/" + groupName;
                    }
                }
                
    else
                {
                    
    return EMPTY_MEMBERS;
                }

                
    try
                {
                    List
    <String> childlist;
                    
    if(watcher == null)
                    {
                        childlist 
    = zk.getChildren(groupName, false);
                    }
                    
    else
                    {
                        childlist 
    = zk.getChildren(groupName, watcher);
                    }

                    List
    <String> lastresult = new ArrayList<String>();
                    
    for (String item : childlist)
                    {
                        
    if (item.startsWith(ESCAPE_PREFIX))
                        {
                            lastresult.add(URLDecoder.decode(item, 
    "UTF-8").substring(3));
                        }
                        
    else
                        {
                            lastresult.add(item);
                        }
                    }

                    
    return lastresult;
                }
                
    catch (Exception e)
                {
                    log.error(
    "fetch group members error", e);
                    
    return EMPTY_MEMBERS;
                }
            }
        }




        GroupMemberCenter主要是把用戶的address信息做一下轉(zhuǎn)義然后在ZooKeeper中創(chuàng)建一個(gè)節(jié)點(diǎn), 注冊(cè)時(shí)使用 CreateMode.EPHEMERAL 模式, 也就是類似心跳監(jiān)測(cè), 如果服務(wù)掛掉, 那么ZooKeeper會(huì)自動(dòng)刪除此節(jié)點(diǎn).


        為了方便測(cè)試, 編寫3個(gè)啟動(dòng)服務(wù)的程序來(lái)模擬多臺(tái)機(jī)器, 啟動(dòng)的都是Hello服務(wù), 只是端口不一樣而已:

        public class HelloServiceServer5Zookeeper1 {
            
    public static void main(String[] args) throws Exception {
                
    new HelloServicePublisher5Zookeeper().start("http://localhost:8081/service/Hello"new HelloFirstImpl());
            }
        }



        其他2個(gè)請(qǐng)自己看源碼包.

       
        下面我們來(lái)準(zhǔn)備Client, 代碼和上篇文章中的一樣, 首先是一個(gè)XML:


    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi
    ="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:jaxws
    ="http://cxf.apache.org/jaxws"
           xmlns:clustering
    ="http://cxf.apache.org/clustering"
           xmlns:util
    ="http://www.springframework.org/schema/util"
           xsi:schemaLocation
    ="
    http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd"
    >


        
    <bean id="loadBalanceStrategy" class="org.javascud.extensions.cxf.RandomLoadBalanceStrategy">
            
    <property name="removeFailedEndpoint" value="true" />
        
    </bean>

        
    <bean id="loadBalanceFeature" class="org.javascud.extensions.cxf.LoadBalanceFeature">
            
    <property name="strategy" ref="loadBalanceStrategy" />
        
    </bean>


        
    <jaxws:client name="helloClient"
                      serviceClass
    ="org.javascud.extensions.cxf.service.Hello"            >
            
    <jaxws:features>
                
    <ref bean="loadBalanceFeature" />
            
    </jaxws:features>
        
    </jaxws:client>

        
    <bean id="zooKeeper" class="org.apache.zookeeper.ZooKeeper">
            
    <constructor-arg index="0" value="127.0.0.1:2181" />
            
    <constructor-arg index="1" value="3000" />
            
    <constructor-arg index="2" ><null/></constructor-arg>
        
    </bean>
    </beans>


        XML沒(méi)有寫任何服務(wù)的網(wǎng)址, 后面的程序負(fù)責(zé)更新服務(wù)列表. 此XML定義了一個(gè)ZooKeeper客戶端, 你可以根據(jù)自己的實(shí)際情況修改, 例如ZooKeeper本身也可以是負(fù)載均衡的 (一般為3臺(tái)服務(wù)器, 方便投票).

       
        調(diào)用的Java代碼如下:



            ClassPathXmlApplicationContext context
                    
    = new ClassPathXmlApplicationContext(new String[]
                    {
    "org/javascud/extensions/cxf/zookeeper/client/loadbalance_fail_zookeeper.xml"});
            
    final Hello client = (Hello) context.getBean("helloClient");

            
    final AbstractLoadBalanceStrategy strategy = (AbstractLoadBalanceStrategy) context.getBean("loadBalanceStrategy");

            Client myclient 
    = ClientProxy.getClient(client);
            String address 
    = myclient.getEndpoint().getEndpointInfo().getAddress();
            System.out.println(address);


            ZooKeeper zk 
    = (ZooKeeper) context.getBean("zooKeeper");
       
        
    //使用定時(shí)刷新的方式更新服務(wù)列表: 實(shí)際代碼中可以寫一個(gè)單獨(dú)的類來(lái)調(diào)用
            ServiceEndpointsFetcher fetcher = new ServiceEndpointsFetcher();
            fetcher.setStrategy(strategy);
            fetcher.setZooKeeper(zk);
            fetcher.setGroupName(groupName);
            fetcher.start();

        
    //調(diào)用服務(wù)
            for (int i = 1; i <= 1000; i++) {
                String result1 
    = client.sayHello("Felix" + i);
                System.out.println(
    "Call " + i + "" + result1);

                
    int left = strategy.getAlternateAddresses(null).size();
                System.out.println(
    "================== left " + left + " ===========================");

                Thread.sleep(
    100);
            }


        查看上面的代碼可以發(fā)現(xiàn), 我們使用了ServiceEndpointsFetcher來(lái)刷新, 間隔固定的時(shí)間去獲取最新的服務(wù)列表.


        我們還可以采用觀察者方式來(lái)更新服務(wù)列表:

    /**
     * watcher service from zookeeper.
     *
     * 
    @author Felix Zhang   Date:2010-10-16 01:13
     
    */
    public class ServiceEndpointsWatcher extends ZooKeeperChildrenWatcher {

        
    private AbstractLoadBalanceStrategy strategy;

        
    public void setStrategy(AbstractLoadBalanceStrategy strategy) {
            
    this.strategy = strategy;
        }

        @Override
        
    protected void updateData(List<String> members) {
            strategy.setAlternateAddresses(members);
        }
    }


        ZooKeeperChildrenWatcher是一個(gè)父類, 調(diào)用GroupMemberCenter的代碼來(lái)監(jiān)測(cè)ZooKeeper上的對(duì)應(yīng)節(jié)點(diǎn):

    /**
     * a Watcher for monitor zookeeper by getChildren
     *
     * 
    @author Felix Zhang   Date:2010-10-16 14:39
     
    */
    public abstract class ZooKeeperChildrenWatcher implements Watcher {
        
    private ZooKeeper zooKeeper;
        
    private String groupName;
        
    private GroupMemberCenter gmc = null;

        
    public void setZooKeeper(ZooKeeper zooKeeper) {
            
    this.zooKeeper = zooKeeper;
        }

        
    public void setGroupName(String groupName) {
            
    this.groupName = groupName;
        }

        @Override
        
    public void process(WatchedEvent event) {
            fetchAndUpdate();
        }

        
    private void fetchAndUpdate() {
            
    //get children and register watcher again
            List<String> members = gmc.fetchGroupMembers(groupName, this);

            updateData(members);
        }

        
    protected abstract void updateData(List<String> members);

        
    public void init() {
            
    if (zooKeeper != null) {
                gmc 
    = new GroupMemberCenter();
                gmc.setZooKeeper(zooKeeper);

                fetchAndUpdate();
            }
        }
    }
       

        調(diào)用ServiceEndpointsWatcher的代碼是在Spring的XML中, 當(dāng)然也可以在單獨(dú)程序中調(diào)用:

       
        <bean id="serviceEndpointsWatcher"
              class
    ="org.javascud.extensions.cxf.zookeeper.ServiceEndpointsWatcher"
                init-method
    ="init">
            
    <property name="strategy" ref="loadBalanceStrategy" />
            
    <property name="zooKeeper" ref="zooKeeper" />
            
    <property name="groupName" value="helloservice" />
        
    </bean>


       
        ok, 下面我們啟動(dòng)ZooKeeper, 在2181端口. 然后其次啟動(dòng)三個(gè)HelloService: HelloServiceServer5Zookeeper1, HelloServiceServer5Zookeeper2, HelloServiceServer5Zookeeper3, 它們分別監(jiān)測(cè)在8081, 8082, 8083端口, 并且會(huì)向ZooKeeper注冊(cè), 你查看用ZooKeeper的客戶端查看 ls /helloservice, 應(yīng)該可以看到三個(gè)節(jié)點(diǎn).

        然后我們運(yùn)行客戶端代碼 HelloClient5Zookeeper, 在客戶端運(yùn)行的過(guò)程中, 我們可以終止/啟動(dòng)HelloService, 就可以看到我們的程序會(huì)動(dòng)態(tài)地訪問(wèn)不同的HelloService, 達(dá)到了負(fù)載均衡的目的.






        注: ServiceEndpointsWatcher 或ServiceEndpointsFetcher 一定現(xiàn)行運(yùn)行, 否則調(diào)用服務(wù)的部分會(huì)拋出異常, 因?yàn)闆](méi)有可用的服務(wù)地址.


    代碼打包下載: http://cnscud.googlecode.com/files/extensions-20101016.zip
    SVN 源碼位置: http://cnscud.googlecode.com/svn/trunk/extensions/ 


    轉(zhuǎn)載請(qǐng)注明作者和出處 http://scud.blogjava.net
       


    posted on 2010-10-16 19:37 Scud(飛云小俠) 閱讀(4956) 評(píng)論(1)  編輯  收藏 所屬分類: SOA

    評(píng)論

    # re: 使用ZooKeeper為CXF或其他服務(wù)動(dòng)態(tài)更新服務(wù)器信息 2012-09-07 15:32 zdonking

    吼吼,沒(méi)想到飛哥 業(yè)余還研究代碼  回復(fù)  更多評(píng)論   

    <2010年10月>
    262728293012
    3456789
    10111213141516
    17181920212223
    24252627282930
    31123456

    導(dǎo)航

    統(tǒng)計(jì)

    公告

    文章發(fā)布許可
    創(chuàng)造共用協(xié)議:署名,非商業(yè),保持一致

    我的郵件
    cnscud # gmail


    常用鏈接

    留言簿(15)

    隨筆分類(113)

    隨筆檔案(103)

    相冊(cè)

    友情鏈接

    技術(shù)網(wǎng)站

    搜索

    積分與排名

    最新評(píng)論

    閱讀排行榜

    評(píng)論排行榜

    主站蜘蛛池模板: 国产精品亚洲一区二区在线观看| 中文字幕亚洲综合久久男男| 国产一区二区三区免费| 99亚洲精品卡2卡三卡4卡2卡| 狠狠热精品免费观看| 亚洲精品无码久久久久久| 亚洲一区二区三区播放在线| 亚洲国产精品无码中文字| 中文字幕亚洲不卡在线亚瑟| 亚洲电影国产一区| 亚洲成a人片在线观看日本| 亚洲综合激情九月婷婷| 久久精品国产亚洲AV高清热 | 国产精品亚洲w码日韩中文| 午夜亚洲福利在线老司机| 日韩高清在线免费观看| 久久久www成人免费毛片 | 好爽又高潮了毛片免费下载| 国产v精品成人免费视频400条| 色老头永久免费网站| 亚洲一级免费视频| 五月婷婷亚洲综合| 亚洲人成网站在线播放影院在线| 亚洲系列中文字幕| 色偷偷亚洲第一综合| 未满十八18禁止免费无码网站 | 亚洲国产成人精品电影| 亚洲综合色区中文字幕| 亚洲精品天堂成人片AV在线播放| 亚洲精品日韩一区二区小说| 中文无码成人免费视频在线观看| 久久久久久成人毛片免费看| 在线免费观看污网站| 亚洲va无码va在线va天堂| 久久精品国产亚洲AV大全| 国产亚洲综合久久| 91成人免费观看在线观看| 99爱视频99爱在线观看免费| 日韩免费精品视频| 日本xxwwxxww在线视频免费| 亚洲最新视频在线观看|