Skip to main content

Building a producer/consumer pipeline in .NET Core using Open.ChannelExtensions

One of the lesser known features of .NET Core is System.Threading.Channels. It allows you to implement a pipeline of producers/consumers without having to worry about locking, concurrency and so on.

For an introduction have a look here; https://devblogs.microsoft.com/dotnet/an-introduction-to-system-threading-channels/

Although it would be a good solution for a lot of use cases, I don’t see it used that often. I think the main reason is that the API is not that intuitive and it takes some time to figure out how to use it.

Let’s have a look at an example; (I borrowed it from Sacha Barb’s great introduction about System.Threading.Channels):

class Program
{
static async Task Main(string[] args)
{
await ChannelRun(2000, 1, 100, 5);
Console.ReadLine();
}
public static async Task ChannelRun(int delayMs, int numberOfReaders,
int howManyMessages = 100, int maxCapacity = 10)
{
var finalDelayMs = 25;
var finalNumberOfReaders = 1;
if (delayMs >= 25)
finalDelayMs = delayMs;
if (numberOfReaders >= 1)
finalNumberOfReaders = numberOfReaders;
//use a bounded channel is useful if you have a slow consumer
//unbounded may lead to OutOfMemoryException
var channel = Channel.CreateBounded<string>(maxCapacity);
var reader = channel.Reader;
var writer = channel.Writer;
async Task Read(ChannelReader<string> theReader, int readerNumber)
{
//while when channel is not complete
while (await theReader.WaitToReadAsync())
{
while (theReader.TryRead(out var theMessage))
{
Console.WriteLine($"Reader {readerNumber} read '{theMessage}' at {DateTime.Now.ToLongTimeString()}");
//simulate some work
await Task.Delay(delayMs);
}
}
}
var tasks = new List<Task>();
for (int i = 0; i < finalNumberOfReaders; i++)
{
tasks.Add(Task.Run(() => Read(reader, i + 1)));
await Task.Delay(10);
}
//Write message to the channel, but since Read has Delay
//we will get back pressure applied to the writer, which causes it to block
//when writing. Unbounded channels do not block ever
for (int i = 0; i < howManyMessages; i++)
{
Console.WriteLine($"Writing at {DateTime.Now.ToLongTimeString()}");
await writer.WriteAsync($"SomeText message '{i}");
}
//Tell readers we are complete with writing, to stop them awaiting
//WaitToReadAsync() forever
writer.Complete();
await reader.Completion;
await Task.WhenAll(tasks);
}
}
view raw Channels.cs hosted with ❤ by GitHub

Although this example is rather trivial, it takes some time to wrap your head around it and understand what is going on.  Let’s see if we can simplify this example through the Open.ChannelExtensions. This library offers a set of extensions for optimizing/simplifying System.Threading.Channels usage.

Here is the simplified code:

class Program
{
static async Task Main(string[] args)
{
await ChannelRunSimplified(2000, 1, 100, 5);
Console.ReadLine();
}
public static async Task ChannelRunSimplified(int delayMs, int numberOfReaders,
int howManyMessages = 100, int maxCapacity = 10)
{
var finalDelayMs = 25;
var finalNumberOfReaders = 1;
if (delayMs >= 25)
finalDelayMs = delayMs;
if (numberOfReaders >= 1)
finalNumberOfReaders = numberOfReaders;
//Write message to the channel, but since Read has Delay
//we will get back pressure applied to the writer, which causes it to block
//when writing. Unbounded channels do not block ever
IEnumerable<string> GetMessages()
{
for (int i = 0; i < howManyMessages; i++)
{
Console.WriteLine($"Writing at {DateTime.Now.ToLongTimeString()}");
yield return $"SomeText message '{i}";
}
}
//use a bounded channel is useful if you have a slow consumer
//unbounded may lead to OutOfMemoryException
var channel = Channel
.CreateBounded<string>(maxCapacity)
.Source(GetMessages())
.ReadAllAsync(async(theMessage, readerNumber) =>
{
Console.WriteLine($"Reader {readerNumber} read '{theMessage}' at {DateTime.Now.ToLongTimeString()}");
//simulate some work
await Task.Delay(delayMs);
})
;
await channel;
}
}
}

Popular posts from this blog

Kubernetes–Limit your environmental impact

Reducing the carbon footprint and CO2 emission of our (cloud) workloads, is a responsibility of all of us. If you are running a Kubernetes cluster, have a look at Kube-Green . kube-green is a simple Kubernetes operator that automatically shuts down (some of) your pods when you don't need them. A single pod produces about 11 Kg CO2eq per year( here the calculation). Reason enough to give it a try! Installing kube-green in your cluster The easiest way to install the operator in your cluster is through kubectl. We first need to install a cert-manager: kubectl apply -f https://github.com/cert-manager/cert-manager/releases/download/v1.14.5/cert-manager.yaml Remark: Wait a minute before you continue as it can take some time before the cert-manager is up & running inside your cluster. Now we can install the kube-green operator: kubectl apply -f https://github.com/kube-green/kube-green/releases/latest/download/kube-green.yaml Now in the namespace where we want t...

Azure DevOps/ GitHub emoji

I’m really bad at remembering emoji’s. So here is cheat sheet with all emoji’s that can be used in tools that support the github emoji markdown markup: All credits go to rcaviers who created this list.

DevToys–A swiss army knife for developers

As a developer there are a lot of small tasks you need to do as part of your coding, debugging and testing activities.  DevToys is an offline windows app that tries to help you with these tasks. Instead of using different websites you get a fully offline experience offering help for a large list of tasks. Many tools are available. Here is the current list: Converters JSON <> YAML Timestamp Number Base Cron Parser Encoders / Decoders HTML URL Base64 Text & Image GZip JWT Decoder Formatters JSON SQL XML Generators Hash (MD5, SHA1, SHA256, SHA512) UUID 1 and 4 Lorem Ipsum Checksum Text Escape / Unescape Inspector & Case Converter Regex Tester Text Comparer XML Validator Markdown Preview Graphic Col...