本文概述了如何使用 Java 消息傳遞系統(JMS)進行大型文件的復制。Dan Drasin 描
述了解決 Applied Reasoning 公司客戶的分布式數據問題的方案,并提供了基于 JMS
的解決方案的實現細節。他討論了其中的優點、一些潛在缺陷以及將 IBM MQSeries(現
在稱為 WebSphere MQ)成功設置為 JMS 服務器的一些實際指示信息。
背景
在思考消息傳遞解決方案時,您可能會想到一個通過遠程消息調用機制來集成兩個不同
應用程序的系統。一般來講,對于不常通信的分布式實體以及數據傳輸量不是很多這樣
的情況,常常使用這種耦合。較經典的示例是,連接到異構后端和入口的同構接口,這
些后端和入口指派進行用戶請求的后端處理,然后為最終用戶表示而對那些請求進行重
新格式化。
消息傳遞方法中的公共線程一直有這樣的假定:雖然消息傳遞解決方案在系統之間提供
健壯、高度可用的通信,但它基本上效率很低,只用來作為在無法避免與外部系統通信
時的最后一種手段。在出現遠程方法調用(RMC)時關于消息傳遞的這種觀點就開始流行
一直到出現了更現代的象 CORBA 和 DCOM 那樣的消息傳遞解決方案,而且,通常所應用
的消息傳遞只局限于解決幾類問題。
目標
在過去的十年中,人們對分布式系統需求有了更深入的理解。新興技術(象 Java 和 .
NET)已經包含了代碼分布來作為它們基本編程模型的一部分。通過這樣做,這些技術已
將高度可用性和容錯性融入到消息傳遞中,同時鼓勵那些提供解決方案的供應商交付一
些系統,這些系統在更廣范圍的問題上考慮性能。
近來我們公司被要求實現文件分布和復制的解決方案,在以前這樣的方案需要集成安全
的 FTP、數據庫復制和其它一次性解決方案的定制系統。我們沒有一味地埋頭按照定制
開發的道路前進,而是研究了將最新的消息傳遞解決方案應用到這個問題的可能性。我
們發現 JMS 不僅為信息傳送提供必要的基礎結構,而且它還能處理我們客戶要求的、與
服務質量、安全性、可靠性和性能有關的所有基礎結構問題。本文描述了我們團隊面臨
的挑戰,以及 JMS(以 MQSeries 的形式)如何讓我們滿足并超越客戶的要求。
問題
我們的客戶面臨一個重大的分布式數據難題,在全國范圍內有許多呼叫中心,在全國各
地的呼叫中心里接線員要記錄與客戶之間的交互。必須快速可靠地在遠程數據中心為這
些記錄建立索引并存檔。建立索引和存檔的存儲過程不能影響接線員的系統記錄和存儲
接線員正在與客戶交互的信息的能力。該客戶已經有了一個包含組合起來的代碼、VPN
和其它技術的系統。但是,現有的解決方案遠遠達不到性能和可靠性上的目標,并且它
是一種拙劣的技術,難以理解并且維護費用很高。
在開發替代客戶原有系統時,我們考慮了 JMS 和多種非 JMS 的解決方案,尤其是那些
基于 FTP 和安全復制(SCP)的解決方案。然而,非 JMS 解決方案有兩個主要缺點:
它們對于安全性方面的缺陷一籌莫展。(FTP 上的安全性漏洞已經人人皆知,并且人們
對此已廣泛地作了記載。如果需要這方面的例子,請參閱參考資料。)
它們提供的基礎結構只適用于實際的數據傳送,而對于處理可靠性、容錯性、安全性、
平臺獨立性以及性能優化等問題,需要定制開發來解決。
我們團隊最后得出結論,對于添加這些額外的特性所需的開發工作是讓人望而卻步的,
因此我們決定選用 JMS 解決方案,它可以擺脫這些問題。
解決方案
我們開發了一個基于 JMS 的系統,它:
為已記錄的多媒體文件提供可靠存檔
支持可擴展性,可以使多個數據中心接收文件
支持對其它數據類型進行存檔
我們這里正討論的文件比以前那些涉及消息傳遞解決方案的項目中傳送的數據還要大(
50K - 500K)。我們第一個任務是確保數據大小不會影響 JMS 解決方案。通過測試系統
傳遞各種大小的消息有效負載時的性能,我們評估了包括 IBM MQSeries 在內的許多 J
MS 解決方案。結果顯示:經過適當配置,大小達到 1 兆的消息不會對整個系統性能產
生顯著影響。因為常識認為消息傳遞解決方案只適用于定期的、小的有效負載,所以我
們的結果是一個重大發現。我們繼續分析系統的體系結構(圖 1 中概述了此體系結構)
,它可以提供客戶需要的安全性、高可用性和可靠性。
圖 1. 高級系統體系結構
現有的基礎結構在每個客戶機上有一個系統,當接線員與用戶之間進行交互時,它創建
多媒體文件,以此作為響應。此外,還需對這些文件進行存檔。我們的系統啟動一個進
程(運行在每個機器上)并在已知目錄中查找這些文件。當檢測到新文件時,進程將它
們打包成 JMS 有效負載并發送到其中一個數據中心的 JMS 服務器以便傳遞。一旦 JMS
服務器確認收到,則除去發送方中的這些文件。JMS 服務器將該數據傳送到數據中心內
的一個可用處理程序上,進行存檔。
主要概念
JMS 是特定于 Java 的消息傳遞和排隊的實現。在消息傳遞和排隊中有兩個基本思想:
系統通過使用不連續的數據包進行通信,這些數據包都有一個有效負載(即要傳送的信
息)和屬性(即該信息的特征以及它應如何通信)。這個數據包稱為 消息。
消息不是被發送給系統,而是被發送到一個獨立的保存區域。可以根據您的需要確定保
存區域的數量,通過唯一的名稱,可以標識并定位它們。每個保存區域都可以接收消息
,并且根據配置的不同,該區域將每個消息要么傳遞給所有感興趣的系統(發布-訂閱)
,要么傳遞給第一個感興趣的系統(點對點)。這個保存區域稱為 目的地。
我們構建的系統采用點對點的目的地,在 JMS 中稱為隊列。排隊是圖 1 中顯示的系統
設計的一個重要方面。該圖顯示了消息正從 JMS 代理直接傳送到接收方的客戶機上,但
這并不十分準確。實際上,消息被傳送到一個隊列中,接收方客戶機從隊列中檢索它們
。稍后我們研究實現細節時,這個區別將變得非常重要,因為它讓系統并行地處理收到
的消息。
跨平臺和交叉供應商
對我們客戶機來說盡量減少對某家供應商的依賴,這意味著,我們所設計的代碼應該使
由于更改了 JMS 供應商而帶來的影響降至最低,這是十分重要的。JMS 的一個主要優點
是它以廣泛的業界支持和開放標準為基礎,因此有了正確設計的代碼,我們就可以讓系
統使用任何 JMS 系統。(可以對現有系統進行直接改進,專門設計來使系統在某套硬件
上運行并能與特定于供應商的解決方案相匹配。)
通過將所有特定于供應商的調用封裝在稱為 JMSProvider 的類中,就可以輕松實現平臺
獨立性。這些 Provider 類處理特定于供應商的問題,例如工廠查詢、錯誤處理、連接
創建和消息特性設置等。請參閱下面清單 1 中的示例代碼。
清單 1. 在類 ar.jms.JmsProvider 中
public QueueConnection createConnection() throws JMSException {
return getConnectionFactory().createQueueConnection(getUserName(),
getPassword());
}
通過利用“Java 命名和目錄接口(JNDI)”,我們將特定于供應商的設置存儲在一個資
源庫(例如,LDAP 庫)中,這樣實際代碼就幾乎不需要特定于供應商的引用。只需要少
量特定于供應商的代碼來處理一些特性,但是可以將這樣的代碼限定于一些“適配器”
類,并將它保存在應用程序代碼之外。請參閱下面清單 2 中的示例代碼。因為 JMS 被
設計用來方便地使用 JNDI,所以與其它解決方案相比,這是另一個直接優點 ― 配置信
息的集中存儲不僅可以保存基于文本的信息,而且還可以存儲已配置的對象。
清單 2. 在類 ar.jms.JmsProvider 中
public final static String
CONNECTION_FACTORY_LOOKUP_NAME_KEY = "CONNECTION_FACTORY_LOOKUP_NAME";
public final static
String FILE_TRANSFER_QUEUE_LOOKUP_NAME_KEY =
"FILE_TRANSFER_QUEUE_LOOKUP_NAME";
public final static String
JMS_PROVIDER_CLASS_KEY = "JMS_PROVIDER_CLASS";
public void init() throws NamingException {
InitialContext jndi = createInitialContext();
initConnectionFactory(jndi);
initFileTransferQueue(jndi);
}
public QueueConnection createConnection() throws JMSException {return
getConnectionFactory().createQueueConnection(getUserName(),
getPassword());
}
public void initConnectionFactory(InitialContext jndi) throws
NamingException {
setConnectionFactory((QueueConnectionFactory)jndi.lookup
(getProperties().getProperty(CONNECTION_FACTORY_LOOKUP_NAME
_KEY)));
}
public void initFileTransferQueue(InitialContext jndi) throws
NamingException {
setFileTransferQueue((Queue) jndi.lookup
(getProperties().getProperty(FILE_TRANSFER_QUEUE_LOOKUP_NAM
E_KEY)));
}
跳出傳統模式,JMS 解決方案允許以可靠的方式傳送消息,即一旦確認已將消息傳送到
JMS 服務器,就將它傳送至尋址到的目的地(隊列)。MQSeries 也不例外。一旦成功
執行了將消息發送到服務器的代碼,客戶機就保證目的地最終會接收到消息,即使所討
論的服務器在處理過程中出現故障(如果目的地暫時不可用,或者 JMS 服務器死機等等
)。請參閱下面清單 3 中的示例代碼。下面代碼中的類實際上負責一旦它確定需要發送
文件,就執行數據的發送。
通過將消息配置為持久消息,我們可以保證一旦目的地(隊列)接收到消息,那么消息
將保留在那里直到它在該隊列中被檢索為止 ― 即使在系統有故障期間。因此,一旦安
全地將消息傳送到本地 JMS 服務器,就可以刪除它了。不能過高估計克服系統故障的能
力;對周期性系統故障的處理是開發分布式存檔解決方案最重要的問題之一。客戶現有
系統上處理故障情況的代碼很復雜很脆弱,而且對這些故障的處理和維護費用很高。通
過一個健壯的、經測試成功的商業解決方案,JMS 使我們能解決所有這些問題。
清單 3. 來自類 ar.jms.file.send.ConnectionElement
public void sendMessage(byte[] payload, boolean persistent) throws
SendFailedException {
QueueSender sender = null;
try {
Message message = createMessage(payload);
sender = createSender
(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
sender.send(message);
getClient().getLogService().logInfo(getName() +
" sent message " + message.getJMSMessageID() + ".");
} catch (JMSException exception) {
getClient().getLogService().logError
("JMS exception processing " + getName(),exception);
stop();
throw new SendFailedException("JMS Message Send Failed");
}
try {
sender.close();
} catch (JMSException ignore) {
getClient().getLogService().logInfo(getName() + " failed to
close sender. Processing will continue.");
}
}
這個解決方案的關鍵是配置 JMS 消息和服務器來同時提供令人滿意的性能和服務質量。
JMS 規范定義了配置選項,并通過所有商業解決方案實現它們。但是,配置的確切方法
根據不同的供應商而有所不同。
設置
我們創建的體系結構和系統具有通用性且很強大。但是,對于一些移動部件,必須使用
正確的方式配置并鉤連它們。以下內容是有關將 MQSeries 成功地設置為 JMS 服務器的
概述、一些潛在缺陷和實際的指示信息。
對于 MQSeries,首先設置 JNDI 服務器來檢索特定于實現的設置,在這種情況下是 JM
S 連接工廠(JMS Connection Factory)。有許多不同方法來實現這個操作,但適宜的
通用選項是輕量級目錄訪問協議(LDAP)服務器。我們選擇使用 Qualcomm SLAPD。一旦
安裝好并運行該服務器,就可以用 MQSeries 管理工具(JMSAdmin.bat)來設置該服務
器并將其作為 MQ 對象信息庫來使用。請參閱參考資料,獲取有關講述該過程的實用書
籍的鏈接。同時,在設置期間,要特別注意在 IBM MQSeries 之上設置 JMS 的 IBM 文
檔,這很重要。這個過程涉及創建一些隊列和其它對象,這些隊列和對象是特定于 JMS
使用并且不屬于標準 MQSeries 安裝的。
完成 JNDI/LDAP 和 JMS 服務器的設置后,就可以準備配置客戶機了。第一步是理解 J
MS 如何與 IBM 的標準 MQSeries 實現交互。MQSeries 的 Java 客戶機能以兩種模式之
一進行交互:客戶機和綁定模式。只能通過 Java Applet 來使用客戶機模式,而綁定模
式取決于客戶機上的 DLL 或者對象庫。因為實現的特性,當使用用于 JMS 連接信息的
LDAP 服務器時,只能使用綁定模式。(不清楚為什么有這個限制,但它確實存在。)
因此,將用戶登錄和密碼存儲在一個全局位置(com.ibm.mq.MQEnvironment.class)而
不是在連接時傳遞它們。要解決這些供應商問題,我們創建了標準 JmsProvider 類(稱
為 MQSeriesProvider)的子類。這個類將完成的唯一操作是覆蓋如何創建連接。不象清
單 1 中那樣
newQueueConnection = getConnectionFactory().createQueueConnection(getUserNam
e(),getPassword));
進行調用,我們必須調用
newQueueConnection = getConnectionFactory().createQueueConnection();
最后,您需要將特定于 JMS 的元素(如隊列、隊列管理器、隊列工廠等等)提供給客戶
機。現在,使用 LDAP 和 JNDI 的原因就變得很明顯:我們使用 LDAP 服務器來存儲這
些元素并使用外部文件保存那些 LDAP 對象的鍵。LDAP 服務器可以作為 JNDI 服務器并
通過返回存儲的對象來響應名稱查詢。這就是清單 2 中代碼的工作原理。JMS 元素的名
稱可從類的靜態變量(對于缺省名稱)或者外部文件(使用非缺省的其它名稱)中獲取
。簡而言之,向 LDAP 服務器請求鍵(我們正討論的)中存儲的對象,并返回我們感興
趣的 JMS 對象(在這種情況下)。
我們基于 JMS 的解決方案通過使用現有的組件更方便地實現統一的、跨平臺和交叉供應
商的配置環境。現在我們的代碼已盡可能地成為獨立于特定于平臺和特定于供應商的設
置。
應用程序
應用程序中有兩個關鍵組件:發送器和接收器。發送器啟動一個后臺程序,它在目錄中
輪詢需要存檔的文件,而接收器只是等待將要傳遞的 JMS 消息,然后將該消息中包含的
文件存檔。JMS API 使我們幾乎無需關注正使用的特定 JMS 實現就可以定義這些組件。
發送器由三個主要部件組成:
JMSProvider,用于創建連接
ConnectionPool,用于獲取現有的空閑連接(我們稱之為 JMSConnection)
輪詢程序,監視需要傳送的文件。
在啟動時,使用 JMSProvider 來創建一些到 JMS 服務器的準備就緒的連接。這些連接
放置在池中,然后啟動輪詢程序。當輪詢程序檢測到需要傳送文件時,它就創建一個獨
立線程來處理這個文件。(可以通過派生(forking),創建一個獨立的線程來創建消息
和進行傳送操作,描述該過程非常簡單。但在實際應用中,常常將合用(pooling)與循
環組合起來使用,這樣可以確保很少創建新線程,而是重用線程。但是,那個過程相當
復雜,過多的說明會分散本文的中心主題 ― JMS。)
在獨立線程中,輪詢程序接著從連接池中獲取 JMSConnection,用它來創建一個 Bytes
Message,并將這個文件的二進制內容放入那個消息中。最后這個消息查找到接收器,并
發送到 JMS 服務器,接著將 JMSConnection 返回給 ConnectionPool。這個發送過程的
部分步驟顯示在下面的圖 2 中。
圖 2. 發送器過程
接收器是一個較簡單的組件;它啟動一些 FileListener 來等待將要放置在接收器隊列
中的消息。下面的清單 4 中的代碼顯示了 FileListener 設置處理過程。圖 6 中的類
實際上負責從隊列中檢索消息并對它們進行存檔。JMS 保證隊列發送每個消息的次數僅
一次,所以我們可以安全啟動許多不同的 FileListener 線程并且知道每個消息(因此
每個文件)只處理一次。這個保證是使用基于 JMS 解決方案的另一個重要優點。在自己
設計的解決方案中開發這樣的功能(比如基于 FTP 的功能),花銷很大且易出錯。
清單 4:來自類 ar.jms.file.receive.FileListener public void startOn(Queue qu
eue) {
setQueue(queue);
createConnection();
try {
createSession();
createReceiver();
getConnection().start(); // this starts
the queue listener
} catch (JMSException exception) {
// Handle the exception
}
}
public void createReceiver() throws javax.jms.JMSException {
try {
QueueReceiver receiver = getSession().
createReceiver(getQueue());
receiver.setMessageListener(this);
} catch (JMSException exception) {
// Handle the exception
}
}
public void createSession() throws JMSException {
setSession(getConnection().
createQueueSession(false, Session.AUTO_ACKNOWLEDGE));
}
public void createConnection() {
while (!hasConnection()) {
try {
setConnection(getClient().createConnection());
} catch (JMSException exception) {
// Connections drop periodically on the
internet, log and try again.
try {
Thread.sleep(2000);
} catch
(java.lang.InterruptedException ignored) {
}
}
}
}
以回調的方式編寫消息處理代碼,回調是當將消息傳遞給 FileListener 時,JMS 自動
調用的方法。這個消息的代碼顯示在下面的清單 5 中。
清單 5. 來自類 ar.jms.file.receive.FileListener public void onMessage(Messag
e message) {
BytesMessage byteMessage = ((BytesMessage) message);
OutputStream stream =
new BufferedOutputStream(
new FileOutputStream(getFilenameFor(message)));
byte[] buffer = new byte[getFileBufferSize()];
int length = 0;
try {
while ((length = byteMessage.readBytes(buffer)) != -1) {
stream.write(buffer, 0, length);
}
stream.close();
} catch (JMSException exception) {
// Handle the JMSException
} catch (IOException exception) {
// Handle the IOException
}
}
在設置接收器時要記住一條訣竅:在所有 FileListener 啟動后,確保啟動這些 FileL
istener 的原始線程繼續運行。這是必要的,因為某些 JMS 實現在守護程序的線程中啟
動 QueueListener。所以,如果正在運行的唯一線程是守護程序的線程,那么 Java 虛
擬機(JVM)可能會過早地意外退出。下面的清單 6 顯示了一些防止這種情況發生的簡
單代碼。
清單 6. 至少使一個非守護程序的線程保持運行 public static void main(String[]
args) {
ReceiverClient newReceiverClient = new ReceiverClient();
newReceiverClient.init();
setSoleInstance(newReceiverClient);
while(!finished) { // This prevents the VM from exiting early
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
}
}
}
結束語
在該項目的最初實現之后,我們添加了一些功能,象消息壓縮、當位置無法到達時的自
動恢復、聯合消息代理、安全性、健壯的日志記錄、管理等等。添加這些功能很容易,
因為 JMS 提供了開放模型,而且我們的體系結構也很健壯。構建整個系統花了六個星期
的時間,并且還很快地替換了客戶一直使用的現有的、勞動密集型的系統。在這些天里
,系統已經超出了所有的基準測試程序的標準并且已更正了原來系統遺留下來的錯誤。
這個項目不單超出了客戶的期望,還證明了 JMS 是一個可行的解決方案,不僅適用于小
型、面向消息的應用程序,而且還適用于大規模的、重要任務的數據傳送操作。
參考資料
獲取更多信息或者下載 Qualcomm slapd。
訪問 JMS 主頁。
回顧 FTP 的安全性問題。
查閱 Giotta 等人編寫的 JMS 設置一書:“Professional JMS Programming”。
學習有關 IBM MQSeries 的更多內容或者下載測試版本。
閱讀 developerWorks 上的相關文章:“Java theory and practice: Should you use
JMS in your next entERPrise application?”。
閱讀 developerWorks 上 Daniel 以前的文章:“The profound impact of hardware,
software, and networking decisions”。