Rebus - 运行 单元测试并等待 Rebus 完成所有线程
Rebus - Run unit tests and wait for Rebus to complete all threads
我正在构建一个集成测试,我正在使用 InMemNetwork 来 运行 测试。
在断言之前有一个 Thread.Sleep 调用,但这是一种不可靠的测试方式,它会大大降低我们的测试速度。
我也在做一些集成测试,使用 SagaFixtures 和一个简单的 IBus 实现,运行s 是同步的,但注册处理程序、运行ning 处理程序和延迟消息有点乏味。
有没有一种方法可以等待 Rebus 使用的所有线程,直到它们完成执行,而无需使用诸如 ManualResetEvent(在 Rebus 自己的测试中使用)之类的东西来增加生产代码?
我通常像你一样使用SagaFixture
,然后我使用FakeBus
注入saga处理程序以捕获它们的动作。
虽然我的大部分测试都是简单处理程序的单元测试,但我会经常注入 "real" 服务,例如IThis
和 IThat
的实现转到真实数据库。
虽然我使用 in-mem 传输启动了多个端点,但在某些情况下,我通常会在 InMemNetwork
上实施扩展,以帮助我等待特定事件的发布或其他内容就像那样——它在测试中可能看起来像这样:
var updated = await Network.WaitForNext<WhateverUpdated>(subscriberAddress, timeoutSeconds: 20);
其中 WaitForNext
只是一种扩展方法,它轮询 subscriberAddress
指定的队列以获取下一条消息,并尝试将其反序列化为 WhateverUpdated
.
希望能给大家一些启发:)
对于某些场景,我使用以下方法等待 Rebus 完成所有消息处理。 rebus 端点托管在单独的 exe 中,rebus 文件系统传输用于集成测试(通常是 Azure SB)。集成测试启动了 exe,在每个 exe 中,Rebus 都配置了 0 个 worker,所以它什么都不做。然后在测试中我们有一个 WaitForMessagesProcessed() 方法,它配置了一些 worker 和块,直到没有更多的消息需要处理。
代码大致如下:
public class MessageProcessor() {
private string queueName;
private int messagesWaitingForProcessing;
private IBus bus;
public MessageProcessor(string queueName) {
this.queueName = queueName;
this.bus = Configure.With(adapter)
.Transport(t => t.UseFileSystem(@"c:\baseDirectory", this.queueName))
.Options(o =>
{
o.SetNumberOfWorkers(0);
})
.Events(e =>
{
e.BeforeMessageSent += (thebus, headers, message, context) =>
{
// When sending to itself, the message is not queued on the network.
var m = context.Load<Rebus.Pipeline.Send.DestinationAddresses>();
if (m.Any(t => t == this.queueName))
this.messagesWaitingForProcessing++;
};
e.AfterMessageHandled += (thebus, headers, message, context, args) =>
{
this.messagesWaitingForProcessing--;
};
})
.Start();
}
public async Task WaitForMessagesProcessed()
{
this.DetermineMessagesWaitingForProcessing();
while (this.messagesWaitingForProcessing > 0)
{
this.bus.Advanced.Workers.SetNumberOfWorkers(2);
while (this.messagesWaitingForProcessing > 0)
{
await Task.Delay(100);
}
this.bus.Advanced.Workers.SetNumberOfWorkers(0);
this.DetermineMessagesWaitingForProcessing();
}
}
public void DetermineMessagesWaitingForProcessing() {
this.messagesWaitingForProcessing = Directory.GetFiles(GetDirectoryForQueueNamed(this.queueName), "*.rebusmessage.json").Count();
}
private static string GetDirectoryForQueueNamed(string queueName)
{
return Path.Combine(this.baseDiretory, queueName);
}
}
测试可能像
[TestMethod]
public void Test() {
var endpoint1 = LaunchExe("1");
var endpoint2 = LaunchExe("2");
endPoint1.DoSomeAction();
endPoint1.WaitForMessagesProcessed();
Assert.AreEqual("expectation", endPoint1.Query());
}
我正在构建一个集成测试,我正在使用 InMemNetwork 来 运行 测试。
在断言之前有一个 Thread.Sleep 调用,但这是一种不可靠的测试方式,它会大大降低我们的测试速度。
我也在做一些集成测试,使用 SagaFixtures 和一个简单的 IBus 实现,运行s 是同步的,但注册处理程序、运行ning 处理程序和延迟消息有点乏味。
有没有一种方法可以等待 Rebus 使用的所有线程,直到它们完成执行,而无需使用诸如 ManualResetEvent(在 Rebus 自己的测试中使用)之类的东西来增加生产代码?
我通常像你一样使用SagaFixture
,然后我使用FakeBus
注入saga处理程序以捕获它们的动作。
虽然我的大部分测试都是简单处理程序的单元测试,但我会经常注入 "real" 服务,例如IThis
和 IThat
的实现转到真实数据库。
虽然我使用 in-mem 传输启动了多个端点,但在某些情况下,我通常会在 InMemNetwork
上实施扩展,以帮助我等待特定事件的发布或其他内容就像那样——它在测试中可能看起来像这样:
var updated = await Network.WaitForNext<WhateverUpdated>(subscriberAddress, timeoutSeconds: 20);
其中 WaitForNext
只是一种扩展方法,它轮询 subscriberAddress
指定的队列以获取下一条消息,并尝试将其反序列化为 WhateverUpdated
.
希望能给大家一些启发:)
对于某些场景,我使用以下方法等待 Rebus 完成所有消息处理。 rebus 端点托管在单独的 exe 中,rebus 文件系统传输用于集成测试(通常是 Azure SB)。集成测试启动了 exe,在每个 exe 中,Rebus 都配置了 0 个 worker,所以它什么都不做。然后在测试中我们有一个 WaitForMessagesProcessed() 方法,它配置了一些 worker 和块,直到没有更多的消息需要处理。
代码大致如下:
public class MessageProcessor() {
private string queueName;
private int messagesWaitingForProcessing;
private IBus bus;
public MessageProcessor(string queueName) {
this.queueName = queueName;
this.bus = Configure.With(adapter)
.Transport(t => t.UseFileSystem(@"c:\baseDirectory", this.queueName))
.Options(o =>
{
o.SetNumberOfWorkers(0);
})
.Events(e =>
{
e.BeforeMessageSent += (thebus, headers, message, context) =>
{
// When sending to itself, the message is not queued on the network.
var m = context.Load<Rebus.Pipeline.Send.DestinationAddresses>();
if (m.Any(t => t == this.queueName))
this.messagesWaitingForProcessing++;
};
e.AfterMessageHandled += (thebus, headers, message, context, args) =>
{
this.messagesWaitingForProcessing--;
};
})
.Start();
}
public async Task WaitForMessagesProcessed()
{
this.DetermineMessagesWaitingForProcessing();
while (this.messagesWaitingForProcessing > 0)
{
this.bus.Advanced.Workers.SetNumberOfWorkers(2);
while (this.messagesWaitingForProcessing > 0)
{
await Task.Delay(100);
}
this.bus.Advanced.Workers.SetNumberOfWorkers(0);
this.DetermineMessagesWaitingForProcessing();
}
}
public void DetermineMessagesWaitingForProcessing() {
this.messagesWaitingForProcessing = Directory.GetFiles(GetDirectoryForQueueNamed(this.queueName), "*.rebusmessage.json").Count();
}
private static string GetDirectoryForQueueNamed(string queueName)
{
return Path.Combine(this.baseDiretory, queueName);
}
}
测试可能像
[TestMethod]
public void Test() {
var endpoint1 = LaunchExe("1");
var endpoint2 = LaunchExe("2");
endPoint1.DoSomeAction();
endPoint1.WaitForMessagesProcessed();
Assert.AreEqual("expectation", endPoint1.Query());
}