Skip to main content

RabbitMQ Streams

RabbitMQ has been the message broker of my choice for a long time. It has served me well over the years and I still like to use it today. Recently, I was able to add an extra reason to the list why I like RabbitMQ when I noticed that a new feature was added in RabbitMQ 3.9; Streams.

RabbitMQ Streams

From the documentation:

Streams are a new persistent and replicated data structure in RabbitMQ 3.9 which models an append-only log with non-destructive consumer semantics.

With streams you get Kafka like functionality in RabbitMQ without all the complexity that comes with maintaining and managing your Kafka cluster. It has been created with the following use cases in mind:

  • Large amount of subscribers; in traditional queuing we use a dedicated queue for each consumers. This becomes ineffective whehn we have large number of consumers.
  • Time-travelling; Streams will allow consumers to attach at any point in the log and read from there.
  • Performance: Streams have been designed with performance as a major goal
  • Large logs: Streams are designed to store larger amounts of data in an efficient manner with minimal in-memory overhead.

If you want a good introduction, have a look at the following video:

Using RabbitMQ Streams with .NET

Although I hope that there will soon be a MassTransit Rider implementation for RabbitMQ Streams, right now the way to go is through the RabbitMQ.Stream.Client NuGet package.

Let’s build a small sample application to try it out…

Enable the RabbitMQ Streams plugin

Before we can start we need to enable RabbitMQ Streams. This is available as a separate plugin and is not activated out of the box:

rabbitmq-plugins enable rabbitmq_stream

Tip: If you want to test it locally you can use the RabbitMQ image I’ve created a Dockerfile where this plugin is already enabled: https://github.com/wullemsb/docker-rabbitmq

Create the stream

First we need to construct a configuration object. RabbitMQ streams is using a separate port (by default this is port 5552).

Now that we are connected to our RabbitMQ cluster, we can create a new Stream through CreateStream(). This is an idempotent operation so you can safely call this multiple times.

It is important when creating the stream to specify a retention policy to prevent the stream from growing infinitely. In our example we limit the queue to 200000 bytes.

Add a producer

Now it is time to create our producer.

And we can start publishing events through the Send() method.

The Send() method expect a publishingId that should be incremented for each send and our message payload.

Add a consumer

Almost there. Time to consume these published messages…

Notice the OffsetSpec. This allows us to specify from where the stream should be consumed. In this example we have set it to OffsetTypeFirst meaning the beginning of the stream.

    If we now run the application, we get output like this:

    Here is the full example:

    Popular posts from this blog

    Azure DevOps/ GitHub emoji

    I’m really bad at remembering emoji’s. So here is cheat sheet with all emoji’s that can be used in tools that support the github emoji markdown markup: All credits go to rcaviers who created this list.

    Podman– Command execution failed with exit code 125

    After updating WSL on one of the developer machines, Podman failed to work. When we took a look through Podman Desktop, we noticed that Podman had stopped running and returned the following error message: Error: Command execution failed with exit code 125 Here are the steps we tried to fix the issue: We started by running podman info to get some extra details on what could be wrong: >podman info OS: windows/amd64 provider: wsl version: 5.3.1 Cannot connect to Podman. Please verify your connection to the Linux system using `podman system connection list`, or try `podman machine init` and `podman machine start` to manage a new Linux VM Error: unable to connect to Podman socket: failed to connect: dial tcp 127.0.0.1:2655: connectex: No connection could be made because the target machine actively refused it. That makes sense as the podman VM was not running. Let’s check the VM: >podman machine list NAME         ...

    Cleaner switch expressions with pattern matching in C#

    Ever find yourself mapping multiple string values to the same result? Being a C# developer for a long time, I sometimes forget that the C# has evolved so I still dare to chain case labels or reach for a dictionary. Of course with pattern matching this is no longer necessary. With pattern matching, you can express things inline, declaratively, and with zero repetition. A small example I was working on a small script that should invoke different actions depending on the environment. As our developers were using different variations for the same environment e.g.  "tst" alongside "test" , "prd" alongside "prod" .  We asked to streamline this a long time ago, but as these things happen, we still see variations in the wild. This brought me to the following code that is a perfect example for pattern matching: The or keyword here is a logical pattern combinator , not a boolean operator. It matches if either of the specified pattern...