MassTransit 消费者测试通过但抛出令人困惑的错误
MassTransit Consumer Test Passing But Confusing Error Thrown
我正在尝试使用 MassTransit.Testing 框架和 InMemoryTestHarness 对 MassTransit 消费者进行单元测试。
到目前为止,我能够成功测试为两个不同的消费者发送一条消息。
其中一个消费者也消费成功了,但是报错如下:
R-FAULT loopback://localhost/vhost/input_queue 49820000-5689-0050-3b5c-08d5ecc4708c Acme.Company.Messages.Commands.ISomeCommand Acme.Company.SomeService.Consumers.SomeCommandConsumer(00:00:00.2328493) Failure: The payload was not found: MassTransit.RabbitMqTransport.ModelContext, StackTrace: at GreenPipes.PipeExtensions.GetPayload[TPayload](PipeContext context) at MassTransit.DeferExtensions.Defer[T](ConsumeContext1 context, TimeSpan delay, Action
2 callback)
此时的代码试图将消息延迟一分钟,所以我想知道这是否是丢失负载的原因???
代码如下:
[TestFixture]
public class SomeCommandConsumerTests
{
private InMemoryTestHarness _harness;
private Mock<ISomeRepository> _SomeRepositoryMock;
private Mock<IAnotherRepository> _AnotherRepositoryMock;
[OneTimeSetUp]
public async Task OneTimeInit()
{
_harness = new InMemoryTestHarness("vhost");
_harness.Consumer(() => new SomeCommandConsumer(_SomeRepositoryMock.Object, _AnotherRepositoryMock.Object));
await _harness.Start();
}
[SetUp]
public void Init()
{
_SomeRepositoryMock = new Mock<ISomeRepository>();
_AnotherRepositoryMock = new Mock<IAnotherRepository>();
_SomeRepositoryMock.Setup(x => x.UpdateSomeId(It.IsAny<SomeEnum>(), It.IsAny<int>()))
.Returns(Task.Factory.StartNew(() => { }));
_SomeRepositoryMock.Setup(x => x.UpdateProcMessage(It.IsAny<string>(), It.IsAny<int>()))
.Returns(Task.Factory.StartNew(() => { }));
_SomeRepositoryMock.Setup(
x => x.UpdateSomeProcStartTime(It.IsAny<int>()))
.Returns(Task.Factory.StartNew(() => { }));
_SomeRepositoryMock.Setup(
x => x.UpdateSomeProcEndTime(It.IsAny<int>()))
.Returns(Task.Factory.StartNew(() => { }));
}
[Test]
public async Task ProcessMessage_MethodCalledWithSomeCondition_MessageSent()
{
//Arrange
_SomeRepositoryMock.Setup(x => x.GetAsync(It.IsAny<int>())).ReturnsAsync(new Entity
{
Property1 = true,
SomeID = 12345
});
await _harness.InputQueueSendEndpoint.Send(new SomeCommand
{
MessageType = MessageTypeEnum.SomeMessgae,
SomeID = 12345
});
//Assert
_harness.Sent.Select<ISomeCommand>().Any().Should().BeTrue();
}
[Test]
public async Task ProcessMessage_MethodCalledWithSomeCondition_CorrectNextStepReturned()
{
//Arrange
_SomeRepositoryMock.Setup(x => x.GetAsync(It.IsAny<int>())).ReturnsAsync(new Control()
{
Property1 = true,
SomeID = 12345
});
await _harness.InputQueueSendEndpoint.Send(new SomeCommand
{
MessageType = MessageTypeEnum.SomeMessgae,
SomeID = 12345
});
//Assert
_harness.Consumed.Select<ISomeCommand>().Any().Should().BeTrue();
_harness.Consumed
.Select<ISomeCommand>()
.First()
.Context
.Message
.SomeID
.Should()
.Be(12345);
_harness.Consumed
.Select<ISomeCommand>()
.First()
.Context
.Message
.MessageProcessingResult
.Should()
.Be(MessageProcessingResult.DeferProcessing);
}
[OneTimeTearDown]
public async Task Teardown()
{
await _harness.Stop();
}
}
在消费者中被命中的代码是:
await context.Defer(TimeSpan.FromMinutes(1));
基本上,我错过了什么,这甚至是个问题吗?
发生这种情况是因为您使用的内存测试工具具有 RabbitMQ 支持的功能 (Defer
)。 Defer 尝试使用来自消费者的 RabbitMQ 模型来延迟消息,但它不存在,因为内存中对此一无所知。
如果您想使用更通用的解决方案,请改用 Redeliver
。您需要将 QuartzIntegration 库与内存中测试工具一起使用,但它会使用该调度程序进行内存中消息重新传递。
您还需要更新 RabbitMQ 总线配置以包含 cfg.UseDelayedExchangeMessageScheduler();
,以便 RabbitMQ 用于消息调度。
我正在尝试使用 MassTransit.Testing 框架和 InMemoryTestHarness 对 MassTransit 消费者进行单元测试。
到目前为止,我能够成功测试为两个不同的消费者发送一条消息。
其中一个消费者也消费成功了,但是报错如下:
R-FAULT loopback://localhost/vhost/input_queue 49820000-5689-0050-3b5c-08d5ecc4708c Acme.Company.Messages.Commands.ISomeCommand Acme.Company.SomeService.Consumers.SomeCommandConsumer(00:00:00.2328493) Failure: The payload was not found: MassTransit.RabbitMqTransport.ModelContext, StackTrace: at GreenPipes.PipeExtensions.GetPayload[TPayload](PipeContext context) at MassTransit.DeferExtensions.Defer[T](ConsumeContext
1 context, TimeSpan delay, Action
2 callback)
此时的代码试图将消息延迟一分钟,所以我想知道这是否是丢失负载的原因???
代码如下:
[TestFixture]
public class SomeCommandConsumerTests
{
private InMemoryTestHarness _harness;
private Mock<ISomeRepository> _SomeRepositoryMock;
private Mock<IAnotherRepository> _AnotherRepositoryMock;
[OneTimeSetUp]
public async Task OneTimeInit()
{
_harness = new InMemoryTestHarness("vhost");
_harness.Consumer(() => new SomeCommandConsumer(_SomeRepositoryMock.Object, _AnotherRepositoryMock.Object));
await _harness.Start();
}
[SetUp]
public void Init()
{
_SomeRepositoryMock = new Mock<ISomeRepository>();
_AnotherRepositoryMock = new Mock<IAnotherRepository>();
_SomeRepositoryMock.Setup(x => x.UpdateSomeId(It.IsAny<SomeEnum>(), It.IsAny<int>()))
.Returns(Task.Factory.StartNew(() => { }));
_SomeRepositoryMock.Setup(x => x.UpdateProcMessage(It.IsAny<string>(), It.IsAny<int>()))
.Returns(Task.Factory.StartNew(() => { }));
_SomeRepositoryMock.Setup(
x => x.UpdateSomeProcStartTime(It.IsAny<int>()))
.Returns(Task.Factory.StartNew(() => { }));
_SomeRepositoryMock.Setup(
x => x.UpdateSomeProcEndTime(It.IsAny<int>()))
.Returns(Task.Factory.StartNew(() => { }));
}
[Test]
public async Task ProcessMessage_MethodCalledWithSomeCondition_MessageSent()
{
//Arrange
_SomeRepositoryMock.Setup(x => x.GetAsync(It.IsAny<int>())).ReturnsAsync(new Entity
{
Property1 = true,
SomeID = 12345
});
await _harness.InputQueueSendEndpoint.Send(new SomeCommand
{
MessageType = MessageTypeEnum.SomeMessgae,
SomeID = 12345
});
//Assert
_harness.Sent.Select<ISomeCommand>().Any().Should().BeTrue();
}
[Test]
public async Task ProcessMessage_MethodCalledWithSomeCondition_CorrectNextStepReturned()
{
//Arrange
_SomeRepositoryMock.Setup(x => x.GetAsync(It.IsAny<int>())).ReturnsAsync(new Control()
{
Property1 = true,
SomeID = 12345
});
await _harness.InputQueueSendEndpoint.Send(new SomeCommand
{
MessageType = MessageTypeEnum.SomeMessgae,
SomeID = 12345
});
//Assert
_harness.Consumed.Select<ISomeCommand>().Any().Should().BeTrue();
_harness.Consumed
.Select<ISomeCommand>()
.First()
.Context
.Message
.SomeID
.Should()
.Be(12345);
_harness.Consumed
.Select<ISomeCommand>()
.First()
.Context
.Message
.MessageProcessingResult
.Should()
.Be(MessageProcessingResult.DeferProcessing);
}
[OneTimeTearDown]
public async Task Teardown()
{
await _harness.Stop();
}
}
在消费者中被命中的代码是:
await context.Defer(TimeSpan.FromMinutes(1));
基本上,我错过了什么,这甚至是个问题吗?
发生这种情况是因为您使用的内存测试工具具有 RabbitMQ 支持的功能 (Defer
)。 Defer 尝试使用来自消费者的 RabbitMQ 模型来延迟消息,但它不存在,因为内存中对此一无所知。
如果您想使用更通用的解决方案,请改用 Redeliver
。您需要将 QuartzIntegration 库与内存中测试工具一起使用,但它会使用该调度程序进行内存中消息重新传递。
您还需要更新 RabbitMQ 总线配置以包含 cfg.UseDelayedExchangeMessageScheduler();
,以便 RabbitMQ 用于消息调度。