kafka 流实例上状态存储和分区的混合

Mix of State Stores and Partitions on kafka stream instances

我构建了一个带有状态存储的 kafka 流应用程序。现在我正在尝试扩展这个应用程序。当运行 三个不同服务器上的应用程序 Kafka 随机拆分分区和状态存储。

例如:

Instance1 获取:分区 0、分区 1

Instance2 获取:partition-2,stateStore-repartition-0

Instance3 获取:stateStore-repartition-1、stateStore-repartition-2

我想为每个实例分配一个stateStore 和一个分区。我做错了什么?

我的 KafkaStreams 配置:

final Properties properties = new Properties();
properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "my-app");
properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);

try {
     properties.setProperty(StreamsConfig.STATE_DIR_CONFIG,
           Files.createTempDirectory(stateStoreName).toAbsolutePath().toString());
} catch (final IOException e) {
         // use the default one
}

我的直播是:

stream.groupByKey()
       .windowedBy(TimeWindows.of(timeWindowDuration))
       .<TradeStats>aggregate(
           () -> new TradeStats(),
           (k, v, tradestats) -> tradestats.add(v),
           Materialized.<String, TradeStats, WindowStore<Bytes, byte[]>>as(stateStoreName)
        .withValueSerde(new TradeStatsSerde()))
        .toStream();

到目前为止我所看到的(正如我对你的问题的评论中提到的,请分享你的状态存储定义),一切都很好,我怀疑你对这个问题有轻微的误解

What am I doing wrong?

基本上没有。 :-)

对于您的问题的分区部分:它们根据配置的分配器(咨询https://kafka.apache.org/26/javadoc/index.html?org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html或相邻接口)分布在消费者周围。

对于您问题的 state store 部分:可能这里存在一些关于(在内存中)状态存储如何工作的误解:它们通常由 Kafka 主题支持,该主题不驻留在您的应用程序主机上,而是驻留在 Kafka 集群本身中。更准确地说,整个状态存储的一部分存在于每个应用程序主机上的 (RocksDB) 内存 key/value 存储中,就像您在问题中的状态存储分配中显示的那样。然而,这些只是 部分 或在 Kafka 集群中维护的完整状态存储的切片。

所以简而言之:一切都很好,让 Kafka 完成分配工作,只有当你有真正特殊的用例或充分的理由时才干涉它。 :-) Kafka 还确保正确的冗余和所有分区的重新平衡,以防您的应用程序主机中断。

如果您仍然想自己分配一些东西,这个用例会很有趣,需要进一步的帮助。