On one of my projects, we are using a rather older version of MassTransit(version 3.5). As we needed some of the new features, we thought it was time for an upgrade.
After updating all our NuGet packages, everything seemed to compile. Unfortunately when we tried to run our unit tests, every test that was using the built-in TestFramework timed out.
Here is the code we were using:
using MassTransit; | |
using MassTransit.TestFramework; | |
using MassTransit.Testing.Observers; | |
using NUnit.Framework; | |
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Text; | |
using System.Threading.Tasks; | |
namespace Test.Messaging | |
{ | |
[TestFixture] | |
public class ConsumerTests : InMemoryTestFixture | |
{ | |
private TestConsumeMessageObserver<PlaceOrderMessage> _observer; | |
[OneTimeSetUp] | |
public async Task Before() | |
{ | |
_observer = GetConsumeObserver<PlaceOrderMessage>(); | |
Bus.ConnectConsumeMessageObserver(_observer); | |
await Bus.Publish(new SampleMessage() { CorrelationId = Guid.NewGuid(), SubmitDate = DateTime.Now, OrderNumber = 1000 }); | |
} | |
[Test] | |
public async Task Consume_Should_Receive_And_ProcessPublishedMessage() | |
{ | |
var expectedOrderNumber = 1000; | |
var orderMessage = await _observer.PostConsumed; | |
Assert.AreEqual(expectedOrderNumber, orderMessage.OrderNumber); | |
} | |
} | |
} |
Problem is that the following line keeps waiting for an incoming message:
var orderMessage = await _observer.PostConsumed;
This code worked perfectly before in MassTransit 3.5 but not anymore in MassTransit 5.0.
So what’s changed to cause this behavior?
Starting from MassTransit 4, the in-memory message fabric used by the InMemoryTestFixture behaves the same as RabbitMQ. Previously, every message was published to every endpoint, regardless of whether or not there was a binding for that message type. Now, if there is no binding, the message isn't delivered.
To fix it, we’ll have to add at least one consumer that listens to the specified message type. Here is the altered code:
using MassTransit; | |
using MassTransit.TestFramework; | |
using MassTransit.Testing.Observers; | |
using NUnit.Framework; | |
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Text; | |
using System.Threading.Tasks; | |
namespace Test.Messaging | |
{ | |
[TestFixture] | |
public class ConsumerTests : InMemoryTestFixture | |
{ | |
private TestConsumeMessageObserver<PlaceOrderMessage> _observer; | |
[OneTimeSetUp] | |
public async Task Before() | |
{ | |
_observer = GetConsumeObserver<PlaceOrderMessage>(); | |
Bus.ConnectConsumeMessageObserver(_observer); | |
await Bus.Publish(new SampleMessage() { CorrelationId = Guid.NewGuid(), SubmitDate = DateTime.Now, OrderNumber = 1000 }); | |
} | |
protected override void ConfigureInMemoryReceiveEndpoint(IInMemoryReceiveEndpointConfigurator configurator) | |
{ | |
base.ConfigureInMemoryReceiveEndpoint(configurator); | |
//Add at least one consumer to make the observer behavior work. | |
configurator.Consumer<SampleMessageConsumer>(); | |
} | |
public class SampleMessageConsumer : IConsumer<PlaceOrderMessage> | |
{ | |
public Task Consume(ConsumeContext<PlaceOrderMessage> context) | |
{ | |
return Task.CompletedTask; | |
} | |
} | |
[Test] | |
public async Task Consume_Should_Receive_And_ProcessPublishedMessage() | |
{ | |
var expectedOrderNumber = 1000; | |
var orderMessage = await _observer.PostConsumed; | |
Assert.AreEqual(expectedOrderNumber, orderMessage.OrderNumber); | |
} | |
} | |
} |
We override the ConfigureInMemoryReceiveEndpoint method and register a no-op Consumer.