自动化 RabbitMQ 消费者测试
Automate RabbitMQ consumer testing
我有一个使用 RabbitMQ 客户端接收消息的 .net 微服务,我需要测试以下内容:
1-消费者成功连接到rabbitMq主机。
2- 消费者正在监听队列。
3- 消费者正在成功接收消息。
为了实现上述目标,我创建了一个发送消息的示例应用程序,我正在调试消费者以确保它正在接收消息。
我怎样才能使这个测试自动化?因此将其包含在我的微服务中 CI.
我正在考虑将我的示例应用程序包含在我的 CI 中,这样我就可以触发消息,然后 运行 消费者单元测试等待特定时间然后在收到消息时通过,但是这对我来说这似乎是一种错误的做法,因为测试要等到消息被触发几秒钟后才会开始。
我想到的另一种方法是从单元测试本身触发示例应用程序,但如果示例应用程序无法运行,则会导致服务故障。
是否有任何通过 RabbitMQ 连接的微服务集成测试的最佳实践?
我成功地做了这样的测试。您需要 RabbitMQ 的测试实例、用于发送消息的测试交换器和用于连接接收消息的测试队列。
不要嘲笑一切!
但是,对于 rabbitMQ 的测试消费者、生产者和测试实例,该测试中没有实际的生产代码。
使用测试rabbitMQ实例和实际应用
为了进行有意义的完整测试,我将使用测试 RabbitMQ 实例、交换器和队列,但保留实际应用程序(生产者和消费者)。
我会实现以下场景
当测试应用程序执行某些测试消息到 rabbitMQ 的操作时
那么rabbitMQ接收到的消息数量就增加了
应用程序在收到消息时执行它应该执行的操作
第 1 步和第 3 步是特定于应用程序的。您的应用程序根据某些外部事件(收到 HTTP 消息?计时器事件?)向 rabbitMQ 发送消息。您可以在测试中重现这种情况,因此应用程序将发送消息(以测试 rabbitMQ 实例)。
收到消息后验证应用程序操作的情况相同。应用程序应该在收到消息后做一些可观察的事情。
如果应用程序进行 HTTP 调用 - 那么您可以模拟该 HTTP 端点并验证收到的消息。如果应用程序将消息保存到数据库中 - 您可以使用数据库来查找您的消息。
使用rabbitMQ监控API
第 2 步可以使用 RabbitMQ 监控来实现 API(有一些方法可以查看从队列接收和使用的消息数量 https://www.rabbitmq.com/monitoring.html#rabbitmq-metrics)
考虑使用 spring 引导进行健康检查
如果您是 java-based 然后使用 Spring Boot 将大大简化您的问题。您将自动获得 rabbitMQ 连接的健康检查!
有关如何使用 Spring 启动连接到 RabbitMQ 的教程,请参阅 https://spring.io/guides/gs/messaging-rabbitmq/。
Spring 启动应用程序为每个附加的外部资源(数据库、消息传递、jms 等)公开健康信息(使用 HTTP 端点 /health)
有关详细信息,请参阅 https://docs.spring.io/spring-boot/docs/current/reference/html/production-ready-endpoints.html#_auto_configured_healthindicators。
如果与 rabbitMQ 的连接断开,那么健康检查(由 org.springframework.boot.actuate.amqp.RabbitHealthIndicator 完成)将 return HTTP 代码 4xx 和 meaninfull json 消息在 JSON 正文中。
你不需要做任何特别的事情来进行健康检查 - 只需使用 org.springframework.boot:spring-boot-starter-amqp 作为 maven/gradle 依赖就足够了。
CI 测试- 来自 src/test 目录
我已经使用集成测试在 src/test 目录中编写了此类测试(连接到 RabbitMQ 的外部测试实例)。如果使用 Spring 启动,最简单的方法是使用测试配置文件,并在 application-test.properties 中有连接的详细信息来测试 RabbitMQ 实例(生产可以使用生产配置文件,而 application-production.properties带有 RabbitMQ 生产实例的文件)。
在最简单的情况下(只需验证与 rabbitMQ 的连接),您只需正常启动应用程序并验证 /health 端点。
在这种情况下,我会执行以下 CI 个步骤
- 构建(gradle 构建)
- 一个 运行 单元测试(没有任何外部依赖的测试)
- 运行 集成测试
CI 测试-外部
上述方法也可以用于部署到测试环境(并连接到测试 rabbitMQ 实例)的应用程序。应用程序启动后,您可以检查 /health 端点以确保它已连接到 rabbitMQ 实例。
如果您让您的应用程序向 rabbitMQ 发送消息,那么您可以观察 rabbbitMQ 指标(使用 rabbitMQ 监控 API)并观察应用程序使用的消息的外部影响。
对于此类测试,您需要从 CI 开始并部署您的应用程序,然后再开始测试。
对于那种情况,我会按照 CI 步骤
- 构建应用程序的步骤
- 步骤 运行 src/test 目录中的所有测试(单元、集成)
- 将应用程序部署到测试环境或启动docker化应用程序的步骤
- 步骤 运行s 外部测试
- 对于 docker 化环境,停止 docker 容器的步骤
考虑docker化环境
对于外部测试,您可以 运行 您的应用程序以及 Docker 中的测试 RabbitMQ 实例。您将需要两个 docker 个容器。
- 一个有申请
- 一个与 rabbitMQ 。 rabbitmq https://hub.docker.com/_/rabbitmq/ 有官方 docker 镜像,非常好用
对于运行这两张图片,写成docker-compose文件是最合理的。
我已经构建了很多这样的测试。我已经抛出了一些基本代码
GitHub here with .NET Core 2.0.
您将需要一个 RabbitMQ 集群来进行这些自动化测试。每个测试都从消除队列开始,以确保不存在任何消息。来自另一个测试的预先存在的消息将中断当前测试。
我有一个简单的帮手来删除队列。在我的应用程序中,他们总是声明自己的队列,但如果不是你的情况,那么你将不得不再次创建队列以及对任何交换的任何绑定。
public class QueueDestroyer
{
public static void DeleteQueue(string queueName, string virtualHost)
{
var connectionFactory = new ConnectionFactory();
connectionFactory.HostName = "localhost";
connectionFactory.UserName = "guest";
connectionFactory.Password = "guest";
connectionFactory.VirtualHost = virtualHost;
var connection = connectionFactory.CreateConnection();
var channel = connection.CreateModel();
channel.QueueDelete(queueName);
connection.Close();
}
}
我创建了一个非常简单的消费者示例来代表您的微服务。它 运行 在任务中直到被取消。
public class Consumer
{
private IMessageProcessor _messageProcessor;
private Task _consumerTask;
public Consumer(IMessageProcessor messageProcessor)
{
_messageProcessor = messageProcessor;
}
public void Consume(CancellationToken token, string queueName)
{
_consumerTask = Task.Run(() =>
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: queueName,
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
_messageProcessor.ProcessMessage(message);
};
channel.BasicConsume(queue: queueName,
autoAck: false,
consumer: consumer);
while (!token.IsCancellationRequested)
Thread.Sleep(1000);
}
}
});
}
public void WaitForCompletion()
{
_consumerTask.Wait();
}
}
消费者有一个 IMessageProcessor 接口,它将完成处理消息的工作。在我的集成测试中,我创建了一个假的。您可能会为此使用您喜欢的模拟框架。
测试发布者向队列发布消息。
public class TestPublisher
{
public void Publish(string queueName, string message)
{
var factory = new ConnectionFactory() { HostName = "localhost", UserName="guest", Password="guest" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
routingKey: queueName,
basicProperties: null,
body: body);
}
}
}
我的示例测试如下所示:
[Fact]
public void If_SendMessageToQueue_ThenConsumerReceiv4es()
{
// ARRANGE
QueueDestroyer.DeleteQueue("queueX", "/");
var cts = new CancellationTokenSource();
var fake = new FakeProcessor();
var myMicroService = new Consumer(fake);
// ACT
myMicroService.Consume(cts.Token, "queueX");
var producer = new TestPublisher();
producer.Publish("queueX", "hello");
Thread.Sleep(1000); // make sure the consumer will have received the message
cts.Cancel();
// ASSERT
Assert.Equal(1, fake.Messages.Count);
Assert.Equal("hello", fake.Messages[0]);
}
我的假货是这样的:
public class FakeProcessor : IMessageProcessor
{
public List<string> Messages { get; set; }
public FakeProcessor()
{
Messages = new List<string>();
}
public void ProcessMessage(string message)
{
Messages.Add(message);
}
}
额外的建议是:
如果您可以将随机文本附加到队列并在每次测试中交换名称运行,那么这样做可以避免并发测试相互干扰
如果您的应用程序不这样做,我在代码中也有一些帮助程序用于声明队列、交换和绑定。
编写一个连接杀手 class,它将强制关闭连接并检查您的应用程序是否仍在运行并可以恢复。我有这方面的代码,但不是在 .NET Core 中。只要问我,我可以在 .NET Core 中将其修改为 运行。
一般来说,我认为您应该避免在集成测试中包含其他微服务。例如,如果您从一项服务向另一项服务发送消息并期望返回消息,则创建一个可以模拟预期行为的虚假消费者。如果您收到来自其他服务的消息,请在您的集成测试项目中创建虚假发布者。
我有一个使用 RabbitMQ 客户端接收消息的 .net 微服务,我需要测试以下内容:
1-消费者成功连接到rabbitMq主机。
2- 消费者正在监听队列。
3- 消费者正在成功接收消息。
为了实现上述目标,我创建了一个发送消息的示例应用程序,我正在调试消费者以确保它正在接收消息。
我怎样才能使这个测试自动化?因此将其包含在我的微服务中 CI.
我正在考虑将我的示例应用程序包含在我的 CI 中,这样我就可以触发消息,然后 运行 消费者单元测试等待特定时间然后在收到消息时通过,但是这对我来说这似乎是一种错误的做法,因为测试要等到消息被触发几秒钟后才会开始。
我想到的另一种方法是从单元测试本身触发示例应用程序,但如果示例应用程序无法运行,则会导致服务故障。
是否有任何通过 RabbitMQ 连接的微服务集成测试的最佳实践?
我成功地做了这样的测试。您需要 RabbitMQ 的测试实例、用于发送消息的测试交换器和用于连接接收消息的测试队列。
不要嘲笑一切!
但是,对于 rabbitMQ 的测试消费者、生产者和测试实例,该测试中没有实际的生产代码。
使用测试rabbitMQ实例和实际应用
为了进行有意义的完整测试,我将使用测试 RabbitMQ 实例、交换器和队列,但保留实际应用程序(生产者和消费者)。
我会实现以下场景
当测试应用程序执行某些测试消息到 rabbitMQ 的操作时
那么rabbitMQ接收到的消息数量就增加了
应用程序在收到消息时执行它应该执行的操作
第 1 步和第 3 步是特定于应用程序的。您的应用程序根据某些外部事件(收到 HTTP 消息?计时器事件?)向 rabbitMQ 发送消息。您可以在测试中重现这种情况,因此应用程序将发送消息(以测试 rabbitMQ 实例)。
收到消息后验证应用程序操作的情况相同。应用程序应该在收到消息后做一些可观察的事情。 如果应用程序进行 HTTP 调用 - 那么您可以模拟该 HTTP 端点并验证收到的消息。如果应用程序将消息保存到数据库中 - 您可以使用数据库来查找您的消息。
使用rabbitMQ监控API
第 2 步可以使用 RabbitMQ 监控来实现 API(有一些方法可以查看从队列接收和使用的消息数量 https://www.rabbitmq.com/monitoring.html#rabbitmq-metrics)
考虑使用 spring 引导进行健康检查
如果您是 java-based 然后使用 Spring Boot 将大大简化您的问题。您将自动获得 rabbitMQ 连接的健康检查!
有关如何使用 Spring 启动连接到 RabbitMQ 的教程,请参阅 https://spring.io/guides/gs/messaging-rabbitmq/。 Spring 启动应用程序为每个附加的外部资源(数据库、消息传递、jms 等)公开健康信息(使用 HTTP 端点 /health) 有关详细信息,请参阅 https://docs.spring.io/spring-boot/docs/current/reference/html/production-ready-endpoints.html#_auto_configured_healthindicators。
如果与 rabbitMQ 的连接断开,那么健康检查(由 org.springframework.boot.actuate.amqp.RabbitHealthIndicator 完成)将 return HTTP 代码 4xx 和 meaninfull json 消息在 JSON 正文中。
你不需要做任何特别的事情来进行健康检查 - 只需使用 org.springframework.boot:spring-boot-starter-amqp 作为 maven/gradle 依赖就足够了。
CI 测试- 来自 src/test 目录
我已经使用集成测试在 src/test 目录中编写了此类测试(连接到 RabbitMQ 的外部测试实例)。如果使用 Spring 启动,最简单的方法是使用测试配置文件,并在 application-test.properties 中有连接的详细信息来测试 RabbitMQ 实例(生产可以使用生产配置文件,而 application-production.properties带有 RabbitMQ 生产实例的文件)。
在最简单的情况下(只需验证与 rabbitMQ 的连接),您只需正常启动应用程序并验证 /health 端点。
在这种情况下,我会执行以下 CI 个步骤
- 构建(gradle 构建)
- 一个 运行 单元测试(没有任何外部依赖的测试)
- 运行 集成测试
CI 测试-外部
上述方法也可以用于部署到测试环境(并连接到测试 rabbitMQ 实例)的应用程序。应用程序启动后,您可以检查 /health 端点以确保它已连接到 rabbitMQ 实例。
如果您让您的应用程序向 rabbitMQ 发送消息,那么您可以观察 rabbbitMQ 指标(使用 rabbitMQ 监控 API)并观察应用程序使用的消息的外部影响。
对于此类测试,您需要从 CI 开始并部署您的应用程序,然后再开始测试。
对于那种情况,我会按照 CI 步骤
- 构建应用程序的步骤
- 步骤 运行 src/test 目录中的所有测试(单元、集成)
- 将应用程序部署到测试环境或启动docker化应用程序的步骤
- 步骤 运行s 外部测试
- 对于 docker 化环境,停止 docker 容器的步骤
考虑docker化环境
对于外部测试,您可以 运行 您的应用程序以及 Docker 中的测试 RabbitMQ 实例。您将需要两个 docker 个容器。
- 一个有申请
- 一个与 rabbitMQ 。 rabbitmq https://hub.docker.com/_/rabbitmq/ 有官方 docker 镜像,非常好用
对于运行这两张图片,写成docker-compose文件是最合理的。
我已经构建了很多这样的测试。我已经抛出了一些基本代码 GitHub here with .NET Core 2.0.
您将需要一个 RabbitMQ 集群来进行这些自动化测试。每个测试都从消除队列开始,以确保不存在任何消息。来自另一个测试的预先存在的消息将中断当前测试。
我有一个简单的帮手来删除队列。在我的应用程序中,他们总是声明自己的队列,但如果不是你的情况,那么你将不得不再次创建队列以及对任何交换的任何绑定。
public class QueueDestroyer
{
public static void DeleteQueue(string queueName, string virtualHost)
{
var connectionFactory = new ConnectionFactory();
connectionFactory.HostName = "localhost";
connectionFactory.UserName = "guest";
connectionFactory.Password = "guest";
connectionFactory.VirtualHost = virtualHost;
var connection = connectionFactory.CreateConnection();
var channel = connection.CreateModel();
channel.QueueDelete(queueName);
connection.Close();
}
}
我创建了一个非常简单的消费者示例来代表您的微服务。它 运行 在任务中直到被取消。
public class Consumer
{
private IMessageProcessor _messageProcessor;
private Task _consumerTask;
public Consumer(IMessageProcessor messageProcessor)
{
_messageProcessor = messageProcessor;
}
public void Consume(CancellationToken token, string queueName)
{
_consumerTask = Task.Run(() =>
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: queueName,
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
_messageProcessor.ProcessMessage(message);
};
channel.BasicConsume(queue: queueName,
autoAck: false,
consumer: consumer);
while (!token.IsCancellationRequested)
Thread.Sleep(1000);
}
}
});
}
public void WaitForCompletion()
{
_consumerTask.Wait();
}
}
消费者有一个 IMessageProcessor 接口,它将完成处理消息的工作。在我的集成测试中,我创建了一个假的。您可能会为此使用您喜欢的模拟框架。
测试发布者向队列发布消息。
public class TestPublisher
{
public void Publish(string queueName, string message)
{
var factory = new ConnectionFactory() { HostName = "localhost", UserName="guest", Password="guest" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
routingKey: queueName,
basicProperties: null,
body: body);
}
}
}
我的示例测试如下所示:
[Fact]
public void If_SendMessageToQueue_ThenConsumerReceiv4es()
{
// ARRANGE
QueueDestroyer.DeleteQueue("queueX", "/");
var cts = new CancellationTokenSource();
var fake = new FakeProcessor();
var myMicroService = new Consumer(fake);
// ACT
myMicroService.Consume(cts.Token, "queueX");
var producer = new TestPublisher();
producer.Publish("queueX", "hello");
Thread.Sleep(1000); // make sure the consumer will have received the message
cts.Cancel();
// ASSERT
Assert.Equal(1, fake.Messages.Count);
Assert.Equal("hello", fake.Messages[0]);
}
我的假货是这样的:
public class FakeProcessor : IMessageProcessor
{
public List<string> Messages { get; set; }
public FakeProcessor()
{
Messages = new List<string>();
}
public void ProcessMessage(string message)
{
Messages.Add(message);
}
}
额外的建议是:
如果您可以将随机文本附加到队列并在每次测试中交换名称运行,那么这样做可以避免并发测试相互干扰
如果您的应用程序不这样做,我在代码中也有一些帮助程序用于声明队列、交换和绑定。
编写一个连接杀手 class,它将强制关闭连接并检查您的应用程序是否仍在运行并可以恢复。我有这方面的代码,但不是在 .NET Core 中。只要问我,我可以在 .NET Core 中将其修改为 运行。
一般来说,我认为您应该避免在集成测试中包含其他微服务。例如,如果您从一项服务向另一项服务发送消息并期望返回消息,则创建一个可以模拟预期行为的虚假消费者。如果您收到来自其他服务的消息,请在您的集成测试项目中创建虚假发布者。