Skip to main content

RabbitMQ Streams

RabbitMQ has been the message broker of my choice for a long time. It has served me well over the years and I still like to use it today. Recently, I was able to add an extra reason to the list why I like RabbitMQ when I noticed that a new feature was added in RabbitMQ 3.9; Streams.

RabbitMQ Streams

From the documentation:

Streams are a new persistent and replicated data structure in RabbitMQ 3.9 which models an append-only log with non-destructive consumer semantics.

With streams you get Kafka like functionality in RabbitMQ without all the complexity that comes with maintaining and managing your Kafka cluster. It has been created with the following use cases in mind:

  • Large amount of subscribers; in traditional queuing we use a dedicated queue for each consumers. This becomes ineffective whehn we have large number of consumers.
  • Time-travelling; Streams will allow consumers to attach at any point in the log and read from there.
  • Performance: Streams have been designed with performance as a major goal
  • Large logs: Streams are designed to store larger amounts of data in an efficient manner with minimal in-memory overhead.

If you want a good introduction, have a look at the following video:

Using RabbitMQ Streams with .NET

Although I hope that there will soon be a MassTransit Rider implementation for RabbitMQ Streams, right now the way to go is through the RabbitMQ.Stream.Client NuGet package.

Let’s build a small sample application to try it out…

Enable the RabbitMQ Streams plugin

Before we can start we need to enable RabbitMQ Streams. This is available as a separate plugin and is not activated out of the box:

rabbitmq-plugins enable rabbitmq_stream

Tip: If you want to test it locally you can use the RabbitMQ image I’ve created a Dockerfile where this plugin is already enabled: https://github.com/wullemsb/docker-rabbitmq

Create the stream

First we need to construct a configuration object. RabbitMQ streams is using a separate port (by default this is port 5552).

var config = new StreamSystemConfig
{
UserName = "guest",
Password = "guest",
Endpoints=new List<EndPoint>() {
new IPEndPoint(IPAddress.Loopback, 5552)
},
VirtualHost = "/"
};
// Connect to the broker
var system = await StreamSystem.Create(config);
view raw Connect.cs hosted with ❤ by GitHub

Now that we are connected to our RabbitMQ cluster, we can create a new Stream through CreateStream(). This is an idempotent operation so you can safely call this multiple times.

const string stream = "my_first_stream";
await system.CreateStream(new StreamSpec(stream)
{
MaxLengthBytes = 200000,
});
view raw CreateStream.cs hosted with ❤ by GitHub

It is important when creating the stream to specify a retention policy to prevent the stream from growing infinitely. In our example we limit the queue to 200000 bytes.

Add a producer

Now it is time to create our producer.

And we can start publishing events through the Send() method.

var producer = await system.CreateProducer(
new ProducerConfig
{
Reference = Guid.NewGuid().ToString(),
Stream = stream,
// Here you can receive the messages confirmation
// it means the message is stored on the server
ConfirmHandler = conf =>
{
Console.WriteLine($"message: {conf.PublishingId} - confirmed");
}
});

The Send() method expect a publishingId that should be incremented for each send and our message payload.

for (ulong i = 0; i < 100; i++)
{
var message = new Message(Encoding.UTF8.GetBytes($"hello {i}"));
await producer.Send(i, message);
}

Add a consumer

Almost there. Time to consume these published messages…

var consumer = await system.CreateConsumer(
new ConsumerConfig
{
Reference = Guid.NewGuid().ToString(),
Stream = stream,
OffsetSpec = new OffsetTypeFirst(),
MessageHandler = async (consumer, ctx, message) =>
{
Console.WriteLine($"message: {Encoding.Default.GetString(message.Data.Contents.ToArray())} - consumed");
await Task.CompletedTask;
}
});

