使用 "consume-transform-produce" 集成测试的 Kafka exactly once 消息传递测试

Kafka exactly once messaging test with "consume-transform-produce" Integration test

我正在编写测试用例来测试我的应用程序的 Kafka 消费-转换-生产循环。如此有效,我正在从 sourceTopic-processing-sendMessage 消费到 Destination 主题。我正在编写这些测试用例来证明与 Kafka 的 exactly once 消息传递,因为稍后我将添加其他失败案例。
这是我的配置:

private Map<String, Object> consConfigProps(boolean txnEnabled) {
    Map<String, Object> props = new HashMap<>(
            KafkaTestUtils.consumerProps(AB_CONSUMER_GROUP_ID, "false", kafkaBroker));
    props.put(ConsumerConfig.GROUP_ID_CONFIG, AB_CONSUMER_GROUP_ID);
    props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    return props;
}

private Map<String, Object> prodConfigProps(boolean txnEnabled) {
    Map<String, Object> props = new HashMap<>(KafkaTestUtils.producerProps(kafkaBroker));
    props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
    props.put(ProducerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID().toString());
    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
    props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "3");
    props.put(ProducerConfig.RETRIES_CONFIG, "3");
    props.put(ProducerConfig.ACKS_CONFIG, "all");
    props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
                "prod-txn-" + UUID.randomUUID().toString());
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    return props;
}

public KafkaMessageListenerContainer<String, NormalUser> fetchContainer() {
    ContainerProperties containerProperties = new ContainerProperties(ABTOPIC, XYTOPIC, PATOPIC);
    containerProperties.setGroupId("groupId-10001");
    containerProperties.setAckMode(AckMode.MANUAL);
    containerProperties.setSyncCommits(true);
    containerProperties.setSyncCommitTimeout(Duration.ofMillis(5000));
    containerProperties.setTransactionManager(kafkaTransactionManager());
    KafkaMessageListenerContainer<String, NormalUser> kafkaMessageListContainer = new KafkaMessageListenerContainer<>(
            consumerFactory(), containerProperties);
    kafkaMessageListContainer.setupMessageListener(new AcknowledgingMessageListener<String, NormalUser>() {
        @Override
        public void onMessage(ConsumerRecord<String, NormalUser> record, Acknowledgment acknowledgment) {
            log.debug("test-listener received message='{}'", record.toString());
            records.add(record);
            acknowledgment.acknowledge();
        }
    });
    return kafkaMessageListContainer;
}

    @Test
    public void testProducerABSuccess() throws InterruptedException, IOException {
        NormalUser userObj = new NormalUser(ABTypeGood,
                Double.valueOf(Math.random() * 10000).longValue(),
                "Blah" + String.valueOf(Math.random() * 10));
        sendMessage(XYTOPIC, "AB-id", userObj);
        try {
            ConsumerRecords<String, NormalUser> records;
            parserConsumer.subscribe(Collections.singletonList(XYTOPIC));
            Map<TopicPartition, OffsetAndMetadata> currentOffsets = new LinkedHashMap<>();
            // Check for messages
            parserProducer.beginTransaction();
            records = parserConsumer.poll(Duration.ofSeconds(3));
            assertThat(1).isEqualTo(records.count()); // --> this asserts passes like 50% of the time. 
            for (ConsumerRecord<String, NormalUser> record : records) {
                assertEquals(record.key(), "AB-id");
                assertEquals(record.value(), userObj);
                currentOffsets.put(new TopicPartition(record.topic(), record.partition()),
                        new OffsetAndMetadata(record.offset()));
            }
            parserProducer.send(new ProducerRecord<String, NormalUser>(ABTOPIC, "AB-id", userObj));
            parserProducer.sendOffsetsToTransaction(currentOffsets, AB_CONSUMER_GROUP_ID);
            parserProducer.commitTransaction();
        } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
            parserProducer.close();
        } catch (final KafkaException e) {
            parserProducer.abortTransaction();
        }
        ConsumerRecords<String, NormalUser> records;
        loadConsumer.subscribe(Collections.singletonList(ABTOPIC));
        records = loadConsumer.poll(Duration.ofSeconds(3));
        assertThat(1).isEqualTo(records.count()); //--> this assert fails all the time. 
        for (ConsumerRecord<String, NormalUser> record : records) {
            assertEquals(record.key(), "AB-id");
            assertEquals(record.value(), userObj);
        }
    }

我的问题是上面的测试用例“testProducerABSuccess”不一致,断言有时会失败,有时会通过。我无法弄清楚为什么它们如此不一致。以上有什么问题。

编辑:16-12:

  1. 测试 consumerconfig.Auto_Offset_Reset_config-最早没有变化。第一个断言大约有 70% 的时间通过。第二个断言一直失败(0% 通过率)。

哪个断言失败了?如果是 assertThat(1).isEqualTo(records.count());,可能是因为您将 auto.offset.reset 设置为 latest。它需要 earliest 以避免竞争条件,即在为消费者分配分区之前发送记录。