使用 "consume-transform-produce" 集成测试的 Kafka exactly once 消息传递测试
Kafka exactly once messaging test with "consume-transform-produce" Integration test
我正在编写测试用例来测试我的应用程序的 Kafka 消费-转换-生产循环。如此有效,我正在从 sourceTopic-processing-sendMessage 消费到 Destination 主题。我正在编写这些测试用例来证明与 Kafka 的 exactly once 消息传递,因为稍后我将添加其他失败案例。
这是我的配置:
private Map<String, Object> consConfigProps(boolean txnEnabled) {
Map<String, Object> props = new HashMap<>(
KafkaTestUtils.consumerProps(AB_CONSUMER_GROUP_ID, "false", kafkaBroker));
props.put(ConsumerConfig.GROUP_ID_CONFIG, AB_CONSUMER_GROUP_ID);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return props;
}
private Map<String, Object> prodConfigProps(boolean txnEnabled) {
Map<String, Object> props = new HashMap<>(KafkaTestUtils.producerProps(kafkaBroker));
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID().toString());
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "3");
props.put(ProducerConfig.RETRIES_CONFIG, "3");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
"prod-txn-" + UUID.randomUUID().toString());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
public KafkaMessageListenerContainer<String, NormalUser> fetchContainer() {
ContainerProperties containerProperties = new ContainerProperties(ABTOPIC, XYTOPIC, PATOPIC);
containerProperties.setGroupId("groupId-10001");
containerProperties.setAckMode(AckMode.MANUAL);
containerProperties.setSyncCommits(true);
containerProperties.setSyncCommitTimeout(Duration.ofMillis(5000));
containerProperties.setTransactionManager(kafkaTransactionManager());
KafkaMessageListenerContainer<String, NormalUser> kafkaMessageListContainer = new KafkaMessageListenerContainer<>(
consumerFactory(), containerProperties);
kafkaMessageListContainer.setupMessageListener(new AcknowledgingMessageListener<String, NormalUser>() {
@Override
public void onMessage(ConsumerRecord<String, NormalUser> record, Acknowledgment acknowledgment) {
log.debug("test-listener received message='{}'", record.toString());
records.add(record);
acknowledgment.acknowledge();
}
});
return kafkaMessageListContainer;
}
@Test
public void testProducerABSuccess() throws InterruptedException, IOException {
NormalUser userObj = new NormalUser(ABTypeGood,
Double.valueOf(Math.random() * 10000).longValue(),
"Blah" + String.valueOf(Math.random() * 10));
sendMessage(XYTOPIC, "AB-id", userObj);
try {
ConsumerRecords<String, NormalUser> records;
parserConsumer.subscribe(Collections.singletonList(XYTOPIC));
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new LinkedHashMap<>();
// Check for messages
parserProducer.beginTransaction();
records = parserConsumer.poll(Duration.ofSeconds(3));
assertThat(1).isEqualTo(records.count()); // --> this asserts passes like 50% of the time.
for (ConsumerRecord<String, NormalUser> record : records) {
assertEquals(record.key(), "AB-id");
assertEquals(record.value(), userObj);
currentOffsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset()));
}
parserProducer.send(new ProducerRecord<String, NormalUser>(ABTOPIC, "AB-id", userObj));
parserProducer.sendOffsetsToTransaction(currentOffsets, AB_CONSUMER_GROUP_ID);
parserProducer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
parserProducer.close();
} catch (final KafkaException e) {
parserProducer.abortTransaction();
}
ConsumerRecords<String, NormalUser> records;
loadConsumer.subscribe(Collections.singletonList(ABTOPIC));
records = loadConsumer.poll(Duration.ofSeconds(3));
assertThat(1).isEqualTo(records.count()); //--> this assert fails all the time.
for (ConsumerRecord<String, NormalUser> record : records) {
assertEquals(record.key(), "AB-id");
assertEquals(record.value(), userObj);
}
}
我的问题是上面的测试用例“testProducerABSuccess”不一致,断言有时会失败,有时会通过。我无法弄清楚为什么它们如此不一致。以上有什么问题。
编辑:16-12:
- 测试 consumerconfig.Auto_Offset_Reset_config-最早没有变化。第一个断言大约有 70% 的时间通过。第二个断言一直失败(0% 通过率)。
哪个断言失败了?如果是 assertThat(1).isEqualTo(records.count());
,可能是因为您将 auto.offset.reset
设置为 latest
。它需要 earliest
以避免竞争条件,即在为消费者分配分区之前发送记录。
我正在编写测试用例来测试我的应用程序的 Kafka 消费-转换-生产循环。如此有效,我正在从 sourceTopic-processing-sendMessage 消费到 Destination 主题。我正在编写这些测试用例来证明与 Kafka 的 exactly once 消息传递,因为稍后我将添加其他失败案例。
这是我的配置:
private Map<String, Object> consConfigProps(boolean txnEnabled) {
Map<String, Object> props = new HashMap<>(
KafkaTestUtils.consumerProps(AB_CONSUMER_GROUP_ID, "false", kafkaBroker));
props.put(ConsumerConfig.GROUP_ID_CONFIG, AB_CONSUMER_GROUP_ID);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return props;
}
private Map<String, Object> prodConfigProps(boolean txnEnabled) {
Map<String, Object> props = new HashMap<>(KafkaTestUtils.producerProps(kafkaBroker));
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID().toString());
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "3");
props.put(ProducerConfig.RETRIES_CONFIG, "3");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
"prod-txn-" + UUID.randomUUID().toString());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
public KafkaMessageListenerContainer<String, NormalUser> fetchContainer() {
ContainerProperties containerProperties = new ContainerProperties(ABTOPIC, XYTOPIC, PATOPIC);
containerProperties.setGroupId("groupId-10001");
containerProperties.setAckMode(AckMode.MANUAL);
containerProperties.setSyncCommits(true);
containerProperties.setSyncCommitTimeout(Duration.ofMillis(5000));
containerProperties.setTransactionManager(kafkaTransactionManager());
KafkaMessageListenerContainer<String, NormalUser> kafkaMessageListContainer = new KafkaMessageListenerContainer<>(
consumerFactory(), containerProperties);
kafkaMessageListContainer.setupMessageListener(new AcknowledgingMessageListener<String, NormalUser>() {
@Override
public void onMessage(ConsumerRecord<String, NormalUser> record, Acknowledgment acknowledgment) {
log.debug("test-listener received message='{}'", record.toString());
records.add(record);
acknowledgment.acknowledge();
}
});
return kafkaMessageListContainer;
}
@Test
public void testProducerABSuccess() throws InterruptedException, IOException {
NormalUser userObj = new NormalUser(ABTypeGood,
Double.valueOf(Math.random() * 10000).longValue(),
"Blah" + String.valueOf(Math.random() * 10));
sendMessage(XYTOPIC, "AB-id", userObj);
try {
ConsumerRecords<String, NormalUser> records;
parserConsumer.subscribe(Collections.singletonList(XYTOPIC));
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new LinkedHashMap<>();
// Check for messages
parserProducer.beginTransaction();
records = parserConsumer.poll(Duration.ofSeconds(3));
assertThat(1).isEqualTo(records.count()); // --> this asserts passes like 50% of the time.
for (ConsumerRecord<String, NormalUser> record : records) {
assertEquals(record.key(), "AB-id");
assertEquals(record.value(), userObj);
currentOffsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset()));
}
parserProducer.send(new ProducerRecord<String, NormalUser>(ABTOPIC, "AB-id", userObj));
parserProducer.sendOffsetsToTransaction(currentOffsets, AB_CONSUMER_GROUP_ID);
parserProducer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
parserProducer.close();
} catch (final KafkaException e) {
parserProducer.abortTransaction();
}
ConsumerRecords<String, NormalUser> records;
loadConsumer.subscribe(Collections.singletonList(ABTOPIC));
records = loadConsumer.poll(Duration.ofSeconds(3));
assertThat(1).isEqualTo(records.count()); //--> this assert fails all the time.
for (ConsumerRecord<String, NormalUser> record : records) {
assertEquals(record.key(), "AB-id");
assertEquals(record.value(), userObj);
}
}
我的问题是上面的测试用例“testProducerABSuccess”不一致,断言有时会失败,有时会通过。我无法弄清楚为什么它们如此不一致。以上有什么问题。
编辑:16-12:
- 测试 consumerconfig.Auto_Offset_Reset_config-最早没有变化。第一个断言大约有 70% 的时间通过。第二个断言一直失败(0% 通过率)。
哪个断言失败了?如果是 assertThat(1).isEqualTo(records.count());
,可能是因为您将 auto.offset.reset
设置为 latest
。它需要 earliest
以避免竞争条件,即在为消费者分配分区之前发送记录。