| 在面對  生產(chǎn)者-消費(fèi)者的場景下, netcore 提供了一個(gè)新的命名空間System.Threading.Channels來幫助我們更高效的處理此類問題,有了這個(gè) Channels 存在,生產(chǎn)者和消費(fèi)者可以各自處理自己的任務(wù)而不相互干擾,有利于兩方的并發(fā)處理,這篇文章我們就來討論下如何使用System.Threading.Channels。 Dataflow vs Channel在 System.Threading.Tasks.Dataflow命名空間下提供了一個(gè)數(shù)據(jù)流庫,主要封裝了存儲和處理兩大塊,該庫專注于 pipeline 處理,而System.Threading.Tasks.Channels主要專注于存儲這塊,從單一職責(zé)上來說,在生產(chǎn)者-消費(fèi)者場景下,Channels 比 Dataflow 性能要高得多。 為什么要使用 Channels可以利用 Channels 來實(shí)現(xiàn) 生產(chǎn)者和消費(fèi)者之間的解耦,大體上有兩個(gè)好處: 總的來說,在 生產(chǎn)者-消費(fèi)者模式下可以幫助我們提高應(yīng)用程序的吞吐率。 安裝 System.Threading.Channels要想使用 Channel,需要用 nuget 引用 System.Threading.Channels包,還可以通過 Visual Studio 2019 的NuGet package manager可視化界面安裝 或者 通過NuGet package manager命令行工具輸入以下命令: 
dotnet add package System.Threading.Channels
 創(chuàng)建 channel本質(zhì)上來說,你可以創(chuàng)建兩種類型的 channel,一種是有限容量的 bound channel,一種是無限容量的unbound channel,接下來的問題是,如何創(chuàng)建呢?Channels 提供了兩種 工廠方法 用于創(chuàng)建,如下代碼所示: 下面的代碼片段展示了如何創(chuàng)建 unbounded channel,并且只能存放 string 類型。 
        static void Main(string[] args)
        {
            var channel = Channel.CreateUnbounded<string>();
        }
 對了,Bounded channel還提供了一個(gè) FullMode 屬性,用于指定當(dāng) channel 已滿時(shí)該如何對插入的 message 進(jìn)行處理,通常有四種做法。 
Wait
DropWrite
DropNewest
DropOldest 下面的代碼片段展示了如何在 Bounded channel上使用 FullMode。 
        static void Main(string[] args)
        {
            var channel = Channel.CreateBounded<string>(new BoundedChannelOptions(1000)
            {
                FullMode = BoundedChannelFullMode.Wait
            });
        }
 將消息寫入到 channel要想將 message 寫入到 channel,可以使用 WriteAsync()方法,如下代碼所示: 
        static async Task Main(string[] args)
        {
            var channel = Channel.CreateBounded<string>(new BoundedChannelOptions(1000)
            {
                FullMode = BoundedChannelFullMode.Wait
            });
            await channel.Writer.WriteAsync("Hello World!");
        }
 從 channel 中讀取消息要想從 channel 中讀取 message,可以使用 ReadAsync(),如下代碼所示: 
        static async Task Main(string[] args)
        {
            var channel = Channel.CreateBounded<string>(new BoundedChannelOptions(1000)
            {
                FullMode = BoundedChannelFullMode.Wait
            });
            while (await channel.Reader.WaitToReadAsync())
            {
                if (channel.Reader.TryRead(out var message))
                {
                    Console.WriteLine(message);
                }
            }
        }
 System.Threading.Channels 例子下面是完整的代碼清單,展示了如何從 channel 中讀寫 message。 
    class Program
    {
        static async Task Main(string[] args)
        {
            await SingleProducerSingleConsumer();
            Console.ReadKey();
        }
        public static async Task SingleProducerSingleConsumer()
        {
            var channel = Channel.CreateUnbounded<int>();
            var reader = channel.Reader;
            for (int i = 0; i < 10; i++)
            {
                await channel.Writer.WriteAsync(i + 1);
            }
            while (await reader.WaitToReadAsync())
            {
                if (reader.TryRead(out var number))
                {
                    Console.WriteLine(number);
                }
            }
        }
    }
 
 可以看到,控制臺中輸出了數(shù)字 1-10,這些數(shù)字正是 Writer 寫入到 channel 中的,對吧。 總的來說,要想使用 生產(chǎn)者-消費(fèi)者場景,有幾種實(shí)現(xiàn)途徑,比如:BlockingCollection 和 TPL Dataflow,但本篇介紹的 Channels 要比前面的兩種性能更高,關(guān)于 Channels 更多的細(xì)節(jié),我會在未來的文章中進(jìn)行討論,如果您現(xiàn)在想急于了解的話,可以參考MSDN: https://docs.microsoft.com/en-us/dotnet/api/system.threading.channels?view=netcore-3.0 |