@import url(http://www.tkk7.com/CuteSoft_Client/CuteEditor/Load.ashx?type=style&file=SyntaxHighlighter.css);@import url(/css/cuteeditor.css); 面向服務(wù)的架構(gòu)和事件驅(qū)動(dòng)的架構(gòu)天生就有著對分布式系統(tǒng)的適應(yīng)性,這些架構(gòu)都有著模塊性、松散耦合,和適應(yīng)性等特性。
“面向服務(wù)”這個(gè)術(shù)語已經(jīng)演變成一個(gè)架構(gòu),在那里服務(wù)作為一個(gè)軟件組件嵌入在企業(yè)業(yè)務(wù)邏輯和特新的核心中,特性如下:
·        松散耦合:服務(wù)部與其它組件有著根深蒂固的關(guān)系
·        協(xié)議獨(dú)立:多種協(xié)議透明訪問
·        位置不可知:一個(gè)服務(wù)執(zhí)行一組業(yè)務(wù)邏輯,針對這次調(diào)用返回一個(gè)結(jié)果
·        粗粒度:不論在什么位置均可訪問該服務(wù)。
·        維護(hù)無用戶狀態(tài)


一個(gè)事件驅(qū)動(dòng)框架(EDA)定義了一個(gè)設(shè)計(jì)和實(shí)現(xiàn)一個(gè)應(yīng)用系統(tǒng)得方法學(xué),在這個(gè)系統(tǒng)里事件可傳輸于松散耦合的軟件組件和服務(wù)之間。一個(gè)事件驅(qū)動(dòng)系統(tǒng)典型地由事件消費(fèi)者和事件產(chǎn)生者組成。事件消費(fèi)者向事件管理器訂閱事件,事件產(chǎn)生者向事件管理器發(fā)布事件。當(dāng)事件管理器從事件產(chǎn)生者那接收到一個(gè)事件時(shí),事件管理把這個(gè)事件轉(zhuǎn)送給相應(yīng)的事件消費(fèi)者。如果這個(gè)事件消費(fèi)者是不可用的,事件管理這將保留這個(gè)事件,一段間隔之后再次轉(zhuǎn)送該事件消費(fèi)者。這種事件傳送方法在基于消息的系統(tǒng)里就是:儲存(store)和轉(zhuǎn)送(forward)。

構(gòu)建一個(gè)包含事件驅(qū)動(dòng)構(gòu)架的應(yīng)用程序和系統(tǒng),這樣就使得這些應(yīng)用程序和系統(tǒng)響應(yīng)更靈敏,因?yàn)槭录?qū)動(dòng)的系統(tǒng)更適合應(yīng)用在不可預(yù)知的和異步的環(huán)境里。

事件驅(qū)動(dòng)設(shè)計(jì)和開發(fā)的優(yōu)勢:
事件驅(qū)動(dòng)設(shè)計(jì)和開發(fā)所提供的優(yōu)勢如下:
·        可以更容易開發(fā)和維護(hù)大規(guī)模分布式應(yīng)用程序和不可預(yù)知的服務(wù)或異步服務(wù)
·        可以很容易,低成本地集成、再集成、再配置新的和已存在的英勇程序和服務(wù)
·        促進(jìn)遠(yuǎn)程組件和服務(wù)的再使用,擁有一個(gè)更靈敏、沒有Bug的開發(fā)環(huán)境
·        短期利益:更容易定制。因?yàn)樵O(shè)計(jì)對動(dòng)態(tài)處理又更好的響應(yīng)。
·        長期利益:系統(tǒng)和組織的狀態(tài)變得更精準(zhǔn),對實(shí)時(shí)變化的響應(yīng)接近于同步

Mule是一個(gè)開源消息ESB框架,一個(gè)消息代理,一個(gè)分級事件驅(qū)動(dòng)的框架(SEDA)。SEDA定義了一個(gè)依照分級隊(duì)列、高度并行的企業(yè)級平臺。Mule使用SED的概念增加事件處理的性能。

Mule事件對象
Mule事件對象包含事件數(shù)據(jù)和被組件所感知和操控的屬性。屬性是任意的,在事件創(chuàng)建之后任何時(shí)間可被設(shè)置。
org.mule.umo.UMOEvent類代表了一個(gè)在Mule環(huán)境中出現(xiàn)的時(shí)間。所有在組件之間發(fā)送或接收的數(shù)據(jù)都是org.mule.umo.UMOEvent的一個(gè)實(shí)體。可以訪問一個(gè)原始的或被轉(zhuǎn)換的Mule事件對象中的數(shù)據(jù)能。一個(gè)Mule事件對象使用一個(gè)與提供者管理的提供者轉(zhuǎn)換數(shù)據(jù),提供者收到數(shù)據(jù)后把事件中的有效載荷轉(zhuǎn)換成當(dāng)前組件所識別的格式。

