如何让 CreateUsingInMemory() 与 MassTransit 一起工作?
How do I get CreateUsingInMemory() to work with MassTransit?
这是我的全部代码。我认为测试应该通过,但它失败了。我已经(未成功)尝试使用一些重载 Consumer
.
using MassTransit;
using NUnit.Framework;
using System.Threading.Tasks;
namespace MassTransitTests
{
public class Message
{
}
public class MessageConsumer : IConsumer<Message>
{
public static int ConsumedCount
{
get;
private set;
}
public Task Consume(ConsumeContext<Message> context)
{
ConsumedCount++;
return Task.FromResult(0);
}
}
[TestFixture]
public class MassTransitTest
{
[Test]
public async Task BasicTestAsync()
{
// Arrange
var control = Bus.Factory.CreateUsingInMemory(configure =>
{
configure.ReceiveEndpoint("myQueue", endpoint =>
{
endpoint.Consumer<MessageConsumer>();
});
});
// Act
using (var handle = control.Start())
{
await control.Publish(new Message());
await control.Publish(new Message());
}
// Assert
Assert.That(MessageConsumer.ConsumedCount, Is.EqualTo(2));
}
}
}
他们的documentation展示了这一点,这就是我正在做的:
var busControl = Bus.Factory.CreateUsingInMemory(cfg =>
{
cfg.ReceiveEndpoint("queue_name", ep =>
{
//configure the endpoint
})
});
我在做什么wrong/what我确实需要更改我的 Arrange/Act 才能让我的 Assert 工作?
在深入研究他们的测试后,我发现了我所缺少的东西:
[1] 你需要*到 await
BusHandle.Ready
,我没有这样做。 *(测试在没有这个的情况下工作 - 至少我第一次 运行 它,但这可能只是一个对我有利的竞争条件......)
[2] 只要总线收到我猜测的消息,对 Publish
的调用显然就会完成 - 而不是当消息的 handlers/consumers 完成它们的工作时。因此,如果您要测试的是处理程序,则需要通知调用代码处理程序已完成。这是执行此操作的一种方法 - 使用 TaskCompletionSource<T>
(类似于我在他们的代码库中找到的)。显然,我的线程安全性可能并不完美,而且我的 lock
用法有点像大锤,但这说明了这一点:
using MassTransit;
using NUnit.Framework;
using System.Threading.Tasks;
namespace MassTransitTests
{
public class Message
{
}
public class MessageConsumer : IConsumer<Message>
{
public static int TargetConsumedCount
{
get { return _targetConsumedCount; }
set
{
lock (_lock)
{
_targetConsumedCount = value;
CheckTargetReached();
}
}
}
private static void CheckTargetReached()
{
if (_consumedCount >= TargetConsumedCount)
{
_targetReached.SetResult(true);
}
}
public static Task<bool> TargetReached { get; private set; }
private static int _consumedCount;
private static int _targetConsumedCount;
private static TaskCompletionSource<bool> _targetReached;
private static object _lock;
static MessageConsumer()
{
_lock = new object();
_targetReached = new TaskCompletionSource<bool>();
TargetReached = _targetReached.Task;
}
public Task Consume(ConsumeContext<Message> context)
{
lock (_lock)
{
_consumedCount++;
CheckTargetReached();
}
return Task.FromResult(0);
}
}
[TestFixture]
public class MassTransitTest
{
[Test]
public async Task BasicTestAsync()
{
// Arrange
var control = Bus.Factory.CreateUsingInMemory(configure =>
{
configure.ReceiveEndpoint("myQueue", endpoint =>
{
endpoint.Consumer<MessageConsumer>();
});
});
using (var handle = control.Start())
{
await handle.Ready; // [1]
// Act
await control.Publish(new Message());
await control.Publish(new Message());
// Assert
MessageConsumer.TargetConsumedCount = 2;
await MessageConsumer.TargetReached; // [2]
}
}
}
}
这是我的全部代码。我认为测试应该通过,但它失败了。我已经(未成功)尝试使用一些重载 Consumer
.
using MassTransit;
using NUnit.Framework;
using System.Threading.Tasks;
namespace MassTransitTests
{
public class Message
{
}
public class MessageConsumer : IConsumer<Message>
{
public static int ConsumedCount
{
get;
private set;
}
public Task Consume(ConsumeContext<Message> context)
{
ConsumedCount++;
return Task.FromResult(0);
}
}
[TestFixture]
public class MassTransitTest
{
[Test]
public async Task BasicTestAsync()
{
// Arrange
var control = Bus.Factory.CreateUsingInMemory(configure =>
{
configure.ReceiveEndpoint("myQueue", endpoint =>
{
endpoint.Consumer<MessageConsumer>();
});
});
// Act
using (var handle = control.Start())
{
await control.Publish(new Message());
await control.Publish(new Message());
}
// Assert
Assert.That(MessageConsumer.ConsumedCount, Is.EqualTo(2));
}
}
}
他们的documentation展示了这一点,这就是我正在做的:
var busControl = Bus.Factory.CreateUsingInMemory(cfg =>
{
cfg.ReceiveEndpoint("queue_name", ep =>
{
//configure the endpoint
})
});
我在做什么wrong/what我确实需要更改我的 Arrange/Act 才能让我的 Assert 工作?
在深入研究他们的测试后,我发现了我所缺少的东西:
[1] 你需要*到 await
BusHandle.Ready
,我没有这样做。 *(测试在没有这个的情况下工作 - 至少我第一次 运行 它,但这可能只是一个对我有利的竞争条件......)
[2] 只要总线收到我猜测的消息,对 Publish
的调用显然就会完成 - 而不是当消息的 handlers/consumers 完成它们的工作时。因此,如果您要测试的是处理程序,则需要通知调用代码处理程序已完成。这是执行此操作的一种方法 - 使用 TaskCompletionSource<T>
(类似于我在他们的代码库中找到的)。显然,我的线程安全性可能并不完美,而且我的 lock
用法有点像大锤,但这说明了这一点:
using MassTransit;
using NUnit.Framework;
using System.Threading.Tasks;
namespace MassTransitTests
{
public class Message
{
}
public class MessageConsumer : IConsumer<Message>
{
public static int TargetConsumedCount
{
get { return _targetConsumedCount; }
set
{
lock (_lock)
{
_targetConsumedCount = value;
CheckTargetReached();
}
}
}
private static void CheckTargetReached()
{
if (_consumedCount >= TargetConsumedCount)
{
_targetReached.SetResult(true);
}
}
public static Task<bool> TargetReached { get; private set; }
private static int _consumedCount;
private static int _targetConsumedCount;
private static TaskCompletionSource<bool> _targetReached;
private static object _lock;
static MessageConsumer()
{
_lock = new object();
_targetReached = new TaskCompletionSource<bool>();
TargetReached = _targetReached.Task;
}
public Task Consume(ConsumeContext<Message> context)
{
lock (_lock)
{
_consumedCount++;
CheckTargetReached();
}
return Task.FromResult(0);
}
}
[TestFixture]
public class MassTransitTest
{
[Test]
public async Task BasicTestAsync()
{
// Arrange
var control = Bus.Factory.CreateUsingInMemory(configure =>
{
configure.ReceiveEndpoint("myQueue", endpoint =>
{
endpoint.Consumer<MessageConsumer>();
});
});
using (var handle = control.Start())
{
await handle.Ready; // [1]
// Act
await control.Publish(new Message());
await control.Publish(new Message());
// Assert
MessageConsumer.TargetConsumedCount = 2;
await MessageConsumer.TargetReached; // [2]
}
}
}
}