小男孩‘自慰网亚洲一区二区,亚洲一级在线播放毛片,亚洲中文字幕av每天更新,黄aⅴ永久免费无码,91成人午夜在线精品,色网站免费在线观看,亚洲欧洲wwwww在线观看

分享

C#隊(duì)列學(xué)習(xí)筆記:RabbitMQ延遲隊(duì)列

 悅光陰 2021-04-05

    一、引言

    日常生活中,很多的APP都有延遲隊(duì)列的影子。比如在手機(jī)淘寶上,經(jīng)常遇到APP派發(fā)的限時(shí)消費(fèi)紅包,一般有幾個(gè)小時(shí)或24小時(shí)不等。假如在紅包倒計(jì)時(shí)的過(guò)程中,沒(méi)有消費(fèi)掉紅包的話,紅包會(huì)自動(dòng)失效。假如上述行為使用RabbitMQ延時(shí)隊(duì)列來(lái)理解的話,就是在你收到限時(shí)消費(fèi)紅包的時(shí)候,手機(jī)淘寶會(huì)自動(dòng)發(fā)一條延時(shí)消息到隊(duì)列中以供消費(fèi)。在規(guī)定時(shí)間內(nèi),則可正常消費(fèi),否則依TTL自動(dòng)失效。

    在RabbitMQ中,有兩種方式來(lái)實(shí)現(xiàn)延時(shí)隊(duì)列:一種是基于隊(duì)列方式,另外一種是基于消息方式。

    二、示例

    2.1、發(fā)送端(生產(chǎn)端)

    新建一個(gè)控制臺(tái)項(xiàng)目Send,并添加一個(gè)類RabbitMQConfig。

    class RabbitMQConfig
    {
        public static string Host { get; set; }

        public static string VirtualHost { get; set; }

        public static string UserName { get; set; }

        public static string Password { get; set; }

        public static int Port { get; set; }

        static RabbitMQConfig()
        {
            Host = "192.168.2.242";
            VirtualHost = "/";
            UserName = "hello";
            Password = "world";
            Port = 5672;
        }
    }
RabbitMQConfig.cs
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("C# RabbitMQ實(shí)現(xiàn)延遲隊(duì)列有以下兩種方式:");
            Console.WriteLine("1、基于隊(duì)列方式實(shí)現(xiàn)延遲隊(duì)列,請(qǐng)按1開(kāi)始生產(chǎn)。");
            Console.WriteLine("2、基于消息方式實(shí)現(xiàn)延遲隊(duì)列,請(qǐng)按2開(kāi)始生產(chǎn)。");

            string chooseChar = Console.ReadLine();
            if (chooseChar == "1")
            {
                DelayMessagePublishByQueueExpires();
            }
            else if (chooseChar == "2")
            {
                DelayMessagePublishByMessageTTL();
            }
            Console.ReadLine();
        }

        /// <summary>
        /// 基于隊(duì)列方式實(shí)現(xiàn)延遲隊(duì)列
        /// 將隊(duì)列中所有消息的TTL(Time To Live,即過(guò)期時(shí)間)設(shè)置為一樣
        /// </summary>
        private static void DelayMessagePublishByQueueExpires()
        {
            const string MessagePrefix = "message_";
            const int PublishMessageCount = 6;
            const int QuequeExpirySeconds = 1000 * 30;
            const int MessageExpirySeconds = 1000 * 10;

            var factory = new ConnectionFactory()
            {
                HostName = RabbitMQConfig.Host,
                Port = RabbitMQConfig.Port,
                VirtualHost = RabbitMQConfig.VirtualHost,
                UserName = RabbitMQConfig.UserName,
                Password = RabbitMQConfig.Password,
                Protocol = Protocols.DefaultProtocol
            };

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    //當(dāng)同時(shí)指定了queue和message的TTL值,則兩者中較小的那個(gè)才會(huì)起作用。
                    Dictionary<string, object> dict = new Dictionary<string, object>
                    {
                        { "x-expires", QuequeExpirySeconds },//隊(duì)列過(guò)期時(shí)間
                        { "x-message-ttl", MessageExpirySeconds },//消息過(guò)期時(shí)間
                        { "x-dead-letter-exchange", "dead exchange 1" },//過(guò)期消息轉(zhuǎn)向路由
                        { "x-dead-letter-routing-key", "dead routing key 1" }//過(guò)期消息轉(zhuǎn)向路由的routing key
                    };

                    //聲明隊(duì)列
                    channel.QueueDeclare(queue: "delay1", durable: true, exclusive: false, autoDelete: false, arguments: dict);


                    //向該消息隊(duì)列發(fā)送消息message
                    for (int i = 0; i < PublishMessageCount; i++)
                    {
                        var message = MessagePrefix + i.ToString();
                        var body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(exchange: "", routingKey: "delay1", basicProperties: null, body: body);
                        Thread.Sleep(1000 * 2);
                        Console.WriteLine($"{DateTime.Now.ToString()} Send {message} MessageExpirySeconds {MessageExpirySeconds / 1000}");
                    }
                }
            }
        }

        /// <summary>
        /// 基于消息方式實(shí)現(xiàn)延遲隊(duì)列
        /// 對(duì)隊(duì)列中消息進(jìn)行單獨(dú)設(shè)置,每條消息的TTL可以不同。
        /// </summary>
        private static void DelayMessagePublishByMessageTTL()
        {
            const string MessagePrefix = "message_";
            const int PublishMessageCount = 6;
            int MessageExpirySeconds = 0;

            var factory = new ConnectionFactory()
            {
                HostName = RabbitMQConfig.Host,
                Port = RabbitMQConfig.Port,
                VirtualHost = RabbitMQConfig.VirtualHost,
                UserName = RabbitMQConfig.UserName,
                Password = RabbitMQConfig.Password,
                Protocol = Protocols.DefaultProtocol
            };

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    Dictionary<string, object> dict = new Dictionary<string, object>
                    {
                        { "x-dead-letter-exchange", "dead exchange 2" },//過(guò)期消息轉(zhuǎn)向路由
                        { "x-dead-letter-routing-key", "dead routing key 2" }//過(guò)期消息轉(zhuǎn)向路由的routing key
                    };

                    //聲明隊(duì)列
                    channel.QueueDeclare(queue: "delay2", durable: true, exclusive: false, autoDelete: false, arguments: dict);

                    //向該消息隊(duì)列發(fā)送消息message
                    Random random = new Random();
                    for (int i = 0; i < PublishMessageCount; i++)
                    {
                        MessageExpirySeconds = i * 1000;
                        var properties = channel.CreateBasicProperties();
                        properties.Expiration = MessageExpirySeconds.ToString();
                        var message = MessagePrefix + i.ToString();
                        var body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(exchange: "", routingKey: "delay2", basicProperties: properties, body: body);
                        Console.WriteLine($"{DateTime.Now.ToString()} Send {message} MessageExpirySeconds {MessageExpirySeconds / 1000}");
                    }
                }
            }
        }
    }
