小男孩‘自慰网亚洲一区二区,亚洲一级在线播放毛片,亚洲中文字幕av每天更新,黄aⅴ永久免费无码,91成人午夜在线精品,色网站免费在线观看,亚洲欧洲wwwww在线观看

分享

消息隊(duì)列及rabbitmq簡介

 huowufenghuang 2018-09-06

1.什么是消息隊(duì)列

消息隊(duì)列允許應(yīng)用間通過消息的發(fā)送與接收的方式進(jìn)行通信,當(dāng)消息接收方服務(wù)忙或不可用時,其提供了一個消息暫存的功能。

消息隊(duì)列中的幾個基本概念:

Producer: 消息發(fā)送方,也叫做消息生產(chǎn)者

Consumer: 消息接收方,也叫做消息消費(fèi)者

Broker: 消息投遞的代理

2.應(yīng)用場景

低耦合、可靠投遞、廣播、流量控制、最終一致性。

1)解耦(為面向服務(wù)的架構(gòu)(SOA)提供基本的最終一致性實(shí)現(xiàn))

基于消息的模型,關(guān)心的是“通知”,而非“處理”。

短信、郵件通知、緩存刷新等操作使用消息隊(duì)列進(jìn)行通知。

系統(tǒng)性能評價標(biāo)準(zhǔn):

非消息系統(tǒng)隊(duì)列系統(tǒng):系統(tǒng)中最慢的組件運(yùn)行時間(短板效應(yīng))

消息隊(duì)列系統(tǒng):異步通知減少短板效應(yīng)的影響

消息隊(duì)列和RPC的區(qū)別與比較:

RPC: 異步調(diào)用,及時獲得調(diào)用結(jié)果,具有強(qiáng)一致性結(jié)果,關(guān)心業(yè)務(wù)調(diào)用處理結(jié)果。

消息隊(duì)列:兩次異步RPC調(diào)用,將調(diào)用內(nèi)容在隊(duì)列中進(jìn)行轉(zhuǎn)儲,并選擇合適的時機(jī)進(jìn)行投遞(錯峰流控)

2)廣播(發(fā)布/訂閱模型,基于消息格式進(jìn)行編程)

如果沒有消息隊(duì)列,每當(dāng)一個新的業(yè)務(wù)方接入,我們都要聯(lián)調(diào)一次新接口。有了消息隊(duì)列,我們只需要關(guān)心消息是否送達(dá)了隊(duì)列,至于誰希望訂閱,是下游的事情,無疑極大地減少了開發(fā)和聯(lián)調(diào)的工作量。

3) 錯峰及流控

不同服務(wù)間的處理能力不同。比如:

WEB前端上千萬次/s的負(fù)載,數(shù)據(jù)庫tps跟不上。

沒有消息隊(duì)列時,通過滑動窗口、擁塞控制等一系列措施,可以做到服務(wù)調(diào)用間的流量控制,但是不具有通用性,且維護(hù)復(fù)雜。

通過控制消息隊(duì)列的分發(fā)速度可以實(shí)現(xiàn)通用的流量控制。利用中間系統(tǒng)轉(zhuǎn)儲兩個系統(tǒng)的通信內(nèi)容,并在下游系統(tǒng)有能力處理這些消息的時候,再處理這些消息,是一套相對較通用的方式。

總結(jié):

1. 消息隊(duì)列適用于處理非強(qiáng)一致性事務(wù)要求,非延遲敏感的業(yè)務(wù)場景。RPC可滿足強(qiáng)一致性和延遲敏感要求;

2.可以用支持最終一致性的消息隊(duì)列來實(shí)現(xiàn)輕量級、非延遲敏感的分布式事務(wù);

3.上下游業(yè)務(wù)系統(tǒng)處理能力存在較大差距,且不包含在業(yè)務(wù)主流程的功能,可以交給消息隊(duì)列;

4.有多個下游業(yè)務(wù)關(guān)心系統(tǒng)發(fā)出的通知消息時,基于標(biāo)準(zhǔn)的消息內(nèi)容格式,使用消息隊(duì)列可減少重復(fù)開發(fā)和聯(lián)調(diào)。

3. 消息隊(duì)列的基本特性