一個(gè)Mule事件對象的有效載荷能通過org.mule.umo.UMOMessage接口訪問,一個(gè)org.mule.umo.UMOMessage實(shí)例由有效載荷和它的屬性組成。這個(gè)接口是不同技術(shù)實(shí)現(xiàn)的消息對象的一個(gè)抽象。

org.mule.extras.client.MuleClient類定義了一個(gè)簡單的借口,允許Mule客戶端從Mule服務(wù)器接收和發(fā)送事件數(shù)據(jù)。在大多數(shù)Mule應(yīng)用程序里,事件是被一些外部的并發(fā)行為所觸發(fā),例如一個(gè)主題上接收到消息或在目錄里一個(gè)文件被刪除。

Mule支持同步、異步和請求響應(yīng)事件,事件處理和傳輸實(shí)用不同的技術(shù)例如JMS,HTTP,電子郵件和基于XML的RPC。Mule能很容易地嵌入到任何應(yīng)用框架中,明確支持Spring框架。Mule也支持動(dòng)態(tài)的,預(yù)定義的,基于內(nèi)容的和基于規(guī)則的消息路由。Mule使得預(yù)定義的和計(jì)劃性的事務(wù)更容易,包括XA事務(wù)支持。Mule提供一個(gè)有代表性的狀態(tài)調(diào)用(REST)API提供給與Web的事件訪問。

Mule ESB模式驅(qū)動(dòng)系統(tǒng)中所有服務(wù),這個(gè)系統(tǒng)有著一個(gè)分離的消息通訊中樞。服務(wù)注冊在總線上,但不知道其他任何被注冊的消息;因此,每個(gè)服務(wù)只關(guān)心處理它收到的事件。Mule也把容器,傳輸,轉(zhuǎn)換細(xì)節(jié)從服務(wù)中分離出來,允許任何對象作為服務(wù)注冊到總線的。

下面演示了如何去發(fā)送一個(gè)同步事件到另外的Mule組件:
String componentName = "MyReceiver";     // The name of the receiving component. 
String transformers = null;                      // A comma-separated list of transformers
                                                         
// to apply to the result message. 
String payload = "A test event";              // The payload of the event. 
java.util.Map messageProperties = null;    // Any properties to be associated
                                                        
// with the payload.
MuleClient client = new MuleClient();
UMOMessage message 
= client.sendDirect(componentName,
                                       transformers,
                                       payload,
                                       messageProperties);
System.out.println(
"Event result: " + message.getPayloadAsString())

MuleClient類需要一個(gè)服務(wù)器URL區(qū)定義它所連接的遠(yuǎn)程Mule服務(wù)器的終端。URL定義了傳輸協(xié)議、接收消息的地址,提供者在派遣一個(gè)事件時(shí)可以隨時(shí)使用這些信息。終端例示如下:
·        vm://com.jeffhanson.receivers.Default: 使用虛擬機(jī)的提供者派遣到一個(gè)com.jeffhanson.receivers.Default
·        jms://jmsProvider/accounts.topic:使用全局注冊的jmsProvider派遣一個(gè)JMS消息到ccounts.topic.
·        jms://accounts.topic: 使用第一個(gè)(默認(rèn))的JMS提供者派遣JMS消息

Mule事件處理
Mule可以在三種不同的方式發(fā)送和接收事件:
1.異步方式:一個(gè)組件可通過不同的線程同時(shí)處理多個(gè)事件的發(fā)送和接收
2.同步方式:在一個(gè)組件重新工作之前,一個(gè)單一的事件必須被處理完。換言之,一個(gè)創(chuàng)建了事件的組件發(fā)送事件時(shí)將被阻斷,直到發(fā)送任務(wù)完成,因此,一次只允許處理一個(gè)事件
3.請求-應(yīng)答方式:一個(gè)組件專門請求一個(gè)事件,然后等待一個(gè)特定的時(shí)間去接收回應(yīng)。
org.mule.impl.MuleComponent實(shí)現(xiàn)類提供了一個(gè)具體的組件類,它包括又有創(chuàng)建,發(fā)送和接收事件的功能。
執(zhí)行同步動(dòng)作的對象應(yīng)該實(shí)現(xiàn)org.mule.umo.lifecycle.Callable接口,這個(gè)定義了一個(gè)簡單的方法Object onCall(UMOEventContext eventContext)。Callable接口提供支持事件調(diào)用的UMO對象。雖然不是強(qiáng)制的,但這個(gè)接口提供了一個(gè)生命周期控制的方法,當(dāng)實(shí)現(xiàn)這個(gè)接口的組建接收到一個(gè)消息時(shí)執(zhí)行這個(gè)方法。下面展示了這個(gè)接口的簡單實(shí)現(xiàn)。
import org.mule.umo.lifecycle.Callable;

