Skip to main content

RabbitMQ Streams–Reliable producers

Last week I introduced RabbitMQ streams and how you could produce and consume streams through the RabbitMQ.Stream.Client in .NET.

The default Producer is really low-level and leaves a lot of things to be implemented by us. For example, we have to increment the PublishingId ourselves with every Send() operation. Let’s find out how we can improve this through Reliable Producers.

Introducing Reliable Producers

Reliable Producer builts on top of the Producer and adds the following features:

  • Provide publishingID automatically
  • Auto-Reconnect in case of disconnection
  • Trace sent and received messages
  • Invalidate messages
  • Handle the metadata Update

Provide publishingID automatically

When using a Reliable Producer it retrieves the last publishingID given the producer name.  This means that it becomes important to choose a good reference value.

Auto-Reconnect

The Reliable Producer  will try to restore the TCP connection when the Producer is disconnected for some reason.

Trace sent and received messages

The Reliable Producer keeps each sent message in memory and removes it from memory when the message is confirmed or goes in timeout.

Invalidate messages

If the client doesn't receive a confirmation within 2 seconds the Reliable Producer removes the message from the internal messages cache. The user will receive ConfirmationStatus.TimeoutError in the ConfirmationHandler.

Handle the metadata update

If the streams  topology changes (ex:Stream deleted or add/remove follower), the client receives an MetadataUpdate event. The Reliable Producer detects this event and tries to reconnect the producer if the stream still exist or closes the producer/consumer when the stream is deleted.

Sending messages through Reliable Producers

The Reliable Producer also provides a Send() method but only requires the message as parameter because the publishingId is provided automatically:

Popular posts from this blog

.NET 8–Keyed/Named Services

A feature that a lot of IoC container libraries support but that was missing in the default DI container provided by Microsoft is the support for Keyed or Named Services. This feature allows you to register the same type multiple times using different names, allowing you to resolve a specific instance based on the circumstances. Although there is some controversy if supporting this feature is a good idea or not, it certainly can be handy. To support this feature a new interface IKeyedServiceProvider got introduced in .NET 8 providing 2 new methods on our ServiceProvider instance: object? GetKeyedService(Type serviceType, object? serviceKey); object GetRequiredKeyedService(Type serviceType, object? serviceKey); To use it, we need to register our service using one of the new extension methods: Resolving the service can be done either through the FromKeyedServices attribute: or by injecting the IKeyedServiceProvider interface and calling the GetRequiredKeyedServic...

Azure DevOps/ GitHub emoji

I’m really bad at remembering emoji’s. So here is cheat sheet with all emoji’s that can be used in tools that support the github emoji markdown markup: All credits go to rcaviers who created this list.

Kubernetes–Limit your environmental impact

Reducing the carbon footprint and CO2 emission of our (cloud) workloads, is a responsibility of all of us. If you are running a Kubernetes cluster, have a look at Kube-Green . kube-green is a simple Kubernetes operator that automatically shuts down (some of) your pods when you don't need them. A single pod produces about 11 Kg CO2eq per year( here the calculation). Reason enough to give it a try! Installing kube-green in your cluster The easiest way to install the operator in your cluster is through kubectl. We first need to install a cert-manager: kubectl apply -f https://github.com/cert-manager/cert-manager/releases/download/v1.14.5/cert-manager.yaml Remark: Wait a minute before you continue as it can take some time before the cert-manager is up & running inside your cluster. Now we can install the kube-green operator: kubectl apply -f https://github.com/kube-green/kube-green/releases/latest/download/kube-green.yaml Now in the namespace where we want t...