數(shù)據(jù)流轉(zhuǎn)過程:producer發(fā)送給broker, broker發(fā)送給consumer, consumer回復(fù)消費(fèi)確認(rèn),broker刪除消息

1)消息丟失(可靠性)

會導(dǎo)致消息重復(fù)及延遲。

當(dāng)有可能發(fā)生消息丟失風(fēng)險的操作前,先將消息數(shù)據(jù)落地,然后發(fā)送。失敗或超時時,不斷輪詢所有待發(fā)送消息重新發(fā)送,保證最終一定送達(dá)。消息消費(fèi)者收到消息后給服務(wù)端一個確認(rèn),當(dāng)所有訂閱者都確認(rèn)收到消息后,刪除消息。

消息重復(fù)和丟失必須得面對一個,只能在兩者間做平衡。

2)順序投遞

1.允許消息丟失。

2.從發(fā)送方到服務(wù)方到接受者都是單點(diǎn)單線程。

3)重復(fù)投遞

如何鑒別重復(fù)消息?

重復(fù)消息如何冪等處理?

i) 版本號

在一個回話周期內(nèi)維護(hù)一個單調(diào)遞增的消息版本號,檢測到小于最近接收到的版本號時判別為     重復(fù)投遞的消息。

發(fā)送方必須攜帶消息版本號;

對于嚴(yán)格要求消息順序的業(yè)務(wù),接收方必須存儲消息版本號。

ii)狀態(tài)機(jī)

主流消息隊(duì)列的設(shè)計范式里,在不丟消息的前提下,盡量減少重復(fù)消息,不保證消息的投遞順序。

4)push or pull

push 模式, 即 broker 主動推送消息給消費(fèi)者

pull 模式, 即消費(fèi)者主動從 broker 中拉取消息

4. rabbitmq(以下內(nèi)容均基于AMQP協(xié)議)

1)基本特性

* 可靠性

消息持久化、消息發(fā)送和投遞確認(rèn)機(jī)制、集群高可用方案

* 靈活路由

消息通過exchange的方式路由到不同的queue中,提供了包括fanout, direct, topic等多種exchange實(shí)現(xiàn),并且支持通過編寫exchange插件的方式自實(shí)現(xiàn)路由方案

* 支持集群

同網(wǎng)段下的rabbitmq節(jié)點(diǎn)可以通過集群的方式,組成一個邏輯上的單一broker

* Federation

通過Federation可以在跨網(wǎng)段節(jié)點(diǎn)間組件集群

* 高可用消息隊(duì)列

通過設(shè)置鏡像隊(duì)列的方式,消息可以在鏡像隊(duì)列間進(jìn)行復(fù)制,使節(jié)點(diǎn)down機(jī)或硬件損壞的情況下保證隊(duì)列服務(wù)的高可用

* 多協(xié)議支持

包括AMQP, STOMP, MQTT, HTTP等多種消息交換協(xié)議

* 多客戶端支持

JAVA, .NET, Ruby, Python, PHP, Node, Go......

* 可視化管理界面

* 豐富的插件支持

tracing, managment-plugin, and you can also write your own.

2)基礎(chǔ)模型

基礎(chǔ)組件:Publisher, Exchange, Queue, Consumer

基礎(chǔ)動作:Publish, Binding, Route, Consume

i) Queue的基本屬性:

Name

Durable (the queue will survive a broker restart)

Exclusive (used by only one connection and the queue will be deleted when that connection closes)

Auto-delete (queue is deleted when last consumer unsubscribes)

Arguments (some brokers use it to implement additional features like message TTL)

ii) Exchange路由策略

direct(default):

路由到完全匹配的routing_key對應(yīng)綁定的queue中

Fanout:

將消息路由轉(zhuǎn)發(fā)到所有綁定到該exchange上的queue

Topic:

通過比對routing_key是否匹配來路由消息。

*(star) can substitute for exactly one word.

#(hash) can substitute for zero or more words.

Header:

忽略routingKey,通過匹配header中的鍵值對方式來路由,有all(全部匹配)和any(部分匹配)兩種模式。

iii) Binding

綁定動作是將發(fā)送到exchange上的消息路由到特定queue的規(guī)則的綁定。