public class EchoComponent
   
implements Callable
{
    
public Object onCall(UMOEventContext context) throws Exception
    {
        String msg 
= context.getMessageAsString();
        
// Print message to System.out
        System.out.println("Received synchronous message: " + msg);
        
// Echo transformed message back to sender
        return context.getTransformedMessage();
    }
}

    從onCall()方法可返回任何對象。當(dāng)組件的UMOLifecycleAdapter接收這個(gè)對象時(shí),它首先看看這個(gè)對象是否是一個(gè)UMOMessage;如果這個(gè)對象既不是UMOMessage也不是Null,那么以這個(gè)對象作為有效載荷去創(chuàng)建一個(gè)新的消息。這個(gè)新事件經(jīng)由所配制的出站路有器發(fā)布,如果UMO對象已經(jīng)配制了一個(gè)出站路由器,那么在UMOEventContext實(shí)例中不能調(diào)用setStopFurtherProcessing(true)方法

Mule使用的一個(gè)簡單的事件框架
讓我們把這幾段Mule的代碼放到一起去構(gòu)建一個(gè)簡單的事件框架。這個(gè)框架包含一個(gè)負(fù)責(zé)注冊和注銷事件的管理器,可以接收事件,和負(fù)責(zé)路由同步和異步消息到他們相應(yīng)的服務(wù)。

Mule的虛擬機(jī)協(xié)議要求有一個(gè)放置事件管理器工作目錄META-INF/services/org/mule/providers/vm路徑下的可配制文件,配制文件為協(xié)議定義了大量的組件,例如連接器和調(diào)度工廠。配制文件的內(nèi)容如下:
connector=org.mule.providers.vm.VMConnector
dispatcher.factory
=org.mule.providers.vm.VMMessageDispatcherFactory
message.receiver
=org.mule.providers.vm.VMMessageReceiver
message.adapter
=org.mule.providers.vm.VMMessageAdapter
endpoint.builder
=org.mule.impl.endpoint.ResourceNameEndpointBuilder

一個(gè)簡單的接口定義了事件管理器的公有結(jié)構(gòu):
package com.jeffhanson.mule;

import org.mule.umo.FutureMessageResult;

public interface EventManager{
   
/**
    * Sends an event message synchronously to a given service.
    *
    * 
@param serviceName    The name of the service to which the event
    *                       message is to be sent.
    * 
@param payload        The content of the event message.
    * 
@return Object          The result, if any.
    * 
@throws EventException on error
    
*/
   
public Object sendSynchronousEvent(String serviceName,  Object payload) throws EventException;

   
/**
    * Sends an event message asynchronously to a given service.
    *
    * 
@param serviceName    The name of the service to which the event message is to be sent.
    * 
@param payload           The content of the event message.
    * 
@return FutureMessageResult The result, if any.
    * 
@throws EventException on error
    
*/
   
public FutureMessageResult sendAsynchronousEvent(String serviceName,  Object payload) throws EventException;

   
/**
    * Starts this event manager.
    
*/
   
public void start();

   
/**
    * Stops this event manager.
    
*/
   
public void stop();

   
/**
    * Retrieves the protocol this event manager uses.
    * 
@return
    
*/
   
public String getProtocol();

   
/**
    * Registers a service to receive event messages.
    *
    * 
@param serviceName      The name to associate with the service.
    * 
@param implementation   Either a container reference to the service   or a fully-qualified class name.
    
*/
   
public void registerService(String serviceName, String implementation) throws EventException;

   
/**
    * Unregisters a service from receiving event messages.
    *
    * 
@param serviceName  The name associated with the service to unregister.
    
*/
   
public void unregisterService(String serviceName)  throws EventException;
}

