Kafka Streams 2.6,Partition Assignor和Rebalancing Strategy

Kafka Streams 2.6, Partition Assignor and Rebalancing Strategy

在我当前的 Kafka 版本 2.6 中,我使用的是 Streams API 我有一个问题。当我启动一个流时,它会写入 Streams、Admin、Consumer 和 Produces 配置。我注意到一些奇怪的事情,尽管我提供了配置

streamsConfiguration.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getName());

像上面一样,我在消费者和流日志中看到了一些不同的策略。

这是显示消费者配置的消费者日志

2021-01-20 15:52:32.611  INFO 111980 --- [alytics.event-4] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
allow.auto.create.topics = true
auto.commit.interval.ms = 500
auto.offset.reset = none
bootstrap.servers = [XXX:9092, XXX:9092, XXX:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = APPID-dd747646-8b51-42b0-8ad9-2fb26435a588-StreamThread-2-restore-consumer
client.rack = 
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = null
group.instance.id = null
heartbeat.interval.ms = 25000
interceptor.classes = []
internal.leave.group.on.close = false
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 1000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = DEBUG
metrics.sample.window.ms = 30000
partition.assignment.strategy = [org.apache.kafka.clients.consumer.CooperativeStickyAssignor]

但我也看到了如下日志

2021-01-20 15:52:32.740  INFO 111980 --- [alytics.event-4] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
allow.auto.create.topics = false
auto.commit.interval.ms = 500
auto.offset.reset = latest
bootstrap.servers = [XXX:9092, XXX:9092, XXX:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = APPID-dd747646-8b51-42b0-8ad9-2fb26435a588-StreamThread-2-consumer
client.rack = 
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
heartbeat.interval.ms = 25000
interceptor.classes = []
internal.leave.group.on.close = false
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 1000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = DEBUG
metrics.sample.window.ms = 30000
partition.assignment.strategy = [org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor]

当我查看这两个消费者日志时,我只注意到它们的 client.id 值不同。

我有点困惑我是否启用了 CooperativeStickyAssignor

这两个消费者有什么区别导致使用不同的分区分配策略?

我在同一个 kafka 流应用程序中看到不同的消费者配置是否正常?

谢谢

您问题中的第一个消费者日志是管理状态存储恢复的“恢复”消费者的日志。您可以在客户端 ID 中找到“恢复”一词。 您在问题中显示的第二个消费者日志是您自己定义的消费者的日志。您的消费者使用的策略似乎是“StreamsPartitionAssignor”。