我们使用consumer group通过多个consumers读取broker的数据,但是消费的数据是如何合并的呢?
We read data from brokers through multiple consumers using consumer group, but how the consumed data is combined?
我需要来自 kafka broker 的数据,但是为了快速访问,我使用了多个具有相同组 ID 的消费者,每个消费者阅读后被称为消费者 groups.But,我们如何合并来自多个消费者的数据?有什么逻辑吗?
根据设计,同一消费者组中的不同消费者彼此独立地处理数据。 (此行为允许应用程序很好地扩展。)
But after reading by each consumer,how can we combine data from multiple consumers? Is there any logic?
当您使用 Kafka 的 "Consumer API"(也称为:"consumer client" 库)时,简短但稍微简化的答案,我认为根据您的问题的措辞,您正在使用的是:如果您需要组合来自多个消费者的数据,最简单的选择是让这个(新的)输入数据在另一个 Kafka 主题中可用,您可以在后续处理步骤中进行组合。一个简单的例子是:另一个第二个 Kafka 主题将设置为只有 1 个分区,因此任何后续处理步骤都会看到所有需要组合的数据。
如果这听起来有点太复杂,我建议使用 Kafka 的 Streams API,这样可以更容易地定义此类处理流(例如,连接或聚合,如您的问题)。换句话说,Kafka Streams 为您提供了很多您正在寻找的所需的内置 "logic":https://kafka.apache.org/documentation/streams/
Kafka 的目标是为您提供一个可扩展、高性能和容错的框架。让一组消费者异步读取来自不同分区的数据允许您归档前两个目标。数据分组有点超出标准 Kafka 流程的范围——在最简单的情况下,您可以使用单个消费者实现单个分区,但我确定这不是您想要的。
对于来自不同消费者的单个状态的聚合之类的事情,我建议您应用一些专门为此类目标设计的解决方案。如果您在 Hadoop 方面工作,则可以使用 Storm Trident bolt,它允许您聚合来自 Kafka spout 的数据。或者您可以使用 Spark Streaming,它允许您以稍微不同的方式执行相同的操作。或者作为一种选择,您始终可以使用标准 Kafka 库实现具有此类逻辑的自定义组件。
我需要来自 kafka broker 的数据,但是为了快速访问,我使用了多个具有相同组 ID 的消费者,每个消费者阅读后被称为消费者 groups.But,我们如何合并来自多个消费者的数据?有什么逻辑吗?
根据设计,同一消费者组中的不同消费者彼此独立地处理数据。 (此行为允许应用程序很好地扩展。)
But after reading by each consumer,how can we combine data from multiple consumers? Is there any logic?
当您使用 Kafka 的 "Consumer API"(也称为:"consumer client" 库)时,简短但稍微简化的答案,我认为根据您的问题的措辞,您正在使用的是:如果您需要组合来自多个消费者的数据,最简单的选择是让这个(新的)输入数据在另一个 Kafka 主题中可用,您可以在后续处理步骤中进行组合。一个简单的例子是:另一个第二个 Kafka 主题将设置为只有 1 个分区,因此任何后续处理步骤都会看到所有需要组合的数据。
如果这听起来有点太复杂,我建议使用 Kafka 的 Streams API,这样可以更容易地定义此类处理流(例如,连接或聚合,如您的问题)。换句话说,Kafka Streams 为您提供了很多您正在寻找的所需的内置 "logic":https://kafka.apache.org/documentation/streams/
Kafka 的目标是为您提供一个可扩展、高性能和容错的框架。让一组消费者异步读取来自不同分区的数据允许您归档前两个目标。数据分组有点超出标准 Kafka 流程的范围——在最简单的情况下,您可以使用单个消费者实现单个分区,但我确定这不是您想要的。
对于来自不同消费者的单个状态的聚合之类的事情,我建议您应用一些专门为此类目标设计的解决方案。如果您在 Hadoop 方面工作,则可以使用 Storm Trident bolt,它允许您聚合来自 Kafka spout 的数据。或者您可以使用 Spark Streaming,它允许您以稍微不同的方式执行相同的操作。或者作为一种选择,您始终可以使用标准 Kafka 库实现具有此类逻辑的自定义组件。