Spring Cloud Stream Kafka 提交失败,因为组已重新平衡
Spring Cloud Stream Kafka Commit Failed since the group is rebalanced
我已经得到 CommitFailedException
一些耗时的 Spring Cloud Stream 应用程序。我知道要解决此问题,我需要设置 max.poll.records
和 max.poll.interval.ms
以符合我对处理批次所需时间的预期。但是,我不太确定如何在 Spring Cloud Stream 中为消费者设置它。
异常:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:808) at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:691) at
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1416) at
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1377) at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:1554) at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:1418) at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:739) at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:700) at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
java.lang.Thread.run(Thread.java:748)
而且,如何保证这种情况完全不会发生呢?或者,在这种异常的情况下,我该如何注入某种回滚?原因是我正在做一些其他的外部工作,一旦完成,我就会相应地发布输出消息。因此,如果在外部系统上完成工作后由于任何问题而无法发布消息,我必须将其还原(通过 Kafka 发布和其他外部系统进行某种原子事务)。
将心跳间隔设置为小于会话超时的 1/3。如果代理无法确定您的消费者是否还活着,它将在其余消费者之间发起分区重新平衡。因此,您有一个心跳线程来通知代理消费者处于活动状态,以防应用程序需要更长的时间来处理。在您的消费者配置中更改这些:
heartbeat.interval.ms
session.timeout.ms
如果不起作用,请尝试增加会话超时。您必须 fiddle 使用这些值。
您可以在活页夹级别设置任意 Kafka 属性 documentation here
spring.cloud.stream.kafka.binder.consumerProperties
Key/Value map of arbitrary Kafka client consumer properties. In addition to support known Kafka consumer properties, unknown consumer properties are allowed here as well. Properties here supersede any properties set in boot and in the configuration property above.
Default: Empty map.
例如spring.cloud.stream.kafka.binder.consumerProperties.max.poll.records=10
或在绑定级别 documentation here。
spring.cloud.stream.kafka.bindings.<channelName>.consumer.configuration
Map with a key/value pair containing generic Kafka consumer properties. In addition to having Kafka consumer properties, other configuration properties can be passed here. For example some properties needed by the application such as spring.cloud.stream.kafka.bindings.input.consumer.configuration.foo=bar.
Default: Empty map.
例如spring.cloud.stream.kafka.bindings.input.consumer.configuration.max.poll.records=10
您可以通过将 OffsetCommitCallback
添加到侦听器容器的 ContainerProperties
并将 syncCommits
设置为 false
来获得提交失败的通知。要自定义容器及其属性,请将 ListenerContainerCustomizer
bean 添加到应用程序。
编辑
异步提交回调...
@SpringBootApplication
@EnableBinding(Sink.class)
public class So57970152Application {
public static void main(String[] args) {
SpringApplication.run(So57970152Application.class, args);
}
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer() {
return (container, dest, group) -> {
container.getContainerProperties().setAckMode(AckMode.RECORD);
container.getContainerProperties().setSyncCommits(false);
container.getContainerProperties().setCommitCallback((map, ex) -> {
if (ex == null) {
System.out.println("Successful commit for " + map);
}
else {
System.out.println("Commit failed for " + map + ": " + ex.getMessage());
}
});
container.getContainerProperties().setClientId("so57970152");
};
}
@StreamListener(Sink.INPUT)
public void listen(String in) {
System.out.println(in);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> {
template.send("input", "foo".getBytes());
};
}
}
手动提交(同步)...
@SpringBootApplication
@EnableBinding(Sink.class)
public class So57970152Application {
public static void main(String[] args) {
SpringApplication.run(So57970152Application.class, args);
}
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer() {
return (container, dest, group) -> {
container.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
container.getContainerProperties().setClientId("so57970152");
};
}
@StreamListener(Sink.INPUT)
public void listen(String in, @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment ack) {
System.out.println(in);
try {
ack.acknowledge(); // MUST USE MANUAL_IMMEDIATE for this to work.
System.out.println("Commit successful");
}
catch (Exception e) {
System.out.println("Commit failed " + e.getMessage());
}
}
@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> {
template.send("input", "foo".getBytes());
};
}
}
我已经得到 CommitFailedException
一些耗时的 Spring Cloud Stream 应用程序。我知道要解决此问题,我需要设置 max.poll.records
和 max.poll.interval.ms
以符合我对处理批次所需时间的预期。但是,我不太确定如何在 Spring Cloud Stream 中为消费者设置它。
异常:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:808) at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:691) at
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1416) at
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1377) at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:1554) at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:1418) at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:739) at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:700) at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
java.lang.Thread.run(Thread.java:748)
而且,如何保证这种情况完全不会发生呢?或者,在这种异常的情况下,我该如何注入某种回滚?原因是我正在做一些其他的外部工作,一旦完成,我就会相应地发布输出消息。因此,如果在外部系统上完成工作后由于任何问题而无法发布消息,我必须将其还原(通过 Kafka 发布和其他外部系统进行某种原子事务)。
将心跳间隔设置为小于会话超时的 1/3。如果代理无法确定您的消费者是否还活着,它将在其余消费者之间发起分区重新平衡。因此,您有一个心跳线程来通知代理消费者处于活动状态,以防应用程序需要更长的时间来处理。在您的消费者配置中更改这些:
heartbeat.interval.ms
session.timeout.ms
如果不起作用,请尝试增加会话超时。您必须 fiddle 使用这些值。
您可以在活页夹级别设置任意 Kafka 属性 documentation here
spring.cloud.stream.kafka.binder.consumerProperties
Key/Value map of arbitrary Kafka client consumer properties. In addition to support known Kafka consumer properties, unknown consumer properties are allowed here as well. Properties here supersede any properties set in boot and in the configuration property above.
Default: Empty map.
例如spring.cloud.stream.kafka.binder.consumerProperties.max.poll.records=10
或在绑定级别 documentation here。
spring.cloud.stream.kafka.bindings.<channelName>.consumer.configuration
Map with a key/value pair containing generic Kafka consumer properties. In addition to having Kafka consumer properties, other configuration properties can be passed here. For example some properties needed by the application such as spring.cloud.stream.kafka.bindings.input.consumer.configuration.foo=bar.
Default: Empty map.
例如spring.cloud.stream.kafka.bindings.input.consumer.configuration.max.poll.records=10
您可以通过将 OffsetCommitCallback
添加到侦听器容器的 ContainerProperties
并将 syncCommits
设置为 false
来获得提交失败的通知。要自定义容器及其属性,请将 ListenerContainerCustomizer
bean 添加到应用程序。
编辑
异步提交回调...
@SpringBootApplication
@EnableBinding(Sink.class)
public class So57970152Application {
public static void main(String[] args) {
SpringApplication.run(So57970152Application.class, args);
}
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer() {
return (container, dest, group) -> {
container.getContainerProperties().setAckMode(AckMode.RECORD);
container.getContainerProperties().setSyncCommits(false);
container.getContainerProperties().setCommitCallback((map, ex) -> {
if (ex == null) {
System.out.println("Successful commit for " + map);
}
else {
System.out.println("Commit failed for " + map + ": " + ex.getMessage());
}
});
container.getContainerProperties().setClientId("so57970152");
};
}
@StreamListener(Sink.INPUT)
public void listen(String in) {
System.out.println(in);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> {
template.send("input", "foo".getBytes());
};
}
}
手动提交(同步)...
@SpringBootApplication
@EnableBinding(Sink.class)
public class So57970152Application {
public static void main(String[] args) {
SpringApplication.run(So57970152Application.class, args);
}
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer() {
return (container, dest, group) -> {
container.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
container.getContainerProperties().setClientId("so57970152");
};
}
@StreamListener(Sink.INPUT)
public void listen(String in, @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment ack) {
System.out.println(in);
try {
ack.acknowledge(); // MUST USE MANUAL_IMMEDIATE for this to work.
System.out.println("Commit successful");
}
catch (Exception e) {
System.out.println("Commit failed " + e.getMessage());
}
}
@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> {
template.send("input", "foo".getBytes());
};
}
}