<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    隨筆 - 41  文章 - 7  trackbacks - 0
    <2016年6月>
    2930311234
    567891011
    12131415161718
    19202122232425
    262728293012
    3456789

    常用鏈接

    留言簿

    隨筆分類

    隨筆檔案

    搜索

    •  

    最新評論

    閱讀排行榜

    評論排行榜

    概述

    RabbitMQ Java client 將com.rabbitmq.client作為其頂層包. 關(guān)鍵類和接口有:

    • Channel
    • Connection
    • ConnectionFactory
    • Consumer
    協(xié)議操作可通過Channel接口來進(jìn)行.Connection用于開啟channels,注冊connection生命周期事件處理, 并在不需要時(shí)關(guān)閉connections.
    Connections是通過ConnectionFactory來初始化的,在ConnectionFactory中,你可以配置不同的connection設(shè)置,如:虛擬主機(jī)和用戶名等等.

    Connections 和 Channels

    核心API類是Connection和Channel, 它們代表對應(yīng)AMQP 0-9-1 connection 和 channel. 在使用前,可像下面這樣來導(dǎo)入:

    import com.rabbitmq.client.Connection; 
    import com.rabbitmq.client.Channel;

    連接到broker

    下面的代碼會(huì)使用給定的參數(shù)連接到AMQP broker:

    ConnectionFactory factory = new ConnectionFactory(); 
    factory.setUsername(userName);
    factory.setPassword(password);
    factory.setVirtualHost(virtualHost);
    factory.setHost(hostName);
    factory.setPort(portNumber);
    Connection conn = factory.newConnection();

    也可以使用URIs 來設(shè)置連接參數(shù):

    ConnectionFactory factory = new ConnectionFactory(); 
    factory.setUri("amqp://userName:password@hostName:portNumber/virtualHost");
    Connection conn = factory.newConnection();


    Connection 接口可用來打開一個(gè)channel:

    Channel channel = conn.createChannel(); 

    channel現(xiàn)在可用來發(fā)送和接收消息,正如后續(xù)章節(jié)中描述的一樣.

    要斷開連接,只需要簡單地關(guān)閉channel和connection:

    channel.close(); conn.close();

    關(guān)閉channel被認(rèn)為是最佳實(shí)踐,但在這里不是嚴(yán)格必須的 - 當(dāng)?shù)讓舆B接關(guān)閉的時(shí)候,channel也會(huì)自動(dòng)關(guān)閉.

    使用 Exchanges 和 Queues

    采用交換器和隊(duì)列工作的客戶端應(yīng)用程序,是AMQP高級別構(gòu)建模塊。在使用前,必須先聲明.聲明每種類型的對象都需要確保名稱存在,如果有必要須進(jìn)行創(chuàng)建.

    繼續(xù)上面的例子,下面的代碼聲明了一個(gè)交換器和一個(gè)隊(duì)列,然后再將它們進(jìn)行綁定.

    channel.exchangeDeclare(exchangeName, "direct", true); 
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, exchangeName, routingKey);

    這實(shí)際上會(huì)聲明下面的對象,它們兩者都可以可選參數(shù)來定制. 在這里,它們兩個(gè)都沒有特定參數(shù)。

    1. 一個(gè)類型為direct,且持久化,非自動(dòng)刪除的交換器
    2. 采用隨機(jī)生成名稱,且非持久化,私有的,自動(dòng)刪除隊(duì)列

    上面的函數(shù)然后使用給定的路由鍵來綁定隊(duì)列和交換器.

    注意,當(dāng)只有一個(gè)客戶端時(shí),這是一種典型聲明隊(duì)列的方式:它不需要一個(gè)已知的名稱,其它的客戶端也不會(huì)使用它(exclusive),并會(huì)被自動(dòng)清除(autodelete).
    如果多個(gè)客戶端想共享帶有名稱的隊(duì)列,下面的代碼應(yīng)該更適合:

    channel.exchangeDeclare(exchangeName, "direct", true); 
    channel.queueDeclare(queueName, true, false, false, null);
    channel.queueBind(queueName, exchangeName, routingKey);

    這實(shí)際上會(huì)聲明:

    1. 一個(gè)類型為direct,且持久化,非自動(dòng)刪除的交換器
    2. 一個(gè)已知名稱,且持久化的,非私有,非自動(dòng)刪除隊(duì)列

    注意,Channel API 的方法都是重載的。這些 exchangeDeclarequeueDeclare 和queueBind 都使用的是預(yù)設(shè)行為.
    這里也有更多參數(shù)的長形式,它們允許你按需覆蓋默認(rèn)行為,允許你完全控制。


    發(fā)由消息

    要向交換器中發(fā)布消息,可按下面這樣來使用Channel.basicPublish方法:

    byte[] messageBodyBytes = "Hello, world!".getBytes(); 
    channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

    為了更好的控制,你可以使用重載方法來指定mandatory標(biāo)志,或使用預(yù)先設(shè)置的消息屬性來發(fā)送消息:

    channel.basicPublish(exchangeName, routingKey, mandatory, MessageProperties.PERSISTENT_TEXT_PLAIN,messageBodyBytes);

    這會(huì)使用分發(fā)模式2(持久化)來發(fā)送消息, 優(yōu)先級為1,且content-type 為"text/plain".你可以使用Builder類來構(gòu)建你自己的消息屬性對象:

    channel.basicPublish(exchangeName, routingKey,new AMQP.BasicProperties.Builder().contentType("text/plain").deliveryMode(2).priority(1).userId("bob").build()),messageBodyBytes);

    下面的例子使用自定義的headers來發(fā)布消息:

    Map<String, Object> headers = new HashMap<String, Object>(); 
    headers.put("latitude", 51.5252949);
    headers.put("longitude", -0.0905493);
    channel.basicPublish(exchangeName, routingKey,new AMQP.BasicProperties.Builder().headers(headers).build()),messageBodyBytes);

    下面的例子使用expiration來發(fā)布消息:

    channel.basicPublish(exchangeName, routingKey,new AMQP.BasicProperties.Builder().expiration("60000").build()),messageBodyBytes);

    BasicProperties is an inner class of the autogenerated holder class AMQP.

    Invocations of Channel#basicPublish will eventually block if a resource-driven alarm is in effect.

    Channels 和并發(fā)考慮(線程安全性)

    Channel 實(shí)例不能在多個(gè)線程間共享。應(yīng)用程序必須在每個(gè)線程中使用不同的channel實(shí)例,而不能將同個(gè)channel實(shí)例在多個(gè)線程間共享。 有些channl上的操作是線程安全的,有些則不是,這會(huì)導(dǎo)致傳輸時(shí)出現(xiàn)錯(cuò)誤的幀交叉。
    在多個(gè)線程共享channels也會(huì)干擾Publisher Confirms.

    通過訂閱來來接收消息

    import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer;

    接收消息最高效的方式是用Consumer接口來訂閱。當(dāng)消息到達(dá)時(shí),它們會(huì)自動(dòng)地進(jìn)行分發(fā),而不需要顯示地請求

    當(dāng)在調(diào)用Consumers的相關(guān)方法時(shí), 個(gè)別訂閱總是通過它們的consumer tags來確定的, consumer tags可通過客戶端或服務(wù)端來生成,參考 the AMQP specification document.
    同一個(gè)channel上的消費(fèi)者必須有不同的consumer tags.

    實(shí)現(xiàn)Consumer的最簡單方式是繼承便利類DefaultConsumer.子類可通過在設(shè)置訂閱時(shí),將其傳遞給basicConsume調(diào)用:

    boolean autoAck = false; 
    channel.basicConsume(queueName, autoAck, "myConsumerTag",new DefaultConsumer(channel) {
    @Override
    publicvoid handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException{
    String routingKey = envelope.getRoutingKey();
    String contentType = properties.getContentType();
    long deliveryTag = envelope.getDeliveryTag();
    // (process the message components here ...)
    channel.basicAck(deliveryTag, false);
    }
    });

    在這里,由于我們指定了autoAck = false,因此消費(fèi)者有必要應(yīng)答分發(fā)的消息,最便利的方式是在handleDelivery 方法中處理.

    更復(fù)雜的消費(fèi)者可能需要覆蓋更多的方法,實(shí)踐中,handleShutdownSignal會(huì)在channels和connections關(guān)閉時(shí)調(diào)用,handleConsumeOk 會(huì)在其它消費(fèi)者之前

    調(diào)用
    ,傳遞consumer tag(不明白,要研究)。

     

    消費(fèi)者可實(shí)現(xiàn)handleCancelOk 和 handleCancel方法來接收顯示和隱式取消操作通知。

    你可以使用Channel.basicCancel來顯示地取消某個(gè)特定的消費(fèi)者:

    channel.basicCancel(consumerTag);

    passing the consumer tag.

    消費(fèi)者回調(diào)是在單獨(dú)線程上處理的,這意味著消費(fèi)者可以安全地在Connection或Channel, 如queueDeclare, txCommit, basicCancel或basicPublish上調(diào)用阻塞方法。

    每個(gè)Channel都有其自己的dispatch線程.對于一個(gè)消費(fèi)者一個(gè)channel的大部分情況來說,這意味著消費(fèi)者不會(huì)阻擋其它的消費(fèi)者。如果在一個(gè)channel上多個(gè)消費(fèi)者,則必須意識(shí)到長時(shí)間運(yùn)行的消費(fèi)者可能阻擋此channel上其它消費(fèi)者回調(diào)調(diào)度.

    獲取單個(gè)消息

    要顯示地獲取一個(gè)消息,可使用Channel.basicGet.返回值是一個(gè)GetResponse實(shí)例, 在它之中,header信息(屬性) 和消息body都可以提取:

    boolean autoAck = false; 
    GetResponse response = channel.basicGet(queueName, autoAck);
    if (response == null) {
    // No message retrieved.
    } else {
    AMQP.BasicProperties props = response.getProps();
    byte[] body = response.getBody();
    long deliveryTag = response.getEnvelope().getDeliveryTag(); ...

    因?yàn)?span style="background-color: inherit; color: #333333;">autoAck = false,你必須調(diào)用Channel.basicAck來應(yīng)答你已經(jīng)成功地接收了消息:

    channel.basicAck(method.deliveryTag, false); // acknowledge receipt of the message }

    處理未路由消息

    如果發(fā)布消息時(shí),設(shè)置了"mandatory"標(biāo)志,但如果消息不能路由的話,broker會(huì)將其返回到發(fā)送客戶端 (通過 AMQP.Basic.Return 命令).

    要收到這種返回的通知, clients可實(shí)現(xiàn)ReturnListener接口,并調(diào)用Channel.setReturnListener.如果channel沒有配置return listener,那么返回的消息會(huì)默默地丟棄。

    channel.setReturnListener(new ReturnListener() {     
        publicvoid handleBasicReturn(int replyCode,String replyText,String exchange,String routingKey,AMQP.BasicProperties properties,byte[] body) throws IOException {
    ...
        }
    });

     return listener將被調(diào)用,例如,如果client使用"mandatory"標(biāo)志向未綁定隊(duì)列的direct類型交換器發(fā)送了消息.

    關(guān)閉協(xié)議

    AMQP client 關(guān)閉概述

    AMQP 0-9-1 connection和channel 使用相同的方法來管理網(wǎng)絡(luò)故障,內(nèi)部故障,以及顯示本地關(guān)閉.

    AMQP 0-9-1 connection  和 channel 有如下的生命周期狀態(tài):

    • open: 準(zhǔn)備要使用的對象
    • closing: 對象已顯示收到收到本地關(guān)閉通知, 并向任何支持的底層對象發(fā)出關(guān)閉請求,并等待其關(guān)閉程序完成
    • closed: 對象已收到所有底層對象的完成關(guān)閉通知,最終將執(zhí)行關(guān)閉操作

    這些對象總是以closed狀態(tài)結(jié)束的,不管基于什么原因引發(fā)的關(guān)閉,比如:應(yīng)用程序請求,內(nèi)部client library故障, 遠(yuǎn)程網(wǎng)絡(luò)請求或網(wǎng)絡(luò)故障.

    AMQP connection 和channel 對象會(huì)持有下面與關(guān)閉相關(guān)的方法:

    • addShutdownListener(ShutdownListener 監(jiān)聽器)和removeShutdownListener(ShutdownListener 監(jiān)聽器),用來管理監(jiān)聽器,當(dāng)對象轉(zhuǎn)為closed狀態(tài)時(shí),將會(huì)觸發(fā)這些監(jiān)聽器.注意,在已經(jīng)關(guān)閉的對象上添加一個(gè)ShutdownListener將會(huì)立即觸發(fā)監(jiān)聽器
    • getCloseReason(), 允許同其交互以了解對象關(guān)閉的理由
    • isOpen(), 用于測試對象是否處于open狀態(tài)
    • close(int closeCode, String closeMessage), 用于顯示通知對象關(guān)閉

    可以像這樣來簡單使用監(jiān)聽器:

    import com.rabbitmq.client.ShutdownSignalException; 
    import com.rabbitmq.client.ShutdownListener;
    connection.addShutdownListener(new ShutdownListener() {
    public void shutdownCompleted(ShutdownSignalException cause) { ... } }
    );

    關(guān)閉環(huán)境信息

    可通過顯示調(diào)用getCloseReason()方法或通過使用ShutdownListener類中的業(yè)務(wù)方法的cause參數(shù)來ShutdownSignalException中獲取關(guān)閉原因的有用信息.

    ShutdownSignalException 類提供方法來分析關(guān)閉的原因.通過調(diào)用isHardError()方法,我們可以知道是connection錯(cuò)誤還是channel錯(cuò)誤.getReason()會(huì)返回相關(guān)cause的相關(guān)信息,這些引起cause的方法形式-要么是AMQP.Channel.Close方法,要么是AMQP.Connection.Close (或者是null,如果是library中引發(fā)的異常,如網(wǎng)絡(luò)通信故障,在這種情況下,可通過getCause()方法來獲取信息).

    public void shutdownCompleted(ShutdownSignalException cause) {   if (cause.isHardError())   {     
    Connection conn = (Connection)cause.getReference();
    if (!cause.isInitiatedByApplication()) {
    Method reason = cause.getReason(); ... } ... }
    else { Channel ch = (Channel)cause.getReference(); ... } }

    原子使用isOpen()方法

    channel和connection對象的isOpen()方法不建議在在生產(chǎn)代碼中使用,因?yàn)榇朔椒ǖ姆祷刂狄蕾囉?span style="color: #555555;">shutdown cause的存在性. 下面的代碼演示了竟?fàn)帡l件的可能性:

    public void brokenMethod(Channel channel) {     if (channel.isOpen())     {         // The following code depends on the channel being in open state.         // However there is a possibility of the change in the channel state         // between isOpen() and basicQos(1) call         ...         channel.basicQos(1);     } }

    相反,我們應(yīng)該忽略這種檢查,并簡單地嘗試這種操作.如果代碼執(zhí)行期間,connection的channel關(guān)閉了,那么將拋出ShutdownSignalException,這就表明對象處于一種無效狀態(tài)了.當(dāng)broker意外關(guān)閉連接時(shí),我們也應(yīng)該捕獲由SocketException引發(fā)的IOException,或者當(dāng)broker清理關(guān)閉時(shí),捕獲ShutdownSignalException.

    public void validMethod(Channel channel) {     try {         ...         channel.basicQos(1);     } catch (ShutdownSignalException sse) {         // possibly check if channel was closed         // by the time we started action and reasons for         // closing it         ...     } catch (IOException ioe) {         // check why connection was closed         ...     } }

    高級連接選項(xiàng)

    Consumer線程池

    Consumer 線程默認(rèn)是通過一個(gè)新的ExecutorService線程池來自動(dòng)分配的(參考下面的Receiving).如果需要在newConnection() 方法中更好地控制ExecutorService,可以使用定制的線程池.下面的示例展示了一個(gè)比正常分配稍大的線程池:

    ExecutorService es = Executors.newFixedThreadPool(20); Connection conn = factory.newConnection(es); 
    Executors 和 ExecutorService 都是java.util.concurrent包中的類.

    當(dāng)連接關(guān)閉時(shí),默認(rèn)的ExecutorService將會(huì)被shutdown(), 但用戶自定義的ExecutorService (如上面所示)將不會(huì)被shutdown(). 提供自定義ExecutorService的Clients必須確保最終它能被關(guān)閉(通過調(diào)用它的shutdown() 方法), 否則池中的線程可能會(huì)阻止JVM終止.

    同一個(gè)executor service,可在多個(gè)連接之間共享,或者連續(xù)地在重新連接上重用,但在shutdown()后,則不能再使用.

    使用這種特性時(shí),唯一需要考慮的是:在消費(fèi)者回調(diào)的處理過程中,是否有證據(jù)證明有嚴(yán)重的瓶頸. 如果沒有消費(fèi)者執(zhí)行回調(diào),或很少,默認(rèn)的配置是綽綽有余. 開銷最初是很小的,分配的全部線程資源也是有界限的,即使偶爾可能出現(xiàn)一陣消費(fèi)活動(dòng).

    使用Host列表

    可以傳遞一個(gè)Address數(shù)組給newConnection()Address只是 com.rabbitmq.client 包中包含host和port組件的簡單便利類. 例如:

    Address[] addrArr = new Address[]{ new Address(hostname1, portnumber1)                                  , new Address(hostname2, portnumber2)}; Connection conn = factory.newConnection(addrArr); 
    將會(huì)嘗試連接hostname1:portnumber1, 如果不能連接,則會(huì)連接hostname2:portnumber2,然后會(huì)返回?cái)?shù)組中第一個(gè)成功連接(不會(huì)拋出IOException)上broker的連接.這完全等價(jià)于在factory上重復(fù)調(diào)用factory.newConnection()方法來設(shè)置host和port, 直到有一個(gè)成功返回.

    如果提供了ExecutorService(在factory.newConnection(es, addrArr)中使用),那么線程池將與第一個(gè)成功連接相關(guān)聯(lián).

    心跳超時(shí)

    參考Heartbeats guide 來了解更多關(guān)于心跳及其在Java client中如何配置的更多信息.

    自定義線程工廠

    像Google App Engine (GAE)這樣的環(huán)境會(huì)限制線程直接實(shí)例化. 在這樣的環(huán)境中使用RabbitMQ Java client, 需要配置一個(gè)定制的ThreadFactory,即使用合適的方法來實(shí)例化線程,如: GAE's ThreadManager. 下面是Google App Engine的相關(guān)代碼.

    import com.google.appengine.api.ThreadManager;  ConnectionFactory cf = new ConnectionFactory(); cf.setThreadFactory(ThreadManager.backgroundThreadFactory()); 

    網(wǎng)絡(luò)故障時(shí)自動(dòng)恢復(fù)

    Connection恢復(fù)

    clients和RabbitMQ節(jié)點(diǎn)之間的連接可發(fā)生故障. RabbitMQ Java client 支持連接和拓?fù)?queues, exchanges, bindings, 和consumers)的自動(dòng)恢復(fù). 大多數(shù)應(yīng)用程序的連接自動(dòng)恢復(fù)過程會(huì)遵循下面的步驟:

    1. 重新連接
    2. 恢復(fù)連接監(jiān)聽器
    3. 重新打開通道
    4. 恢復(fù)通道監(jiān)聽器
    5. 恢復(fù)通道basic.qos 設(shè)置,發(fā)布者確認(rèn)和事務(wù)設(shè)置
    拓?fù)浠謴?fù)包含下面的操作,每個(gè)通道都會(huì)執(zhí)行下面的步驟:
    1. 重新聲明交換器(except for predefined ones)
    2. 重新聲明隊(duì)列
    3. 恢復(fù)所有綁定
    4. 恢復(fù)所有消費(fèi)者
    要啟用自動(dòng)連接恢復(fù),須使用factory.setAutomaticRecoveryEnabled(true):
    ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(userName); factory.setPassword(password); factory.setVirtualHost(virtualHost); factory.setHost(hostName); factory.setPort(portNumber); factory.setAutomaticRecoveryEnabled(true); // connection that will recover automatically Connection conn = factory.newConnection();
    如果恢復(fù)因異常失敗(如. RabbitMQ節(jié)點(diǎn)仍然不可達(dá)),它會(huì)在固定時(shí)間間隔后進(jìn)行重試(默認(rèn)是5秒). 時(shí)間間隔可以進(jìn)行配置:
    ConnectionFactory factory = new ConnectionFactory(); // attempt recovery every 10 seconds factory.setNetworkRecoveryInterval(10000);
    當(dāng)提供了address列表時(shí),將會(huì)在所有address上逐個(gè)進(jìn)行嘗試:
    ConnectionFactory factory = new ConnectionFactory();  Address[] addresses = {new Address("192.168.1.4"), new Address("192.168.1.5")}; factory.newConnection(addresses);

    恢復(fù)監(jiān)聽器

    可在可恢復(fù)連接和通道上注冊一個(gè)或多個(gè)恢復(fù)監(jiān)聽器. 當(dāng)啟用了連接恢復(fù)時(shí),ConnectionFactory#newConnection 和 Connection#createChannel 的連接已實(shí)現(xiàn)了com.rabbitmq.client.Recoverable,并提供了兩個(gè)方法:

    • addRecoveryListener
    • removeRecoveryListener
    注意,在使用這些方法時(shí),你需要將connections和channels強(qiáng)制轉(zhuǎn)換為Recoverable.

    發(fā)布影響

    當(dāng)連接失敗時(shí),使用Channel.basicPublish方法發(fā)送的消息將會(huì)丟失. client不會(huì)保證在連接恢復(fù)后,消息會(huì)得到分發(fā).要確保發(fā)布的消息到達(dá)了RabbitMQ,應(yīng)用程序必須使用Publisher Confirms 


    拓?fù)浠謴?fù)

    拓?fù)浠謴?fù)涉及交換器,隊(duì)列,綁定以及消費(fèi)者恢復(fù).默認(rèn)是啟用的,但也可以禁用:

    ConnectionFactory factory = new ConnectionFactory();  Connection conn = factory.newConnection(); factory.setAutomaticRecoveryEnabled(true); factory.setTopologyRecoveryEnabled(false);

    手動(dòng)應(yīng)答和自動(dòng)恢復(fù)

    當(dāng)使用手動(dòng)應(yīng)答時(shí),在消息分發(fā)與應(yīng)答之間可能存在網(wǎng)絡(luò)連接中斷. 在連接恢復(fù)后,RabbitMQ會(huì)在所有通道上重設(shè)delivery標(biāo)記. 也就是說,使用舊delivery標(biāo)記的basic.ackbasic.nack, 以及basic.reject將會(huì)引發(fā)channel exception. 為了避免發(fā)生這種情況, RabbitMQ Java client可以跟蹤,更新,以使它們在恢復(fù)期間單調(diào)地增長. Channel.basicAckChannel.basicNack, 以及Channel.basicReject 然后可以轉(zhuǎn)換這些 delivery tags,并且不再發(fā)送過期的delivery tags. 使用手動(dòng)應(yīng)答和自動(dòng)恢復(fù)的應(yīng)用程序必須負(fù)責(zé)處理重新分發(fā).

    未處理異常

    關(guān)于connection, channel, recovery, 和consumer lifecycle 的異常將會(huì)委派給exception handler,Exception handler是實(shí)現(xiàn)了ExceptionHandler接口的任何對象. 默認(rèn)情況下,將會(huì)使用DefaultExceptionHandler實(shí)例,它會(huì)將異常細(xì)節(jié)輸出到標(biāo)準(zhǔn)輸出上.

    可使用ConnectionFactory#setExceptionHandler來覆蓋原始handler,它將被用于由此factory創(chuàng)建的所有連接:

    ConnectionFactory factory = new ConnectionFactory(); cf.setExceptionHandler(customHandler);         
    Exception handlers 應(yīng)該用于異常日志.

    Google App Engine上的RabbitMQ Java Client

    在Google App Engine (GAE) 上使用RabbitMQ Java client,必須使用一個(gè)自定義的線程工廠來初始化線程,如使用GAE's ThreadManager. 此外,還需要設(shè)置一個(gè)較小的心跳間隔(4-5 seconds) 來避免InputStream 上讀超時(shí):

    ConnectionFactory factory = new ConnectionFactory(); cf.setRequestedHeartbeat(5);         

    警告和限制

    為了能使拓?fù)浠謴?fù), RabbitMQ Java client 維持了聲明隊(duì)列,交換器,綁定的緩存. 緩存是基于每個(gè)連接的.某些RabbitMQ的特性使得客戶端不能觀察到拓?fù)涞淖兓?如,當(dāng)隊(duì)列因TTL被刪除時(shí). RabbitMQ Java client 會(huì)嘗試在下面的情況中使用緩存實(shí)體失效:

    • 當(dāng)隊(duì)列被刪除時(shí).
    • 當(dāng)交換器被刪除時(shí).
    • 當(dāng)綁定被刪除時(shí).
    • 當(dāng)消費(fèi)者在自動(dòng)刪除隊(duì)列上退出時(shí).
    • 當(dāng)隊(duì)列或交換器不再綁定到自動(dòng)刪除的交換器上時(shí).
    然而, 除了單個(gè)連接外,client不能跟蹤這些拓?fù)渥兓? 依賴于自動(dòng)刪除隊(duì)列或交換器的應(yīng)用程序,正如TTL隊(duì)列一樣 (注意:不是消息TTL!), 如果使用了自動(dòng)連接恢復(fù),在知道不再使用或要?jiǎng)h除時(shí),必須明確地刪除這些緩存實(shí)體,以凈化 client-side 拓?fù)鋍ache. 這些可通過Channel#queueDeleteChannel#exchangeDelete,Channel#queueUnbind, and Channel#exchangeUnbind來進(jìn)行.

    RPC (Request/Reply) 模式

    為了便于編程, Java client API提供了一個(gè)使用臨時(shí)回復(fù)隊(duì)列的RpcClient類來提供簡單的RPC-style communication.

    此類不會(huì)在RPC參數(shù)和返回值上強(qiáng)加任何特定格式. 它只是簡單地提供一種機(jī)制來向帶特定路由鍵的交換器發(fā)送消息,并在回復(fù)隊(duì)列上等待響應(yīng).

    import com.rabbitmq.client.RpcClient;  
    RpcClient rpc = new RpcClient(channel, exchangeName, routingKey);

    (其實(shí)現(xiàn)細(xì)節(jié)為:請求消息使用basic.correlation_id唯一值字段來發(fā)送消息,并使用basic.reply_to來設(shè)置響應(yīng)隊(duì)列的名稱.)

    一旦你創(chuàng)建此類的實(shí)例,你可以使用下面的任意一個(gè)方法來發(fā)送RPC請求:

    byte[] primitiveCall(byte[] message); 
    String stringCall(String message) Map mapCall(Map message) Map mapCall(Object[] keyValuePairs)

    primitiveCall 方法會(huì)將原始byte數(shù)組轉(zhuǎn)換為請求和響應(yīng)的消息體. stringCall只是一個(gè)primitiveCall的簡單包裝,將消息體視為帶有默認(rèn)字符集編碼的String實(shí)例.

    mapCall 變種稍為有些復(fù)雜: 它會(huì)將原始java值包含在java.util.Map中,并將其編碼為AMQP 0-9-1二進(jìn)制表示形式,并以同樣的方式來解碼response. (注意:在這里,對一些值對象類型有所限制,具體可參考javadoc.)

    所有的編組/解組便利方法都使用primitiveCall來作為傳輸機(jī)制,其它方法只是在它上面的做了一個(gè)封裝.

    posted on 2016-06-04 00:37 胡小軍 閱讀(15647) 評論(1)  編輯  收藏 所屬分類: RabbitMQ

    FeedBack:
    # re: Java Client API Guide 2016-06-05 17:10 有機(jī)綠茶
    非常詳細(xì)的介紹!學(xué)習(xí)啦!  回復(fù)  更多評論
      
    主站蜘蛛池模板: 男女超爽视频免费播放| 日韩精品无码专区免费播放| 亚洲狠狠ady亚洲精品大秀| 国产成人在线观看免费网站| 99re热精品视频国产免费| 一区二区三区免费看| 亚洲国产综合AV在线观看| 亚洲精品国产福利片| 亚洲va久久久噜噜噜久久天堂| 国产一级一片免费播放i| 日韩精品福利片午夜免费观着| 91在线视频免费观看| 免费国产草莓视频在线观看黄| 最新亚洲春色Av无码专区| 亚洲白嫩在线观看| 亚洲国产精品自在在线观看| 亚洲人成网77777亚洲色| 亚洲福利视频一区二区| 日本免费人成黄页网观看视频 | 国产成人aaa在线视频免费观看 | 亚洲国产主播精品极品网红| 免费视频淫片aa毛片| 欧美好看的免费电影在线观看| 久久国产色AV免费观看| 久久青草免费91线频观看不卡 | 亚洲an天堂an在线观看| 精品国产综合成人亚洲区| 亚洲人成色77777在线观看大| 国产伦精品一区二区三区免费下载| 毛片网站免费在线观看| 91情侣在线精品国产免费| 波多野结衣免费在线| 中文字幕无码视频手机免费看| 成人在线免费看片| 91视频国产免费| 日韩高清在线免费看| 国产成人免费A在线视频| 亚洲福利视频一区二区| 丁香五月亚洲综合深深爱| 亚洲无人区一区二区三区| 国产亚洲一区二区精品|