Kafka Consumer Rebalancing 耗时太长
Kafka Consumer Rebalancing takes too long
我有一个 Kafka Streams 应用程序,它从几个主题中获取数据并将数据加入另一个主题中。
卡夫卡配置:
5 kafka brokers
Kafka Topics - 15 partitions and 3 replication factor.
注意:我的 运行 Kafka Streams 应用程序与我的 Kafka 代理 运行.
在同一台机器上
每小时有几百万条记录consumed/produced。
每当我关闭任何 kafka 经纪人时,它都会进行重新平衡,大约需要。重新平衡需要 30 分钟,有时甚至更长。
有人知道如何解决 kafka 消费者的再平衡问题吗?
此外,很多时候它在重新平衡时抛出异常。
这会阻止我们使用此设置在生产环境中上线。任何帮助将不胜感激。
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: ?
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:725)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:604)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1173)
at org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:307)
at org.apache.kafka.streams.processor.internals.StreamTask.access[=11=]0(StreamTask.java:49)
at org.apache.kafka.streams.processor.internals.StreamTask.run(StreamTask.java:268)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
at org.apache.kafka.streams.processor.internals.StreamTask.commitImpl(StreamTask.java:259)
at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:362)
at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:346)
at org.apache.kafka.streams.processor.internals.StreamThread.apply(StreamThread.java:1118)
at org.apache.kafka.streams.processor.internals.StreamThread.performOnStreamTasks(StreamThread.java:1448)
at org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:1110)
Kafka 流配置:
bootstrap.servers=kafka-1:9092,kafka-2:9092,kafka-3:9092,kafka-4:9092,kafka-5:9092
max.poll.records = 100
request.timeout.ms=40000
它内部创建的 ConsumerConfig 是:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [kafka-1:9092, kafka-2:9092, kafka-3:9092, kafka-4:9092, kafka-5:9092]
check.crcs = true
client.id = conversion-live-StreamThread-1-restore-consumer
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id =
heartbeat.interval.ms = 3000
interceptor.classes = null
internal.leave.group.on.close = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 2147483647
max.poll.records = 100
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 40000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
我建议通过参数 num.standby.replicas=1
配置 StandbyTasks
(默认为 0
)。这应该有助于显着减少重新平衡时间。
此外,我建议将您的应用程序升级到 Kafka 0.11。请注意,Streams API 0.11 向后兼容 0.10.1 和 0.10.2 代理,因此,您不需要为此升级您的代理。重新平衡行为在 0.11 中得到了很大改进,并将在即将发布的 1.0 版本中进一步改进(参见 https://cwiki.apache.org/confluence/display/KAFKA/KIP-167%3A+Add+interface+for+the+state+store+restoration+process),因此,将您的应用程序升级到最新版本始终是重新平衡的改进。
根据我的经验,
第一的
考虑到您的工作量,您的 max.poll.records 太小了:每小时只有几百万条记录 consumed/produced。
所以如果 max.poll.records 太小,比如 1,那么重新平衡需要很长时间。我不知道原因。
其次,请确保您的流应用程序输入主题的分区数是一致的。
例如如果 APP-1 有两个输入主题 A 和 B。如果 A 有 4 个分区,B 有 2 个分区,那么重新平衡需要很长时间。但是,如果 A 和 B 都有 4 个分区,并且一些分区处于空闲状态,那么再平衡时间就足够了。
希望对你有帮助
我有一个 Kafka Streams 应用程序,它从几个主题中获取数据并将数据加入另一个主题中。
卡夫卡配置:
5 kafka brokers
Kafka Topics - 15 partitions and 3 replication factor.
注意:我的 运行 Kafka Streams 应用程序与我的 Kafka 代理 运行.
在同一台机器上每小时有几百万条记录consumed/produced。 每当我关闭任何 kafka 经纪人时,它都会进行重新平衡,大约需要。重新平衡需要 30 分钟,有时甚至更长。
有人知道如何解决 kafka 消费者的再平衡问题吗? 此外,很多时候它在重新平衡时抛出异常。
这会阻止我们使用此设置在生产环境中上线。任何帮助将不胜感激。
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: ?
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:725)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:604)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1173)
at org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:307)
at org.apache.kafka.streams.processor.internals.StreamTask.access[=11=]0(StreamTask.java:49)
at org.apache.kafka.streams.processor.internals.StreamTask.run(StreamTask.java:268)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
at org.apache.kafka.streams.processor.internals.StreamTask.commitImpl(StreamTask.java:259)
at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:362)
at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:346)
at org.apache.kafka.streams.processor.internals.StreamThread.apply(StreamThread.java:1118)
at org.apache.kafka.streams.processor.internals.StreamThread.performOnStreamTasks(StreamThread.java:1448)
at org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:1110)
Kafka 流配置:
bootstrap.servers=kafka-1:9092,kafka-2:9092,kafka-3:9092,kafka-4:9092,kafka-5:9092
max.poll.records = 100
request.timeout.ms=40000
它内部创建的 ConsumerConfig 是:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [kafka-1:9092, kafka-2:9092, kafka-3:9092, kafka-4:9092, kafka-5:9092]
check.crcs = true
client.id = conversion-live-StreamThread-1-restore-consumer
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id =
heartbeat.interval.ms = 3000
interceptor.classes = null
internal.leave.group.on.close = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 2147483647
max.poll.records = 100
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 40000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
我建议通过参数 num.standby.replicas=1
配置 StandbyTasks
(默认为 0
)。这应该有助于显着减少重新平衡时间。
此外,我建议将您的应用程序升级到 Kafka 0.11。请注意,Streams API 0.11 向后兼容 0.10.1 和 0.10.2 代理,因此,您不需要为此升级您的代理。重新平衡行为在 0.11 中得到了很大改进,并将在即将发布的 1.0 版本中进一步改进(参见 https://cwiki.apache.org/confluence/display/KAFKA/KIP-167%3A+Add+interface+for+the+state+store+restoration+process),因此,将您的应用程序升级到最新版本始终是重新平衡的改进。
根据我的经验, 第一的 考虑到您的工作量,您的 max.poll.records 太小了:每小时只有几百万条记录 consumed/produced。
所以如果 max.poll.records 太小,比如 1,那么重新平衡需要很长时间。我不知道原因。
其次,请确保您的流应用程序输入主题的分区数是一致的。 例如如果 APP-1 有两个输入主题 A 和 B。如果 A 有 4 个分区,B 有 2 个分区,那么重新平衡需要很长时间。但是,如果 A 和 B 都有 4 个分区,并且一些分区处于空闲状态,那么再平衡时间就足够了。 希望对你有帮助