Quarkus - Kafka Streams - 如何将配置选项传递给底层生产者和消费者
Quarkus - Kafka Streams - How to pass through config options to underlying producer and consumer
Quarkus' Kafka-Streams extension 提供了一种启动管道的便捷方式。流应用程序的必要配置选项,例如 quarkus.kafka-streams.bootstrap-servers=localhost:9092
必须插入包含项目的 application.properties
文件中。
Quarkus 还为更精细的配置提供了直通选项。文档指出:
All the properties within the kafka-streams namespace are passed through as-is to the Kafka Streams engine. Changing their values requires a rebuild of the application.
例如,我们可以通过自定义时间戳提取器(或与流配置相关的任何其他配置 属性)
...
kafka-streams.default.timestamp.extractor=my.custom.extractor.class
...
启动时,Quarkus 扩展打印出此配置 属性 未知,但我们可以看到它已正确传递,因为 my.custom.extractor.class
出现在自动打印的配置中流应用程序。
我们如何将选项传递给底层的 Kafka 生产者和消费者并验证它们的使用?例如,我想更改底层生产者的 max.request.size
和消费者的 max.partition.fetch.bytes
属性。
原生 Kafka Streams 和 old Quarkus approach 允许我直接在代码中更改传递的配置对象,例如
streamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG), 6000000);
IMO 这在 Quarkusland 中很脏,我想通过 application.properties
文件传递所有选项。
您可以使用消费者和生产者的标准配置传递它,前缀为 kafka-streams:
kafka-streams.consumer.$property
kafka-streams.producer.$property
Quarkus' Kafka-Streams extension 提供了一种启动管道的便捷方式。流应用程序的必要配置选项,例如 quarkus.kafka-streams.bootstrap-servers=localhost:9092
必须插入包含项目的 application.properties
文件中。
Quarkus 还为更精细的配置提供了直通选项。文档指出:
All the properties within the kafka-streams namespace are passed through as-is to the Kafka Streams engine. Changing their values requires a rebuild of the application.
例如,我们可以通过自定义时间戳提取器(或与流配置相关的任何其他配置 属性)
...
kafka-streams.default.timestamp.extractor=my.custom.extractor.class
...
启动时,Quarkus 扩展打印出此配置 属性 未知,但我们可以看到它已正确传递,因为 my.custom.extractor.class
出现在自动打印的配置中流应用程序。
我们如何将选项传递给底层的 Kafka 生产者和消费者并验证它们的使用?例如,我想更改底层生产者的 max.request.size
和消费者的 max.partition.fetch.bytes
属性。
原生 Kafka Streams 和 old Quarkus approach 允许我直接在代码中更改传递的配置对象,例如
streamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG), 6000000);
IMO 这在 Quarkusland 中很脏,我想通过 application.properties
文件传递所有选项。
您可以使用消费者和生产者的标准配置传递它,前缀为 kafka-streams:
kafka-streams.consumer.$property
kafka-streams.producer.$property