小男孩‘自慰网亚洲一区二区,亚洲一级在线播放毛片,亚洲中文字幕av每天更新,黄aⅴ永久免费无码,91成人午夜在线精品,色网站免费在线观看,亚洲欧洲wwwww在线观看

分享

activeMQ指南針_Queue完整分析

 開心豆豆2010 2010-09-08

 

在接觸activeMQ的這一段時間里,我們還是保持開始對它的態(tài)度,它是個優(yōu)秀的開源消息中間件。消息中間件是個非常重要的搭建企業(yè)應(yīng)用系統(tǒng)的重要組件,我們在不斷深入分析activeMQ的過程中,發(fā)現(xiàn)直到5.1這個版本,都還是存在不少問題,有些是很致命,但正因為如此,我們更加堅定了要全面掌握activeMQ,我們不想重新做“輪子”,但我們要具備在輪子壞了或不好用的情況下,要能獨立解決碰到的這些問題。下面我們通過分析網(wǎng)友提出的一個典型的問題場景,來作為我們指南針計劃的結(jié)束。

Queue作為activeMQ里面一個很重要的通訊方式,網(wǎng)友的場景如下:

測試queue持久化消息時,發(fā)送接收20W條消息。打開消息消費者,連上再斷開,反復(fù)進(jìn)行這步操作,能接收到消息,接收端有時候會阻塞,但不能完全接收完20W條消息。(其實5000條就會發(fā)生問題,不用20W這么多)

       相關(guān)背景知識:

       因為這是5.1版本的一個非常嚴(yán)重的bug,所以我們會比較詳細(xì)的進(jìn)行分析。(我們在最終解決問題后,上activeMQ官網(wǎng)上發(fā)現(xiàn)它最新的源碼是解決了該問題的,但這并不影響這個問題的典型性)。下面我們將從3個方面來分析:Queue消息的接收和發(fā)送、內(nèi)存使用機制、消息的審查(audit)、消息在文件中的存儲機制。

l         Queue消息的接收和發(fā)送



 

Queue接收消息并發(fā)給需要的消費者,具體過程如下:

1.  Queue從消息生產(chǎn)者接收消息。

2.  Queue使用一個“存儲指針”來接收這些消息。當(dāng)內(nèi)存有空閑區(qū)域時,“存儲指針”把消息放到內(nèi)存中,當(dāng)內(nèi)存不夠時,則把消息們存入磁盤文件。

3.  當(dāng)有活動的(active)的消息消費者時,Queue會首先把“存儲指針”的內(nèi)存中的消息送給消費者,當(dāng)內(nèi)存的消息被消費掉,則從磁盤文件中再讀入其他的消息(出問題處),直至消息都被消費掉了。

其中最關(guān)鍵的方法是Queue類里的doPageIn()。

 

l         內(nèi)存使用機制

activeMQ為了適應(yīng)企業(yè)級的365*24的使用,在內(nèi)存使用方面非常慎重,任何消息只有在內(nèi)存里有空閑區(qū)域時,才能放到內(nèi)存里,之后才能發(fā)給消費者。當(dāng)消息被消費者消耗掉了后,確認(rèn)信息會發(fā)給activeMQQueue接收到這些確認(rèn)消息后,會把那些被確認(rèn)的消息所占用的內(nèi)存釋放掉。

 

l         消息的審查(audit)

為了防止消息的重復(fù)發(fā)送,activeMQ采用了一個審查機制,它負(fù)責(zé)審查某條消息是否重復(fù)。它是一個最近最久未使用算法(LRU)隊列。每個隊列元素它是一個bit數(shù)組,它的運行機制如下所示:



 

 

       消息是一個個按照順序進(jìn)入bit數(shù)組,具體算法answer = (index - firstIndex) / BitArray.LONG_SIZE,其中:

BitArray.LONG_SIZE是每個bit數(shù)組的大小。

Index是消息的編號。(它是按照+1順序增加的)

firstIndex是整個LRU隊列的首Index,這個值會經(jīng)常變化,因為當(dāng)達(dá)到LRU的上限時,老的一批就被清除了,firstIndex += BitArray.LONG_SIZE。(出問題處)

 