事件管理器類是被封裝在一個(gè)工廠類里,因此,可以依據(jù)需要去改變它的實(shí)現(xiàn)而不會(huì)影響到它的客戶端。事件管理器實(shí)現(xiàn)如下:
package com.jeffhanson.mule;

import org.mule.umo.*;
import org.mule.extras.client.MuleClient;
import org.mule.impl.endpoint.MuleEndpoint;
import org.mule.config.QuickConfigurationBuilder;

import java.util.HashMap;
import java.util.Map;

public class EventManagerFactory{
   
private static HashMap instances = new HashMap();
   
/**
    * Retrieves the event manager instance for a given protocol.
    *
    * 
@param protocol      The protocol to use.
    * 
@return EventManager The event manager instance.
    
*/
   
public static EventManager getInstance(String protocol){
      EventManager instance 
= (EventManager)instances.get(protocol);
      
if (instance == null){
         instance 
= new EventManagerImpl(protocol);
         instances.put(protocol, instance);
      }

      
return instance;
   }

   
/**
    * A concrete implementation for a simple event manager.
    
*/
   
private static class EventManagerImpl  implements EventManager {
      
private UMOManager manager = null;
      
private QuickConfigurationBuilder builder = null;
      
private MuleClient eventClient = null;
      
private String protocol = null;
      
private MuleEndpoint receiveEndpoint = null;
      
private MuleEndpoint sendEndpoint = null;

      
private EventManagerImpl(String protocol){
         
this.protocol = protocol;
      }

      
/**
       * Starts this event manager.
       
*/
      
public void start() {
         
try {
            builder 
= new QuickConfigurationBuilder();
            manager 
= builder.createStartedManager(true,  protocol + "tmp/events");
            eventClient 
= new MuleClient();
            receiveEndpoint 
= new MuleEndpoint(protocol + "tmp/events/receive");
            sendEndpoint 
= new MuleEndpoint(protocol + "tmp/events/send");
         }
         
catch (UMOException e) {
            System.err.println(e);
         }
      }

      
/**
       * Stops this event manager.
       
*/
      
public void stop() {
         
try {
            manager.stop();
         }
         
catch (UMOException e)  {
            System.err.println(e);
         }
      }

      
/**
       * Retrieves the protocol this event manager uses.
       * 
@return
       
*/
      
public String getProtocol()   {
         
return protocol;
      }

      
/**
       * Registers a service to receive event messages.
       *
       * 
@param serviceName      The name to associate with the service.
       * 
@param implementation   Either a container reference to the service
       *                         or a fully-qualified class name
       *                         to use as the component implementation.
       
*/
      
public void registerService(String serviceName,   String implementation) throws EventException  {
         
if (!manager.getModel().isComponentRegistered(serviceName))    {
            
try   {
               builder.registerComponent(implementation,  serviceName,   receiveEndpoint, sendEndpoint);
            }  
catch (UMOException e) {
               
throw new EventException(e.toString());
            }
         }
      }

      
/**
       * Unregisters a service from receiving event messages.
       *
       * 
@param serviceName  The name associated with the service to unregister.
       
*/
      
public void unregisterService(String serviceName)   throws EventException {
         
try  {
            builder.unregisterComponent(serviceName);
         }
         
catch (UMOException e)   {
            
throw new EventException(e.toString());
         }
      }

      
/**
       * Sends an event message synchronously to a given service.
       *
       * 
@param serviceName    The name of the service to which the event
       *                       message is to be sent.
       * 
@param payload        The content of the event message
       * 
@return Object        The result, if any.
       * 
@throws EventException on error
       
*/
      
public Object sendSynchronousEvent(String serviceName,  Object payload)  throws EventException  {
         
try    {
            
if (!manager.getModel().isComponentRegistered(serviceName)){
               
throw new EventException("Service: " + serviceName  + " is not registered.");
            }

            String transformers 
= null;
            Map messageProperties 
= null;
            UMOMessage result 
= eventClient.sendDirect(serviceName,  transformers,  payload, messageProperties);
            
if (result == null)     {
                 
return null;
            }
            
return result.getPayload();
         }  
catch (UMOException e)   {
            
throw new EventException(e.toString());
         }   
catch (Exception e)  {
            
throw new EventException(e.toString());
         }
      }

      
/**
       * Sends an event message asynchronously.
       *
       * 
@param serviceName    The name of the service to which the event
       *                       message is to be sent.
       * 
@param payload        The content of the event message.
       * 
@return FutureMessageResult The result, if any
       * 
@throws EventException on error
       
*/
      
public FutureMessageResult sendAsynchronousEvent(String serviceName,   Object payload)  throws EventException  {
         FutureMessageResult result 
= null;
         
try   {
            
if (!manager.getModel().isComponentRegistered(serviceName))  {
               
throw new EventException("Service: " + serviceName + " is not registered.");
            }

            String transformers 
= null;
            Map messageProperties 
= null;
            result 
= eventClient.sendDirectAsync(serviceName,   transformers,   payload,  messageProperties);
         }   
catch (UMOException e) {
            
throw new EventException(e.toString());
         }

         
return result;
      }
   }
}

