如何让 CreateUsingInMemory() 与 MassTransit 一起工作?

How do I get CreateUsingInMemory() to work with MassTransit?

这是我的全部代码。我认为测试应该通过,但它失败了。我已经(未成功)尝试使用一些重载 Consumer.

using MassTransit;
using NUnit.Framework;
using System.Threading.Tasks;

namespace MassTransitTests
{
    public class Message
    {
    }

    public class MessageConsumer : IConsumer<Message>
    {
        public static int ConsumedCount
        {
            get;
            private set;
        }

        public Task Consume(ConsumeContext<Message> context)
        {
            ConsumedCount++;
            return Task.FromResult(0);
        }
    }

    [TestFixture]
    public class MassTransitTest
    {
        [Test]
        public async Task BasicTestAsync()
        {
            // Arrange
            var control = Bus.Factory.CreateUsingInMemory(configure =>
            {
                configure.ReceiveEndpoint("myQueue", endpoint =>
                {
                    endpoint.Consumer<MessageConsumer>();
                });
            });

            // Act
            using (var handle = control.Start())
            {
                await control.Publish(new Message());
                await control.Publish(new Message());
            }

            // Assert
            Assert.That(MessageConsumer.ConsumedCount, Is.EqualTo(2));
        }
    }
}

他们的documentation展示了这一点,这就是我正在做的:

var busControl = Bus.Factory.CreateUsingInMemory(cfg =>
{
    cfg.ReceiveEndpoint("queue_name", ep =>
    {
       //configure the endpoint
    })
});

我在做什么wrong/what我确实需要更改我的 Arrange/Act 才能让我的 Assert 工作?

在深入研究他们的测试后,我发现了我所缺少的东西:

[1] 你需要*到 await BusHandle.Ready,我没有这样做。 *(测试在没有这个的情况下工作 - 至少我第一次 运行 它,但这可能只是一个对我有利的竞争条件......)

[2] 只要总线收到我猜测的消息,对 Publish 的调用显然就会完成 - 而不是当消息的 handlers/consumers 完成它们的工作时。因此,如果您要测试的是处理程序,则需要通知调用代码处理程序已完成。这是执行此操作的一种方法 - 使用 TaskCompletionSource<T>(类似于我在他们的代码库中找到的)。显然,我的线程安全性可能并不完美,而且我的 lock 用法有点像大锤,但这说明了这一点:

using MassTransit;
using NUnit.Framework;
using System.Threading.Tasks;

namespace MassTransitTests
{
    public class Message
    {
    }

    public class MessageConsumer : IConsumer<Message>
    {        
        public static int TargetConsumedCount
        {
            get { return _targetConsumedCount; }
            set
            {
                lock (_lock)
                {
                    _targetConsumedCount = value;
                    CheckTargetReached();
                }
            }
        }

        private static void CheckTargetReached()
        {
            if (_consumedCount >= TargetConsumedCount)
            {
                _targetReached.SetResult(true);
            }
        }

        public static Task<bool> TargetReached { get; private set; }

        private static int _consumedCount;
        private static int _targetConsumedCount;
        private static TaskCompletionSource<bool> _targetReached;
        private static object _lock;

        static MessageConsumer()
        {
            _lock = new object();
            _targetReached = new TaskCompletionSource<bool>();
            TargetReached = _targetReached.Task;
        }

        public Task Consume(ConsumeContext<Message> context)
        {
            lock (_lock)
            {
                _consumedCount++;
                CheckTargetReached();
            }
            return Task.FromResult(0);
        }
    }

    [TestFixture]
    public class MassTransitTest
    {
        [Test]
        public async Task BasicTestAsync()
        {
            // Arrange
            var control = Bus.Factory.CreateUsingInMemory(configure =>
            {
                configure.ReceiveEndpoint("myQueue", endpoint =>
                {
                    endpoint.Consumer<MessageConsumer>();
                });
            });

            using (var handle = control.Start())
            {
                await handle.Ready; // [1]

                // Act
                await control.Publish(new Message());
                await control.Publish(new Message());

                // Assert
                MessageConsumer.TargetConsumedCount = 2;
                await MessageConsumer.TargetReached; // [2]
            }               
        }
    }
}