Rebus - 运行 单元测试并等待 Rebus 完成所有线程

Rebus - Run unit tests and wait for Rebus to complete all threads

我正在构建一个集成测试,我正在使用 InMemNetwork 来 运行 测试。

在断言之前有一个 Thread.Sleep 调用,但这是一种不可靠的测试方式,它会大大降低我们的测试速度。

我也在做一些集成测试,使用 SagaFixtures 和一个简单的 IBus 实现,运行s 是同步的,但注册处理程序、运行ning 处理程序和延迟消息有点乏味。

有没有一种方法可以等待 Rebus 使用的所有线程,直到它们完成执行,而无需使用诸如 ManualResetEvent(在 Rebus 自己的测试中使用)之类的东西来增加生产代码?

我通常像你一样使用SagaFixture,然后我使用FakeBus注入saga处理程序以捕获它们的动作。

虽然我的大部分测试都是简单处理程序的单元测试,但我会经常注入 "real" 服务,例如IThisIThat 的实现转到真实数据库。

虽然我使用 in-mem 传输启动了多个端点,但在某些情况下,我通常会在 InMemNetwork 上实施扩展,以帮助我等待特定事件的发布或其他内容就像那样——它在测试中可能看起来像这样:

var updated = await Network.WaitForNext<WhateverUpdated>(subscriberAddress, timeoutSeconds: 20);

其中 WaitForNext 只是一种扩展方法,它轮询 subscriberAddress 指定的队列以获取下一条消息,并尝试将其反序列化为 WhateverUpdated.

希望能给大家一些启发:)

对于某些场景,我使用以下方法等待 Rebus 完成所有消息处理。 rebus 端点托管在单独的 exe 中,rebus 文件系统传输用于集成测试(通常是 Azure SB)。集成测试启动了 exe,在每个 exe 中,Rebus 都配置了 0 个 worker,所以它什么都不做。然后在测试中我们有一个 WaitForMessagesProcessed() 方法,它配置了一些 worker 和块,直到没有更多的消息需要处理。

代码大致如下:

public class MessageProcessor() {
    private string queueName;
    private int messagesWaitingForProcessing;
    private IBus bus;

    public MessageProcessor(string queueName) {
      this.queueName = queueName;

      this.bus = Configure.With(adapter)
        .Transport(t => t.UseFileSystem(@"c:\baseDirectory", this.queueName))
        .Options(o =>
        {
            o.SetNumberOfWorkers(0);
        })
        .Events(e =>
        {
            e.BeforeMessageSent += (thebus, headers, message, context) =>
            {
                // When sending to itself, the message is not queued on the network.
                var m = context.Load<Rebus.Pipeline.Send.DestinationAddresses>();
                if (m.Any(t => t == this.queueName))
                    this.messagesWaitingForProcessing++;
            };

            e.AfterMessageHandled += (thebus, headers, message, context, args) =>
            {
                this.messagesWaitingForProcessing--;
            };
        })
        .Start();
    }

    public async Task WaitForMessagesProcessed()
    {
        this.DetermineMessagesWaitingForProcessing();
        while (this.messagesWaitingForProcessing > 0)
        {
            this.bus.Advanced.Workers.SetNumberOfWorkers(2);

            while (this.messagesWaitingForProcessing > 0)
            {
                await Task.Delay(100);
            }

            this.bus.Advanced.Workers.SetNumberOfWorkers(0);

            this.DetermineMessagesWaitingForProcessing();
        }
    }

    public void DetermineMessagesWaitingForProcessing() {
        this.messagesWaitingForProcessing = Directory.GetFiles(GetDirectoryForQueueNamed(this.queueName),      "*.rebusmessage.json").Count();
    }

    private static string GetDirectoryForQueueNamed(string queueName)
    {
        return Path.Combine(this.baseDiretory, queueName);
    }
}

测试可能像

[TestMethod]
public void Test() {
   var endpoint1 = LaunchExe("1");
   var endpoint2 = LaunchExe("2");

   endPoint1.DoSomeAction();

   endPoint1.WaitForMessagesProcessed();

   Assert.AreEqual("expectation", endPoint1.Query());
}