Mule框架依據(jù)消息有效載荷的類型來派遣消息。事件框架使用基于有效載荷的派遣機(jī)制,這種派遣機(jī)制把注冊到事件管理器中一般定義的事件方法作為事件接收器。下面的類定義了一個(gè)包含三個(gè)重載的receiveEvent()方法的服務(wù):
package com.jeffhanson.mule;

import java.util.Date;

public class TestService{
   
public void receiveEvent(String eventMessage){
      System.out.println(
"
           TestService.receiveEvent(String) received 
" + "event message:  " + eventMessage + "");
     }

   
public void receiveEvent(Integer eventMessage){
      System.out.println(
"
      TestService.receiveEvent(Integer) received 
"  +"event message:  " + eventMessage + "");
    }

   
public void receiveEvent(Date eventMessage){
      System.out.println(
"
      TestService.receiveEvent(Date) received " + "event message:  " + eventMessage + "");
   }
}

事件管理器客戶端應(yīng)用程序發(fā)送三個(gè)事件到測試服務(wù)中,去測試每一個(gè)receiveEvent()方法??蛻舳藨?yīng)用程序如下:
package com.jeffhanson.mule;

import org.apache.log4j.Logger;
import org.apache.log4j.Level;
import org.apache.log4j.BasicConfigurator;

import java.util.Date;

public class EventClient{
   
static Logger logger = Logger.getLogger(EventClient.class);
   
public static void main(String[] args)  {
      
// Set up a simple configuration that logs on the console.
      BasicConfigurator.configure();
      logger.setLevel(Level.ALL);
      
try  {
         EventManager eventManager 
= EventManagerFactory.getInstance("vm://");
         eventManager.start();

         String serviceName 
= TestService.class.getName();
         String implementation 
= serviceName;

         eventManager.registerService(serviceName, implementation);

         Object result 
=  eventManager.sendSynchronousEvent(serviceName, "A test message");

         
if (result != null)  {
            System.out.println(
"Event result: " + result.toString());
         }

         result 
=   eventManager.sendSynchronousEvent(serviceName, new Integer(23456));
         
if (result != null)  {
            System.out.println(
"Event result: " + result.toString());
         }

         result 
=  eventManager.sendSynchronousEvent(serviceName, new Date());

         
if (result != null) {
            System.out.println(
"Event result: " + result.toString());
         }

         eventManager.stop();
      }
      
catch (EventException e)
      {
         System.err.println(e.toString());
      }
   }
}

Mule平臺簡化和抽象了前面所敘述框架的事件方面的處理,使得你發(fā)送和接收穿越一個(gè)層級結(jié)構(gòu)的同步和異步消息時(shí),不需要知道下層系統(tǒng)的細(xì)節(jié)。工廠模式和SOA準(zhǔn)則的應(yīng)用,則使得這個(gè)框架有了一個(gè)松散耦合和可擴(kuò)展的設(shè)計(jì)。

總結(jié)
當(dāng)服務(wù)和進(jìn)程需要穿越多層結(jié)構(gòu),使用多種協(xié)議去交互時(shí),設(shè)計(jì)一個(gè)有效地事件驅(qū)動(dòng)的軟件系統(tǒng)可能變得復(fù)雜了??墒牵粋€(gè)使用標(biāo)準(zhǔn)模式包含適當(dāng)事件管理層的面向服務(wù)架構(gòu)能減少,甚至消除這些問題。

Mule 平臺提供API,組件和抽象對象,這些都可以用于去建立一個(gè)強(qiáng)大,健壯,事件驅(qū)動(dòng)的有著良好的伸縮性和可維護(hù)性的系統(tǒng)。

@import url(http://www.tkk7.com/CuteSoft_Client/CuteEditor/Load.ashx?type=style&file=SyntaxHighlighter.css);@import url(/css/cuteeditor.css);