IBM的WebSphere?MQ產(chǎn)品可以用來方便地實(shí)現(xiàn)分布式異構(gòu)系統(tǒng)之間的消息傳遞。對(duì)于大型的分布式系統(tǒng),使用MQ進(jìn)行數(shù)據(jù)通信是可以說是非常有效的,而且適用于異構(gòu)環(huán)境(如NT和多種UNIX之間通信)。本文主要介紹:MQ的核心組件介紹、MQ環(huán)境的搭建以及利用JAVA對(duì)MQ隊(duì)列管理器的操作的程序設(shè)計(jì),希望能起到拋磚引玉的作用。
第一部分、MQ的核心組件介紹
??? MQ的核心組件包括:隊(duì)列管理器(QueueManager)、隊(duì)列(Queue)、通道(Channel)、消息(Message)和集群(Cluster)。
隊(duì)列管理器(QueueManager)提供隊(duì)列服務(wù),管理屬于該隊(duì)列管理器的隊(duì)列和通道等所有MQ對(duì)象。
隊(duì)列(Queue)是用于存儲(chǔ)消息(Message)的數(shù)據(jù)結(jié)構(gòu),有四種類型:本地隊(duì)列(LocalQueue)、遠(yuǎn)程隊(duì)列(RemoteQueue)、別名隊(duì)列(AliasQueue)和模型隊(duì)列(ModelQueue),最常用到的是本地隊(duì)列和遠(yuǎn)程隊(duì)列。
通道(Channel)是提供了從一個(gè)隊(duì)列管理器到其他隊(duì)列管理器的數(shù)據(jù)傳輸路徑。通道類型有若干種,其中常用的是發(fā)送方通道(SenderChannel)和接收方通道(ReceiverChannel)。
消息(Message)是應(yīng)用程序之間傳遞的一系列字節(jié)數(shù)據(jù),MQ傳遞的消息有兩部分組成:消息描述符(MessageDescriptor)和應(yīng)用數(shù)據(jù)(ApplicationData)。默認(rèn)最大傳遞的消息大小是4MB,可以根據(jù)需要進(jìn)行設(shè)置,最大可到100MB。
集群(Cluster)是分布式網(wǎng)絡(luò)上的多個(gè)隊(duì)列管理器的集合。(本文不涉及集群的具體內(nèi)容)
第二部分、MQ環(huán)境的搭建
本文搭建的環(huán)境以Windows平臺(tái)為例,涉及其他平臺(tái)的請(qǐng)讀者查閱相關(guān)文檔。
具體搭建步驟:
1、根據(jù)安裝向?qū)О惭bIBM?WebSphere?MQ?v5.3軟件,安裝路徑為:D:\IBM\WebSphere?MQ。
2、安裝成功后,請(qǐng)使用命令echo?%classpath%檢查classpath變量中是否已經(jīng)把D:\IBM\WebSphere?MQ\Java\lib下面的jar文件包含進(jìn)來,如沒有包括請(qǐng)進(jìn)行手工添加,本文要用到的2個(gè)關(guān)鍵的是:com.ibm.mq.jar和connector.jar。使用echo?%path%檢查path變量中是否已經(jīng)把D:\IBM\WebSphere?MQ\bin包含進(jìn)來,如沒有包括請(qǐng)進(jìn)行手工添加。
3、創(chuàng)建一個(gè)配置文本文件,文件名為config.txt,內(nèi)容如下(請(qǐng)讀者到附件下載):
*?更改QM的字符集編碼(CCSID)
????ALTER?QMGR?FORCE?CCSID(1381)
*?定義本地隊(duì)列
????DEFINE?QLOCAL('LQ_SAMPLE')?REPLACE?+
???????USAGE(normal)?+
???????DEFPSIST(YES)?
4、創(chuàng)建一個(gè)批處理文件,文件名為mqsetup.bat,內(nèi)容如下(請(qǐng)讀者到附件下載):
rem?創(chuàng)建缺省隊(duì)列管理器,擁有100個(gè)句柄,使用線性循環(huán)日志,容量為?1024?×?4?K/文件,主文件10個(gè),輔文件20個(gè)
echo?Creating?QM_SAMPLE?
crtmqm?-t?5000?-h?100?-lc?-lf?1024?-lp?10?-ls?20?-q?QM_SAMPLE
rem?設(shè)置cpu個(gè)數(shù)為1
setmqcap?1
rem?啟動(dòng)隊(duì)列管理器
echo?Starting?Queue?Manager
strmqm?QM_SAMPLE
rem?從配置文件中讀入初始化命令
echo?Running?config?
runmqsc?QM_SAMPLE?<?config.txt
rem?停止隊(duì)列管理器
amqmdain?end?QM_SAMPLE
rem?將隊(duì)列管理器設(shè)置為自動(dòng)啟動(dòng)
amqmdain?auto?QM_SAMPLE
rem?創(chuàng)建隊(duì)列偵聽器,使用1414端口
amqmdain?crtlsr?QM_SAMPLE?-t?TCP?-p?1414
rem?修改MQ參數(shù),采用AdoptNewMCA方式
amqmdain?reg?QM_SAMPLE?-c?add?-s?Channels?-v?AdoptNewMCA=ALL
rem?修改MQ參數(shù),采用KeepAlive方式
amqmdain?reg?QM_SAMPLE?-c?add?-s?TCP?-v?KeepAlive=Yes
rem?重新啟動(dòng)隊(duì)列管理器
amqmdain?start?QM_SAMPLE
5、運(yùn)行mqsetup.bat,檢查運(yùn)行結(jié)果輸出是否無誤,如有錯(cuò)誤,請(qǐng)仔細(xì)根據(jù)上述步驟進(jìn)行檢查并糾錯(cuò)。
6、在命令窗口中,輸入dspmq,看是否顯示如下結(jié)果:
QMNAME(QM_SAMPLE)???????????????????????????????????STATUS(正在運(yùn)行)
7、在命令窗口中,輸入runmqsc回車,進(jìn)入mq交互操作環(huán)境,輸入display?queue(LQ_SAMPLE),看是否顯示如下結(jié)果:
AMQ8409:?顯示隊(duì)列細(xì)節(jié)。
???DESCR(WebSphere?MQ?Default?Local?Queue)
???PROCESS(?)??????????????????????????????BOQNAME(?)
???INITQ(?)????????????????????????????????TRIGDATA(?)
???CLUSTER(?)??????????????????????????????CLUSNL(?)
???QUEUE(LQ_SAMPLE)????????????????????????CRDATE(2006-10-31)
???CRTIME(16.17.01)????????????????????????ALTDATE(2006-10-31)
8、輸入end退出mq交互操作環(huán)境。
自此,NT平臺(tái)上的最基本的MQ環(huán)境搭建完成了。
第三部分、利用JAVA對(duì)MQ隊(duì)列管理器的操作的程序設(shè)計(jì)
本文涉及的程序在JDK?1.4.2上測(cè)試通過。文件名為MQSample.java,程序內(nèi)容如下:
import?java.io.IOException;
import?com.ibm.mq.MQC;
import?com.ibm.mq.MQException;
import?com.ibm.mq.MQGetMessageOptions;
import?com.ibm.mq.MQMessage;
import?com.ibm.mq.MQPutMessageOptions;
import?com.ibm.mq.MQQueue;
import?com.ibm.mq.MQQueueManager;
public?class?MQSample{
????//定義隊(duì)列管理器和隊(duì)列的名稱
????private?static?String?qmName;?
????private?static?String?qName;
????
????public?static?void?main(String?args[])?{
????????try{
????????????//第一個(gè)參數(shù)是隊(duì)列管理器名,第二個(gè)參數(shù)是隊(duì)列名
????????????qmName?=?args[0].trim();
????????????qName?=?args[1].trim();
????????}catch(Exception?e){
????????????System.out.println("USAGE:?java?MQSample?隊(duì)列管理器名?隊(duì)列名");
????????????System.exit(0);
????????}
????????
????????try?{
????????????//定義并初始化隊(duì)列管理器對(duì)象并連接?
????????????MQQueueManager?qMgr?=?new?MQQueueManager(qmName);?
????????????//?設(shè)置將要連接的隊(duì)列屬性
????????????//?Note.?All?WebSphere?MQ?Options?are?prefixed?with?MQC?in?Java.?
????????????int?openOptions?=?MQC.MQOO_INPUT_AS_Q_DEF?|?MQC.MQOO_OUTPUT;?
????????????//連接隊(duì)列?
????????????MQQueue?localQ?=?qMgr.accessQueue(qName,?openOptions);?
????????????
????????????//定義一個(gè)簡單的消息
????????????MQMessage?putMessage?=?new?MQMessage();?
????????????putMessage.writeUTF("Hello?World!");?
????????????//設(shè)置寫入消息的屬性(默認(rèn)屬性)
????????????MQPutMessageOptions?pmo?=?new?MQPutMessageOptions();?
????????????
????????????//將消息寫入隊(duì)列?
????????????localQ.put(putMessage,pmo);?
????????????????????????
????????????MQMessage?retrievedMessage?=?new?MQMessage();
????????????retrievedMessage.messageId?=?putMessage.messageId;?
????????????//設(shè)置取出消息的屬性(默認(rèn)屬性)
????????????MQGetMessageOptions?gmo?=?new?MQGetMessageOptions();?
????????????//?從隊(duì)列中取出消息
????????????localQ.get(retrievedMessage,?gmo);?
????????????String?msgText?=?retrievedMessage.readUTF();
????????????System.out.println("The?message?is:?"?+?msgText);?
????????????//關(guān)閉隊(duì)列
????????????localQ.close();?
????????????//從隊(duì)列管理器斷開?
????????????qMgr.disconnect();?
????????}catch?(MQException?ex)?{?
????????????System.out.println("A?WebSphere?MQ?error?occurred?:?Completion?code?"?
????????????+?ex.completionCode?+?"?Reason?code?"?+?ex.reasonCode);?
????????}catch?(IOException?ex)?{?
????????????System.out.println("An?error?occurred?whilst?writing?to?the?message?buffer:?"?+?ex);?
????????}catch(Exception?ex){
????????????ex.printStackTrace();
????????}
????}
}
以上程序接受2個(gè)輸入?yún)?shù),第一個(gè)參數(shù)是隊(duì)列管理器名,第二個(gè)參數(shù)是隊(duì)列名。首先初始化MQQueueManager對(duì)象,以隊(duì)列管理器的名稱作為參數(shù)。這里用的是最簡單的MQQueueManager,只能連接本機(jī)的隊(duì)列管理器。推薦使用MQQueueManager(String?qmName,?Hashtable?hashtable)構(gòu)造函數(shù),如下:
private?static?Hashtable?properties?=?new?Hashtable();
properties.put("hostname",?"主機(jī)名");
properties.put("port",?new?Integer(1414));
......
MQQueueManager?qMgr?=?new?MQQueueManager(qmName,properties);?
可以使應(yīng)用程序訪問MQ服務(wù)器的特定端口。
然后通過QueueManager的accessQueue()方法連接隊(duì)列,該方法返回MQQueue對(duì)象。然后定義了MQMessage對(duì)象,通過writeUTF()將字符串寫入消息,進(jìn)而通過MQQueue的put()方法將消息放入隊(duì)列中,然后通過get()方法取出消息,最后顯示在console上。在程序結(jié)尾,執(zhí)行MQQueue的close()方法關(guān)閉隊(duì)列,執(zhí)行MQQueueManager的disconnect()斷開和隊(duì)列管理器的連接。
對(duì)MQSample.java編譯后,輸入java?MQSample?QM_SAMPLE?LQ_SAMPLE運(yùn)行,得出如下結(jié)果:
The?message?is:?Hello?World!
在作者調(diào)試以上程序時(shí),出現(xiàn)了一個(gè)錯(cuò)誤:java.lang.NoClassDefFoundError:?javax/resource/ResourceException
經(jīng)過在IBM網(wǎng)站上進(jìn)行搜索,說是classpath中沒有指定connector.jar文件,但是我已經(jīng)在Eclipse開發(fā)環(huán)境中進(jìn)行設(shè)置。
通過仔細(xì)分析,終于發(fā)現(xiàn)是由于JDK的緣故(我用的是WebSphere6.0里的JDK),切換到SUN的JDK就正常了。
所以,如果你需要使用IBM的JDK的話,那么要把MQ的這幾個(gè)包覆蓋到IBM?JDK的相關(guān)路徑下。
到此,讀者應(yīng)該了解了MQ有哪些主要的對(duì)象,環(huán)境怎么搭建(請(qǐng)仔細(xì)研究config.txt和mqsetup.bat文件),以及如何對(duì)MQ的對(duì)象進(jìn)行操作。
如果讀者對(duì)以上的內(nèi)容有任何疑問,可以和我聯(lián)系,qianh@cntmi.com?