Notice the OffsetSpec. This allows us to specify from where the stream should be consumed. In this example we have set it to OffsetTypeFirst meaning the beginning of the stream.

    If we now run the application, we get output like this:

    Here is the full example:

    using RabbitMQ.Stream.Client;
    using System.Buffers;
    using System.Net;
    using System.Text;
    var config = new StreamSystemConfig
    {
    UserName = "guest",
    Password = "guest",
    Endpoints=new List<EndPoint>() {
    new IPEndPoint(IPAddress.Loopback, 5552)
    },
    VirtualHost = "/"
    };
    // Connect to the broker
    var system = await StreamSystem.Create(config);
    const string stream = "my_first_stream";
    // Create the stream. It is important to put some retention policy
    // in this case is 200000 bytes.
    await system.CreateStream(new StreamSpec(stream)
    {
    MaxLengthBytes = 200000,
    });
    var producer = await system.CreateProducer(
    new ProducerConfig
    {
    Reference = Guid.NewGuid().ToString(),
    Stream = stream,
    // Here you can receive the messages confirmation
    // it means the message is stored on the server
    ConfirmHandler = conf =>
    {
    Console.WriteLine($"message: {conf.PublishingId} - confirmed");
    }
    });
    // Publish the messages and set the publishingId that
    // should be sequential
    for (ulong i = 0; i < 100; i++)
    {
    var message = new Message(Encoding.UTF8.GetBytes($"hello {i}"));
    await producer.Send(i, message);
    }
    // not mandatory. Just to show the confirmation
    Thread.Sleep(TimeSpan.FromSeconds(1));
    // Create a consumer
    var consumer = await system.CreateConsumer(
    new ConsumerConfig
    {
    Reference = Guid.NewGuid().ToString(),
    Stream = stream,
    // Consume the stream from the beginning
    // See also other OffsetSpec
    OffsetSpec = new OffsetTypeFirst(),
    // Receive the messages
    MessageHandler = async (consumer, ctx, message) =>
    {
    Console.WriteLine($"message: {Encoding.Default.GetString(message.Data.Contents.ToArray())} - consumed");
    await Task.CompletedTask;
    }
    });
    Console.WriteLine($"Press to stop");
    Console.ReadLine();
    await producer.Close();
    await consumer.Close();
    await system.DeleteStream(stream);
    await system.Close();

    Popular posts from this blog

    Kubernetes–Limit your environmental impact

    Reducing the carbon footprint and CO2 emission of our (cloud) workloads, is a responsibility of all of us. If you are running a Kubernetes cluster, have a look at Kube-Green . kube-green is a simple Kubernetes operator that automatically shuts down (some of) your pods when you don't need them. A single pod produces about 11 Kg CO2eq per year( here the calculation). Reason enough to give it a try! Installing kube-green in your cluster The easiest way to install the operator in your cluster is through kubectl. We first need to install a cert-manager: kubectl apply -f https://github.com/cert-manager/cert-manager/releases/download/v1.14.5/cert-manager.yaml Remark: Wait a minute before you continue as it can take some time before the cert-manager is up & running inside your cluster. Now we can install the kube-green operator: kubectl apply -f https://github.com/kube-green/kube-green/releases/latest/download/kube-green.yaml Now in the namespace where we want t...

    Azure DevOps/ GitHub emoji

    I’m really bad at remembering emoji’s. So here is cheat sheet with all emoji’s that can be used in tools that support the github emoji markdown markup: All credits go to rcaviers who created this list.

    DevToys–A swiss army knife for developers

    As a developer there are a lot of small tasks you need to do as part of your coding, debugging and testing activities.  DevToys is an offline windows app that tries to help you with these tasks. Instead of using different websites you get a fully offline experience offering help for a large list of tasks. Many tools are available. Here is the current list: Converters JSON <> YAML Timestamp Number Base Cron Parser Encoders / Decoders HTML URL Base64 Text & Image GZip JWT Decoder Formatters JSON SQL XML Generators Hash (MD5, SHA1, SHA256, SHA512) UUID 1 and 4 Lorem Ipsum Checksum Text Escape / Unescape Inspector & Case Converter Regex Tester Text Comparer XML Validator Markdown Preview Graphic Col...