|
更多優(yōu)質(zhì)內(nèi)容請關(guān)注微信公眾號“AI 前線”(ID:ai-front) 消息隊(duì)列是一種幫助開發(fā)人員解決系統(tǒng)間異步通信的中間件,常用于解決系統(tǒng)解耦和請求的削峰平谷的問題。它是一種中間件,意味著它是面向研發(fā)人員而非終端用戶的產(chǎn)品,它的存在不能直接的創(chuàng)造價(jià)值,但可以有效的簡化研發(fā)的開發(fā)工作。下面,我試著用一個(gè)簡單的例子來展示下 MQ 的打開方式。 一個(gè)最簡單的電商系統(tǒng),至少包含以下流程:
相信大家都能理解為什么我們不會(huì)在一個(gè)系統(tǒng)中實(shí)現(xiàn)所有功能。 那好,我們假設(shè)三個(gè)功能簡單的對應(yīng)三個(gè)系統(tǒng): 用戶端系統(tǒng),也就是給用戶使用的前端,可以是 web 也可以是 app 或者小程序,這不重要;支付系統(tǒng),對接銀行或其他金融機(jī)構(gòu),完成付款流程;物流系統(tǒng),負(fù)責(zé)商品配送的跟蹤。 顯而易見,用戶如果想購買商品,會(huì)在前端系統(tǒng)中選中自己喜歡的商品,然后它需要付款,之后由物流人員進(jìn)行配送。 那么問題來了,當(dāng)用戶選購物品之后,前端系統(tǒng)如何將訂單消息轉(zhuǎn)給支付系統(tǒng)? 很簡單,支付系統(tǒng)可以給前端系統(tǒng)提供一個(gè)接口,當(dāng)有訂單生成時(shí),前端系統(tǒng)調(diào)用支付系統(tǒng)的接口將消息傳遞給支付系統(tǒng),完成付款流程。同樣,支付系統(tǒng)也可以把訂單消息再傳給物流系統(tǒng),邏輯清晰,架構(gòu)簡單,一切看起來都很美好,完全不需要消息隊(duì)列。 但是接下來,隨著業(yè)務(wù)的發(fā)展,又有了新的需求。公司新建了一套風(fēng)控系統(tǒng),防止用戶惡意操作。 所以風(fēng)控系統(tǒng)也需要接受到所有的訂單請求。 按照前面的方式,前端調(diào)用支付系統(tǒng)后,又多了一步調(diào)用風(fēng)控系統(tǒng)接口的操作。之后的之后又有了優(yōu)惠卷系統(tǒng),也需要獲取訂單信息,還有公司財(cái)務(wù)系統(tǒng),內(nèi)控合規(guī),大數(shù)據(jù)統(tǒng)計(jì),廣告系統(tǒng),用戶推薦系統(tǒng)。前端系統(tǒng)的研發(fā)每天忙于和各個(gè)系統(tǒng)進(jìn)行對接,完全沒有時(shí)間優(yōu)化用戶體驗(yàn)。更糟糕的是,后來老板心血來潮,要做一次 69 大促,優(yōu)惠套路及具欺騙性。屆時(shí)訂單量會(huì)有百倍的增長,各個(gè)系統(tǒng)當(dāng)前處理能力嚴(yán)重不足。理想情況下,各個(gè)系統(tǒng)可以橫向無限擴(kuò)容,簡單說就是增加服務(wù)器數(shù)量。但這樣會(huì)帶來嚴(yán)重的成本支出,尤其是當(dāng)我們考慮到有些系統(tǒng),尤其是大數(shù)據(jù)相關(guān)的其實(shí)對消息實(shí)時(shí)性要求并不高,這些成本會(huì)顯得很沒必要。 還好,God bless 碼農(nóng)。消息隊(duì)列橫空出世,拯救研發(fā)狗于水深火熱中。前端系統(tǒng)只需將訂單信息發(fā)送的 MQ 中,而不用關(guān)心都有誰需要接受訂單信息。其他所有系統(tǒng)從 MQ 中獲取消息,而且前端系統(tǒng)也不用關(guān)系其他系統(tǒng)收到消息后是否處理成功,MQ 可以幫助我們處理這些問題,這就是我們所說的系統(tǒng)節(jié)藕。大促時(shí),也只需要保證核心系統(tǒng)有充足的處理能力即可,對于處理能力較弱的系統(tǒng),在流量峰值時(shí) MQ 系統(tǒng)可以將消息暫時(shí)保存,下游系統(tǒng)可以優(yōu)哉游哉的慢慢處理,這就是我們所說的削峰平谷。 那么問題來了,MQ 為什么這么牛逼呢?它是如何實(shí)現(xiàn)這些功能的? 我們一個(gè)個(gè)慢慢講。 JMS 本質(zhì)上是 JAVA API。在 JMS 中定義了 Producer,Consumer,Provider 三種角色,Producer 作為消息的發(fā)送方,Consumer 作為消息的接收方,Provider 作為服務(wù)的提供者,Producer 和 Consumer 統(tǒng)稱為 Client。JMS 定義了點(diǎn)對點(diǎn)和發(fā)布訂閱兩種消息模型,發(fā)布訂閱模型中,通過 topic 對消息進(jìn)行路由,生產(chǎn)者可以將消息發(fā)到指定的 topic,消費(fèi)者訂閱這個(gè) topic 即可收到生產(chǎn)者發(fā)送的消息。 一個(gè)生產(chǎn)者可以向一個(gè)或多個(gè) topic 中發(fā)送消息,一個(gè)消費(fèi)者也可以消費(fèi)一個(gè)或多個(gè) topic 中的消息,一個(gè) topic 也可以有多個(gè)生產(chǎn)者或消費(fèi)者,生產(chǎn)者和消費(fèi)者只需要關(guān)聯(lián) topic,而不用關(guān)心這消息由誰發(fā)送或者消費(fèi)。 Provider 為每一個(gè) topic 維護(hù)一個(gè)或多個(gè) queue 來保存消息,消息在 queue 中是有序的,遵循先進(jìn)先出的原則,不同 queue 間的消息是無序的。點(diǎn)對點(diǎn)模式中沒有 topic 的概念,生產(chǎn)者直接將消息發(fā)送到指定 queue,消費(fèi)者也指定 queue 進(jìn)行消費(fèi),消息只能被一個(gè)消費(fèi)者消費(fèi),不可以被多個(gè)消費(fèi)者消費(fèi)。Kafka 和 RocketMQ 都實(shí)現(xiàn)了或部分實(shí)現(xiàn)了 JMS 協(xié)議。 與 JMS 不同,AMQP 是一個(gè)應(yīng)用層的網(wǎng)絡(luò)傳輸協(xié)議,對報(bào)文格式進(jìn)行定義,與開發(fā)語言無關(guān)。在 AMQP 中同樣有生產(chǎn)者,消費(fèi)者兩種角色,消息也是保存在 queue 中的。 但不同于 JMS 用 topic 對消息進(jìn)行路由,AMQP 的路由方式由 exchange 和 binding 決定。 client 可以創(chuàng)建 queue,并在創(chuàng)建 queue 的同時(shí)通知 exchange 這個(gè) queue 接受符合什么條件的消息,這個(gè)條件即為 Bingding key。生產(chǎn)者發(fā)送消息到 exchange 的時(shí)候會(huì)指定一個(gè) router key,exchange 收到消息后會(huì)與自己所維護(hù)的 Bingding key 做比較,發(fā)送到符合條件的 queue 中。消費(fèi)者在消費(fèi)時(shí)指定 queue 進(jìn)行消費(fèi)。RabbitMQ 實(shí)現(xiàn)了 AMQP 協(xié)議。 MQTT 協(xié)議是一種基于發(fā)布訂閱的輕量級協(xié)議,支持 TCP 和 UDP 兩種連接方式,主要應(yīng)用于即時(shí)通訊,小型設(shè)備,移動(dòng)應(yīng)用等領(lǐng)域。 MQTT 中有發(fā)布者(Publish),訂閱者(Subscribe)和代理服務(wù)器(Broker)三種角色。Broker 是服務(wù)的提供者,發(fā)布者和前兩種協(xié)議中的生產(chǎn)者相同,將消息(Message)發(fā)送到 Broker,Subscribe 從 Broker 中獲取消息并做業(yè)務(wù)處理。 MQTT 的 Message 中固定消息頭(Fixed header)僅有 2 字節(jié),開銷極小,除此之外分為可變頭(Variable header)和消息體(payload)兩部分。固定頭中包含消息類型,消息級別,變長頭的大小以及消息體的總長度等信息。 變長頭則根據(jù)消息類別,含有不同的標(biāo)識(shí)信息。 MQTT 允許客戶端動(dòng)態(tài)的創(chuàng)建主題,發(fā)布者與服務(wù)端建立會(huì)話(session)后,可以通過 Publish 方法發(fā)送數(shù)據(jù)到服務(wù)端的對應(yīng)主題,訂閱者通過 Subscribe 訂閱主題后,服務(wù)端就會(huì)將主題中的消息推送給對應(yīng)的訂閱者。 一般來說,系統(tǒng)間如果需要通信,除了正常情況下的消息傳遞外,還要考慮下游系統(tǒng)處理異常,上游系統(tǒng)如何處理?系統(tǒng)宕機(jī)的情況下會(huì)不會(huì)導(dǎo)致數(shù)據(jù)丟失?當(dāng)有業(yè)務(wù)數(shù)據(jù)異常時(shí),如何去定位是上游系統(tǒng)發(fā)送出了問題還是下游系統(tǒng)的問題?如果需要同時(shí)將信息發(fā)送給多個(gè)下游系統(tǒng),其中一個(gè)處理有問題會(huì)不會(huì)導(dǎo)致其它系統(tǒng)受影響?而 MQ 可以讓這些問題變得簡單。 在消費(fèi)中,一般有推消息和拉消息兩種模式。推模式即服務(wù)端收到消息后,主動(dòng)將消息推送給消費(fèi)者,由消費(fèi)者進(jìn)行處理,這種模式具有更高的實(shí)時(shí)性,但是由于服務(wù)端不能準(zhǔn)確評估消費(fèi)端的消費(fèi)性能,所以有可能造成消息推送過多使客戶端來不及處理收到的消息; 拉模式則是服務(wù)端收到消息后將消息保存在服務(wù)端,被動(dòng)的等待客戶端來拉取消息,這種模式下客戶端可以根據(jù)自己的處理能力來決定拉消息的頻率,但是缺點(diǎn)就是消息處理可能有延遲,不過可以通過長輪詢的方式來提高實(shí)時(shí)性。 消息傳遞過程中,會(huì)有各種異常導(dǎo)致消息不能正常發(fā)送,這時(shí)候,我們有以下三種選擇:
這三種方式又分別是如何實(shí)現(xiàn)的呢? 要實(shí)現(xiàn)至多一次并不難,生產(chǎn)者只需要異步發(fā)送,在發(fā)送失敗或者消費(fèi)失敗的時(shí)候不做任何處理即可。MQ 在消費(fèi)者拉走消息后,就直接將消息標(biāo)記為已經(jīng)消費(fèi)或者刪除消息。在監(jiān)控系統(tǒng)和日志系統(tǒng)中,丟失部分信息是可以接受的,但顯然,電商系統(tǒng),金融系統(tǒng)等大部分業(yè)務(wù),是不允許出現(xiàn)消息丟失這種情況的,需要保證消息一定會(huì)送達(dá)到消費(fèi)者。 至少一次的實(shí)現(xiàn)一般如下:生產(chǎn)者發(fā)消息到 MQ,MQ 收到消息后返回確認(rèn)信息(ACK)給生產(chǎn)者,生產(chǎn)者收到確認(rèn)信息后生產(chǎn)過程完成,如果在一定時(shí)間內(nèi),生產(chǎn)者沒有收到確認(rèn)信息,生產(chǎn)者重新發(fā)送消息。 重新發(fā)送的過程可以是立即發(fā)送,也可以將處理異常的消息持久化,比如保存到數(shù)據(jù)庫中,然后定時(shí)重試知道成功。 同樣,消費(fèi)者從 MQ 獲取到消息后,當(dāng)業(yè)務(wù)邏輯處理完成,向 MQ 返回 ACK 信息。 但是存在下面一種情況,當(dāng) MQ 收到消息并發(fā)送 ACK,或者消費(fèi)者消費(fèi)完成發(fā)送 ACK 信息之后,由于網(wǎng)絡(luò),系統(tǒng)故障等問題,ACK 信息沒有成功送達(dá),就會(huì)導(dǎo)致消息重復(fù)發(fā)送。 對于大部分消息隊(duì)列的實(shí)現(xiàn)來說(如 kafka,RocketMQ)對于消息重復(fù)的處理方式,就是不處理,交由消費(fèi)者根據(jù)業(yè)務(wù)邏輯自己實(shí)現(xiàn)去重或冪等。消費(fèi)者根據(jù)業(yè)務(wù)邏輯自己實(shí)現(xiàn)去重或冪等。消費(fèi)者根據(jù)業(yè)務(wù)邏輯自己實(shí)現(xiàn)去重或冪等。重要的事情說三遍。 有些人或許會(huì)覺得這是常識(shí)和基本素養(yǎng),但也有部分同學(xué)過于相信 MQ 系統(tǒng)和網(wǎng)絡(luò)環(huán)境的穩(wěn)定性,不做去重導(dǎo)致業(yè)務(wù)出現(xiàn)問題,比如優(yōu)惠卷系統(tǒng)沒有做去重處理,本來只能領(lǐng)取一張的優(yōu)惠卷,結(jié)果給用戶發(fā)了多張。 如果要實(shí)現(xiàn)正好一次的消息級別,每次消息傳遞過程正需要四次通信,過程如下: 發(fā)送端發(fā)消息給接收端,接收端收到消息后持久化保存消息 ID 并返回 REC 信息給發(fā)送端,通知生產(chǎn)端我已經(jīng)收到這個(gè)消息了。 這時(shí)消息是一種中間態(tài),接受端不會(huì)進(jìn)行業(yè)務(wù)邏輯的處理。這個(gè)過程中,如果 REC 消息丟失,服務(wù)端重傳了消息, 接受端接受到消息后會(huì)和本地保存到消息 ID 做對比,如果重復(fù),就丟棄消息不做處理,避免消息被處理多次,而且消息 ID 會(huì)持久化到硬盤,防止因?yàn)閿嚯妰?nèi)存中數(shù)據(jù)丟失倒是消息被重復(fù)處理。 發(fā)送端收到接收端返回的 rec 消息后,發(fā)送一個(gè) rel 請求給消費(fèi)端,告訴消費(fèi)端我確認(rèn)收到了你的確認(rèn)消息,接收端收到 rel 請求后才會(huì)進(jìn)行具體的業(yè)務(wù)邏輯處理,并返回 comp 信息給發(fā)送端,同時(shí)在本地刪除保存的消息 ID。如果發(fā)送端沒有收到 comp 信息,會(huì)重發(fā) rel 請求而不會(huì)重發(fā)消息。 以上,就是正好一次的實(shí)現(xiàn)過程。如果你沒看懂,那就再看一遍,如果還是沒看懂,那也沒關(guān)系,至少,你知道這玩意實(shí)現(xiàn)起來很復(fù)雜就好了。也就明白了為什么大部分消息中間件一般只保證至少一次,去重的過程交給消費(fèi)者自己處理了。 畢竟對于大多數(shù)場景,吞吐量才是首要指標(biāo)。 那么,什么場景下,需要保證正好一次呢? 答案是物聯(lián)網(wǎng)。 而標(biāo)準(zhǔn)的實(shí)現(xiàn)協(xié)議,是 MQTT。 因?yàn)槲锫?lián)網(wǎng)場景下,大部分終端是嵌入式系統(tǒng),處理能力會(huì)比服務(wù)器低很多,所以服務(wù)端需要幫助終端實(shí)現(xiàn)去重,簡化終端的業(yè)務(wù)邏輯。 生產(chǎn)者將消息發(fā)到消息隊(duì)列的 Broker 之后,如果 Broker 只將消息保存在內(nèi)存中,那當(dāng)服務(wù)器斷電或各種原因?qū)е洛礄C(jī)時(shí), 還沒有被消費(fèi)的消息將會(huì)丟失。 為了解決這個(gè)問題,可以選擇將數(shù)據(jù)持久化到硬盤,這樣當(dāng)機(jī)器故障恢復(fù)后數(shù)據(jù)還在,消費(fèi)者可以繼續(xù)消費(fèi)之前沒有消費(fèi)完的數(shù)據(jù)。但是,如果僅僅持久化到硬盤,當(dāng)服務(wù)器發(fā)生磁盤故障,Raid 卡故障時(shí),數(shù)據(jù)依然存在丟失的風(fēng)險(xiǎn)。 為了解決這個(gè)問題,絕大多數(shù)消息隊(duì)列的實(shí)現(xiàn)都引入了復(fù)制 / 多副本的概念,將每份數(shù)據(jù)都保存在多臺(tái)服務(wù)器上,而且一般這些服務(wù)器還要盡可能多實(shí)現(xiàn)跨機(jī)架甚至跨數(shù)據(jù)中心。 復(fù)制可以是同步的也可以是異步的,可以是一主一從,也可以是一主多從,也可以基于 Raft,Paxos 等算法實(shí)現(xiàn)多副本。 不管是持久化還是復(fù)制,在保證數(shù)據(jù)可靠性的同時(shí),都必然會(huì)帶來一部分性能的損耗,所以不同的消息隊(duì)列實(shí)現(xiàn)根據(jù)自己的定位,會(huì)選擇不同的復(fù)制的實(shí)現(xiàn)方式以及持久化時(shí)的文件結(jié)構(gòu)。下面,我根據(jù)自己的理解聊一聊常見的文件結(jié)構(gòu)和復(fù)制方式的優(yōu)缺點(diǎn),只是個(gè)人觀點(diǎn),僅供參考。 其實(shí)消息隊(duì)列的持久化,除了本地寫文件外,還可以持久化到 K-V 存儲(chǔ)或者關(guān)系型數(shù)據(jù)庫中,但是性能會(huì)比較差,我們就不做討論了,我們只聊聊持久化到本地文件系統(tǒng)中。而最常見的兩種文件結(jié)構(gòu),一種是 Kafka 所使用的,一種是 RocketMQ 所使用的。 Kafka 會(huì)在 Broker 上為每一個(gè) topic 創(chuàng)建一個(gè)獨(dú)立的 partiton 文件,Broker 接受到消息后,會(huì)按主題在對應(yīng)的 partition 文件中順序的追加消息內(nèi)容。而 RocketMQ 則會(huì)創(chuàng)建一個(gè) commitlog 的文件來保存分片上所有主題的消息。 Broker 接收到任意主題的消息后,都會(huì)將消息的 topic 信息,消息大小,校驗(yàn)和等信息以及消息體的內(nèi)容順序追加到 Commitlog 文件中,Commitlog 文件一般為固定大小,當(dāng)前文件達(dá)到限定大小時(shí),會(huì)創(chuàng)建一個(gè)新的文件,文件以起始便宜位置命名。 同時(shí),Broker 會(huì)為每一個(gè)主題維護(hù)各自的 ConsumerQueue 文件,文件中記錄了該主題消息的索引,包括在 Commitlog 中的偏移位置,消息大小及校驗(yàn)和,以便于在消費(fèi)時(shí)快速的定位到消息位置。ConsumerQueue 的維護(hù)是異步進(jìn)行的,不影響消息生產(chǎn)的主流程,即使 ConsumerQueue 沒有及時(shí)更新的 情況下,服務(wù)異常終止,下次啟動(dòng)時(shí)也可以根據(jù) Commitlog 文件中的內(nèi)容對 ConsumerQueue 進(jìn)行恢復(fù)。 這樣的文件結(jié)構(gòu)也就決定了,在同步刷盤的場景下,RocketMQ 是順序?qū)?,?Kafka 是隨機(jī)寫。通常情況下,我們認(rèn)為順序?qū)懙男阅苓h(yuǎn)高于隨機(jī)寫,尤其時(shí)對于傳統(tǒng)的機(jī)械硬盤來講更是如此。 且當(dāng) Broker 上的 topic 數(shù)量增多時(shí),RocketMQ 在寫消息的性能上幾乎不會(huì)受到影響,而對 Kafka 的影響則會(huì)較大。 而在消費(fèi)時(shí),因?yàn)榭梢愿鶕?jù)一定的緩存策略將熱數(shù)據(jù)提前緩存到內(nèi)存中,所以不管哪種方式對于磁盤的要求都不是太高。不過對于 RocketMQ 來說,內(nèi)存加載時(shí)會(huì)加載一整個(gè) Commitlog 文件,如果同一個(gè) Broker 上的兩個(gè)主題,一個(gè)主題的消息積壓了很長時(shí)間開始才開始消費(fèi),而另一個(gè)主題在及時(shí)消費(fèi)新發(fā)送的消息時(shí),Broker 可能會(huì)頻發(fā)的讀取文件更新到緩存中,造成磁盤性能損耗,進(jìn)而影響到生產(chǎn)時(shí)的發(fā)送性能。 所以雖然 RocketMQ 支持海量消息積壓,但如果是在共享的集群中,還是建議用戶最好能做到及時(shí)消費(fèi),保證集群中所有主題都在消費(fèi)相近時(shí)間段的消息,這樣命中內(nèi)存緩存的概率會(huì)比較高,消費(fèi)時(shí)不會(huì)帶來額外的磁盤開銷。 需要補(bǔ)充說明的是,在做技術(shù)選型時(shí),還需要考慮到硬件的發(fā)展?,F(xiàn)今固態(tài)硬盤雖然價(jià)格較機(jī)械硬盤還是高出很多,但普及度越來越高。而固態(tài)硬盤在亂序?qū)憰r(shí),性能表現(xiàn)比機(jī)械硬盤會(huì)好很多,特別是多線程同時(shí)進(jìn)行寫操作時(shí),性能也會(huì)比單線程順序?qū)憦?qiáng)。對于需要同步刷盤保證數(shù)據(jù)可靠性的應(yīng)用,磁盤讀寫性能的重要性一般來講也會(huì)遠(yuǎn)高于磁盤的空間大小。 成本上來講,如果可以顯著的提高單機(jī)性能,雖然單價(jià)來看固態(tài)硬盤更加昂貴,但是如果可以節(jié)省部分 CPU,內(nèi)存和機(jī)架位置,還是很劃算的。 復(fù)制的實(shí)現(xiàn),最簡單的方式就是一主一從的結(jié)構(gòu),開源版本的 RocketMQ 即使用了這種模式。由兩個(gè) Broker 實(shí)例組成一組服務(wù),一個(gè)作為主節(jié)點(diǎn),提供讀寫服務(wù),一個(gè)作為從節(jié)點(diǎn),在正常情況下只從主節(jié)點(diǎn)同步數(shù)據(jù)不提供讀寫服務(wù),且每個(gè) topic 都會(huì)分配到多個(gè) Broker 分組上。當(dāng)某個(gè)從節(jié)點(diǎn)發(fā)生故障時(shí),可以禁止主節(jié)點(diǎn)的寫入,依然允許消費(fèi)者繼續(xù)消費(fèi)該節(jié)點(diǎn)中未處理完成的消息。而生產(chǎn)者有新消息過來時(shí),由其它主從都健康的分組提供服務(wù), 直到故障機(jī)器恢復(fù)后主節(jié)點(diǎn)重新提供讀寫服務(wù)。如果故障機(jī)器無法恢復(fù),只需等積壓消息全部消費(fèi)完,替換故障機(jī)器即可。 如果主節(jié)點(diǎn)故障,則可以在從節(jié)點(diǎn)進(jìn)行消費(fèi),其它處理方式與從節(jié)點(diǎn)故障處理方式一致。 這種方式的優(yōu)點(diǎn)是邏輯簡單,實(shí)現(xiàn)也簡單,簡單意味著穩(wěn)定,隱藏的 bug 少。且數(shù)據(jù)只需要一份冗余,對磁盤空間的開銷相對較少,可以保證大多數(shù)情況下的數(shù)據(jù)可靠性和服務(wù)可用性。 Kafka 的復(fù)制策略,使用的是 ISR(可用服務(wù)列表)的方式,可以把他看成主從結(jié)構(gòu)的升級版。對于每一個(gè) partiton,可以分配一個(gè)或多個(gè) Broker。 其中一個(gè)作為主節(jié)點(diǎn),剩余的作為跟隨者,跟隨者會(huì)保存一個(gè) partition 副本。生產(chǎn)者將消息發(fā)送到主節(jié)點(diǎn)后,主節(jié)點(diǎn)會(huì)廣播給所有跟隨者,跟隨者收到后返回確認(rèn)信息給主節(jié)點(diǎn)。 用戶可以自由的配置副本數(shù)及當(dāng)有幾個(gè)副本寫成功后,則認(rèn)為消息成功保存。且同時(shí),會(huì)在 ZooKeeper 上維護(hù)一個(gè)可用跟隨者列表,列表中記錄所有數(shù)據(jù)和主節(jié)點(diǎn)完全同步的跟隨者列表。當(dāng)主節(jié)點(diǎn)發(fā)生故障時(shí),在列表中選擇一個(gè)跟隨者作為新的主節(jié)點(diǎn)提供服務(wù)。在這種策略下,假設(shè)總共有 m 個(gè)副本,要求至少有 n 個(gè)(0<n<m+1)副本寫成功,則系統(tǒng)可以在最多 m-n 個(gè)機(jī)器故障的情況下保證可用性。 還有一種實(shí)現(xiàn)是基于 Raft 算法實(shí)現(xiàn)的多副本機(jī)制,具體細(xì)節(jié)可以參考官方的 paper。Raft 集群一般由奇數(shù)節(jié)點(diǎn)構(gòu)成,如果要保證集群在 n 個(gè)節(jié)點(diǎn)故障的情況下可用,則至少需要有 2n+1 個(gè)節(jié)點(diǎn)。 與 ISR 方式相比,Raft 需要耗費(fèi)更多的資源,但是整個(gè)復(fù)制和選舉過程都是集群中的節(jié)點(diǎn)自主完成,不需要依賴 ZooKeeper 等第三者。 理論上 Raft 集群規(guī)??梢詿o限擴(kuò)展而 ISR 模式下集群規(guī)模會(huì)受限于 ZooKeeper 集群的處理能力。 一般情況下,因?yàn)橄⒎植荚诓煌?Broker 上,且有多個(gè)客戶端同時(shí)消費(fèi),各實(shí)例間的網(wǎng)絡(luò)狀態(tài)和處理能力都是不一定的,所以分布式消息系統(tǒng)是沒有辦法保證消息的處理順序的。但如果你了解了一般消息隊(duì)列的文件結(jié)構(gòu),你就會(huì)發(fā)現(xiàn)不管是 Kafka 的 partition 那種方式,還是 RocketMQ 的方式,都可以保證同一個(gè) partition 或者同一個(gè) ConsumerQueue 內(nèi)的消息是可以保證順序的。 剩下的,我們需要做的就是將需要保證順序的消息放入到同一個(gè) partiton 或者 queue 中就好了, 最簡單的方式是我們只為主題分配一個(gè) partition 或者 queue,這樣就可以保證嚴(yán)格的順序,但是這樣就不能體現(xiàn)分布式系統(tǒng)的性能優(yōu)勢了,集群的處理能力沒有辦法橫向擴(kuò)展。 在實(shí)際的生產(chǎn)中,大多數(shù)情況下我們其實(shí)并不需要所有的消息都順序處理,更多時(shí)候只要求具有相同特征的消息保證順序,如電商系統(tǒng)中,一般要求具有相同訂單號的消息需要保證順序,不同的訂單之間可以亂序,也就是說我們只要保證有辦法將具有相同訂單編號的消息放入到同一個(gè)隊(duì)列或者 partition 中即可。 Kafka 提供了指定 partition 發(fā)送的功能,使用者可以在客戶端根據(jù)業(yè)務(wù)邏輯自行處理,還有的消息隊(duì)列支持根據(jù)某個(gè)字段的值,將消息 hash 到消息指定消息隊(duì)列中。 指定 partition 和 hash 兩種方式的主要區(qū)別,就是當(dāng)有某個(gè)分片故障時(shí),指定 partition 的方式會(huì)導(dǎo)致部分消息發(fā)送失敗,而 hash 的方式有可能造成少量消息的亂序。 事務(wù)消息主要指消息生產(chǎn)過程中,需要確保發(fā)送操作和其它業(yè)務(wù)邏輯處理結(jié)果的一致性,要么都成功要么都失敗。 比如要同時(shí)執(zhí)行寫入 MySQL 數(shù)據(jù)庫和發(fā)送消息兩種操作,要保證寫庫成功同時(shí)發(fā)送消息也成功,如果寫庫失敗,消息也要取消發(fā)送。事務(wù)消息的實(shí)現(xiàn)一般是依賴兩步提交策略。 已寫庫并發(fā)消息為例,首先客戶端將消息發(fā)送到 Broker,Broker 收到消息后,給客戶端返回一個(gè)確認(rèn)信息。 這時(shí)消息在服務(wù)端是處于一種中間狀態(tài),消費(fèi)者不可以消費(fèi)這種狀態(tài)的消息。 客戶端收到確認(rèn)消息后,執(zhí)行寫數(shù)據(jù)庫的操作,寫庫成功后,向 Broker 再發(fā)送一個(gè)提交信息。 服務(wù)端收到提交信息后將消息更改就緒狀態(tài),允許消費(fèi)者正常消費(fèi)。 同時(shí),生產(chǎn)者客戶端還要提供一個(gè)回調(diào)方法,當(dāng) Broker 收到消息后,長時(shí)間沒有收到確認(rèn)信息時(shí),調(diào)用客戶端提供的回調(diào)方法進(jìn)行回滾,如重置數(shù)據(jù)庫。 有時(shí),消費(fèi)者可能會(huì)因?yàn)橄到y(tǒng)問題或其它原因,需要重新消費(fèi)已經(jīng)消費(fèi)完的消息。大部分消息隊(duì)列都可以實(shí)現(xiàn)這個(gè)功能,一般將次功能稱為消息回放。要實(shí)現(xiàn)消息回放的功能,需要保證消息不會(huì)在消費(fèi)成功之后立刻刪除,而是保存一段時(shí)間后,根據(jù)一定策略,如一周后刪除。同時(shí),還需要對消費(fèi)者當(dāng)前消費(fèi)的消費(fèi)位置進(jìn)行記錄,RocketMQ 和 Kafka 都會(huì)通過一個(gè) Offset 文件來記錄消費(fèi)者的消費(fèi)位置,當(dāng)消費(fèi)者消費(fèi)完成功,更新并提交 Offset。 一般來說,Offset 文件中還需要記錄最大消費(fèi)位置,即已經(jīng)入隊(duì)的最新一條消息所在的位置和最小消費(fèi)位置,即還沒刪除的最老的消息所在的位置。Offset 文件可以保存在服務(wù)端,也可以保存在客戶端,也可以保存在 ZooKeeper 中,或者其它如 Redis 之類的第三方存儲(chǔ)。 早期版本(0.9.0 之前)的 Kafka 是將消息保存在 ZooKeeper 中,之后為了減輕 ZooKeeper 的負(fù)擔(dān),將 Offset 保存到 Broker 對應(yīng)的 topic 中。 RocketMQ 則支持有兩種模式,默認(rèn)是集群模式,topic 中的每條消息只會(huì)集群中的一個(gè)實(shí)例消費(fèi),這種模式由服務(wù)端管理 Offset,還有一種是廣播模式,集群中的所有實(shí)例都會(huì)消費(fèi)一份全量消息,這種模式由客戶端管理 Offset。 Kafka 和 RocketMQ 都是優(yōu)秀的分布式消息系統(tǒng),當(dāng)需要服務(wù)于有較高高吞吐量要求的服務(wù)時(shí),都可以通過擴(kuò)容來解決需求。雖然如此,我們也不應(yīng)放棄對單機(jī)吞吐量的追求,畢竟單機(jī)處理能力越高,意味著可以節(jié)省更多資源。而決定單機(jī)性能的因素,我能想到的主要有下面幾個(gè)方面:
|
|
|