MassTransit offers a lot of flexibility and it is easy to tweak it to your needs.
With all this power it is not always easy to find out what is the best way to solve a specific problem. I wanted to include a header with information about the user in every message.
I solved it by adding two filters; one for sending messages and one for publishing messages:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class PublishUserHeaderFilter<T> : IFilter<PublishContext<T>> where T : class | |
{ | |
private readonly IUserFactory _userFactory; | |
public PublishUserHeaderFilter(IUserFactory userFactory) | |
{ | |
_userFactory = userFactory; | |
} | |
public void Probe(ProbeContext context) | |
{ | |
context.CreateFilterScope("publishuserheader"); | |
} | |
public Task Send(PublishContext<T> context, IPipe<PublishContext<T>> next) | |
{ | |
context.Headers.Set("user", _userFactory.GetCurrentUser()); | |
return next.Send(context); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class SendUserHeaderFilter<T> : IFilter<SendContext<T>> where T : class | |
{ | |
private readonly IUserFactory _userFactory; | |
public SendUserHeaderFilter(IUserFactory userFactory) | |
{ | |
_userFactory = userFactory; | |
} | |
public void Probe(ProbeContext context) | |
{ | |
context.CreateFilterScope("senduserheader"); | |
} | |
public Task Send(SendContext<T> context, IPipe<SendContext<T>> next) | |
{ | |
context.Headers.Set("user", _userFactory.GetCurrentUser()); | |
return next.Send(context); | |
} | |
} |
Inside these filters I resolved an IUserFactory which gave me access to the underlying user. This is how I implemented this in ASP.NET Core:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class HttpContextUserFactory : IUserFactory | |
{ | |
private readonly IHttpContextAccessor _httpContextAccessor; | |
public HttpContextUserFactory(IHttpContextAccessor httpContextAccessor) | |
{ | |
_httpContextAccessor = httpContextAccessor; | |
} | |
public IUser GetCurrentUser() | |
{ | |
var httpUser = _httpContextAccessor?.HttpContext?.User; | |
if (httpUser == null) | |
return new UnknownUser(); | |
return new WebUser(httpUser); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public interface IUserFactory | |
{ | |
IUser GetCurrentUser(); | |
} |
I registered both filters in the IoC container(Autofac in our case) to make sure that the dependencies are correctly resolved:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var container=new ContainerBuilder(); | |
container.RegisterGeneric(typeof(SendUserHeaderFilter<>)).InstancePerLifetimeScope(); | |
container.RegisterGeneric(typeof(PublishUserHeaderFilter<>)).InstancePerLifetimeScope(); |
And as a last step I configured MassTransit to use these filters:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
container.AddMassTransit(config => | |
{ | |
//Removed the other config settings | |
config.UsingRabbitMq((context, cfg) => | |
{ | |
cfg.UsePublishFilter(typeof(PublishUserHeaderFilter<>), context); | |
cfg.UseSendFilter(typeof(SendUserHeaderFilter<>), context); | |
} | |
} |