最近一個(gè)項(xiàng)目要用到ActiveMq,并且需要最大程度的保證消息不丟失。以前對(duì)activeMq不是很熟悉,完全是摸著石頭過(guò)河,目前基本配置都搞定了。只是對(duì)于它的自動(dòng)重連一直找不到好的解決辦法,我希望的效果是當(dāng)一個(gè)broker(假設(shè)只有這一個(gè),沒(méi)有備用的)如果異常down掉的話,那么監(jiān)聽(tīng)程序能夠等待broker重啟后再自動(dòng)重新連接。看了它的文檔似乎 設(shè)置一下failover:(tcp://localhost:61616) 就可以了
Failover Transport
Failover Transport是一種重新連接的機(jī)制,它工作于其它transport的上層,用于建立可靠的傳輸。它的配置語(yǔ)法允許制定任意多個(gè)復(fù)合的URI。 Failover transport會(huì)自動(dòng)選擇其中的一個(gè)URI來(lái)嘗試建立連接。如果沒(méi)有成功,那么會(huì)選擇一個(gè)其它的URI來(lái)建立一個(gè)新的連接。以下是配置語(yǔ)法:
failover:(uri1,...,uriN)?transportOptions
failover:uri1,...,uriN
Transport Options的可選值如下:
Option Name Default Value Description
initialReconnectDelay 10 How long to wait before the first reconnect attempt (in ms)
maxReconnectDelay 30000 The maximum amount of time we ever wait between reconnect attempts (in ms)
useExponentialBackOff true Should an exponential backoff be used between reconnect attempts
backOffMultiplier 2 The exponent used in the exponential backoff attempts
maxReconnectAttempts 0 If not 0, then this is the maximum number of reconnect attempts before an error is sent back to the client
randomize true use a random algorithm to choose the URI to use for reconnect from the list provided
backup false initialize and hold a second transport connection - to enable fast failover
例如:failover:(tcp://localhost:61616,tcp://remotehost:61616)?initialReconnectDelay=100
2.2.4 Discovery transport
Discovery transport是可靠的tranport。它使用Discovery transport來(lái)定位用來(lái)連接的URI列表。以下是配置語(yǔ)法:
discovery:(discoveryAgentURI)?transportOptions
discovery:discoveryAgentURI
Transport Options的可選值如下:
Option Name Default Value Description
initialReconnectDelay 10 How long to wait before the first reconnect attempt
maxReconnectDelay 30000 The maximum amount of time we ever wait between reconnect attempts
useExponentialBackOff true Should an exponential backoff be used btween reconnect attempts
backOffMultiplier 2 The exponent used in the exponential backoff attempts
maxReconnectAttempts 0 If not 0, then this is the maximum number of reconnect attempts before an error is sent back to the client
例如:discovery:(multicast://default)?initialReconnectDelay=100
為了使用Discovery來(lái)發(fā)現(xiàn)broker,需要為broker啟用discovery agent。 以下是XML配置文件中的一個(gè)例子:
Xml代碼
1. <broker name="foo">
2. <transportConnectors>
3. <transportConnector uri="tcp://localhost:0" discoveryUri="multicast://default"/>
4. </transportConnectors>
5. ...
6. </broker>
在使用Failover Transport或Discovery transport等能夠自動(dòng)重連的transport的時(shí)候,需要注意的是:設(shè)想有兩個(gè)broker,它們都啟用AMQ Message Store作為持久化存儲(chǔ),有一個(gè)producer和一個(gè)consumer連接到某個(gè)queue。當(dāng)因其中一個(gè)broker失效時(shí)而切換到另一個(gè) broker的時(shí)候,如果失效的broker的queue中還有未被consumer消費(fèi)的消息,那么這個(gè)queue里的消息仍然滯留在失效broker 的中,直到失效的broker被修復(fù)并重新切換回這個(gè)被修復(fù)的broker后,之前被保留的消息才會(huì)被consumer消費(fèi)掉。如果被處理的消息有時(shí)序限 制,那么應(yīng)用程序就需要處理這個(gè)問(wèn)題。另外也可以通過(guò)ActiveMQ集群來(lái)解決這個(gè)問(wèn)題。
在transport重連的時(shí)候,可以在connection上注冊(cè)TransportListener來(lái)獲得回調(diào),例如:
Java代碼
1. (ActiveMQConnection)connection).addTransportListener(new TransportListener() {
2. public void onCommand(Object cmd) {
3. }
4.
5. public void onException(IOException exp) {
6. }
7.
8. public void transportInterupted() {
9. // The transport has suffered an interruption from which it hopes to recover.
10. }
11.
12. public void transportResumed() {
13. // The transport has resumed after an interruption.
14. }
15. });
---------------------
我的做法是 不要在服務(wù)器端設(shè)置,而在本地設(shè)置:failover:(tcp://192.168.0.245:61616?wireFormat.maxInactivityDuration=0)