在 Streams 应用程序的所有实例中使用主题的所有消息
Consume all messages of a topic in all instances of a Streams app
在 Kafka Streams 应用程序中,实例仅获取已分配给该实例的分区的输入主题的消息。作为 group.id
,它基于(对于所有实例都相同)application.id
,这意味着每个实例只看到主题的一部分。
当然,这一切都非常合理,我们将其用于高吞吐量数据主题,但我们还想通过向输入主题添加主题范围的“控制消息”来控制流应用程序.但是由于所有实例都需要获取这些消息,我们要么必须发送
- 每个分区一个控制消息(使发送者有必要了解分区方案,这是我们希望避免的事情)
- 每个密钥一个控制消息(因此每个活动分区将至少获得一个控制消息)
因为这对发送者来说很麻烦,所以除了数据主题之外,我们正在考虑为流应用程序使用的控制消息创建一个新主题。但是我们如何才能让每个分区都收到来自控制消息主题的所有消息呢?
根据,无法为 Kafka Streams 设置组 ID。
除了使用 Kafka Streams 之外,一种方法是创建和使用 KafkaConsumer
,这样我们就可以根据需要设置组 ID。然而,这听起来很复杂和肮脏,以至于想知道我们是否缺少更直接的方法。
有什么想法吗?
您可以使用从所有分区获取数据的全局存储。
Adds a global StateStore to the topology. The StateStore sources its
data from all partitions of the provided input topic. There will be
exactly one instance of this StateStore per Kafka Streams instance.
语法如下:
public StreamsBuilder addGlobalStore(StoreBuilder storeBuilder,
String topic,
Consumed consumed,
ProcessorSupplier stateUpdateSupplier)
最后一个参数是 ProcessorSupplier
,它有一个 get()
,returns 一个 Processor
,它将为每个新消息执行。 Processor
包含 process()
方法,每次有新消息发送到主题时都会执行该方法。
全局存储每个流实例,因此您可以获取每个流实例中的所有主题数据。
在process(K key, V value)中,你可以编写你的处理逻辑。
全局存储可以是内存中的或持久的,并且可以由更新日志主题支持,因此即使流实例 local 数据(状态)被删除,可以使用更新日志主题构建商店。
在 Kafka Streams 应用程序中,实例仅获取已分配给该实例的分区的输入主题的消息。作为 group.id
,它基于(对于所有实例都相同)application.id
,这意味着每个实例只看到主题的一部分。
当然,这一切都非常合理,我们将其用于高吞吐量数据主题,但我们还想通过向输入主题添加主题范围的“控制消息”来控制流应用程序.但是由于所有实例都需要获取这些消息,我们要么必须发送
- 每个分区一个控制消息(使发送者有必要了解分区方案,这是我们希望避免的事情)
- 每个密钥一个控制消息(因此每个活动分区将至少获得一个控制消息)
因为这对发送者来说很麻烦,所以除了数据主题之外,我们正在考虑为流应用程序使用的控制消息创建一个新主题。但是我们如何才能让每个分区都收到来自控制消息主题的所有消息呢?
根据
除了使用 Kafka Streams 之外,一种方法是创建和使用 KafkaConsumer
,这样我们就可以根据需要设置组 ID。然而,这听起来很复杂和肮脏,以至于想知道我们是否缺少更直接的方法。
有什么想法吗?
您可以使用从所有分区获取数据的全局存储。
Adds a global StateStore to the topology. The StateStore sources its data from all partitions of the provided input topic. There will be exactly one instance of this StateStore per Kafka Streams instance.
语法如下:
public StreamsBuilder addGlobalStore(StoreBuilder storeBuilder,
String topic,
Consumed consumed,
ProcessorSupplier stateUpdateSupplier)
最后一个参数是 ProcessorSupplier
,它有一个 get()
,returns 一个 Processor
,它将为每个新消息执行。 Processor
包含 process()
方法,每次有新消息发送到主题时都会执行该方法。
全局存储每个流实例,因此您可以获取每个流实例中的所有主题数据。
在process(K key, V value)中,你可以编写你的处理逻辑。
全局存储可以是内存中的或持久的,并且可以由更新日志主题支持,因此即使流实例 local 数据(状态)被删除,可以使用更新日志主题构建商店。