Program.cs

    2.2、接收端(消費(fèi)端)

    新建一個(gè)控制臺(tái)項(xiàng)目Receive,按住Alt鍵,將發(fā)送端RabbitMQConfig類拖一個(gè)快捷方式到Receive項(xiàng)目中。

    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("C# RabbitMQ實(shí)現(xiàn)延遲隊(duì)列有以下兩種方式:");
            Console.WriteLine("1、基于隊(duì)列方式實(shí)現(xiàn)延遲隊(duì)列,請(qǐng)按1開(kāi)始消費(fèi)。");
            Console.WriteLine("2、基于消息方式實(shí)現(xiàn)延遲隊(duì)列,請(qǐng)按2開(kāi)始消費(fèi)。");

            string chooseChar = Console.ReadLine();
            if (chooseChar == "1")
            {
                DelayMessageConsumeByQueueExpires();
            }
            else if (chooseChar == "2")
            {
                DelayMessageConsumeByMessageTTL();
            }
            Console.ReadLine();
        }

        public static void DelayMessageConsumeByQueueExpires()
        {
            var factory = new ConnectionFactory()
            {
                HostName = RabbitMQConfig.Host,
                Port = RabbitMQConfig.Port,
                VirtualHost = RabbitMQConfig.VirtualHost,
                UserName = RabbitMQConfig.UserName,
                Password = RabbitMQConfig.Password,
                Protocol = Protocols.DefaultProtocol
            };

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange: "dead exchange 1", type: "direct");
                    string name = channel.QueueDeclare().QueueName;
                    channel.QueueBind(queue: name, exchange: "dead exchange 1", routingKey: "dead routing key 1");

                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var message = Encoding.UTF8.GetString(ea.Body);
                        Console.WriteLine($"{DateTime.Now.ToString()} Received {message}");
                    };
                    channel.BasicConsume(queue: name, noAck: true, consumer: consumer);
                    Console.ReadKey();
                }
            }
        }

        public static void DelayMessageConsumeByMessageTTL()
        {
            var factory = new ConnectionFactory()
            {
                HostName = RabbitMQConfig.Host,
                Port = RabbitMQConfig.Port,
                VirtualHost = RabbitMQConfig.VirtualHost,
                UserName = RabbitMQConfig.UserName,
                Password = RabbitMQConfig.Password,
                Protocol = Protocols.DefaultProtocol
            };

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange: "dead exchange 2", type: "direct");
                    string name = channel.QueueDeclare().QueueName;
                    channel.QueueBind(queue: name, exchange: "dead exchange 2", routingKey: "dead routing key 2");

                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var message = Encoding.UTF8.GetString(ea.Body);
                        Console.WriteLine($"{DateTime.Now.ToString()} Received {message}");
                    };
                    channel.BasicConsume(queue: name, noAck: true, consumer: consumer);
                    Console.ReadKey();
                }
            }
        }
    }
Program.cs

    2.3、運(yùn)行結(jié)果

-----------------------------------------------------------------------------------------------------------

    本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請(qǐng)注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購(gòu)買(mǎi)等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊一鍵舉報(bào)。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請(qǐng)遵守用戶 評(píng)論公約

    類似文章 更多