轉載請注明作者和出處 http://scud.blogjava.net
CXF是一個比較流行的Web Service框架. ( 當然如果追求更高效, 還可以去搜索ice, thrift, protobuff之類的)
近一個月, 斷斷續續地又好好看了看CXF的一些代碼, CXF的文檔還是很欠缺,特別是關于內部實現的東西. 從我的感覺來說, 內部實現還是挺復雜的. Inteceptor, Feature, ConduitSelector 這些概念一大堆, 又差不多可以做類似的事情, 真是讓人頭暈.
CXF本身提供了一個FailoverFeature, 可以在調用服務出錯時切換到其他服務器, 但是無法做到負載均衡, 我研究了幾天, 在FailoverFeature的基礎上改出來一個LoadBalanceFeature, 當然也同時支持Failover.
首先我們來看看如何使用CXF的FailoverFeature: (下載示例中包括使用xml和代碼兩種方式, 當然CXF自己還提供了使用wsdl內部定義的方式)
我們需要先準備一個HelloService, 非常簡單的一個Web Service, 這里不在貼出, 具體可以看下載包
調用代碼示例:
package org.javascud.extensions.cxf.testfailover;
import org.apache.cxf.clustering.FailoverFeature;
import org.apache.cxf.clustering.RandomStrategy;
import org.apache.cxf.feature.AbstractFeature;
import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
import org.javascud.extensions.cxf.service.Hello;
import java.util.ArrayList;
import java.util.List;
public class HelloServiceFailOverClient
{
public static void main(String[] args)
{
String helloFirst = "http://localhost:8080/service/Hello";
String helloSecond = "http://localhost:8081/service/Hello";
String helloThird = "http://localhost:8082/service/Hello";
String helloFour = "http://localhost:8083/service/Hello";
List<String> serviceList = new ArrayList<String>();
serviceList.add(helloFirst);
serviceList.add(helloSecond);
serviceList.add(helloThird);
//serviceList.add(helloFour);
RandomStrategy strategy = new RandomStrategy();
strategy.setAlternateAddresses(serviceList);
FailoverFeature ff = new FailoverFeature();
ff.setStrategy(strategy);
JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean();
List<AbstractFeature> features = new ArrayList<AbstractFeature>();
features.add(ff);
factory.setFeatures(features);
factory.initFeatures();
factory.setServiceClass(Hello.class);
//factory.setAddress("http://localhost:8080/service/Hello");
Hello client = (Hello) factory.create();
String result = client.sayHello("felix");
System.out.println("result is: " + result);
}
}
在遇到錯誤時可以自動使用下一個服務器, 但是必須要自己設置一個地址, 如果不設置的話也可以, 但是會出錯然后failover.
下面我們自己來看看我們的 LoadBalanceFeature
1. 首先我們創建一個LoadBalanceFeature (完全和FailoverFeature一樣)
Feature是用來定制Server, Client, Bus的一個組件, 具體可以查看AbstractFeature, 我們使用initialize方法來定制Client, 修改Client的Conduit選擇器達到負載均衡的目的.
LoadBalanceFeature代碼如下:
/**
* This feature may be applied to a Client so as to enable
* load balance , use any compatible endpoint for the target service.
*
* @author Felix Zhang Date:2010-10-3 22:58
* @see org.apache.cxf.clustering.FailoverFeature
*/
public class LoadBalanceFeature extends AbstractFeature {
private LoadBalanceStrategy loadBalanceStrategy;
@Override
public void initialize(Client client, Bus bus) {
LoadBalanceTargetSelector selector = new LoadBalanceTargetSelector();
selector.setEndpoint(client.getEndpoint());
selector.setStrategy(getStrategy());
client.setConduitSelector(selector);
}
public void setStrategy(LoadBalanceStrategy strategy) {
loadBalanceStrategy = strategy;
}
public LoadBalanceStrategy getStrategy() {
return loadBalanceStrategy;
}
}
2. 定制一個LoadBalanceStrategy 負載均衡策略
負載均衡策略有很多種, 例如隨機選擇, 順序選擇等, FailoverFeature提供了三種策略, 總之很簡單, 我們在這里就先實現隨機策略, 其他的策略都很簡單, 幾行代碼就可以實現了.
這個類主要用來設置/獲取所有的提供服務的地址列表, 為了方便控制, 我新增了2個選項:
A: alwaysChangeEndpoint 是否每次請求都切換地址: 如果只有一個客戶端, 可以分擔負載. 缺省為true
B: removeFailedEndpoint 是否從全局的地址列表中移除失敗服務地址 -- 如果你沒有監測服務器狀態的程序
關于動態增刪服務地址
- 可以使用zookeeper等服務實時監測服務器狀態, 或者自己寫程序實現, 調用strategy.setAlternateAddresses即可.
- removeFailedEndpoint 如果設置為true, 但沒有監測服務器狀態的程序, 新增的或者復活的服務器則無法被恢復到地址列表中.
- 考慮到效率和支持failover, 設置地址列表, 移除地址等沒有同步鎖.
- 自動移除失敗服務地址時, 目前僅支持手動地址列表, 沒有考慮wsdl中的多服務地址.
- 后續我會寫一個使用zookeeper增刪服務地址列表的示例. (最近也在看zookeeper)
主要的代碼都在AbstractLoadBalanceStrategy 中, 基本和 AbstractStaticFailoverStrategy 一樣, 添加了一個removeAlternateAddress 用于移除失敗的服務地址.
LoadBalanceStrategy 接口的代碼如下:
/**
* Supports pluggable strategies for alternate endpoint selection on
* load balance.
* <p/>
* Random, Retries, Mod (later)
* <p/>
* 1. support load balance 2.support fail over.
*
* @author Felix Zhang Date:2010-10-1 18:14
* @see org.apache.cxf.clustering.FailoverStrategy
*/
public interface LoadBalanceStrategy {
/**
* Get the alternate endpoints for this invocation.
*
* @param exchange the current Exchange
* @return a failover endpoint if one is available
*/
List<Endpoint> getAlternateEndpoints(Exchange exchange);
/**
* Select one of the alternate endpoints for a retried invocation.
*
* @param alternates List of alternate endpoints if available
* @return the selected endpoint
*/
Endpoint selectAlternateEndpoint(List<Endpoint> alternates);
/**
* Get the alternate addresses for this invocation.
* These addresses over-ride any addresses specified in the WSDL.
*
* @param exchange the current Exchange
* @return a failover endpoint if one is available
*/
List<String> getAlternateAddresses(Exchange exchange);
/**
* Select one of the alternate addresses for a retried invocation.
*
* @param addresses List of alternate addresses if available
* @return the selected address
*/
String selectAlternateAddress(List<String> addresses);
/**
* should remove failed endpoint or not.
* only work for user defined addresses list.
* @return true or false
*/
boolean isRemoveFailedEndpoint();
/**
* change endpoint every time or not.
* @return boolean
*/
boolean isAlwaysChangeEndpoint();
/**
* remove failed address from list.
* @param address the failed address
*/
void removeAlternateAddress(String address);
}
RandomLoadBalanceStrategy繼承自 AbstractLoadBalanceStrategy, 和 RandomStrategy的區別就是獲取下一個服務地址時并不從列表中移除此地址, 否則就做不到負載均衡了.
3. 最重要的 LoadBalanceTargetSelector
A: 這個類比較復雜, 我們為了實現負載均衡, 修改了
prepare來動態設置調用的endpoint, 替換策略取決于LoadBalanceStrategy
主要代碼如下:
boolean existsEndpoint = false;
//check current endpoint is not null
Endpoint theEndpoint = exchange.get(Endpoint.class);
if (theEndpoint.getEndpointInfo().getAddress() != null) {
existsEndpoint = true;
}
Endpoint nextEndpoint;
if (getStrategy().isAlwaysChangeEndpoint() || !existsEndpoint) {
//get a endpoint and set to current endpoint
Endpoint loadBalanceTarget = getLoadBalanceTarget(exchange);
if (loadBalanceTarget != null) {
logger.info("switch to next target: " + loadBalanceTarget.getEndpointInfo().getAddress());
setEndpoint(loadBalanceTarget);
//update exchange.org.apache.cxf.message.Message.ENDPOINT_ADDRESS --- 不設置這個就用上次的
奇怪
message.put(Message.ENDPOINT_ADDRESS, loadBalanceTarget.getEndpointInfo().getAddress());
}
nextEndpoint = loadBalanceTarget;
} else {
//use current endpoint
nextEndpoint = theEndpoint;
}
B:為了和原有Failover特性兼容, 我們修改了
getFailoverTarget函數, 在此函數中要移除失敗的服務地址, 因為在之前我們修改了LoadBalanceStrategy, 它在獲取地址時不再移除當前地址, 所以我們需要手動移除.
部分代碼如下:
String currentAddress = getEndpoint().getEndpointInfo().getAddress();
//failover should remove current endpoint first, then get next -- 根據定義的策略來決定是否從全局地址列表中移除
if (getStrategy().isRemoveFailedEndpoint()) {
logger.warn("remove current failed address: " + currentAddress);
//remove for client, not for current invocation -- 沒有同步鎖
getStrategy().removeAlternateAddress(currentAddress);
}
//remove for current invocation: 當前請求中總是移除失敗服務地址
alternateAddresses.remove(currentAddress);
String alternateAddress =
getStrategy().selectAlternateAddress(alternateAddresses);
4. 調用實例:
此處我們采用XML定義方式:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jaxws="http://cxf.apache.org/jaxws"
xmlns:clustering="http://cxf.apache.org/clustering"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
<util:list id="addressList">
<value>http://localhost:8081/service/Hello</value>
<value>http://localhost:8082/service/Hello</value>
<value>http://localhost:8083/service/Hello</value>
<value>http://localhost:8086/service/Hello</value>
<value>http://localhost:8087/service/Hello</value>
<value>http://localhost:8088/service/Hello</value>
</util:list>
<bean id="SequentialAddresses" class="org.apache.cxf.clustering.SequentialStrategy">
<property name="alternateAddresses">
<ref bean="addressList"/>
</property>
</bean>
<bean id="randomAddresses" class="org.javascud.extensions.cxf.RandomLoadBalanceStrategy">
<property name="alternateAddresses">
<ref bean="addressList"/>
</property>
<property name="removeFailedEndpoint" value="true" />
</bean>
<bean id="loadBalanceFeature" class="org.javascud.extensions.cxf.LoadBalanceFeature">
<property name="strategy" ref="randomAddresses" />
</bean>
<jaxws:client name="helloClient"
serviceClass="org.javascud.extensions.cxf.service.Hello" >
<jaxws:features>
<ref bean="loadBalanceFeature" />
</jaxws:features>
</jaxws:client>
</beans>
8081, 8082, 8083是實際存在的服務, 其他的不存在.
調用的Java代碼:
package org.javascud.extensions.cxf.loadbalance;
import org.apache.cxf.endpoint.Client;
import org.apache.cxf.frontend.ClientProxy;
import org.javascud.extensions.cxf.LoadBalanceStrategy;
import org.javascud.extensions.cxf.service.Hello;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class HelloLoadBalanceAndFailOverClientByXML
{
public static void main(String[] args)
{
ClassPathXmlApplicationContext context
= new ClassPathXmlApplicationContext(new String[]
{"org/javascud/extensions/cxf/loadbalance/loadbalance_fail.xml"});
Hello client = (Hello) context.getBean("helloClient");
LoadBalanceStrategy strategy = (LoadBalanceStrategy) context.getBean("randomAddresses");
Client myclient = ClientProxy.getClient(client);
String address = myclient.getEndpoint().getEndpointInfo().getAddress();
System.out.println(address);
for(int i=1; i<=20; i++)
{
String result1 = client.sayHello("Felix" +i);
System.out.println("Call " + i +": " + result1);
int left = strategy.getAlternateAddresses(null).size();
System.out.println("================== left " + left + " ===========================");
}
}
}
此處僅僅為模擬測試.
5. 關于測試用例
沒想好如何寫單元測試, test里面目前都是隨意測試的代碼, 基本照顧到所有功能.
6. 下載
代碼下載: http://cnscud.googlecode.com/files/extensions-cxf_20101015.zip
源碼位置: http://cnscud.googlecode.com/svn/trunk/extensions/ 其中cxf目錄是此文章相關的源碼.
7. 有任何問題請留言.
轉載請注明作者和出處 http://scud.blogjava.net