如何通过属性为 Spring Cloud Stream Kafka 设置偏移量提交间隔
How to set offset commit interval for Spring Cloud Stream Kafka via properties
我想通过属性为 Spring Cloud Stream 在 Kafka 消费者上设置自动偏移提交间隔。
正如我从指标中看到的那样,默认情况下 Spring Cloud Stream Kafka 会在每条消费的消息上提交偏移量。对于高负载主题(例如,如果流量是每秒 10K 条消息),它变得非常显着并增加了 Kafka 代理的负载。
我们通过以下方式声明消费者:
@Bean
public Consumer<TestEvent> testEvents() {
…
}
我尝试了几个选项,但没有任何帮助。
选项 #1
spring:
cloud:
function:
definition: testEvents
stream:
kafka:
binder:
brokers: xxx
consumer-properties:
auto.offset.reset: latest
auto.commit.interval.ms: 2000
enable.auto.commit: true
bindings:
testEvents-in-0:
destination: test_topic
group: ${spring.application.name}_test_topic
consumer:
concurrency: 2
使用这样的配置,消息消费正确,但健康检查的状态为down
:
{
"status": "DOWN",
"components": {
"binders": {
"status": "DOWN",
"components": {
"kafka": {
"status": "DOWN",
"details": {
"error": "org.apache.kafka.common.KafkaException: Failed to construct kafka consumer"
}
}
}
}, …
据我了解,enable.auto.commit: true
不推荐用于 Spring Cloud Stream。
选项 #2
spring:
cloud:
function:
definition: testEvents
stream:
kafka:
binder:
brokers: xxx
configuration:
auto.offset.reset: latest
bindings:
testEvents-in-0:
destination: test_topic
group: ${spring.application.name}_test_topic
consumer:
concurrency: 2
ack-mode: TIME
ack-time: 2000
而且,这样的配置 属性 ack-mode: TIME
仍然没有帮助,因为我们看到每个消费消息的偏移量提交。
我使用maven依赖spring-cloud-starter-stream-kafka
版本3.0.12.RELEASE
首先,ack-mode
是一个特定于kafka的消费者绑定属性,你在公共属性中有它。
spring:
cloud:
function:
definition: testEvents
stream:
kafka:
binder:
brokers: localhost:9092
configuration:
auto.offset.reset: latest
bindings:
testEvents-in-0:
consumer:
ack-mode: TIME
bindings:
testEvents-in-0:
destination: test_topic
group: ${spring.application.name}_test_topic
consumer:
concurrency: 2
其次,没有ack-time
属性,你必须通过容器定制器在容器上设置它:
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> cust() {
return (container, dest, group) -> {
container.getContainerProperties().setAckTime(2000L);
container.getContainerProperties().setLogContainerConfig(true);
};
}
Gary Russell 的回答绝对正确,我只想通过使用 spring.kafka.listener
属性和 KafkaProperties
(来自包 org.springframework.boot.autoconfigure.kafka
)添加另一个类似的替代方案:
spring:
cloud:
function:
definition: testEvents
stream:
kafka:
binder:
brokers: xxx
configuration:
auto.offset.reset: latest
bindings:
testEvents-in-0:
destination: test_topic
group: ${spring.application.name}_test_topic
consumer:
concurrency: 2
kafka:
listener:
ack-mode: TIME
ack-time: 1000
和以下定制器:
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(KafkaProperties kafkaProperties) {
return (container, destinationName, group) -> {
var listener = kafkaProperties.getListener();
var containerProperties = container.getContainerProperties();
containerProperties.setAckMode(listener.getAckMode());
containerProperties.setAckTime(listener.getAckTime().toMillis());
containerProperties.setLogContainerConfig(true);
};
}
我想通过属性为 Spring Cloud Stream 在 Kafka 消费者上设置自动偏移提交间隔。
正如我从指标中看到的那样,默认情况下 Spring Cloud Stream Kafka 会在每条消费的消息上提交偏移量。对于高负载主题(例如,如果流量是每秒 10K 条消息),它变得非常显着并增加了 Kafka 代理的负载。
我们通过以下方式声明消费者:
@Bean
public Consumer<TestEvent> testEvents() {
…
}
我尝试了几个选项,但没有任何帮助。
选项 #1
spring:
cloud:
function:
definition: testEvents
stream:
kafka:
binder:
brokers: xxx
consumer-properties:
auto.offset.reset: latest
auto.commit.interval.ms: 2000
enable.auto.commit: true
bindings:
testEvents-in-0:
destination: test_topic
group: ${spring.application.name}_test_topic
consumer:
concurrency: 2
使用这样的配置,消息消费正确,但健康检查的状态为down
:
{
"status": "DOWN",
"components": {
"binders": {
"status": "DOWN",
"components": {
"kafka": {
"status": "DOWN",
"details": {
"error": "org.apache.kafka.common.KafkaException: Failed to construct kafka consumer"
}
}
}
}, …
据我了解,enable.auto.commit: true
不推荐用于 Spring Cloud Stream。
选项 #2
spring:
cloud:
function:
definition: testEvents
stream:
kafka:
binder:
brokers: xxx
configuration:
auto.offset.reset: latest
bindings:
testEvents-in-0:
destination: test_topic
group: ${spring.application.name}_test_topic
consumer:
concurrency: 2
ack-mode: TIME
ack-time: 2000
而且,这样的配置 属性 ack-mode: TIME
仍然没有帮助,因为我们看到每个消费消息的偏移量提交。
我使用maven依赖spring-cloud-starter-stream-kafka
版本3.0.12.RELEASE
首先,ack-mode
是一个特定于kafka的消费者绑定属性,你在公共属性中有它。
spring:
cloud:
function:
definition: testEvents
stream:
kafka:
binder:
brokers: localhost:9092
configuration:
auto.offset.reset: latest
bindings:
testEvents-in-0:
consumer:
ack-mode: TIME
bindings:
testEvents-in-0:
destination: test_topic
group: ${spring.application.name}_test_topic
consumer:
concurrency: 2
其次,没有ack-time
属性,你必须通过容器定制器在容器上设置它:
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> cust() {
return (container, dest, group) -> {
container.getContainerProperties().setAckTime(2000L);
container.getContainerProperties().setLogContainerConfig(true);
};
}
Gary Russell 的回答绝对正确,我只想通过使用 spring.kafka.listener
属性和 KafkaProperties
(来自包 org.springframework.boot.autoconfigure.kafka
)添加另一个类似的替代方案:
spring:
cloud:
function:
definition: testEvents
stream:
kafka:
binder:
brokers: xxx
configuration:
auto.offset.reset: latest
bindings:
testEvents-in-0:
destination: test_topic
group: ${spring.application.name}_test_topic
consumer:
concurrency: 2
kafka:
listener:
ack-mode: TIME
ack-time: 1000
和以下定制器:
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(KafkaProperties kafkaProperties) {
return (container, destinationName, group) -> {
var listener = kafkaProperties.getListener();
var containerProperties = container.getContainerProperties();
containerProperties.setAckMode(listener.getAckMode());
containerProperties.setAckTime(listener.getAckTime().toMillis());
containerProperties.setLogContainerConfig(true);
};
}