<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    使用ZooKeeper為CXF或其他服務動態更新服務器信息




    ZooKeeper是一個優秀的協調服務, 目前是Hadoop的一個子項目. 我們可以用它來為我們的服務提供配置中心, 注冊中心, 分布式同步鎖, 消息隊列等服務, 更多信息請瀏覽 http://hadoop.apache.org/zookeeper/

    上篇文章中實現一個CXF的負載均衡服務, 本次我們使用ZooKeeper來為我們的服務提供動態服務器列表, 以便把客戶端的調用分配到各個有效的服務上去.


    動態更新服務列表有2種方法
      * 定時去獲取數據, 更新我們的數據 --- 通用
      * 使用ZooKeeper的watch特性, 有服務器加入/退出時我們自動獲取通知 --- 適用于有消息通知機制的


       首先我們的HelloService部分要向ZooKeeper注冊
        只有注冊到ZooKeeper上, 我們才知道你可以提供這個服務. 在實際環境中, 需要每個服務都需要向ZooKeeper注冊 ()
       
        注冊代碼如下:

        private void register2Zookeeper(String address) throws Exception
        {
            ZooKeeper zk 
    = new ZooKeeper(zkAddress, 3000null);

            GroupMemberCenter gmc 
    = new GroupMemberCenter();
            gmc.setZooKeeper(zk);

            gmc.createAndSetGroup(groupName);
            gmc.joinGroupByDefine(address);

            System.out.println(
    "register service to zookeeper: " + address);
        }





        GroupMemberCenter是一個輔助類, 代碼如下:


        /**
         * Dynamic member center.
         * <p/>
         * The member maybe leave or dead dynamiclly.
         *
         *
         * 
    @author: Felix Zhang  Date: 2010-9-30 17:58:16
         
    */
        
    public class GroupMemberCenter
        {
            
    public static final String ESCAPE_PREFIX = "|||";

            
    private static final Log log = LogFactory.getLog(GroupMemberCenter.class);
            
    private static final List<String> EMPTY_MEMBERS = new ArrayList<String>(0);

            
    private ZooKeeper zk;
            
    private String group = "";

            
    private String me = "";

            
    public void setZooKeeper(ZooKeeper zk)
            {
                
    this.zk = zk;
            }

            
    public void setGroup(String groupName)
            {
                
    if (groupName != null && groupName.length() > 0)
                {
                    
    if (!groupName.startsWith("/"))
                    {
                        groupName 
    = "/" + groupName;
                    }

                    
    this.group = groupName;
                }
            }

            
    public boolean createAndSetGroup(String groupName)
            {
                
    boolean result = createGroup(groupName);

                
    if (result)
                {
                    setGroup(groupName);
                }

                
    return result;
            }

            
    public boolean createGroup(String groupName)
            {
                
    assert groupName != null;

                
    if (!groupName.startsWith("/"))
                {
                    groupName 
    = "/" + groupName;
                }

                
    try
                {
                    Stat s 
    = zk.exists(groupName, false);

                    
    if (s == null)
                    {
                        zk.create(groupName, 
    new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                    }
                }
                
    catch (Exception e)
                {
                    log.error(
    "create group error: " + groupName, e);
                    
    return false;
                }
                
    return true;
            }

            
    protected String buildName(String name)
            {
                
    return group + "/" + name;
            }

            
    public boolean joinGroup()
            {
                
    return joinGroup(null);
            }

            
    public boolean joinGroup(Integer port)
            {
                
    try
                {
                    
    //use ipaddress as default, if you will use different ipaddress, you need joinGroup(yourip)
                    me = InetAddress.getLocalHost().getHostAddress();
                    
    return joinGroupByDefine(me + ":" + port);
                }
                
    catch (Exception e)
                {
                    log.error(
    "join group error", e);
                    
    return false;
                }
            }

            
    public boolean joinGroupByDefine(String userdefine)
            {
                
    assert userdefine != null;
                
    assert userdefine.length() > 0;

                
    try
                {
                    me 
    = userdefine;
                    
    if (me.contains("[host]"))
                    {
                        String host 
    = InetAddress.getLocalHost().getHostAddress();
                        me 
    = me.replaceFirst("\\[host\\]", host);
                    }

                    
    //if contains "/", how to deal?      --- maybe we need more format in future
                    me = ESCAPE_PREFIX + URLEncoder.encode(me, "UTF-8");

                    zk.create(buildName(me), 
    new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                }
                
    catch (Exception e)
                {
                    log.error(
    "join group error: " + me, e);
                    
    return false;
                }

                
    return true;
            }

            
    public void leaveGroup()
            {
                
    try
                {
                    zk.delete(buildName(me), 
    0);
                }
                
    catch (Exception e)
                {
                    log.error(
    "leave group error: " + me, e);
                }
            }

            
    public List<String> fetchGroupMembers()
            {
                
    return fetchGroupMembers(group, null);
            }

            
    public List<String> fetchGroupMembers(String groupName)
            {
                
    return fetchGroupMembers(groupName, null);
            }

            
    public List<String> fetchGroupMembers(String groupName, Watcher watcher)
            {
                
    if (groupName != null && groupName.length() > 0)
                {
                    
    if (!groupName.startsWith("/"))
                    {
                        groupName 
    = "/" + groupName;
                    }
                }
                
    else
                {
                    
    return EMPTY_MEMBERS;
                }

                
    try
                {
                    List
    <String> childlist;
                    
    if(watcher == null)
                    {
                        childlist 
    = zk.getChildren(groupName, false);
                    }
                    
    else
                    {
                        childlist 
    = zk.getChildren(groupName, watcher);
                    }

                    List
    <String> lastresult = new ArrayList<String>();
                    
    for (String item : childlist)
                    {
                        
    if (item.startsWith(ESCAPE_PREFIX))
                        {
                            lastresult.add(URLDecoder.decode(item, 
    "UTF-8").substring(3));
                        }
                        
    else
                        {
                            lastresult.add(item);
                        }
                    }

                    
    return lastresult;
                }
                
    catch (Exception e)
                {
                    log.error(
    "fetch group members error", e);
                    
    return EMPTY_MEMBERS;
                }
            }
        }




        GroupMemberCenter主要是把用戶的address信息做一下轉義然后在ZooKeeper中創建一個節點, 注冊時使用 CreateMode.EPHEMERAL 模式, 也就是類似心跳監測, 如果服務掛掉, 那么ZooKeeper會自動刪除此節點.


        為了方便測試, 編寫3個啟動服務的程序來模擬多臺機器, 啟動的都是Hello服務, 只是端口不一樣而已:

        public class HelloServiceServer5Zookeeper1 {
            
    public static void main(String[] args) throws Exception {
                
    new HelloServicePublisher5Zookeeper().start("http://localhost:8081/service/Hello"new HelloFirstImpl());
            }
        }



        其他2個請自己看源碼包.

       
        下面我們來準備Client, 代碼和上篇文章中的一樣, 首先是一個XML:


    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi
    ="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:jaxws
    ="http://cxf.apache.org/jaxws"
           xmlns:clustering
    ="http://cxf.apache.org/clustering"
           xmlns:util
    ="http://www.springframework.org/schema/util"
           xsi:schemaLocation
    ="
    http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd"
    >


        
    <bean id="loadBalanceStrategy" class="org.javascud.extensions.cxf.RandomLoadBalanceStrategy">
            
    <property name="removeFailedEndpoint" value="true" />
        
    </bean>

        
    <bean id="loadBalanceFeature" class="org.javascud.extensions.cxf.LoadBalanceFeature">
            
    <property name="strategy" ref="loadBalanceStrategy" />
        
    </bean>


        
    <jaxws:client name="helloClient"
                      serviceClass
    ="org.javascud.extensions.cxf.service.Hello"            >
            
    <jaxws:features>
                
    <ref bean="loadBalanceFeature" />
            
    </jaxws:features>
        
    </jaxws:client>

        
    <bean id="zooKeeper" class="org.apache.zookeeper.ZooKeeper">
            
    <constructor-arg index="0" value="127.0.0.1:2181" />
            
    <constructor-arg index="1" value="3000" />
            
    <constructor-arg index="2" ><null/></constructor-arg>
        
    </bean>
    </beans>


        XML沒有寫任何服務的網址, 后面的程序負責更新服務列表. 此XML定義了一個ZooKeeper客戶端, 你可以根據自己的實際情況修改, 例如ZooKeeper本身也可以是負載均衡的 (一般為3臺服務器, 方便投票).

       
        調用的Java代碼如下:



            ClassPathXmlApplicationContext context
                    
    = new ClassPathXmlApplicationContext(new String[]
                    {
    "org/javascud/extensions/cxf/zookeeper/client/loadbalance_fail_zookeeper.xml"});
            
    final Hello client = (Hello) context.getBean("helloClient");

            
    final AbstractLoadBalanceStrategy strategy = (AbstractLoadBalanceStrategy) context.getBean("loadBalanceStrategy");

            Client myclient 
    = ClientProxy.getClient(client);
            String address 
    = myclient.getEndpoint().getEndpointInfo().getAddress();
            System.out.println(address);


            ZooKeeper zk 
    = (ZooKeeper) context.getBean("zooKeeper");
       
        
    //使用定時刷新的方式更新服務列表: 實際代碼中可以寫一個單獨的類來調用
            ServiceEndpointsFetcher fetcher = new ServiceEndpointsFetcher();
            fetcher.setStrategy(strategy);
            fetcher.setZooKeeper(zk);
            fetcher.setGroupName(groupName);
            fetcher.start();

        
    //調用服務
            for (int i = 1; i <= 1000; i++) {
                String result1 
    = client.sayHello("Felix" + i);
                System.out.println(
    "Call " + i + "" + result1);

                
    int left = strategy.getAlternateAddresses(null).size();
                System.out.println(
    "================== left " + left + " ===========================");

                Thread.sleep(
    100);
            }


        查看上面的代碼可以發現, 我們使用了ServiceEndpointsFetcher來刷新, 間隔固定的時間去獲取最新的服務列表.


        我們還可以采用觀察者方式來更新服務列表:

    /**
     * watcher service from zookeeper.
     *
     * 
    @author Felix Zhang   Date:2010-10-16 01:13
     
    */
    public class ServiceEndpointsWatcher extends ZooKeeperChildrenWatcher {

        
    private AbstractLoadBalanceStrategy strategy;

        
    public void setStrategy(AbstractLoadBalanceStrategy strategy) {
            
    this.strategy = strategy;
        }

        @Override
        
    protected void updateData(List<String> members) {
            strategy.setAlternateAddresses(members);
        }
    }


        ZooKeeperChildrenWatcher是一個父類, 調用GroupMemberCenter的代碼來監測ZooKeeper上的對應節點:

    /**
     * a Watcher for monitor zookeeper by getChildren
     *
     * 
    @author Felix Zhang   Date:2010-10-16 14:39
     
    */
    public abstract class ZooKeeperChildrenWatcher implements Watcher {
        
    private ZooKeeper zooKeeper;
        
    private String groupName;
        
    private GroupMemberCenter gmc = null;

        
    public void setZooKeeper(ZooKeeper zooKeeper) {
            
    this.zooKeeper = zooKeeper;
        }

        
    public void setGroupName(String groupName) {
            
    this.groupName = groupName;
        }

        @Override
        
    public void process(WatchedEvent event) {
            fetchAndUpdate();
        }

        
    private void fetchAndUpdate() {
            
    //get children and register watcher again
            List<String> members = gmc.fetchGroupMembers(groupName, this);

            updateData(members);
        }

        
    protected abstract void updateData(List<String> members);

        
    public void init() {
            
    if (zooKeeper != null) {
                gmc 
    = new GroupMemberCenter();
                gmc.setZooKeeper(zooKeeper);

                fetchAndUpdate();
            }
        }
    }
       

        調用ServiceEndpointsWatcher的代碼是在Spring的XML中, 當然也可以在單獨程序中調用:

       
        <bean id="serviceEndpointsWatcher"
              class
    ="org.javascud.extensions.cxf.zookeeper.ServiceEndpointsWatcher"
                init-method
    ="init">
            
    <property name="strategy" ref="loadBalanceStrategy" />
            
    <property name="zooKeeper" ref="zooKeeper" />
            
    <property name="groupName" value="helloservice" />
        
    </bean>


       
        ok, 下面我們啟動ZooKeeper, 在2181端口. 然后其次啟動三個HelloService: HelloServiceServer5Zookeeper1, HelloServiceServer5Zookeeper2, HelloServiceServer5Zookeeper3, 它們分別監測在8081, 8082, 8083端口, 并且會向ZooKeeper注冊, 你查看用ZooKeeper的客戶端查看 ls /helloservice, 應該可以看到三個節點.

        然后我們運行客戶端代碼 HelloClient5Zookeeper, 在客戶端運行的過程中, 我們可以終止/啟動HelloService, 就可以看到我們的程序會動態地訪問不同的HelloService, 達到了負載均衡的目的.






        注: ServiceEndpointsWatcher 或ServiceEndpointsFetcher 一定現行運行, 否則調用服務的部分會拋出異常, 因為沒有可用的服務地址.


    代碼打包下載: http://cnscud.googlecode.com/files/extensions-20101016.zip
    SVN 源碼位置: http://cnscud.googlecode.com/svn/trunk/extensions/ 


    轉載請注明作者和出處 http://scud.blogjava.net
       


    posted on 2010-10-16 19:37 Scud(飛云小俠) 閱讀(4957) 評論(1)  編輯  收藏 所屬分類: SOA

    評論

    # re: 使用ZooKeeper為CXF或其他服務動態更新服務器信息 2012-09-07 15:32 zdonking

    吼吼,沒想到飛哥 業余還研究代碼  回復  更多評論   

    <2010年10月>
    262728293012
    3456789
    10111213141516
    17181920212223
    24252627282930
    31123456

    導航

    統計

    公告

    文章發布許可
    創造共用協議:署名,非商業,保持一致

    我的郵件
    cnscud # gmail


    常用鏈接

    留言簿(15)

    隨筆分類(113)

    隨筆檔案(103)

    相冊

    友情鏈接

    技術網站

    搜索

    積分與排名

    最新評論

    閱讀排行榜

    評論排行榜

    主站蜘蛛池模板: 九九精品成人免费国产片| 亚洲精品在线不卡| 亚洲国产品综合人成综合网站| 亚洲已满18点击进入在线观看| 国产偷伦视频免费观看| 亚洲午夜福利717| 久久性生大片免费观看性| 亚洲国产一区视频| 国产精品美女久久久免费 | 7777久久亚洲中文字幕蜜桃| a毛片久久免费观看| 亚洲国产精品无码久久久不卡| 亚洲真人无码永久在线观看| 性做久久久久久免费观看| 色偷偷亚洲女人天堂观看欧| 好爽…又高潮了免费毛片| 亚洲综合久久精品无码色欲| 水蜜桃视频在线观看免费| 亚洲av无码专区在线观看素人| 四虎永久成人免费| 青青久久精品国产免费看| 久久激情亚洲精品无码?V| 成人性生交大片免费看好| 亚洲精品在线观看视频| 国产精品免费观看| 在线播放亚洲精品| 国产精品亚洲аv无码播放| 四虎最新永久免费视频| 亚洲精品V天堂中文字幕| 国产91精品一区二区麻豆亚洲| 巨胸狂喷奶水视频www网站免费| 亚洲欧洲日产国产综合网| 国产日本一线在线观看免费| 色一情一乱一伦一视频免费看| 中文字幕精品亚洲无线码一区应用| 久久免费香蕉视频| 亚洲另类精品xxxx人妖| 亚洲国产av一区二区三区| 特级无码毛片免费视频尤物| 亚洲一级毛片免费在线观看| 亚洲人成网站18禁止一区 |