One of the lesser known features of .NET Core is System.Threading.Channels. It allows you to implement a pipeline of producers/consumers without having to worry about locking, concurrency and so on.
For an introduction have a look here; https://devblogs.microsoft.com/dotnet/an-introduction-to-system-threading-channels/
Although it would be a good solution for a lot of use cases, I don’t see it used that often. I think the main reason is that the API is not that intuitive and it takes some time to figure out how to use it.
Let’s have a look at an example; (I borrowed it from Sacha Barb’s great introduction about System.Threading.Channels):
class Program | |
{ | |
static async Task Main(string[] args) | |
{ | |
await ChannelRun(2000, 1, 100, 5); | |
Console.ReadLine(); | |
} | |
public static async Task ChannelRun(int delayMs, int numberOfReaders, | |
int howManyMessages = 100, int maxCapacity = 10) | |
{ | |
var finalDelayMs = 25; | |
var finalNumberOfReaders = 1; | |
if (delayMs >= 25) | |
finalDelayMs = delayMs; | |
if (numberOfReaders >= 1) | |
finalNumberOfReaders = numberOfReaders; | |
//use a bounded channel is useful if you have a slow consumer | |
//unbounded may lead to OutOfMemoryException | |
var channel = Channel.CreateBounded<string>(maxCapacity); | |
var reader = channel.Reader; | |
var writer = channel.Writer; | |
async Task Read(ChannelReader<string> theReader, int readerNumber) | |
{ | |
//while when channel is not complete | |
while (await theReader.WaitToReadAsync()) | |
{ | |
while (theReader.TryRead(out var theMessage)) | |
{ | |
Console.WriteLine($"Reader {readerNumber} read '{theMessage}' at {DateTime.Now.ToLongTimeString()}"); | |
//simulate some work | |
await Task.Delay(delayMs); | |
} | |
} | |
} | |
var tasks = new List<Task>(); | |
for (int i = 0; i < finalNumberOfReaders; i++) | |
{ | |
tasks.Add(Task.Run(() => Read(reader, i + 1))); | |
await Task.Delay(10); | |
} | |
//Write message to the channel, but since Read has Delay | |
//we will get back pressure applied to the writer, which causes it to block | |
//when writing. Unbounded channels do not block ever | |
for (int i = 0; i < howManyMessages; i++) | |
{ | |
Console.WriteLine($"Writing at {DateTime.Now.ToLongTimeString()}"); | |
await writer.WriteAsync($"SomeText message '{i}"); | |
} | |
//Tell readers we are complete with writing, to stop them awaiting | |
//WaitToReadAsync() forever | |
writer.Complete(); | |
await reader.Completion; | |
await Task.WhenAll(tasks); | |
} | |
} |
Although this example is rather trivial, it takes some time to wrap your head around it and understand what is going on. Let’s see if we can simplify this example through the Open.ChannelExtensions. This library offers a set of extensions for optimizing/simplifying System.Threading.Channels usage.
Here is the simplified code:
class Program | |
{ | |
static async Task Main(string[] args) | |
{ | |
await ChannelRunSimplified(2000, 1, 100, 5); | |
Console.ReadLine(); | |
} | |
public static async Task ChannelRunSimplified(int delayMs, int numberOfReaders, | |
int howManyMessages = 100, int maxCapacity = 10) | |
{ | |
var finalDelayMs = 25; | |
var finalNumberOfReaders = 1; | |
if (delayMs >= 25) | |
finalDelayMs = delayMs; | |
if (numberOfReaders >= 1) | |
finalNumberOfReaders = numberOfReaders; | |
//Write message to the channel, but since Read has Delay | |
//we will get back pressure applied to the writer, which causes it to block | |
//when writing. Unbounded channels do not block ever | |
IEnumerable<string> GetMessages() | |
{ | |
for (int i = 0; i < howManyMessages; i++) | |
{ | |
Console.WriteLine($"Writing at {DateTime.Now.ToLongTimeString()}"); | |
yield return $"SomeText message '{i}"; | |
} | |
} | |
//use a bounded channel is useful if you have a slow consumer | |
//unbounded may lead to OutOfMemoryException | |
var channel = Channel | |
.CreateBounded<string>(maxCapacity) | |
.Source(GetMessages()) | |
.ReadAllAsync(async(theMessage, readerNumber) => | |
{ | |
Console.WriteLine($"Reader {readerNumber} read '{theMessage}' at {DateTime.Now.ToLongTimeString()}"); | |
//simulate some work | |
await Task.Delay(delayMs); | |
}) | |
; | |
await channel; | |
} | |
} | |
} |