Quarkus - Kafka Streams - 如何将配置选项传递给底层生产者和消费者

Quarkus - Kafka Streams - How to pass through config options to underlying producer and consumer

Quarkus' Kafka-Streams extension 提供了一种启动管道的便捷方式。流应用程序的必要配置选项,例如 quarkus.kafka-streams.bootstrap-servers=localhost:9092 必须插入包含项目的 application.properties 文件中。

Quarkus 还为更精细的配置提供了直通选项。文档指出:

All the properties within the kafka-streams namespace are passed through as-is to the Kafka Streams engine. Changing their values requires a rebuild of the application.

例如,我们可以通过自定义时间戳提取器(或与流配置相关的任何其他配置 属性)

...
kafka-streams.default.timestamp.extractor=my.custom.extractor.class
...

启动时,Quarkus 扩展打印出此配置 属性 未知,但我们可以看到它已正确传递,因为 my.custom.extractor.class 出现在自动打印的配置中流应用程序。

我们如何将选项传递给底层的 Kafka 生产者和消费者并验证它们的使用?例如,我想更改底层生产者的 max.request.size 和消费者的 max.partition.fetch.bytes 属性。

原生 Kafka Streams 和 old Quarkus approach 允许我直接在代码中更改传递的配置对象,例如

streamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG), 6000000);

IMO 这在 Quarkusland 中很脏,我想通过 application.properties 文件传递​​所有选项。

您可以使用消费者和生产者的标准配置传递它,前缀为 kafka-streams:

kafka-streams.consumer.$property
kafka-streams.producer.$property

检查 -> https://quarkus.io/guides/kafka-streams