3)消息投遞的可靠性保證

Consumet Acknowledgements & Publisher Confirms

i) (Consumer) Delivery Acknowledgements

當(dāng)broker投遞一條消息給consumer后,broker需要獲取客戶端是否正確處理了該條消息的狀態(tài),當(dāng)所有消息訂閱者都成功確認(rèn)了該條消息時,證明該條消息投遞成功,broker會從隊(duì)列里面刪除該條消息。

Delivery Identifiers: Delivery Tags

當(dāng)一個consumer向服務(wù)器注冊了一個連接(建立起了一個channel),broker向consumer投遞消息時會攜帶一個作用域?yàn)閏hannel,單調(diào)遞增,long類型的delivery tag,consumer通過收到的delivery tag作為標(biāo)識,向broker發(fā)起消息確認(rèn)流程。

(注:delivery tag是一個64位的long類型數(shù),其最大值為:9223372036854775807,channel作用域內(nèi)超過其最大值的可能性很?。?/p>

Acknowledgements Models

basic.ack消息被成功處理

basic.nack客戶端無法處理該消息,支持一次拒絕多條消息(multiple表示reject

basic.reject客戶端無法處理該消息

Channel Prefetch Setting (QoS)

Consumer可以同時接收多少條消息。

ex: tag=5,6,7,8都處于未確認(rèn)狀態(tài),QoS=4,那么broker將不會再向該Consumer投遞任何消息,直到有一條消息被確認(rèn)后。

ii) Publisher Confirms

保證消息被準(zhǔn)確送達(dá)到broker的兩種方式:

transaction & Publisher Acknowledgements

transaction

當(dāng)channel處于事務(wù)模式下時,publisher投遞消息至broker,broker的消息落地在同一個事務(wù)中,保證了消息從publisher到broker的確定性。

事務(wù)模式相較于普通異步模式,性能上有250倍的下降。

Publisher Acknowledgements

通過delivery tag來作為消息的唯一標(biāo)識,來對publisher的消息做異步確認(rèn),對于basic.reject & nack的情況需要業(yè)務(wù)方自己對消息做暫存重試。

官方示例:http://hg./rabbitmq-java-client/file/default/test/src/com/rabbitmq/examples/ConfirmDontLoseMessages.java

消息在什么時候被確認(rèn)

消息沒有找到對應(yīng)的路由策略,broker會返回basic.nack表示broker不能正確處理該條消息;

對于持久化隊(duì)列的消息,當(dāng)消息被成功寫入磁盤后,會返回basic.ack的確認(rèn)消息,表示消息broker正常收到消息且數(shù)據(jù)已落地;

對于鏡像隊(duì)列,當(dāng)所有鏡像結(jié)點(diǎn)的對應(yīng)queue已經(jīng)成功接收到數(shù)據(jù)時,會返回broker.ack表示數(shù)據(jù)已成功同步;

rabbitmq的異步publisher confirm策略會將一段時間間隔內(nèi)(a few hundred milliseconds)的所有message寫入到磁盤,所有異步confirm大概會有幾百毫秒的延遲,但是這樣卻帶來了吞吐量的極大提高。

4)消息投遞的順序

當(dāng)消息從單publisher產(chǎn)生,通過單個exchange路由到單個queue,并被單個consumer消費(fèi)時保證消息是嚴(yán)格按照順序消費(fèi)的。

當(dāng)消息被多個consumer訂閱時,c1消費(fèi)成功,c2消費(fèi)失敗且進(jìn)行了requeue操作,此時就打亂了原來的順序。

5)TTL(time to live)

消息存活時間,當(dāng)一個消息從入隊(duì)開始計時,過了設(shè)定的TTL時間后,就認(rèn)為該消息為dead message.

6)DLX(Dead Letter Exchange)

死信隊(duì)列。當(dāng)發(fā)生如下情況時,消息會進(jìn)入DLX:

The message is rejected (basic.rejectorbasic.nack) withrequeue=false,

The TTL for the message expires; or

The queue length limit is exceeded.

總結(jié)

rabbitmq保證如下特性:

消息不丟失

消息至少被成功消費(fèi)一次

