2.2 Transport
??? ActiveMQ目前支持的transport有:VM Transport、TCP
Transport、SSL Transport、Peer Transport、UDP Transport、Multicast
Transport、HTTP and HTTPS Transport、Failover Transport、Fanout
Transport、Discovery Transport、ZeroConf
Transport等。以下簡單介紹其中的幾種,更多請參考Apache官方文檔。
?
2.2.1 VM Transport
???
VM transport允許在VM內(nèi)部通信,從而避免了網(wǎng)絡(luò)傳輸?shù)拈_銷。這時(shí)候采用的連接不是socket連接,而是直接地方法調(diào)用。
第一個(gè)創(chuàng)建VM 連接的客戶會啟動一個(gè)embed VM broker,接下來所有使用相同的broker
name的VM連接都會使用這個(gè)broker。當(dāng)這個(gè)broker上所有的連接都關(guān)閉的時(shí)候,這個(gè)broker也會自動關(guān)閉。
??? 以下是配置語法:
?? vm://brokerName?transportOptions
?? 例如:vm://broker1?marshal=false&broker.persistent=false
?? Transport Options的可選值如下:
Option Name
|
Default Value
|
Description
|
Marshal |
false |
If true, forces each command sent over the transport to be marshlled and unmarshlled using a WireFormat |
wireFormat |
default |
The name of the WireFormat to use |
wireFormat.* |
? |
All the properties with this prefix are used to configure the wireFormat |
create |
true |
If the broker should be created on demand if it does not allready exist. Only supported in ActiveMQ 4.1 |
broker.* |
? |
All the properties with this prefix are used to configure the broker. See Configuring Wire Formats for more information |
?
?? 以下是高級配置語法:
?? vm:(broker:(tcp://localhost)?brokerOptions)?transportOptions
?? vm:broker:(tcp://localhost)?brokerOptions
??? 例如:vm:(broker:(tcp://localhost:6000)?persistent=false)?marshal=false
??? Transport Options的可選值如下:
Option Name
|
Default Value
|
Description
|
marshal |
false |
If true, forces each command sent over the transport to be marshlled and unmarshlled using a WireFormat |
wireFormat |
default |
The name of the WireFormat to use |
wireFormat.* |
? |
All the properties with this prefix are used to configure the wireFormat |
?
?? 使用配置文件的配置語法:??
??? vm://localhost?brokerConfig=xbean:activemq.xml
??? 例如:vm:// localhost?brokerConfig=xbean:com/test/activemq.xml
?
?? 使用Spring的配置:
-
<
bean
?
id
=
"broker"
?
class
=
"org.apache.activemq.xbean.BrokerFactoryBean"
>
??
-
??<property?name="config"?value="classpath:org/apache/activemq/xbean/activemq.xml"?/>??
-
??<property?name="start"?value="true"?/>??
-
</
bean
>
??
-
??
-
<
bean
?
id
=
"connectionFactory"
?
class
=
"org.apache.activemq.ActiveMQConnectionFactory"
?
depends-on
=
"broker"
>
??
-
??<property?name="brokerURL"?value="vm://localhost"/>??
-
</
bean
>
??
??
如果persistent是true,那么ActiveMQ會在當(dāng)前目錄下創(chuàng)建一個(gè)缺省值是activemq-data的目錄用于持久化保存數(shù)據(jù)。需要注
意的是,如果程序中啟動了多個(gè)不同名字的VM broker,那么可能會有如下警告:Failed to start jmx connector:
Cannot bind to URL [rmi://localhost:1099/jmxrmi]:
javax.naming.NameAlreadyBoundException…可以通過在transportOptions中追加
broker.useJmx=false來禁用JMX來避免這個(gè)警告。
?
2.2.2 TCP Transport
??? TCP transport 允許客戶端通過TCP socket連接到遠(yuǎn)程的broker。以下是配置語法:
??? tcp://hostname:port?transportOptions
??? Transport Options的可選值如下:
Option Name
|
Default Value
|
Description
|
minmumWireFormatVersion |
0 |
The minimum version wireformat that is allowed |
trace |
false |
Causes all commands that are sent over the transport to be logged |
useLocalHost |
true |
When true, it causes the local machines name to resolve to "localhost". |
socketBufferSize |
64 * 1024 |
Sets the socket buffer size in bytes |
soTimeout |
0 |
sets the socket timeout in milliseconds |
connectionTimeout |
30000 |
A
non-zero value specifies the connection timeout in milliseconds. A zero
value means wait forever for the connection to be established. Negative
values are ignored. |
wireFormat |
default |
The name of the WireFormat to use |
wireFormat.* |
? |
All the properties with this prefix are used to configure the wireFormat. See Configuring Wire Formats for more information |
?? 例如:tcp://localhost:61616?trace=false
?
2.2.3 Failover Transport
???
Failover
Transport是一種重新連接的機(jī)制,它工作于其它transport的上層,用于建立可靠的傳輸。它的配置語法允許制定任意多個(gè)復(fù)合的URI。
Failover
transport會自動選擇其中的一個(gè)URI來嘗試建立連接。如果沒有成功,那么會選擇一個(gè)其它的URI來建立一個(gè)新的連接。以下是配置語法:
??? failover:(uri1,...,uriN)?transportOptions
??? failover:uri1,...,uriN
??? Transport Options的可選值如下:
Option Name
|
D
efault Value
|
Description
|
initialReconnectDelay |
10 |
How long to wait before the first reconnect attempt (in ms) |
maxReconnectDelay |
30000 |
The maximum amount of time we ever wait between reconnect attempts (in ms) |
useExponentialBackOff |
true |
Should an exponential backoff be used between reconnect attempts |
backOffMultiplier |
2 |
The exponent used in the exponential backoff attempts |
maxReconnectAttempts |
0 |
If not 0, then this is the maximum number of reconnect attempts before an error is sent back to the client |
randomize |
true |
use a random algorithm to choose the URI to use for reconnect from the list provided |
backup |
false |
initialize and hold a second transport connection - to enable fast failover |
?? 例如:failover:(tcp://localhost:61616,tcp://remotehost:61616)?initialReconnectDelay=100
?
2.2.4 Discovery transport
??? Discovery transport是可靠的tranport。它使用Discovery transport來定位用來連接的URI列表。以下是配置語法:
??? discovery:(discoveryAgentURI)?transportOptions
??? discovery:discoveryAgentURI
??? Transport Options的可選值如下:
Option Name
|
Default Value
|
Description
|
initialReconnectDelay |
10 |
How long to wait before the first reconnect attempt |
maxReconnectDelay |
30000 |
The maximum amount of time we ever wait between reconnect attempts |
useExponentialBackOff |
true |
Should an exponential backoff be used btween reconnect attempts |
backOffMultiplier |
2 |
The exponent used in the exponential backoff attempts |
maxReconnectAttempts |
0 |
If not 0, then this is the maximum number of reconnect attempts before an error is sent back to the client |
?? 例如:discovery:(multicast://default)?initialReconnectDelay=100??
??? 為了使用Discovery來發(fā)現(xiàn)broker,需要為broker啟用discovery agent。 以下是XML配置文件中的一個(gè)例子:
-
<
broker
?
name
=
"foo"
>
??
-
???<transportConnectors>??
-
??????<transportConnector?uri="tcp://localhost:0"?discoveryUri="multicast://default"/>??
-
????</transportConnectors>??
-
????...??
-
</
broker
>
??
??
在使用Failover Transport或Discovery
transport等能夠自動重連的transport的時(shí)候,需要注意的是:設(shè)想有兩個(gè)broker,它們都啟用AMQ Message
Store作為持久化存儲,有一個(gè)producer和一個(gè)consumer連接到某個(gè)queue。當(dāng)因其中一個(gè)broker失效時(shí)而切換到另一個(gè)
broker的時(shí)候,如果失效的broker的queue中還有未被consumer消費(fèi)的消息,那么這個(gè)queue里的消息仍然滯留在失效broker
的中,直到失效的broker被修復(fù)并重新切換回這個(gè)被修復(fù)的broker后,之前被保留的消息才會被consumer消費(fèi)掉。如果被處理的消息有時(shí)序限
制,那么應(yīng)用程序就需要處理這個(gè)問題。另外也可以通過ActiveMQ集群來解決這個(gè)問題。
?? 在transport重連的時(shí)候,可以在connection上注冊TransportListener來獲得回調(diào),例如:
-
(ActiveMQConnection)connection).addTransportListener(
new
?TransportListener()?{??
-
????public?void?onCommand(Object?cmd)?{??
-
????}??
-
??
-
????public?void?onException(IOException?exp)?{??
-
????}??
-
??
-
????public?void?transportInterupted()?{??
-
??????????
-
????}??
-
??
-
????public?void?transportResumed()?{??
-
??????????
-
????}??
-
});