无法使用内存中的 MassTransit 从单元测试发送消息

Cannot send messages from unit test using in-memory MassTransit

我希望能够使用 MassTransit 的内存中实现将消息从单元测试发送到 ASP.Net 核心应用程序。内存中实现很重要,因为实际应用程序使用 ActiveMQ,但我们希望在测试期间用内存中实现替换它。

我开始使用 .NET 5 从 VS2019 创建一个新的 ASP.Net Core API 项目,并将 MassTransit 添加到 Startup,如下所示:

public void ConfigureServices(IServiceCollection services)
{
    services.AddControllers();
    services.AddSwaggerGen(c =>
    {
        c.SwaggerDoc("v1", new OpenApiInfo { Title = "WebApplicationWithMassTransit", Version = "v1" });
    });

    services.AddMassTransit(x =>
    {
        x.AddConsumer<MessageConsumer>();
        x.UsingInMemory((context, cfg) =>
        {
            cfg.ConfigureEndpoints(context);
        });
    });
    services.AddMassTransitHostedService(true);
}

MessageConsumer 看起来像这样:

public class MessageConsumer : IConsumer<Message>
{
    readonly ILogger<MessageConsumer> _logger;

    public MessageConsumer(ILogger<MessageConsumer> logger)
    {
        _logger = logger;
    }

    public Task Consume(ConsumeContext<Message> context)
    {
        _logger.LogInformation("Received Text: {Text}", context.Message.Text);
        return Task.CompletedTask;
    }
}

Message 看起来像这样:

public class Message
{
    public string Text { get; set; }
}

当我将 IPublishEndpoint 注入 WeatherForecastController 并在 GET 完成后发送消息时使用:

await _endpoint.Publish(new Message {Text = "Test message"});

一切如预期。

但现在我想从单元测试向应用程序发送消息。所以我创建了一个XUnit测试项目并引用了ASP.Net API项目。测试 class 如下所示:

public class UnitTest1
    : IClassFixture<WebApplicationFactory<Startup>>
{
    private readonly WebApplicationFactory<Startup> _factory;

    public UnitTest1(WebApplicationFactory<Startup> factory)
    {
       _factory = factory;
    }

    [Fact]
    public async Task SendMassTransitMessage()
    {
        var client = _factory.CreateClient();

        using var scope = _factory.Services.CreateScope();
        var bus = scope.ServiceProvider.GetRequiredService<IBus>();

        await bus.Publish(new Message {Text = "Test message from test"});
    }
}

当我运行测试时,它成功了,但是没有调用消费者。在调试 window 中,我得到以下堆栈跟踪:

MassTransit: Information: Configured endpoint Message, Consumer: WebApplicationWithMassTransit.MessageConsumer
MassTransit: Information: Bus started: loopback://localhost/
MassTransit.ReceiveTransport: Error: R-FAULT loopback://localhost/Message af0f0000-50b6-c8f7-16d4-08d9affc5a91 WebApplicationWithMassTransit.Message WebApplicationWithMassTransit.MessageConsumer(00:00:00.0072326)

System.ObjectDisposedException: Cannot access a disposed object.
Object name: 'IServiceProvider'.
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.ThrowHelper.ThrowObjectDisposedException()
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.ServiceProviderEngineScope.GetService(Type serviceType)
   at Microsoft.Extensions.DependencyInjection.ServiceProviderServiceExtensions.GetRequiredService(IServiceProvider provider, Type serviceType)
   at Microsoft.Extensions.DependencyInjection.ServiceProviderServiceExtensions.GetRequiredService[T](IServiceProvider provider)
   at Microsoft.Extensions.DependencyInjection.ServiceProviderServiceExtensions.CreateScope(IServiceProvider provider)
   at MassTransit.ExtensionsDependencyInjectionIntegration.ScopeProviders.DependencyInjectionConsumerScopeProvider.MassTransit.Scoping.IConsumerScopeProvider.GetScope[TConsumer,T](ConsumeContext`1 context)
   at MassTransit.Scoping.ScopeConsumerFactory`1.Send[TMessage](ConsumeContext`1 context, IPipe`1 next)
   at MassTransit.Pipeline.Filters.ConsumerMessageFilter`2.GreenPipes.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext`1 context, IPipe`1 next)
MassTransit: Information: Bus stopped: loopback://localhost/

是否可以从这样的单元测试中发送消息?我做错了什么?我尝试以多种方式解析 IBusIBusControlIPublishEndpoint,创建范围和不创建范围。

您正在等待发布,但此时,测试退出并开始拆除应用程序。您需要等到消费者完成,因为它是异步调用的。使用内存中测试工具时,可以使用监视器等到没有总线 activity,但我不知道在上面的场景中你会怎么做。

您期望如何断言消费者实际完成了工作?

为了证明这一点,添加一个简单的await Task.Delay(1000)并查看消费者是否执行。