不保證消息不重復(fù),需業(yè)務(wù)方自行進(jìn)行去重邏輯(版本號&狀態(tài)機(jī)&msgid)

不保證完全按照順序消費(fèi)

5)高可用方案

cluster + ha policy

cluster機(jī)制

多個全聯(lián)通節(jié)點(diǎn)之間元信息(exchange、queue、binding等)保持強(qiáng)一致,但是隊(duì)列消息只會存儲在其中一個節(jié)點(diǎn)。

優(yōu)點(diǎn):提高吞吐量,部分解決擴(kuò)展性問題。

缺點(diǎn):不能提升數(shù)據(jù)可靠性和系統(tǒng)可用性。

ha policy機(jī)制

在cluster機(jī)制基礎(chǔ)上可以指定集群內(nèi)任意數(shù)量隊(duì)列組成鏡像隊(duì)列,隊(duì)列消息會在多節(jié)點(diǎn)間復(fù)制。實(shí)現(xiàn)數(shù)據(jù)高可靠和系統(tǒng)高可用。

設(shè)置參數(shù):ha-mode和ha-params可以細(xì)粒度(哪些節(jié)點(diǎn),哪些隊(duì)列)設(shè)置鏡像隊(duì)列。

設(shè)置參數(shù):ha-sync-mode=manual(默認(rèn))/automatic可以指定集群中新節(jié)點(diǎn)的數(shù)據(jù)同步策略。

home node:rabbitmq中每一個queue都有一個home node,稱之為queue master,所有對queue的操作都是首先通過queue master向其他節(jié)點(diǎn)進(jìn)行復(fù)制的,該機(jī)制保證了隊(duì)列的FIFO特性。

我們可以通過設(shè)置queue的x-queue-master-locator設(shè)置queue master策略

Pick the node hosting the minimum number of masters:min-masters

Pick the node the client that declares the queue is connected to:client-local

Pick a random node:random

消息節(jié)點(diǎn)故障的幾種情況

slave節(jié)點(diǎn)故障

slave節(jié)點(diǎn)故障時,集群會自動關(guān)將其剔除,不再將master的信息同步至該節(jié)點(diǎn)。

new node joining the cluster.

節(jié)點(diǎn)可在任意時間加入集群,初始化加入集群的slave隊(duì)列信息為空,但是可以接收從master新同步過來的消息。master隊(duì)列頭部消息不斷被消費(fèi),尾部不斷新增消息,當(dāng)某一時刻,在slave加入之前的歷史消息都被消費(fèi)完畢時,master隊(duì)列的size和slave的size相等,認(rèn)為slave節(jié)點(diǎn)從master的數(shù)據(jù)同步完成。

新加入的slave不能為加入之前的數(shù)據(jù)增加額外的冗余和可用性。因?yàn)閳?zhí)行明確的同步操作會使queue無響應(yīng),因此只允許非活躍的queues進(jìn)行明確的同步操作而活躍的queues進(jìn)行自然的同步操作是一種好的策略。

也可以在新的slave加入集群后,手動或自動觸發(fā)同步歷史數(shù)據(jù),但是在同步過程中,隊(duì)列queue將處于不可用狀態(tài)(類似于MySQL在更改表結(jié)構(gòu)的時候的鎖表)。

rabbitmqctl sync_queuename 手動觸發(fā)歷史消息同步

rabbitmqctl cancel_sync_queuename 取消消息同步操作

rabbitmqctl list_queues name slave_pids synchronised_slave_pids 查看正在同步的隊(duì)列

通過設(shè)置隊(duì)列的ha-sync-mode屬性automatic-新slave加入自動同步,manual-通過增量的方式同步

Stopping nodes and synchronisation

master節(jié)點(diǎn)故障,最先加入集群的slave將被提升為master,假設(shè)此時集群中存在一個已經(jīng)完全同步的節(jié)點(diǎn),即最先加入集群的slave。

集群中的節(jié)點(diǎn)繼續(xù)故障,到最終只剩下唯一的一個節(jié)點(diǎn),該節(jié)點(diǎn)為master,對于該節(jié)點(diǎn)上持久化隊(duì)列,會在其故障或者重啟前不斷持久化消息,當(dāng)故障節(jié)點(diǎn)恢復(fù)后,重新加入節(jié)點(diǎn),因?yàn)槠錈o法得知其故障之前持久化的鏡像隊(duì)列的歷史消息是否被正確投遞,rabbitmq采取的策略是清空其持久化信息,作為一個空的新slave加入集群。

