Kafka Streams 本地状态存储
Kafka Streams local state stores
我有一个简单的流应用程序,将一个主题作为输入流并将 KeyValues 转换为另一个主题,例如:
StoreBuilder<KeyValueStore<Long, CategoryDto>> builder =
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(CategoryTransformer.STORE_NAME),
Serdes.Long(), CATEGORY_JSON_SERDE);
streamsBuilder.addStateStore(builder)
.stream(categoryTopic, Consumed.with(Serdes.Long(), CATEGORY_JSON_SERDE))
.transform(CategoryTransformer::new, CategoryTransformer.STORE_NAME);
static class CategoryTransformer implements Transformer<Long, CategoryDto, KeyValue<Long, CategoryDto>> {
static final String STORE_NAME = "test-store";
private KeyValueStore<Long, CategoryDto> store;
@Override
public void init(ProcessorContext context) {
store = (KeyValueStore<Long, CategoryDto>) context.getStateStore(STORE_NAME);
}
@Override
public KeyValue<Long, CategoryDto> transform(Long key, CategoryDto value) {
store.put(key, value);
return KeyValue.pair(key, value);
}
@Override
public KeyValue<Long, CategoryDto> punctuate(long timestamp) {
return null;
}
@Override
public void close() {
}
}
这里我不得不使用转换器,因为我需要获取存储并更新相关值。
问题是使用本地状态存储与仅将值放入 ForeachAction
中的简单 HashMap
之间有什么区别?
在这种情况下使用本地状态存储有什么优势?
虽然您的代码中没有显示,但我假设您以某种方式读取并使用了存储的状态。
使用简单的(在内存中)存储你的状态 HashMap
使你的状态根本不持久,这意味着当以下任一情况发生时你的状态将丢失(这些都不是异常的,假设它会经常发生):
- 你的流 processor/applications 停止,
- 崩溃,或
- 由于重新平衡,部分迁移到其他地方(其他 JVM)。
non-persistent 状态的问题在于,当上述任何情况发生时,kafka-streams 将在最后提交的偏移处重新启动处理。因此,在 crash/stop/rebalance 之前处理的所有记录都不会被重新处理,这意味着当处理重新开始时,您的 HashMap
的内容将为空。这肯定不是你想要的。
另一方面,如果您使用其中一个提供的状态存储,kafka-streams 将确保,一旦处理在上面列出的任何中断后重新启动,状态将可用,就像处理从未停止,不重新处理任何以前处理过的记录。
The question is what is the difference between using local state stores, and just putting values to a simple HashMap inside a ForeachAction?
如果您的输入主题未分区并且您 运行 Streams 应用程序的单个实例,则本地状态 API 的价值并不大。在这种情况下——当然:你可以在你的处理器中使用 HashMap
,或者如果你想在重启后继续使用 some persistent HashMap
。
当您的主题被分区时,本地存储的价值变得清晰,当您 运行 Streams 应用程序的多个实例时,本地存储的价值变得更加清晰。在这种情况下,您需要维护处理特定分区的处理器的特定状态,并且该状态需要能够随处理器移动,以防它移动到不同的 Streams 实例。在这种情况下(又名规模),本地存储设施既是必要的又是无价的。想象一下,与将此设施作为核心平台的一部分(本地状态 API)相比,必须自己大规模地进行编排。
我有一个简单的流应用程序,将一个主题作为输入流并将 KeyValues 转换为另一个主题,例如:
StoreBuilder<KeyValueStore<Long, CategoryDto>> builder =
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(CategoryTransformer.STORE_NAME),
Serdes.Long(), CATEGORY_JSON_SERDE);
streamsBuilder.addStateStore(builder)
.stream(categoryTopic, Consumed.with(Serdes.Long(), CATEGORY_JSON_SERDE))
.transform(CategoryTransformer::new, CategoryTransformer.STORE_NAME);
static class CategoryTransformer implements Transformer<Long, CategoryDto, KeyValue<Long, CategoryDto>> {
static final String STORE_NAME = "test-store";
private KeyValueStore<Long, CategoryDto> store;
@Override
public void init(ProcessorContext context) {
store = (KeyValueStore<Long, CategoryDto>) context.getStateStore(STORE_NAME);
}
@Override
public KeyValue<Long, CategoryDto> transform(Long key, CategoryDto value) {
store.put(key, value);
return KeyValue.pair(key, value);
}
@Override
public KeyValue<Long, CategoryDto> punctuate(long timestamp) {
return null;
}
@Override
public void close() {
}
}
这里我不得不使用转换器,因为我需要获取存储并更新相关值。
问题是使用本地状态存储与仅将值放入 ForeachAction
中的简单 HashMap
之间有什么区别?
在这种情况下使用本地状态存储有什么优势?
虽然您的代码中没有显示,但我假设您以某种方式读取并使用了存储的状态。
使用简单的(在内存中)存储你的状态 HashMap
使你的状态根本不持久,这意味着当以下任一情况发生时你的状态将丢失(这些都不是异常的,假设它会经常发生):
- 你的流 processor/applications 停止,
- 崩溃,或
- 由于重新平衡,部分迁移到其他地方(其他 JVM)。
non-persistent 状态的问题在于,当上述任何情况发生时,kafka-streams 将在最后提交的偏移处重新启动处理。因此,在 crash/stop/rebalance 之前处理的所有记录都不会被重新处理,这意味着当处理重新开始时,您的 HashMap
的内容将为空。这肯定不是你想要的。
另一方面,如果您使用其中一个提供的状态存储,kafka-streams 将确保,一旦处理在上面列出的任何中断后重新启动,状态将可用,就像处理从未停止,不重新处理任何以前处理过的记录。
The question is what is the difference between using local state stores, and just putting values to a simple HashMap inside a ForeachAction?
如果您的输入主题未分区并且您 运行 Streams 应用程序的单个实例,则本地状态 API 的价值并不大。在这种情况下——当然:你可以在你的处理器中使用 HashMap
,或者如果你想在重启后继续使用 some persistent HashMap
。
当您的主题被分区时,本地存储的价值变得清晰,当您 运行 Streams 应用程序的多个实例时,本地存储的价值变得更加清晰。在这种情况下,您需要维护处理特定分区的处理器的特定状态,并且该状态需要能够随处理器移动,以防它移动到不同的 Streams 实例。在这种情况下(又名规模),本地存储设施既是必要的又是无价的。想象一下,与将此设施作为核心平台的一部分(本地状态 API)相比,必须自己大规模地进行编排。