一、引言日常生活中,很多的APP都有延遲隊(duì)列的影子。比如在手機(jī)淘寶上,經(jīng)常遇到APP派發(fā)的限時(shí)消費(fèi)紅包,一般有幾個(gè)小時(shí)或24小時(shí)不等。假如在紅包倒計(jì)時(shí)的過(guò)程中,沒(méi)有消費(fèi)掉紅包的話,紅包會(huì)自動(dòng)失效。假如上述行為使用RabbitMQ延時(shí)隊(duì)列來(lái)理解的話,就是在你收到限時(shí)消費(fèi)紅包的時(shí)候,手機(jī)淘寶會(huì)自動(dòng)發(fā)一條延時(shí)消息到隊(duì)列中以供消費(fèi)。在規(guī)定時(shí)間內(nèi),則可正常消費(fèi),否則依TTL自動(dòng)失效。 在RabbitMQ中,有兩種方式來(lái)實(shí)現(xiàn)延時(shí)隊(duì)列:一種是基于隊(duì)列方式,另外一種是基于消息方式。
二、示例2.1、發(fā)送端(生產(chǎn)端)新建一個(gè)控制臺(tái)項(xiàng)目Send,并添加一個(gè)類RabbitMQConfig。
class RabbitMQConfig { public static string Host { get; set; } public static string VirtualHost { get; set; } public static string UserName { get; set; } public static string Password { get; set; } public static int Port { get; set; } static RabbitMQConfig() { Host = "192.168.2.242"; VirtualHost = "/"; UserName = "hello"; Password = "world"; Port = 5672; } }
class Program { static void Main(string[] args) { Console.WriteLine("C# RabbitMQ實(shí)現(xiàn)延遲隊(duì)列有以下兩種方式:"); Console.WriteLine("1、基于隊(duì)列方式實(shí)現(xiàn)延遲隊(duì)列,請(qǐng)按1開(kāi)始生產(chǎn)。"); Console.WriteLine("2、基于消息方式實(shí)現(xiàn)延遲隊(duì)列,請(qǐng)按2開(kāi)始生產(chǎn)。"); string chooseChar = Console.ReadLine(); if (chooseChar == "1") { DelayMessagePublishByQueueExpires(); } else if (chooseChar == "2") { DelayMessagePublishByMessageTTL(); } Console.ReadLine(); } /// <summary> /// 基于隊(duì)列方式實(shí)現(xiàn)延遲隊(duì)列 /// 將隊(duì)列中所有消息的TTL(Time To Live,即過(guò)期時(shí)間)設(shè)置為一樣 /// </summary> private static void DelayMessagePublishByQueueExpires() { const string MessagePrefix = "message_"; const int PublishMessageCount = 6; const int QuequeExpirySeconds = 1000 * 30; const int MessageExpirySeconds = 1000 * 10; var factory = new ConnectionFactory() { HostName = RabbitMQConfig.Host, Port = RabbitMQConfig.Port, VirtualHost = RabbitMQConfig.VirtualHost, UserName = RabbitMQConfig.UserName, Password = RabbitMQConfig.Password, Protocol = Protocols.DefaultProtocol }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { //當(dāng)同時(shí)指定了queue和message的TTL值,則兩者中較小的那個(gè)才會(huì)起作用。 Dictionary<string, object> dict = new Dictionary<string, object> { { "x-expires", QuequeExpirySeconds },//隊(duì)列過(guò)期時(shí)間 { "x-message-ttl", MessageExpirySeconds },//消息過(guò)期時(shí)間 { "x-dead-letter-exchange", "dead exchange 1" },//過(guò)期消息轉(zhuǎn)向路由 { "x-dead-letter-routing-key", "dead routing key 1" }//過(guò)期消息轉(zhuǎn)向路由的routing key }; //聲明隊(duì)列 channel.QueueDeclare(queue: "delay1", durable: true, exclusive: false, autoDelete: false, arguments: dict); //向該消息隊(duì)列發(fā)送消息message for (int i = 0; i < PublishMessageCount; i++) { var message = MessagePrefix + i.ToString(); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", routingKey: "delay1", basicProperties: null, body: body); Thread.Sleep(1000 * 2); Console.WriteLine($"{DateTime.Now.ToString()} Send {message} MessageExpirySeconds {MessageExpirySeconds / 1000}"); } } } } /// <summary> /// 基于消息方式實(shí)現(xiàn)延遲隊(duì)列 /// 對(duì)隊(duì)列中消息進(jìn)行單獨(dú)設(shè)置,每條消息的TTL可以不同。 /// </summary> private static void DelayMessagePublishByMessageTTL() { const string MessagePrefix = "message_"; const int PublishMessageCount = 6; int MessageExpirySeconds = 0; var factory = new ConnectionFactory() { HostName = RabbitMQConfig.Host, Port = RabbitMQConfig.Port, VirtualHost = RabbitMQConfig.VirtualHost, UserName = RabbitMQConfig.UserName, Password = RabbitMQConfig.Password, Protocol = Protocols.DefaultProtocol }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { Dictionary<string, object> dict = new Dictionary<string, object> { { "x-dead-letter-exchange", "dead exchange 2" },//過(guò)期消息轉(zhuǎn)向路由 { "x-dead-letter-routing-key", "dead routing key 2" }//過(guò)期消息轉(zhuǎn)向路由的routing key }; //聲明隊(duì)列 channel.QueueDeclare(queue: "delay2", durable: true, exclusive: false, autoDelete: false, arguments: dict); //向該消息隊(duì)列發(fā)送消息message Random random = new Random(); for (int i = 0; i < PublishMessageCount; i++) { MessageExpirySeconds = i * 1000; var properties = channel.CreateBasicProperties(); properties.Expiration = MessageExpirySeconds.ToString(); var message = MessagePrefix + i.ToString(); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", routingKey: "delay2", basicProperties: properties, body: body); Console.WriteLine($"{DateTime.Now.ToString()} Send {message} MessageExpirySeconds {MessageExpirySeconds / 1000}"); } } } } } 2.2、接收端(消費(fèi)端)新建一個(gè)控制臺(tái)項(xiàng)目Receive,按住Alt鍵,將發(fā)送端RabbitMQConfig類拖一個(gè)快捷方式到Receive項(xiàng)目中。
class Program { static void Main(string[] args) { Console.WriteLine("C# RabbitMQ實(shí)現(xiàn)延遲隊(duì)列有以下兩種方式:"); Console.WriteLine("1、基于隊(duì)列方式實(shí)現(xiàn)延遲隊(duì)列,請(qǐng)按1開(kāi)始消費(fèi)。"); Console.WriteLine("2、基于消息方式實(shí)現(xiàn)延遲隊(duì)列,請(qǐng)按2開(kāi)始消費(fèi)。"); string chooseChar = Console.ReadLine(); if (chooseChar == "1") { DelayMessageConsumeByQueueExpires(); } else if (chooseChar == "2") { DelayMessageConsumeByMessageTTL(); } Console.ReadLine(); } public static void DelayMessageConsumeByQueueExpires() { var factory = new ConnectionFactory() { HostName = RabbitMQConfig.Host, Port = RabbitMQConfig.Port, VirtualHost = RabbitMQConfig.VirtualHost, UserName = RabbitMQConfig.UserName, Password = RabbitMQConfig.Password, Protocol = Protocols.DefaultProtocol }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "dead exchange 1", type: "direct"); string name = channel.QueueDeclare().QueueName; channel.QueueBind(queue: name, exchange: "dead exchange 1", routingKey: "dead routing key 1"); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); Console.WriteLine($"{DateTime.Now.ToString()} Received {message}"); }; channel.BasicConsume(queue: name, noAck: true, consumer: consumer); Console.ReadKey(); } } } public static void DelayMessageConsumeByMessageTTL() { var factory = new ConnectionFactory() { HostName = RabbitMQConfig.Host, Port = RabbitMQConfig.Port, VirtualHost = RabbitMQConfig.VirtualHost, UserName = RabbitMQConfig.UserName, Password = RabbitMQConfig.Password, Protocol = Protocols.DefaultProtocol }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "dead exchange 2", type: "direct"); string name = channel.QueueDeclare().QueueName; channel.QueueBind(queue: name, exchange: "dead exchange 2", routingKey: "dead routing key 2"); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); Console.WriteLine($"{DateTime.Now.ToString()} Received {message}"); }; channel.BasicConsume(queue: name, noAck: true, consumer: consumer); Console.ReadKey(); } } } } 2.3、運(yùn)行結(jié)果
-----------------------------------------------------------------------------------------------------------
|
|
|