After blogging about adding a header yesterday, today let’s have a look at how we can read out this header in a generic way(if you only want to do this for a specific consumer, take a look at this blog post from last week).
We’ll start by creating the filter:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class ConsumeUserHeaderFilter<T> : IFilter<ConsumerConsumeContext<T>> where T:class | |
{ | |
public void Probe(ProbeContext context) | |
{ | |
context.CreateFilterScope("consumeuserheader"); | |
} | |
public Task Send(ConsumerConsumeContext<T> context, IPipe<ConsumerConsumeContext<T>> next) | |
{ | |
//Access the scoped IoC container through the payload | |
var serviceProvider=context.GetPayload<IServiceProvider>(); | |
var userFactory = serviceProvider.GetRequiredService<IUserFactory>(); | |
if (context.Headers.TryGetHeader("user", out object user)) | |
{ | |
if (userFactory is ScopedUserFactory scopedFactory) | |
scopedFactory.SetCurrentUser(new User { Name = user.ToString() }); | |
} | |
return next.Send(context); | |
} | |
} |
Now we need to find a way to register this filter in a generic way so that it is applied for every consumer. The trick is to create an IConsumerConfigurationObserver implementation. This observer will be applied for every consumer that is configured. A perfect fit for our use case:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class SecurityEndpointConfigurationObserver : IConsumerConfigurationObserver | |
{ | |
public void ConsumerConfigured<TConsumer>(IConsumerConfigurator<TConsumer> configurator) where TConsumer : class | |
{ | |
configurator.UseFilter(new ConsumeUserHeaderFilter<TConsumer>()); | |
} | |
public void ConsumerMessageConfigured<TConsumer, TMessage>(IConsumerMessageConfigurator<TConsumer, TMessage> configurator) | |
where TConsumer : class | |
where TMessage : class | |
{ | |
} | |
} |
Of course we are not there yet. We still need a way to tell MassTransit to apply this observer. This can be done by calling the ConnectConsumerConfigurationObserver method on the IBusFactoryConfigurator:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
container.AddMassTransit(config => | |
{ | |
//Removed the other config settings | |
config.UsingRabbitMq((context, cfg) => | |
{ | |
cfg.ConnectConsumerConfigurationObserver(new SecurityEndpointConfigurationObserver()); | |
} | |
} |