Kafka-streams:为什么所有分区都分配给消费者组中的同一个消费者?
Kafka-streams: Why do all partitions get assigned to the same consumer in the consumergroup?
背景
多台机器生成事件。这些事件被发送到我们的 Kafka 集群,其中每台机器都有自己的主题 (app.machine-events.machine-name)。因为顺序在每台机器上很重要,而且分区大小目前不是问题,所以所有主题都由一个分区组成。因此,目前 N 个主题也意味着 N 个分区。
consuming/processing 应用程序使用了 kafka-streams,我们已经给出了 StreamsConfig.APPLICATION_ID_CONFIG
/"application.id"
'machine-event-processor',每个实例都保持不变,这意味着他们被放入 Kafka 的同一个消费者组。这个消费者订阅了模式 app.machine-events.*
,至于处理器,它处理哪台机器的事件并不重要。 ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group machine-event-processor --members --verbose
向我显示了一个列表,该列表匹配所有处理服务的 IP 数量 运行ning。
预计
给定 20 台机器和 5 个处理器实例,我们希望每个处理器处理约 4 个分区(因此约 4 个主题)。
实际上
有一个处理器处理 20 个分区(因此有 20 个主题),另外 4 个处理器在 all/idling 处什么也不做。杀死 'lucky' 处理器,所有 20 个分区都重新平衡到另一个处理器,导致新处理器处理 20 partitions/topics,3 个处理器空闲。
到目前为止我尝试了什么
- 查看 partition.grouper。我不觉得我完全理解它,但据我所知,无论如何只有 DefaultPartitioner 选项,并且没有必要编写自定义选项,因为(根据文档)此设置应该有效。它确实提到分区根据它们的分区键加入任务(对我们来说都是 0,因为每个主题只有一个分区),但我无法完全理解这部分。
- 为消费者使用 RoundRobinAssignor:
settings.put(StreamsConfig.consumerPrefix(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG), new RoundRobinAssignor().getClass.getName)
(尝试了几个值,因为似乎没有任何改变。)
- 查看其他 configuration properties,看看我是否遗漏了什么:None,我想。
代码,简化
val streamConfig = new Properties
// {producer.metadata.max.age.ms=5000, consumer.metadata.max.age.ms=5000, default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde, consumer.partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor, bootstrap.servers=kafka:9092, application.id=machine-event-processor, default.value.serde=org.apache.kafka.common.serialization.Serdes$ByteArraySerde}
val builder: StreamsBuilder = new StreamsBuilder
val topicStream: KStream[String, Array[Byte]] = builder.stream(Pattern.compile("app.machine-events.*"))
topicStream.process(new MessageProcessorSupplier(context)) // The event is delegated to a processor, doing the actual processing logic
val eventStreams = new KafkaStreams(builder.build(), streamConfig)
eventStreams.start()
备注
正在使用 Kafka-streams 2.0.0:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.0.0</version>
</dependency>
Kafka 正在 运行 容器内,使用 wurstmeister/kafka:2.11-2.0.0
版本。 docker-stack.yml服务:
kafka:
image: wurstmeister/kafka:2.11-2.0.0
ports:
- target: 9094
published: 9094
protocol: tcp
mode: host
volumes:
- /var/run/docker.sock:/var/run/docker.sock
healthcheck:
test: ["CMD-SHELL", "$$(netstat -ltn | grep -q 9092)"]
interval: 15s
timeout: 10s
retries: 5
environment:
HOSTNAME_COMMAND: "docker info | grep ^Name: | cut -d' ' -f 2"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 36000
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092,OUTSIDE://_{HOSTNAME_COMMAND}:9094
KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9094
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_DEFAULT_REPLICATION_FACTOR: 2
deploy:
replicas: 2
restart_policy:
condition: on-failure
delay: 5s
max_attempts: 3
window: 120s
- Kafka 采用双节点设置,形成一个集群。通过 docker 环境变量,我们将复制因子设置为
2
,因此每个分区应该在每个节点上都有一个复制。
相关topics/questions/discussions我找到并检查了
https://faust.readthedocs.io/en/latest/developerguide/partition_assignor.html
检查了 Kafka mail archives 但没有找到任何东西
全方位搜索其他运行进入这个问题,但没有给我我需要的答案。还发现 KAFKA-7144 但这对我们来说应该不是问题,因为我们正在 运行ning 2.0.0
如果谁有运行类似的问题,或者能指出我可能很愚蠢的错误,请赐教!
为了以后的读者 运行 遇到同样的问题,解决方案是不使用 N 个主题,每个主题都有 1 个分区,而是使用 1 个主题和 N 个分区。即使有 120 个分区和 400+ machines/event-sources,多个事件类型将被放入同一个分区,但这并不影响事件的顺序。
实现是将记录键设置为机器名,并让底层逻辑负责哪个值进入哪个分区。由于我们现在有一个消费者组,其中有 X 个消费者订阅了这个主题,分区将平均分配给消费者,每个分区有 120/X 个分区。
正如 Matthias 所建议的那样,在 Devoxx Belgium 2018 的 Confluent 的其他帮助人员进一步证实了这一点。谢谢!
提示
使用wurstmeister/kafkadocker图像时,考虑使用环境属性:
KAFKA_CREATE_TOPICS: "app.machine-events:120:2"
意思是
topic-name:number-of-partitions:replication-factor
背景
多台机器生成事件。这些事件被发送到我们的 Kafka 集群,其中每台机器都有自己的主题 (app.machine-events.machine-name)。因为顺序在每台机器上很重要,而且分区大小目前不是问题,所以所有主题都由一个分区组成。因此,目前 N 个主题也意味着 N 个分区。
consuming/processing 应用程序使用了 kafka-streams,我们已经给出了 StreamsConfig.APPLICATION_ID_CONFIG
/"application.id"
'machine-event-processor',每个实例都保持不变,这意味着他们被放入 Kafka 的同一个消费者组。这个消费者订阅了模式 app.machine-events.*
,至于处理器,它处理哪台机器的事件并不重要。 ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group machine-event-processor --members --verbose
向我显示了一个列表,该列表匹配所有处理服务的 IP 数量 运行ning。
预计
给定 20 台机器和 5 个处理器实例,我们希望每个处理器处理约 4 个分区(因此约 4 个主题)。
实际上
有一个处理器处理 20 个分区(因此有 20 个主题),另外 4 个处理器在 all/idling 处什么也不做。杀死 'lucky' 处理器,所有 20 个分区都重新平衡到另一个处理器,导致新处理器处理 20 partitions/topics,3 个处理器空闲。
到目前为止我尝试了什么
- 查看 partition.grouper。我不觉得我完全理解它,但据我所知,无论如何只有 DefaultPartitioner 选项,并且没有必要编写自定义选项,因为(根据文档)此设置应该有效。它确实提到分区根据它们的分区键加入任务(对我们来说都是 0,因为每个主题只有一个分区),但我无法完全理解这部分。
- 为消费者使用 RoundRobinAssignor:
settings.put(StreamsConfig.consumerPrefix(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG), new RoundRobinAssignor().getClass.getName)
(尝试了几个值,因为似乎没有任何改变。) - 查看其他 configuration properties,看看我是否遗漏了什么:None,我想。
代码,简化
val streamConfig = new Properties
// {producer.metadata.max.age.ms=5000, consumer.metadata.max.age.ms=5000, default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde, consumer.partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor, bootstrap.servers=kafka:9092, application.id=machine-event-processor, default.value.serde=org.apache.kafka.common.serialization.Serdes$ByteArraySerde}
val builder: StreamsBuilder = new StreamsBuilder
val topicStream: KStream[String, Array[Byte]] = builder.stream(Pattern.compile("app.machine-events.*"))
topicStream.process(new MessageProcessorSupplier(context)) // The event is delegated to a processor, doing the actual processing logic
val eventStreams = new KafkaStreams(builder.build(), streamConfig)
eventStreams.start()
备注
正在使用 Kafka-streams 2.0.0:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.0.0</version> </dependency>
Kafka 正在 运行 容器内,使用
wurstmeister/kafka:2.11-2.0.0
版本。 docker-stack.yml服务:
kafka:
image: wurstmeister/kafka:2.11-2.0.0
ports:
- target: 9094
published: 9094
protocol: tcp
mode: host
volumes:
- /var/run/docker.sock:/var/run/docker.sock
healthcheck:
test: ["CMD-SHELL", "$$(netstat -ltn | grep -q 9092)"]
interval: 15s
timeout: 10s
retries: 5
environment:
HOSTNAME_COMMAND: "docker info | grep ^Name: | cut -d' ' -f 2"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 36000
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092,OUTSIDE://_{HOSTNAME_COMMAND}:9094
KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9094
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_DEFAULT_REPLICATION_FACTOR: 2
deploy:
replicas: 2
restart_policy:
condition: on-failure
delay: 5s
max_attempts: 3
window: 120s
- Kafka 采用双节点设置,形成一个集群。通过 docker 环境变量,我们将复制因子设置为
2
,因此每个分区应该在每个节点上都有一个复制。
相关topics/questions/discussions我找到并检查了
https://faust.readthedocs.io/en/latest/developerguide/partition_assignor.html
检查了 Kafka mail archives 但没有找到任何东西
全方位搜索其他运行进入这个问题,但没有给我我需要的答案。还发现 KAFKA-7144 但这对我们来说应该不是问题,因为我们正在 运行ning 2.0.0
如果谁有运行类似的问题,或者能指出我可能很愚蠢的错误,请赐教!
为了以后的读者 运行 遇到同样的问题,解决方案是不使用 N 个主题,每个主题都有 1 个分区,而是使用 1 个主题和 N 个分区。即使有 120 个分区和 400+ machines/event-sources,多个事件类型将被放入同一个分区,但这并不影响事件的顺序。
实现是将记录键设置为机器名,并让底层逻辑负责哪个值进入哪个分区。由于我们现在有一个消费者组,其中有 X 个消费者订阅了这个主题,分区将平均分配给消费者,每个分区有 120/X 个分区。
正如 Matthias 所建议的那样,在 Devoxx Belgium 2018 的 Confluent 的其他帮助人员进一步证实了这一点。谢谢!
提示
使用wurstmeister/kafkadocker图像时,考虑使用环境属性:
KAFKA_CREATE_TOPICS: "app.machine-events:120:2"
意思是
topic-name:number-of-partitions:replication-factor