按键加入多个Kafka主题
Join multiple Kafka topics by key
如何编写一个以可扩展的方式加入多个 Kafka 主题的消费者?
我有一个主题使用密钥发布事件,第二个主题使用相同的密钥发布与第一个主题的子集相关的其他事件。我想编写一个订阅这两个主题并为出现在两个主题中的子集执行一些额外操作的消费者。
我可以用一个消费者轻松地做到这一点:读取两个主题的所有内容,在本地维护状态,并在为给定键读取两个事件时执行操作。但我需要扩展的解决方案。
理想情况下,我需要将主题联系在一起,以便以相同的方式对它们进行分区,并将分区同步分配给消费者。我该怎么做?
我知道 Kafka Streams 将主题连接在一起,以便将键分配给相同的节点。他们是怎么做到的呢? P.S。我不能使用 Kafka Streams,因为我正在使用 Python。
太糟糕了,你在 Python -- Kafka Streams 将是一个完美的选择:)
如果您想手动执行此操作,则需要实现您自己的 PartitionAssignor
-- 这一点,实现必须确保分区在分配中位于同一位置:假设每个主题有 4 个分区(我们称它们为 A 和 B),分区 A_0 和 B_0 必须分配给同一个消费者(还有 A_1 和 B_1,...)。
我希望 Python consumer 允许您通过配置参数 partition.assignment.strategy
.
指定自定义分区分配器
这是 PartitionAssignor
Kafka Streams 使用的:https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
Streams 使用任务的概念——任务获取不同主题的分区,并分配相同的分区号。 Streams 还尝试执行 "sticky assignment" —— 即,如果可能的话,在重新平衡的情况下不要移动任务(以及分区)。因此,每个消费者在重新平衡元数据中对其 "old assignment" 进行编码。
基本上,方法 #subscription()
会在每个存活的消费者上调用。它将消费者的订阅信息(即消费者想要订阅的主题)和可选的元数据发送给代理。
在第二步中,消费者组的负责人将在 #assign()
内计算实际分配。负责经纪人收集 #subscription()
在重新平衡的第一阶段提供的所有信息,并将其交给 #assign()
。因此,leader 可以获得整个组的全局概览,从而可以确保分区以同地分配的方式分配。
在最后一步中,broker 收到来自 leader 的计算分配,并将其广播给组中的所有消费者。这将导致对每个消费者调用 #onAssignment()
。
这也可能有帮助:
如何编写一个以可扩展的方式加入多个 Kafka 主题的消费者?
我有一个主题使用密钥发布事件,第二个主题使用相同的密钥发布与第一个主题的子集相关的其他事件。我想编写一个订阅这两个主题并为出现在两个主题中的子集执行一些额外操作的消费者。
我可以用一个消费者轻松地做到这一点:读取两个主题的所有内容,在本地维护状态,并在为给定键读取两个事件时执行操作。但我需要扩展的解决方案。
理想情况下,我需要将主题联系在一起,以便以相同的方式对它们进行分区,并将分区同步分配给消费者。我该怎么做?
我知道 Kafka Streams 将主题连接在一起,以便将键分配给相同的节点。他们是怎么做到的呢? P.S。我不能使用 Kafka Streams,因为我正在使用 Python。
太糟糕了,你在 Python -- Kafka Streams 将是一个完美的选择:)
如果您想手动执行此操作,则需要实现您自己的 PartitionAssignor
-- 这一点,实现必须确保分区在分配中位于同一位置:假设每个主题有 4 个分区(我们称它们为 A 和 B),分区 A_0 和 B_0 必须分配给同一个消费者(还有 A_1 和 B_1,...)。
我希望 Python consumer 允许您通过配置参数 partition.assignment.strategy
.
这是 PartitionAssignor
Kafka Streams 使用的:https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
Streams 使用任务的概念——任务获取不同主题的分区,并分配相同的分区号。 Streams 还尝试执行 "sticky assignment" —— 即,如果可能的话,在重新平衡的情况下不要移动任务(以及分区)。因此,每个消费者在重新平衡元数据中对其 "old assignment" 进行编码。
基本上,方法 #subscription()
会在每个存活的消费者上调用。它将消费者的订阅信息(即消费者想要订阅的主题)和可选的元数据发送给代理。
在第二步中,消费者组的负责人将在 #assign()
内计算实际分配。负责经纪人收集 #subscription()
在重新平衡的第一阶段提供的所有信息,并将其交给 #assign()
。因此,leader 可以获得整个组的全局概览,从而可以确保分区以同地分配的方式分配。
在最后一步中,broker 收到来自 leader 的计算分配,并将其广播给组中的所有消费者。这将导致对每个消费者调用 #onAssignment()
。
这也可能有帮助: