<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    隨筆 - 41  文章 - 7  trackbacks - 0
    <2016年8月>
    31123456
    78910111213
    14151617181920
    21222324252627
    28293031123
    45678910

    常用鏈接

    留言簿

    隨筆分類

    隨筆檔案

    搜索

    •  

    最新評論

    閱讀排行榜

    評論排行榜

    3. 參考

    這部分參考文檔詳細描述了組成Sring AMQP的各種組件. main chapter 涵蓋了開發(fā)AMQP應(yīng)用程序的核心類. 這部分也包含了有關(guān)示例程序的章節(jié).

    3.1 使用 Spring AMQP

    在本章中,我們將探索接口和類,它們是使用Spring AMQP來開發(fā)應(yīng)用程序的必要組件 .

    3.1.1 AMQP 抽象

    介紹

    Spring AMQP 由少數(shù)幾個模塊組成, 每個都以JAR的形式來表現(xiàn).這些模塊是: spring-amqp和spring-rabbit. spring-amqp模塊包含org.springframework.amqp.core 包.
    在那個包中,你會找到表示AMQP核心模塊的類. 我們的目的是提供通用的抽象,不依賴于任何特定中間件的實現(xiàn)或AMQP客戶端庫。
    最終用戶代碼將更具有移植性,以便跨供應(yīng)商實現(xiàn),因為它們可以對抽象層開發(fā)。這些抽象是使用broker的特定模塊實現(xiàn)的,如
    spring-rabbit
    目前只有一個RabbitMQ實現(xiàn);而對于抽象的驗證,除了RabbitMQ外,也已經(jīng)在.Net平臺上使用Apache 
    Qpid得到了驗證。
    由于AMQP原則上工作于協(xié)議層次,RabbitMQ客戶端可以在任何支持相同的協(xié)議版本的broker中使用,但目前我們沒有測試其它任何broker。

    這里的概述假設(shè)你已經(jīng)熟悉了AMQP規(guī)范.如果你還沒有,那么你可以查看第5章,其它資源中列舉的資源.

    Message

    AMQP 0-8 和0-9-1 規(guī)范沒有定義一個消息類或接口.相反,當(dāng)執(zhí)行basicPublish()這樣的操作的時候, 內(nèi)容是以字節(jié)數(shù)組參數(shù)進行傳遞的,其它額外屬性也是以單獨參數(shù)進行傳遞的. Spring AMQP定義了一個 Message 類來作為AMQP領(lǐng)域模型表示的一部分.Message類的目的是在單個實例中簡化封裝消息體(body)和屬性(header),這樣API就可以變得更簡單. Message類的定義相當(dāng)簡單.

    public class Message {
    
        private final MessageProperties messageProperties;
    
        private final byte[] body;
    
        public Message(byte[] body, MessageProperties messageProperties) {
            this.body = body;
            this.messageProperties = messageProperties;
        }
    
        public byte[] getBody() {
            returnthis.body;
        }
    
        public MessageProperties getMessageProperties() {
            returnthis.messageProperties;
        }
    }

    MessageProperties 接口定義了多個共同屬性,如messageIdtimestampcontentType 等等. 那些屬性可以通過調(diào)用setHeader(String key, Object value) 方法使用用戶定義頭(user-defined headers)來擴展.


    Exchange

    Exchange 接口代表的是AMQP Exchange,它是生產(chǎn)者發(fā)送消息的地方.broker虛擬主機中的交換器名稱都是唯一的,同時還有少量的其它屬性:

    public interface Exchange {
    
        String getName();
    
        String getExchangeType();
    
        boolean isDurable();
    
        boolean isAutoDelete();
    
        Map<String, Object> getArguments();
    
    }

    正如你所看到的, Exchange還有一個type (它是在ExchangeTypes中定義的常量). 基本類型是: DirectTopicFanout,和Headers.
    在核心包中,你可以找到每種類型的Exchange 接口實現(xiàn).這些交換器類型會在處理隊列綁定時,行為有所不同.
    例如,Direct交換器允許隊列以固定路由鍵進行綁定(通常是隊列的名稱).
    Topic交換器支持使用路由正則表達式(*通配符明確匹配一個,而#通配符可匹配0個或多個). 

    Fanout交換器會把消息發(fā)布到所有綁定到它上面的隊列而不考慮任何路由鍵.

    關(guān)于交換器類型的更多信息,查看Chapter 5, Other Resources.

    AMQP規(guī)范還要求任何broker必須提供一個默認(rèn)的無名字的(空字符串)Direct交換器.所有聲明的隊列都可以用它們的名字作為路由鍵綁定到默認(rèn)交換器中. 在Section 3.1.4, “AmqpTemplate”你會了解到更多關(guān)于在Sring AMQP中使用默認(rèn)交換器的使用情況.

    Queue

    Queue 代表的是消費者接收消息的組件. 像各種各樣的 Exchange 類,我們的實現(xiàn)目標(biāo)是作為核心AMQP類型的抽象表示.

    public class Queue  {
    
        private final String name;
    
        private volatile boolean durable;
    
        private volatile boolean exclusive;
    
        private volatile boolean autoDelete;
    
        private volatile Map<String, Object> arguments;
    
        /**
         * The queue is durable, non-exclusive and non auto-delete.
         *
         * @param name the name of the queue.
         */
     public Queue(String name) {
            this(name, true, false, false);
        }
    
        // Getters and Setters omitted for brevity
    
    }

    注意,構(gòu)造器需要接受隊列名稱作為參數(shù).根據(jù)實現(xiàn), admin template可能會提供生成獨特隊列名稱的方法.這些隊列作為回復(fù)地址或用于臨時情景是非常有用的.
    基于這種原因,自動生成隊列的exclusive和 autoDelete 屬性都應(yīng)該設(shè)置為true.

    參考 Section 3.1.10, “Configuring the broker” 來了解關(guān)于使用命名空間來聲明隊列,包括隊列參數(shù)的詳細情況.

    Binding

    生產(chǎn)者發(fā)送消息到Exchange,而消費者將從Queue中獲取消息,連接Queues與Exchanges之間的綁定對于通過消息來連接生產(chǎn)者和消費者是非常關(guān)鍵的.
    在Spring AMQP中,我們定義了一個 Binding 類來表示這些連接. 讓我們重新回顧一下綁定隊列和交換器的操作.

    你可以使用固定的路由鍵來綁定 Queue 到 DirectExchange上.

    new Binding(someQueue, someDirectExchange, "foo.bar")

    你可以使用路由正則表達式來綁定Queue到TopicExchange上.

    new Binding(someQueue, someTopicExchange, "foo.*")

    你可以不使用路由鍵來綁定Queue到FanoutExchange上.

    new Binding(someQueue, someFanoutExchange)

    我們還提供了BindingBuilder來方便操作.

    Binding b = BindingBuilder.bind(someQueue).to(someTopicExchange).with("foo.*");

    上面展示的BindingBuilder 類很清晰,但如果為bind()方法使用靜態(tài)導(dǎo)入,這種形式將工作得更好.

    本身來說,Binding類的實例只能一個connection中持有數(shù)據(jù).換句話說,它不是一個活力(active)組件.
    但正如在后面Section 3.1.10, “Configuring the broker”看到的, Binding實例可由AmqpAdmin 類來觸發(fā)broker上的綁定操作.
    同樣,在同一個章節(jié)中,你還會看到Binding實例可在@Configuration類中使用Spring @Bean風(fēng)
    格來定義.
    還有方便的基類來簡化生成AMQP相關(guān)bean定義和識別隊列,交換器,綁定的方法,這樣當(dāng)AMQP broker運行程序啟動時,就可以得到聲明.

    AmqpTemplate 也在核心包中定義.作為涉及AMQP消息的主要組件, 會在它自己的章節(jié)中進行詳細介紹(參考Section 3.1.4, “AmqpTemplate”).

    3.1.2 連接和資源管理

    介紹

    雖然我們在前面章節(jié)中描述的AMQP模型是通用的,適用于所有實現(xiàn),但當(dāng)我們說到資源管理時,其細節(jié)是針對特定broker實現(xiàn)的.因此,在這個章節(jié)中,我們只關(guān)注我們的"spring-rabbit"模塊,因為到現(xiàn)在為止,RabbitMQ是唯一支持的實現(xiàn).

    RabbitMQ broker中用于管理連接的中心組件是ConnectionFactory 接口. ConnectionFactory實現(xiàn)的責(zé)任是提供一個org.springframework.amqp.rabbit.connection.Connection 的實例,它包裝了com.rabbitmq.client.Connection.
    我們提供的唯一具體實現(xiàn)提CachingConnectionFactory,默認(rèn)情況下,會建立應(yīng)用程序可共享的單個連接代理.連接共享是可行的,因為在AMQP處理消息的工作單位實際是 "channel" (在某些方面,這類似于JMS中Connection 和 Sessionin的關(guān)系).
    你可以想象,連接實例提供了一個createChannel方法。CachingConnectionFactory 實現(xiàn)支持這些channels的緩存,它會基于它們是否是事務(wù)的來單獨維護其緩存.
    當(dāng)創(chuàng)建
    CachingConnectionFactory的實例時hostname 可通過構(gòu)造器來提供,username 和password 屬性也可以提供.如果你想配置channel緩存的大小(默認(rèn)是25),你可以調(diào)用setChannelCacheSize()方法.

    1.3版本開始,CachingConnectionFactory 也可以同channel一樣,配置緩存連接.在這種情況下每次調(diào)用createConnection() 都會創(chuàng)建一個新連接(或者從緩存中獲取空閑的連接).
    關(guān)閉連接會將其返回到緩存中(如果還沒有達到緩存大小的話).在這些連接上創(chuàng)建的Channels同樣也會被緩存. 單獨連接的使用在某些環(huán)境中是有用的,如從HA 集群中消費, 連接負載均衡器,連接不同的集群成員.設(shè)置cacheMode 為 CacheMode.CONNECTION.

    這不會限制連接的數(shù)目,它用于指定允許空閑打開連接的數(shù)目.

    從1.5.5版本開始,提供了一個新屬性connectionLimit.當(dāng)設(shè)置了此屬性時,它會限制連接的總數(shù)目,當(dāng)達到限制值時,將channelCheckoutTimeLimit 來等待空閑連接.如果時間超時了,將拋出AmqpTimeoutException.

    重要

    當(dāng)緩存模式是CONNECTION, 隊列的自動聲明等等 (參考 the section called “Automatic Declaration of Exchanges, Queues and Bindings”) 將不再支持.

    此外,在寫作的時候,rabbitmq-client 包默認(rèn)為每個連接(5個線程)創(chuàng)建了一個固定的線程池. 當(dāng)需要使用大量連接時,你應(yīng)該考慮在CachingConnectionFactory定制一個executor. 然后,同一個executor會用于所有連接,其線程也是共享的.  
    executor的線程池是沒有界限的或按預(yù)期使用率來設(shè)置(通常, 一個連接至少應(yīng)該有一個線程).如果在每個連接上創(chuàng)建了多個channels,那么池的大小會影響并發(fā)性,因此一個可變的線程池executor應(yīng)該是最合適的.


    理解緩存大小不是限制是很重要的, 它僅僅是可以緩存的通道數(shù)量.當(dāng)說緩存大小是10時,在實際使用中,其實可以是任何數(shù)目的通道. 如果超過10個通道被使用,他們都返回到高速緩存,10個將在高速緩存中,其余的將物理關(guān)閉。

    1.6版本開始,默認(rèn)通道緩存大小從1增加到了25. 在高容量,多線程環(huán)境中,較小的緩存意味著通道的創(chuàng)建和關(guān)閉將以很高的速率運行.加大默認(rèn)緩存大小可避免這種開銷.
    你可以監(jiān)控通道的使用情況(通過RabbitMQ Admin UI) ,如果看到有很多通道在創(chuàng)建和關(guān)閉,你可以增大緩存大小.
    緩存只會增長按需(以適應(yīng)應(yīng)用程序的并發(fā)性要求),所以這個更改不會影響現(xiàn)有的低容量應(yīng)用程序。

    1.4.2版本開始,CachingConnectionFactory 有一個channelCheckoutTimeout屬性. 當(dāng)此屬性的值大于0時,channelCacheSize 會變成連接上創(chuàng)建通道數(shù)目的限制.
    如果達到了限制,調(diào)用線程將會阻塞,直到某個通道可用或者超時, 在后者的情況中,將拋出AmqpTimeoutException 異常.

    在框架(如.RabbitTemplate)中使用的通道將會可靠地返回到緩存中.如果在框架外創(chuàng)建了通道 (如.直接訪問connection(s)并調(diào)用createChannel()),你必須可靠地返回它們(通過關(guān)閉),也許需要在 finally 塊中以防止耗盡通道.

    CachingConnectionFactory connectionFactory = new CachingConnectionFactory("somehost");
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");
    
    Connection connection = connectionFactory.createConnection();

    當(dāng)使用XML時,配置看起來像下面這樣:

    <bean id="connectionFactory"
    class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
    <constructor-arg value="somehost"/>
    <property name="username"value="guest"/>
    <property name="password" value="guest"/>
    </bean>

    這里也有一個 SingleConnectionFactory 實現(xiàn),它只能用于框架的單元測試代碼中.它比CachingConnectionFactory 簡單,因為它不會緩存通道,由于其缺乏性能和韌性,它不打算用于簡單的測試以外的實際使用.
    如果基于某些原
    因,你需要自己來實現(xiàn)ConnectionFactory ,AbstractConnectionFactory 基類提供了一個非常好的起點.

    ConnectionFactory 可使用rabbit命名空間來快速方便的建立:

    <rabbit:connection-factory id="connectionFactory"/>

    在多數(shù)情況下,這是很好的,因為框架會為你選擇最好的默認(rèn)值.創(chuàng)建的實例會是CachingConnectionFactory.要記住,默認(rèn)的緩存大小是25.如果你想緩存更多的通道,你可以設(shè)置channelCacheSize 屬性值.在XML中,它看起來像下面這樣:

    <bean id="connectionFactory"
    class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
    <constructor-arg value="somehost"/>
    <property name="username" value="guest"/>
    <property name="password" value="guest"/>
    <property name="channelCacheSize" value="50"/>
    </bean>

    在命名空間中,你也可以添加channel-cache-size 屬性:

    <rabbit:connection-factory id="connectionFactory" channel-cache-size="50"/>

    默認(rèn)的緩存模式是CHANNEL, 但你可以使用緩存連接來替換;在這種情況下,我們會使用connection-cache-size:

    <rabbit:connection-factory id="connectionFactory" cache-mode="CONNECTION" connection-cache-size="25"/>

    Host 和 port 屬性也可以在命名空間中提供:

    <rabbit:connection-factory id="connectionFactory" host="somehost" port="5672"/>

    此外,如果運行集群環(huán)境中,使用addresses屬性.

    <rabbit:connection-factory id="connectionFactory" addresses="host1:5672,host2:5672"/>

    下面是一個自定義的線程工廠,其前輟線程名稱為rabbitmq-.

    <rabbit:connection-factory id="multiHost" virtual-host="/bar" addresses="host1:1234,host2,host3:4567" thread-factory="tf" channel-cache-size="10" username="user" password="password" />
    <bean id="tf" class="org.springframework.scheduling.concurrent.CustomizableThreadFactory">
    <constructor-arg value="rabbitmq-" />
    </bean>

    配置底層客戶端連接工廠

    CachingConnectionFactory 使用的是 Rabbit client ConnectionFactory的實例; 當(dāng)在CachingConnectionFactory設(shè)置等價屬性時,許多屬性(host, port, userName, password, requestedHeartBeat, connectionTimeout) 來傳遞.
    要設(shè)置其它屬性(例如clientProperties),可定義一個rabbit factory 的實例,并使用CachingConnectionFactory的適當(dāng)構(gòu)造器來提供引用.
    當(dāng)使用上面提到的命名空間時,要在connection-factory屬性中提供一個工廠的引用來配置
    為方便起見,提供了一個工廠,以協(xié)助在一個Spring應(yīng)用程序上下文中配置連接工廠,在下一節(jié)討論。

    <rabbit:connection-factory id="connectionFactory" connection-factory="rabbitConnectionFactory"/>

    RabbitConnectionFactoryBean 和配置SSL

    1.4版本開始, 提供了一個便利的RabbitConnectionFactoryBean 類通過依賴注入來配置底層客戶端連接工廠的SSL屬性.其它設(shè)置簡單地委派給底層工廠.以前你必須以編程方式配置SSL選項。

    <rabbit:connection-factory id="rabbitConnectionFactory"connection-factory="clientConnectionFactory" host="${host}" port="${port}" virtual-host="${vhost}" username="${username}" password="${password}" />
    <bean id="clientConnectionFactory" class="org.springframework.xd.dirt.integration.rabbit.RabbitConnectionFactoryBean">
    <property name="useSSL" value="true" />
    <property name="sslPropertiesLocation" value="file:/secrets/rabbitSSL.properties"/>
    </bean>

    參考 RabbitMQ Documentation 來了解關(guān)于配置SSL的更多信息. 省略的keyStore 和 trustStore 配置將在無證書驗證的情況下,通過SSL來連接. Key和trust store 配置可以按如下提供:

    sslPropertiesLocation 屬性是一個Spring Resource ,它指向一個包含下面key的屬性文件:

    keyStore=file:/secret/keycert.p12
    trustStore=file:/secret/trustStore
    keyStore.passPhrase=secret
    trustStore.passPhrase=secret

    keyStore 的 truststore 是指向store的 Spring Resources .通常情況下,這個屬性文件在操作系統(tǒng)之下安全的,應(yīng)用程序只能讀取訪問.

    從Spring AMQP 1.5版本開始,這些屬性可直接在工廠bean上設(shè)置.如果同時提供了discrete和 sslPropertiesLocation 屬性, 后者屬性值會覆蓋discrete值.

    路由連接工廠

    1.3版本開始,引入了AbstractRoutingConnectionFactory.這提供了一種機制來配置多個ConnectionFactories的映射,并通過在運行時使用lookupKey來決定目標(biāo)ConnectionFactory

    通常,實現(xiàn)會檢查線程綁定上下文. 為了方便, Spring AMQP提供了SimpleRoutingConnectionFactory, 它會從SimpleResourceHolder中獲取當(dāng)前線程綁定的lookupKey:

    <bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory">
    <propertyname="targetConnectionFactories">
    <map>
    <entry key="#{connectionFactory1.virtualHost}" ref="connectionFactory1"/> 
    <entry key="#{connectionFactory2.virtualHost}" ref="connectionFactory2"/>
    </map>
    </property>
    </bean>
    <rabbit:template id="template" connection-factory="connectionFactory" />
    public class MyService {
    
    	@Autowired
      private RabbitTemplate rabbitTemplate;
    
    	public  void service(String vHost, String payload) {
    		SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(), vHost);
    		rabbitTemplate.convertAndSend(payload);
    		SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());
    	}
    
    }

    在使用資源后,對其進行解綁是很重要的.更多信息參考AbstractRoutingConnectionFactoryJavaDocs.

    1.4版本開始RabbitTemplate 支持SpEL sendConnectionFactorySelectorExpression 和receiveConnectionFactorySelectorExpression 屬性,
    它會在每個AMQP 協(xié)議交互操作(sendsendAndReceivereceiveor receiveAndReply)進行評估, 為提供的AbstractRoutingConnectionFactory類解析lookupKey值.
    Bean 引用,如"@vHostResolver.getVHost(#root)" 可用于表達式中.對于send 操作,  要發(fā)送的消息是根評估對象;對于receive操作queueName 是根評估對象.

    路由算法為:如果selector 表達式為null,或等價于null,或提供的ConnectionFactory 不是AbstractRoutingConnectionFactory的實例,根據(jù)提供的ConnectionFactory 實現(xiàn),
    所有的工作都按之前的進行.同樣的結(jié)果也會發(fā)生:如果評估結(jié)果不為null,但對于lookupKey 無目標(biāo)ConnectionFactory,且 the AbstractRoutingConnectionFactory 使用lenientFallback = true進行了配置.
    當(dāng)然,在AbstractRoutingConnectionFactory 的情況下,它會基于determineCurrentLookupKey()的路由實現(xiàn)來進行回退. 但,如果lenientFallback = false, 將會拋出 IllegalStateException 異常.

    Namespace 在<rabbit:template>組件中也支持send-connection-factory-selector-expression 和receive-connection-factory-selector-expression屬性.

    也是從1.4版本開始, 你可以在SimpleMessageListenerContainer置路由連接工廠. 在那種情況下,隊列名稱的列表將作為lookup key.例如,如果你在容器中配置setQueueNames("foo", "bar"),lookup key將是"[foo,bar]" (無空格).




    posted on 2016-08-13 12:21 胡小軍 閱讀(6595) 評論(0)  編輯  收藏 所屬分類: RabbitMQ
    主站蜘蛛池模板: 最近中文字幕mv免费高清视频7| 亚洲成年人在线观看| 亚洲毛片免费视频| 久久国产精品免费一区| 亚洲国产精品成人午夜在线观看| 亚洲天堂一区二区| 亚洲综合另类小说色区| 国产在线播放免费| 猫咪社区免费资源在线观看| 无码精品国产一区二区三区免费| 日韩精品无码永久免费网站| 亚洲色精品三区二区一区| 亚洲精品第一国产综合精品| 久久亚洲精品国产精品黑人| 国产亚洲大尺度无码无码专线| 亚洲国产人成中文幕一级二级| 国产黄色片在线免费观看| 好男人看视频免费2019中文| 99久久99这里只有免费费精品| 无码国产精品一区二区免费3p| 免费a级毛片无码a∨免费软件| 一区二区免费电影| 日韩一级片免费观看| 麻豆安全免费网址入口| 美女扒开尿口给男人爽免费视频| 亚洲成a人片在线观看天堂无码 | 亚洲偷自拍另类图片二区| 亚洲国产精品xo在线观看| 久久亚洲AV无码精品色午夜麻豆| 亚洲国产精品人久久| 亚洲精选在线观看| 久久久久久久亚洲Av无码 | 最新黄色免费网站| 99久久99久久精品免费观看| 日韩精品内射视频免费观看 | 久久精品国产精品亚洲下载| 久久久久亚洲精品中文字幕| 国产亚洲成归v人片在线观看 | 无码专区AAAAAA免费视频| 99re在线这里只有精品免费| 在线日本高清免费不卡|