kafka 流实例上状态存储和分区的混合
Mix of State Stores and Partitions on kafka stream instances
我构建了一个带有状态存储的 kafka 流应用程序。现在我正在尝试扩展这个应用程序。当运行 三个不同服务器上的应用程序 Kafka 随机拆分分区和状态存储。
例如:
Instance1 获取:分区 0、分区 1
Instance2 获取:partition-2,stateStore-repartition-0
Instance3 获取:stateStore-repartition-1、stateStore-repartition-2
我想为每个实例分配一个stateStore 和一个分区。我做错了什么?
我的 KafkaStreams 配置:
final Properties properties = new Properties();
properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "my-app");
properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
try {
properties.setProperty(StreamsConfig.STATE_DIR_CONFIG,
Files.createTempDirectory(stateStoreName).toAbsolutePath().toString());
} catch (final IOException e) {
// use the default one
}
我的直播是:
stream.groupByKey()
.windowedBy(TimeWindows.of(timeWindowDuration))
.<TradeStats>aggregate(
() -> new TradeStats(),
(k, v, tradestats) -> tradestats.add(v),
Materialized.<String, TradeStats, WindowStore<Bytes, byte[]>>as(stateStoreName)
.withValueSerde(new TradeStatsSerde()))
.toStream();
到目前为止我所看到的(正如我对你的问题的评论中提到的,请分享你的状态存储定义),一切都很好,我怀疑你对这个问题有轻微的误解
What am I doing wrong?
基本上没有。 :-)
对于您的问题的分区部分:它们根据配置的分配器(咨询https://kafka.apache.org/26/javadoc/index.html?org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html或相邻接口)分布在消费者周围。
对于您问题的 state store 部分:可能这里存在一些关于(在内存中)状态存储如何工作的误解:它们通常由 Kafka 主题支持,该主题不驻留在您的应用程序主机上,而是驻留在 Kafka 集群本身中。更准确地说,整个状态存储的一部分存在于每个应用程序主机上的 (RocksDB) 内存 key/value 存储中,就像您在问题中的状态存储分配中显示的那样。然而,这些只是 部分 或在 Kafka 集群中维护的完整状态存储的切片。
所以简而言之:一切都很好,让 Kafka 完成分配工作,只有当你有真正特殊的用例或充分的理由时才干涉它。 :-) Kafka 还确保正确的冗余和所有分区的重新平衡,以防您的应用程序主机中断。
如果您仍然想自己分配一些东西,这个用例会很有趣,需要进一步的帮助。
我构建了一个带有状态存储的 kafka 流应用程序。现在我正在尝试扩展这个应用程序。当运行 三个不同服务器上的应用程序 Kafka 随机拆分分区和状态存储。
例如:
Instance1 获取:分区 0、分区 1
Instance2 获取:partition-2,stateStore-repartition-0
Instance3 获取:stateStore-repartition-1、stateStore-repartition-2
我想为每个实例分配一个stateStore 和一个分区。我做错了什么?
我的 KafkaStreams 配置:
final Properties properties = new Properties();
properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "my-app");
properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
try {
properties.setProperty(StreamsConfig.STATE_DIR_CONFIG,
Files.createTempDirectory(stateStoreName).toAbsolutePath().toString());
} catch (final IOException e) {
// use the default one
}
我的直播是:
stream.groupByKey()
.windowedBy(TimeWindows.of(timeWindowDuration))
.<TradeStats>aggregate(
() -> new TradeStats(),
(k, v, tradestats) -> tradestats.add(v),
Materialized.<String, TradeStats, WindowStore<Bytes, byte[]>>as(stateStoreName)
.withValueSerde(new TradeStatsSerde()))
.toStream();
到目前为止我所看到的(正如我对你的问题的评论中提到的,请分享你的状态存储定义),一切都很好,我怀疑你对这个问题有轻微的误解
What am I doing wrong?
基本上没有。 :-)
对于您的问题的分区部分:它们根据配置的分配器(咨询https://kafka.apache.org/26/javadoc/index.html?org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html或相邻接口)分布在消费者周围。
对于您问题的 state store 部分:可能这里存在一些关于(在内存中)状态存储如何工作的误解:它们通常由 Kafka 主题支持,该主题不驻留在您的应用程序主机上,而是驻留在 Kafka 集群本身中。更准确地说,整个状态存储的一部分存在于每个应用程序主机上的 (RocksDB) 内存 key/value 存储中,就像您在问题中的状态存储分配中显示的那样。然而,这些只是 部分 或在 Kafka 集群中维护的完整状态存储的切片。
所以简而言之:一切都很好,让 Kafka 完成分配工作,只有当你有真正特殊的用例或充分的理由时才干涉它。 :-) Kafka 还确保正确的冗余和所有分区的重新平衡,以防您的应用程序主机中断。
如果您仍然想自己分配一些东西,这个用例会很有趣,需要进一步的帮助。