Kafka Streams - 所有实例本地存储指向同一主题

Kafka Streams - all instances local store pointing to the same topic

我们有以下问题:

我们想听取某个 Kafka 主题并构建它 "history" - 因此对于指定的键提取一些数据,将其添加到该键的现有列表中(如果不存在则创建一个新的) ) 把它放到另一个主题中,它只有一个分区并且高度压缩。另一个应用程序可以仅收听该主题并更新其历史列表。

我在想它如何适应 Kafka 流库。我们当然可以使用聚合:

msgReceived.map((key, word) -> new KeyValue<>(key, word))
           .groupBy((k,v) -> k, stringSerde, stringSerde)
           .aggregate(String::new,
                     (k, v, stockTransactionCollector) -> stockTransactionCollector + "|" + v,
                     stringSerde, "summaries2")
           .to(stringSerde, stringSerde, "transaction-summary50");

创建一个由 Kafka 支持的本地存储并将其用作历史记录 table。

我担心的是,如果我们决定扩展此类应用,每个 运行 实例都会创建一个新的支持主题 ${applicationId}-${storeName}-changelog(我假设每个应用都有不同的 applicationId)。每个实例开始使用输入主题,获取一组不同的键并构建不同的状态子集。如果 Kafka 决定重新平衡,一些实例将开始错过本地存储中的一些历史状态,因为它们获得了一组全新的分区来消费。

问题是,如果我只是为每个 运行 实例设置相同的 applicationId,它是否最终会重放来自每个 运行 实例具有相同本地状态的相同 kafka 主题的所有数据?

为什么要创建多个具有不同 ID 的应用来执行相同的工作? Kafka实现并行的方式是通过任务:

An application’s processor topology is scaled by breaking it into multiple tasks.

More specifically, Kafka Streams creates a fixed number of tasks based on the input stream partitions for the application, with each task assigned a list of partitions from the input streams (i.e., Kafka topics). The assignment of partitions to tasks never changes so that each task is a fixed unit of parallelism of the application.

Tasks can then instantiate their own processor topology based on the assigned partitions; they also maintain a buffer for each of its assigned partitions and process messages one-at-a-time from these record buffers. As a result stream tasks can be processed independently and in parallel without manual intervention.

如果您需要扩展您的应用程序,您可以启动新实例 运行 相同的应用程序(相同的应用程序 ID),一些已经分配的任务将重新分配给新实例。本地国有商店的迁移将由图书馆自动处理:

When the re-assignment occurs, some partitions – and hence their corresponding tasks including any local state stores – will be “migrated” from the existing threads to the newly added threads. As a result, Kafka Streams has effectively rebalanced the workload among instances of the application at the granularity of Kafka topic partitions.

推荐你去this guide看看。

My concern is, if we decide to scale such app, each running instance will create a new backed topic ${applicationId}-${storeName}-changelog (I assume each app has different applicationId). Each instance start to consume input topic, gets a different set of keys and build a different subset of the state. If Kafka decides to rebalance, some instances will start to miss some historic states in local store as they get a completely new set of partitions to consume from.

有些假设不正确:

  • 如果您 运行 应用程序的多个实例来扩展您的应用程序,所有这些实例 必须具有相同的 应用程序 ID(参见 Kafka 的消费者组管理协议) -- 否则,将不会共享负载,因为每个实例都将被视为一个自己的应用程序,并且每个实例都将分配所有分区。

因此,如果所有实例都使用相同的应用程序 ID,所有 运行ning 应用程序实例将使用相同的更改日志主题名称,因此,您打算做的事情应该有效 out-of-the 框.