集成测试 RabbitMQ listner - 间歇性失败,因为消息需要时间排队
Integration Testing RabbitMQ listner - Intermittently failing because the message takes time to be queued
我已经使用 RabbitMock 为以下流程编写了集成测试(在 github 上找到它,看起来真的很酷):
消息添加到传入消息队列 --> 传入消息侦听器拾取消息 --> 处理它 --> 将新的传出消息放入新队列传出消息队列 --> (仅用于测试)在 src/test/resources 中为这个传出队列写了一个监听器。
一切正常(有一个重要的故障 - 间歇性超时),我正在做如下所示的断言:
List<OutgoingData> receivedMessages = new ArrayList<>();
assertTimeoutPreemptively(ofMillis(15000L), () -> {
while (receivedMessages.isEmpty()) {
OutgoingData data =
receiver.getOutgoingData();
if(data != null){
receivedMessages.add(data);
}
}
}
);
assertThat(receivedMessages.get(0)).isNotNull();
assertThat
(receivedMessages.get(0).getRecipient())
.isEqualTo("enabled@localhost");
这个测试中的超时是我面临的真实问题。
- 由于超时,测试越来越慢。
- 如果我删除超时,测试将卡在 Jenkins 中,需要强制终止。
- 有时,这个 15000 毫秒的超时时间也不够,导致测试失败。
我想知道在集成测试中有没有更好的方法来处理这种情况
期待您的意见。
非常感谢,
榕树
当我仔细考虑并与我的一位团队成员讨论这个问题时,我突然想到 java 8 个期货可以在这里有效地使用。
我按如下方式实现了它,效果非常好。
@Test
public void basic_consume_case()
InterruptedException, ExecutionException {
IncomingData incomingData = new IncomingData();
incomingData.setRecipient("enabled@localhost");
incomingData.setSender("unblocked@localhost");
incomingData.setStoreKey("123");
incomingData.setSubject("Hello");
incomingData.setText("Hello from Test 1");
try (AnnotationConfigApplicationContext context = new
AnnotationConfigApplicationContext(
ConnectionFactoryConfiguration.class)) {
sendMessage(incomingData);
Future<OutgoingData> receivedMessageFuture = pollOutgoingQueueForData();
OutgoingData receivedMessage = receivedMessageFuture.get();
assertThat(receivedMessage).isNotNull();
assertThat(receivedMessage.getRecipient()).isEqualTo("enabled@localhost");
assertThat(receivedMessage.getContent())
...
}
}
private void sendMessage(IncomingData incomingData) {
try {
rabbitTemplate.convertAndSend("incoming-data-queue", incomingData, m -> {
m.getMessageProperties().setContentType("application/json");
return m;
});
} finally {
}
}
private Future<OutgoingData> pollOutgoingQueueForData() throws InterruptedException {
return executor.submit(() -> {
OutgoingData receivedMessage = null;
while (receivedMessage == null) {
receivedMessage = (OutgoingData)
rabbitTemplate.receiveAndConvert("outgoing-queue");
}
return receivedMessage;
});
}
我已经使用 RabbitMock 为以下流程编写了集成测试(在 github 上找到它,看起来真的很酷):
消息添加到传入消息队列 --> 传入消息侦听器拾取消息 --> 处理它 --> 将新的传出消息放入新队列传出消息队列 --> (仅用于测试)在 src/test/resources 中为这个传出队列写了一个监听器。
一切正常(有一个重要的故障 - 间歇性超时),我正在做如下所示的断言:
List<OutgoingData> receivedMessages = new ArrayList<>();
assertTimeoutPreemptively(ofMillis(15000L), () -> {
while (receivedMessages.isEmpty()) {
OutgoingData data =
receiver.getOutgoingData();
if(data != null){
receivedMessages.add(data);
}
}
}
);
assertThat(receivedMessages.get(0)).isNotNull();
assertThat
(receivedMessages.get(0).getRecipient())
.isEqualTo("enabled@localhost");
这个测试中的超时是我面临的真实问题。
- 由于超时,测试越来越慢。
- 如果我删除超时,测试将卡在 Jenkins 中,需要强制终止。
- 有时,这个 15000 毫秒的超时时间也不够,导致测试失败。
我想知道在集成测试中有没有更好的方法来处理这种情况
期待您的意见。
非常感谢, 榕树
当我仔细考虑并与我的一位团队成员讨论这个问题时,我突然想到 java 8 个期货可以在这里有效地使用。
我按如下方式实现了它,效果非常好。
@Test
public void basic_consume_case()
InterruptedException, ExecutionException {
IncomingData incomingData = new IncomingData();
incomingData.setRecipient("enabled@localhost");
incomingData.setSender("unblocked@localhost");
incomingData.setStoreKey("123");
incomingData.setSubject("Hello");
incomingData.setText("Hello from Test 1");
try (AnnotationConfigApplicationContext context = new
AnnotationConfigApplicationContext(
ConnectionFactoryConfiguration.class)) {
sendMessage(incomingData);
Future<OutgoingData> receivedMessageFuture = pollOutgoingQueueForData();
OutgoingData receivedMessage = receivedMessageFuture.get();
assertThat(receivedMessage).isNotNull();
assertThat(receivedMessage.getRecipient()).isEqualTo("enabled@localhost");
assertThat(receivedMessage.getContent())
...
}
}
private void sendMessage(IncomingData incomingData) {
try {
rabbitTemplate.convertAndSend("incoming-data-queue", incomingData, m -> {
m.getMessageProperties().setContentType("application/json");
return m;
});
} finally {
}
}
private Future<OutgoingData> pollOutgoingQueueForData() throws InterruptedException {
return executor.submit(() -> {
OutgoingData receivedMessage = null;
while (receivedMessage == null) {
receivedMessage = (OutgoingData)
rabbitTemplate.receiveAndConvert("outgoing-queue");
}
return receivedMessage;
});
}