l         消息在文件中的存儲機制

存放在文件中的消息,它們是按照如下方式進(jìn)行組織的:



 

每個消息都知道它的上一個和下一個消息,當(dāng)它自身被刪除后,相應(yīng)的關(guān)系會進(jìn)行調(diào)整。

 

問題原因分析:

    因為activeMQ在編碼實現(xiàn)的時候,原本的想法應(yīng)該是這樣的:

1.  從生產(chǎn)者接收消息,如果Queue有可用的內(nèi)存就放在內(nèi)存中,沒有則存入文件中。

2.  Queue發(fā)送消息給消費者時,先發(fā)送已經(jīng)保存在內(nèi)存中的消息。

3.  當(dāng)內(nèi)存中消息發(fā)送完后,順序讀入(這里是關(guān)鍵)文件中的消息,通過消息的審查機制,確認(rèn)不是重復(fù)消息,則放入內(nèi)存中供后續(xù)操作使用。

但是activeMQ5.1版本的實現(xiàn),問題就出在第三步的順序讀入。因為從文件中讀入它有個先決條件,那就是必須要有可用的內(nèi)存,如果沒有可用的話,就放棄本次消息讀入,并且應(yīng)該放棄這次讀取操作。但是5.1版本是繼續(xù)往下讀,這就導(dǎo)致順序錯亂,使得當(dāng)內(nèi)存可用的時候,讀入的消息在進(jìn)行審查的時候,發(fā)生錯誤,錯誤認(rèn)為它們是重復(fù)消息。這就導(dǎo)致發(fā)送20W條消息,不能保證完全收到。

 

解決方案:

KahaReferenceStore的方法recoverNextMessages里的

if (entry != null) {

                int count = 0;

                do {

                    ReferenceRecord msg = messageContainer.getValue(entry);

                    if (msg != null ) {

                        if ( recoverReference(listener, msg)) {

                            count++;

                            lastBatchId = msg.getMessageId();

                        }

                    } else {

                        lastBatchId = null;

                    }

                    batchEntry = entry;

                    entry = messageContainer.getNext(entry);

                } while (entry != null && count < maxReturned && listener.hasSpace());

            }

 

改為

         if (entry != null) {

                int count = 0;

                do {

                    ReferenceRecord msg = messageContainer.getValue(entry);

                    testTheNextMsgId(msg.getMessageId().toString());

                    if (msg != null )

                    {

                        if ( recoverReference(listener, msg))

                        {

                            count++;

                            lastBatchId = msg.getMessageId();

                            batchEntry = entry;

                            entry = messageContainer.getNext(entry);

                        }

                        else

                        {

                           break;

                        }

                    }

                    else

                    {

                        lastBatchId = null;

                        batchEntry = entry;

                        entry = messageContainer.getNext(entry);

                    }

                } while (entry != null && count < maxReturned && listener.hasSpace());

            }

 

 

activeMQ指南針計劃的結(jié)束,但它又是個新開始,我們通過這個計劃收獲了我們想要的東西了,同時我們不僅為各位朋友答疑解疑,也提供了activemqSpanner這個工具作為消息網(wǎng)絡(luò)拓?fù)鋱D工具。再一次感謝各位朋友對我們的信任。

        現(xiàn)在,我們正式啟動activeMQ笑臉計劃。它的目的不再是給大家提供解決問題的方向,而是直接解決大家碰到的各種問題,給大家?guī)バδ?。它將是一個長期堅持的事情,任何關(guān)于activeMQ使用過程的疑惑、問題、bug、功能改進(jìn),都可以在這個計劃里交流。所有在笑臉計劃中提出的問題、功能改進(jìn)、解決方案,都將完全通過網(wǎng)絡(luò)無償分享給所有人。

  • 大小: 15.2 KB
  • 大小: 9.6 KB
  • 大小: 21.8 KB

    本站是提供個人知識管理的網(wǎng)絡(luò)存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點。請注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊一鍵舉報。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多