I couldn’t find a good example on the best way to integrate the RabbitMQ C# client in a .NET core application. So time to write a post about it.
I would recommend to use the Hosted Services, either directly through the IHostedService or through the BackgroundService.
Let’s see how to do it using the BackgroundService:
- We create a new class that inherits from BackgroundService:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class RabbitMQHostedService: BackgroundService { | |
private ConnectionFactory _connectionFactory; | |
private IConnection _connection; | |
private IModel _channel; | |
public RabbitMQHostedService() | |
{ | |
} | |
} |
- We’ll use the StartAsync method to create the Connectionfactory, Connection and a Channel to listen on:
Remark: Notice the DispatchConsumersAsync = true in the ConnectionFactory configuration. This is important to be able to use an async consumer. If you don’t add this configuration no messages will be picked up by the AsyncEventingBasicConsumer.This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters
public override Task StartAsync(CancellationToken cancellationToken) { _connectionFactory = new ConnectionFactory { UserName = "<user>", Password = "<pw>", DispatchConsumersAsync = true }; _connection = _connectionFactory.CreateConnection(); _channel = _connection.CreateModel(); _channel.QueueDeclare(queue: "<queuename>", durable: true, exclusive: false, autoDelete: false, arguments: null); return base.StartAsync(cancellationToken); }
- We also implement the StopAsync method to cleanup when our backgroundservice is shutdown:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public override async Task StopAsync(CancellationToken cancellationToken) | |
{ | |
await base.StopAsync(cancellationToken); | |
_connection.Close(); | |
} |
- Now the only thing left is to create a consumer to start receiving messages. We’ll use the ExecuteAsync method for that:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
protected override async Task ExecuteAsync(CancellationToken stoppingToken) | |
{ | |
stoppingToken.ThrowIfCancellationRequested(); | |
var consumer = new AsyncEventingBasicConsumer(_channel); | |
consumer.Received += async (bc, ea) => | |
{ | |
var message = Encoding.UTF8.GetString(ea.Body.ToArray()); | |
try | |
{ | |
//Add message parsing and handling logic here | |
_channel.BasicAck(ea.DeliveryTag, false); | |
} | |
catch(Exception ex) | |
{ | |
_channel.BasicNack(ea.DeliveryTag, false, false); | |
} | |
}; | |
_channel.BasicConsume(queue: "<queuename>", autoAck: false, consumer: consumer); | |
await Task.CompletedTask; | |
} |
- Of course we should not forget to register our Hosted Service:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
services.AddHostedService<RabbitMQHostedService>(); |