自动化 RabbitMQ 消费者测试

Automate RabbitMQ consumer testing

我有一个使用 RabbitMQ 客户端接收消息的 .net 微服务,我需要测试以下内容:

1-消费者成功连接到rabbitMq主机。

2- 消费者正在监听队列。

3- 消费者正在成功接收消息。

为了实现上述目标,我创建了一个发送消息的示例应用程序,我正在调试消费者以确保它正在接收消息。

我怎样才能使这个测试自动化?因此将其包含在我的微服务中 CI.

我正在考虑将我的示例应用程序包含在我的 CI 中,这样我就可以触发消息,然后 运行 消费者单元测试等待特定时间然后在收到消息时通过,但是这对我来说这似乎是一种错误的做法,因为测试要等到消息被触发几秒钟后才会开始。

我想到的另一种方法是从单元测试本身触发示例应用程序,但如果示例应用程序无法运行,则会导致服务故障。

是否有任何通过 RabbitMQ 连接的微服务集成测试的最佳实践?

我成功地做了这样的测试。您需要 RabbitMQ 的测试实例、用于发送消息的测试交换器和用于连接接收消息的测试队列。

不要嘲笑一切!

但是,对于 rabbitMQ 的测试消费者、生产者和测试实例,该测试中没有实际的生产代码。

使用测试rabbitMQ实例和实际应用

为了进行有意义的完整测试,我将使用测试 RabbitMQ 实例、交换器和队列,但保留实际应用程序(生产者和消费者)。

我会实现以下场景

  1. 当测试应用程序执行某些测试消息到 rabbitMQ 的操作时

  2. 那么rabbitMQ接收到的消息数量就增加了

  3. 应用程序在收到消息时执行它应该执行的操作

第 1 步和第 3 步是特定于应用程序的。您的应用程序根据某些外部事件(收到 HTTP 消息?计时器事件?)向 rabbitMQ 发送消息。您可以在测试中重现这种情况,因此应用程序将发送消息(以测试 rabbitMQ 实例)。

收到消息后验证应用程序操作的情况相同。应用程序应该在收到消息后做一些可观察的事情。 如果应用程序进行 HTTP 调用 - 那么您可以模拟该 HTTP 端点并验证收到的消息。如果应用程序将消息保存到数据库中 - 您可以使用数据库来查找您的消息。

使用rabbitMQ监控API

第 2 步可以使用 RabbitMQ 监控来实现 API(有一些方法可以查看从队列接收和使用的消息数量 https://www.rabbitmq.com/monitoring.html#rabbitmq-metrics

考虑使用 spring 引导进行健康检查

如果您是 java-based 然后使用 Spring Boot 将大大简化您的问题。您将自动获得 rabbitMQ 连接的健康检查!

有关如何使用 Spring 启动连接到 RabbitMQ 的教程,请参阅 https://spring.io/guides/gs/messaging-rabbitmq/。 Spring 启动应用程序为每个附加的外部资源(数据库、消息传递、jms 等)公开健康信息(使用 HTTP 端点 /health) 有关详细信息,请参阅 https://docs.spring.io/spring-boot/docs/current/reference/html/production-ready-endpoints.html#_auto_configured_healthindicators

如果与 rabbitMQ 的连接断开,那么健康检查(由 org.springframework.boot.actuate.amqp.RabbitHealthIndicator 完成)将 return HTTP 代码 4xx 和 meaninfull json 消息在 JSON 正文中。

你不需要做任何特别的事情来进行健康检查 - 只需使用 org.springframework.boot:spring-boot-starter-amqp 作为 maven/gradle 依赖就足够了。

CI 测试- 来自 src/test 目录

我已经使用集成测试在 src/test 目录中编写了此类测试(连接到 RabbitMQ 的外部测试实例)。如果使用 Spring 启动,最简单的方法是使用测试配置文件,并在 application-test.properties 中有连接的详细信息来测试 RabbitMQ 实例(生产可以使用生产配置文件,而 application-production.properties带有 RabbitMQ 生产实例的文件)。

在最简单的情况下(只需验证与 rabbitMQ 的连接),您只需正常启动应用程序并验证 /health 端点。

在这种情况下,我会执行以下 CI 个步骤

  • 构建(gradle 构建)
  • 一个 运行 单元测试(没有任何外部依赖的测试)
  • 运行 集成测试

CI 测试-外部

上述方法也可以用于部署到测试环境(并连接到测试 rabbitMQ 实例)的应用程序。应用程序启动后,您可以检查 /health 端点以确保它已连接到 rabbitMQ 实例。

如果您让您的应用程序向 rabbitMQ 发送消息,那么您可以观察 rabbbitMQ 指标(使用 rabbitMQ 监控 API)并观察应用程序使用的消息的外部影响。

对于此类测试,您需要从 CI 开始并部署您的应用程序,然后再开始测试。

对于那种情况,我会按照 CI 步骤

  • 构建应用程序的步骤
  • 步骤 运行 src/test 目录中的所有测试(单元、集成)
  • 将应用程序部署到测试环境或启动docker化应用程序的步骤
  • 步骤 运行s 外部测试
  • 对于 docker 化环境,停止 docker 容器的步骤

考虑docker化环境

对于外部测试,您可以 运行 您的应用程序以及 Docker 中的测试 RabbitMQ 实例。您将需要两个 docker 个容器。

对于运行这两张图片,写成docker-compose文件是最合理的。

我已经构建了很多这样的测试。我已经抛出了一些基本代码 GitHub here with .NET Core 2.0.

您将需要一个 RabbitMQ 集群来进行这些自动化测试。每个测试都从消除队列开始,以确保不存在任何消息。来自另一个测试的预先存在的消息将中断当前测试。

我有一个简单的帮手来删除队列。在我的应用程序中,他们总是声明自己的队列,但如果不是你的情况,那么你将不得不再次创建队列以及对任何交换的任何绑定。

public class QueueDestroyer
{
    public static void DeleteQueue(string queueName, string virtualHost)
    {
        var connectionFactory = new ConnectionFactory();
        connectionFactory.HostName = "localhost";
        connectionFactory.UserName = "guest";
        connectionFactory.Password = "guest";
        connectionFactory.VirtualHost = virtualHost;
        var connection = connectionFactory.CreateConnection();
        var channel = connection.CreateModel();
        channel.QueueDelete(queueName);
        connection.Close();
    }
}

我创建了一个非常简单的消费者示例来代表您的微服务。它 运行 在任务中直到被取消。

public class Consumer
{
    private IMessageProcessor _messageProcessor;
    private Task _consumerTask;

    public Consumer(IMessageProcessor messageProcessor)
    {
        _messageProcessor = messageProcessor;
    }

    public void Consume(CancellationToken token, string queueName)
    {
        _consumerTask = Task.Run(() =>
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare(queue: queueName,
                                    durable: false,
                                    exclusive: false,
                                    autoDelete: false,
                                    arguments: null);

                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        _messageProcessor.ProcessMessage(message);
                    };
                    channel.BasicConsume(queue: queueName,
                                        autoAck: false,
                                            consumer: consumer);

                    while (!token.IsCancellationRequested)
                        Thread.Sleep(1000);
                }
            }
        });
    }

    public void WaitForCompletion()
    {
        _consumerTask.Wait();
    }
}