Stopping master nodes with only unsynchronised slaves

當(dāng)master節(jié)點(diǎn)故障時,cluster中只有未同步完成的節(jié)點(diǎn)時,有以下兩種情況:

當(dāng)在可控條件下故障:rabbitmqctl stop, 優(yōu)雅地關(guān)閉OS時,故障節(jié)點(diǎn)將不會轉(zhuǎn)移到鏡像隊(duì)列上,此時鏡像隊(duì)列處于不可用狀態(tài)。(defalut setting)

當(dāng)不可控條件下故障,機(jī)器斷電損壞等,master將會轉(zhuǎn)移到未同步完成的節(jié)點(diǎn)上。

Loss of a master while all slaves are stopped

通常情況下,我們希望最后的節(jié)點(diǎn)在重啟后繼續(xù)接管master權(quán)限,因?yàn)樵撉闆r下,最后的master節(jié)點(diǎn)保存了最全的信息,而這些信息其他之前故障的slave節(jié)點(diǎn)并不能接收到。

然而,我們在Stopping nodes and synchronisation一節(jié)中知道,新啟動加入集群的slave會首先清空其持久化的消息信息,作為一個新節(jié)點(diǎn)加入集群,所以在這里我們需要先執(zhí)行rabbitmqctl forget_cluster_node,此時rabbitmq會嘗試尋找當(dāng)前node上的所有queue的master節(jié)點(diǎn)機(jī)器,并在master機(jī)器再次重啟加入集群后重新提升其為master節(jié)點(diǎn)。

master節(jié)點(diǎn)故障時:

一個slave被提升為新的master。被提升的slave是“最舊”的slave。因?yàn)樵搒lave與原master中內(nèi)容完全同步的幾率最大。然而,也有可能所有的salve都未與master完全同步,此時只有master中存在而slaves中不存在的message將丟失。

slave認(rèn)為所有之前的consumers的連接突然斷開了。因此,它重新將已經(jīng)投遞但還未被確認(rèn)的messages重新排隊(duì)。這些“未被確認(rèn)的”message可能包含client已發(fā)出確認(rèn)但確認(rèn)在到達(dá)master前丟失了的情形,也包含client已發(fā)出確認(rèn)且確認(rèn)已到達(dá)master但在master廣播給slaves時丟失的情形。在上述任意一種情況下,新的master都必須為他認(rèn)為未收到確認(rèn)的message重新排隊(duì)。

之前請求原master的clients被取消。

由于重新排隊(duì),從queue重新consume的clients需要知道此時可能接收到之前已經(jīng)被處理過的message。

x-cancel-on-ha-failover=true時,consuming過程將被取消,consumer cancellation notification會被發(fā)出,由業(yè)務(wù)方確認(rèn)是否重新消費(fèi)。

當(dāng)master節(jié)點(diǎn)故障時,如果publisher使用的時noAck模式,消息可能會丟失,因?yàn)閚oAck模式下,消息一旦進(jìn)入broker的處理流程,就代表消息已經(jīng)處理完成,不需要任何確認(rèn)過程。此時master發(fā)生故障,將不會requeue該消息,消息有丟失的風(fēng)險。

4.消息隊(duì)列的應(yīng)用場景舉例

可靠的延遲隊(duì)列

RPC:

Our RPC will work like this:

When the Client starts up, it creates an anonymous exclusive callback queue.

For an RPC request, the Client sends a message with two properties:replyTo, which is set to the callback queue andcorrelationId, which is set to a unique value for every request.

The request is sent to anrpc_queuequeue.

The RPC worker (aka: server) is waiting for requests on that queue. When a request appears, it does the job and sends a message with the result back to the Client, using the queue from thereplyTofield.

The client waits for data on the callback queue. When a message appears, it checks thecorrelationIdproperty. If it matches the value from the request it returns the response to the application.

    本站是提供個人知識管理的網(wǎng)絡(luò)存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點(diǎn)擊一鍵舉報。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多