消费者有一个 IMessageProcessor 接口,它将完成处理消息的工作。在我的集成测试中,我创建了一个假的。您可能会为此使用您喜欢的模拟框架。

测试发布者向队列发布消息。

public class TestPublisher
{
    public void Publish(string queueName, string message)
    {
        var factory = new ConnectionFactory() { HostName = "localhost", UserName="guest", Password="guest" };
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            var body = Encoding.UTF8.GetBytes(message);

            channel.BasicPublish(exchange: "",
                                    routingKey: queueName,
                                    basicProperties: null,
                                    body: body);
        }
    }
}

我的示例测试如下所示:

[Fact]
public void If_SendMessageToQueue_ThenConsumerReceiv4es()
{
    // ARRANGE
    QueueDestroyer.DeleteQueue("queueX", "/");
    var cts = new CancellationTokenSource();
    var fake = new FakeProcessor();
    var myMicroService = new Consumer(fake);

    // ACT
    myMicroService.Consume(cts.Token, "queueX");

    var producer = new TestPublisher();
    producer.Publish("queueX", "hello");

    Thread.Sleep(1000); // make sure the consumer will have received the message
    cts.Cancel();

    // ASSERT
    Assert.Equal(1, fake.Messages.Count);
    Assert.Equal("hello", fake.Messages[0]);
}

我的假货是这样的:

public class FakeProcessor : IMessageProcessor
{
    public List<string> Messages { get; set; }

    public FakeProcessor()
    {
        Messages = new List<string>();
    }

    public void ProcessMessage(string message)
    {
        Messages.Add(message);
    }
}

额外的建议是:

  • 如果您可以将随机文本附加到队列并在每次测试中交换名称运行,那么这样做可以避免并发测试相互干扰

  • 如果您的应用程序不这样做,我在代码中也有一些帮助程序用于声明队列、交换和绑定。

  • 编写一个连接杀手 class,它将强制关闭连接并检查您的应用程序是否仍在运行并可以恢复。我有这方面的代码,但不是在 .NET Core 中。只要问我,我可以在 .NET Core 中将其修改为 运行。

  • 一般来说,我认为您应该避免在集成测试中包含其他微服务。例如,如果您从一项服务向另一项服务发送消息并期望返回消息,则创建一个可以模拟预期行为的虚假消费者。如果您收到来自其他服务的消息,请在您的集成测试项目中创建虚假发布者。