Kafka Streams - 定义具有容错性的自定义 Relational/Non_Key_Value StateStore
Kafka Streams - Define Custom Relational/Non_Key_Value StateStore With Fault Tolerance
我正在尝试使用 kafka 实现事件溯源。
我对流处理器应用程序的设想是一个典型的 3 层 Spring 应用程序,其中:
- "presentation" 层替换为(由?)Kafka 流 API。
- 拓扑中的处理器 API 使用业务逻辑层。
- 此外,该数据库是一个关系型 H2 内存数据库,可通过 Spring Data JPA 存储库访问。存储库还实现了必要的接口,以便将它们注册为 Kafka 状态存储以使用好处(恢复和容错)
但我想知道我应该如何实现自定义状态存储部分?
我一直在寻找并且:
有StateStore
& StoreBuilder
等接口。 StoreBuilder
有一个 withLoggingEnabled()
方法;但是如果我启用它,实际的更新和更改日志何时发生?通常这些示例都是键值存储,即使是自定义示例也是如此。如果我不想要键值怎么办? kafka 文档中交互式查询部分的示例并没有削减它。
我知道交互式查询。但它们似乎适合查询而不适合更新;顾名思义。
在键值存储中,发送到更改日志的记录很简单。但是如果我不使用键值;我何时以及如何通知 kafka 我的状态已经改变?
您需要为要使用的实际商店引擎实施 StateStore
。这个界面没有规定任何关于商店的东西,你可以做任何你想做的事情。
您还需要实施一个 StoreBuilder
作为工厂来创建自定义商店的实例。
MyCustomStore implements StateStore {
// define any interface you want to present to the user of the store
}
MyCustomStoreBuilder implements StoreBuilder<MyCustomStore> {
MyCustomStore builder() {
// create new instance of MyCustomStore and return it
}
// all other methods (except `name()`) are optional
// eg, you can do a dummy implementation that only returns `this`
}
But if I don't use key value; when & how do I inform kafka that my state has changed?
如果您想实施 withLoggingEnabled()
(类似于缓存),您需要将此日志记录(或缓存)作为商店的一部分来实施。因为,Kafka Streams 不知道你的 store 是如何工作的,它不能为此提供一个实现。因此,这是您的设计决定,您的商店是否支持登录到更改日志主题。如果你想支持日志记录,你需要想出一个设计,将存储更新映射到键值对(你也可以为每个更新写多个),你可以写入一个变更日志主题,并允许你重新创建状态当从更改日志主题中读取这些记录时。
获取容错存储不仅可以通过更改日志记录来实现。例如,您还可以插入一个远程存储,它在内部进行复制等,因此依赖存储的容错能力而不是使用更改日志记录。当然,与使用本地商店相比,使用远程商店意味着其他挑战。
对于 Kafka Streams 默认存储,日志记录和缓存实现为实际存储的包装器,使其易于插入。但是您可以采用最适合您商店的任何方式来实现这一点。您可能想查看以下 类 键值存储作为比较:
- https://github.com/apache/kafka/blob/2.0/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
- https://github.com/apache/kafka/blob/2.0/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
- https://github.com/apache/kafka/blob/2.0/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
对于交互式查询,您实施相应的 QueryableStoreType
来集成您的自定义商店。比照。 https://docs.confluent.io/current/streams/developer-guide/interactive-queries.html#querying-local-custom-state-stores 没错,Interactive Queries 是现有商店的只读接口,因为 Processors
应该负责维护商店。但是,也没有什么能阻止您打开自定义存储以进行写入。但是,这将使您的应用程序本质上是不确定的,因为如果您倒回输入主题并重新处理它,它可能会计算出不同的结果,具体取决于执行的 "external store writes"。您应该考虑通过输入主题对商店进行任何写入。但这是你的决定。如果您允许 "external writes" 您将需要确保它们也被记录下来,以防您想要实现日志记录。
我正在尝试使用 kafka 实现事件溯源。
我对流处理器应用程序的设想是一个典型的 3 层 Spring 应用程序,其中:
- "presentation" 层替换为(由?)Kafka 流 API。
- 拓扑中的处理器 API 使用业务逻辑层。
- 此外,该数据库是一个关系型 H2 内存数据库,可通过 Spring Data JPA 存储库访问。存储库还实现了必要的接口,以便将它们注册为 Kafka 状态存储以使用好处(恢复和容错)
但我想知道我应该如何实现自定义状态存储部分?
我一直在寻找并且:
有
StateStore
&StoreBuilder
等接口。StoreBuilder
有一个withLoggingEnabled()
方法;但是如果我启用它,实际的更新和更改日志何时发生?通常这些示例都是键值存储,即使是自定义示例也是如此。如果我不想要键值怎么办? kafka 文档中交互式查询部分的示例并没有削减它。我知道交互式查询。但它们似乎适合查询而不适合更新;顾名思义。
在键值存储中,发送到更改日志的记录很简单。但是如果我不使用键值;我何时以及如何通知 kafka 我的状态已经改变?
您需要为要使用的实际商店引擎实施 StateStore
。这个界面没有规定任何关于商店的东西,你可以做任何你想做的事情。
您还需要实施一个 StoreBuilder
作为工厂来创建自定义商店的实例。
MyCustomStore implements StateStore {
// define any interface you want to present to the user of the store
}
MyCustomStoreBuilder implements StoreBuilder<MyCustomStore> {
MyCustomStore builder() {
// create new instance of MyCustomStore and return it
}
// all other methods (except `name()`) are optional
// eg, you can do a dummy implementation that only returns `this`
}
But if I don't use key value; when & how do I inform kafka that my state has changed?
如果您想实施 withLoggingEnabled()
(类似于缓存),您需要将此日志记录(或缓存)作为商店的一部分来实施。因为,Kafka Streams 不知道你的 store 是如何工作的,它不能为此提供一个实现。因此,这是您的设计决定,您的商店是否支持登录到更改日志主题。如果你想支持日志记录,你需要想出一个设计,将存储更新映射到键值对(你也可以为每个更新写多个),你可以写入一个变更日志主题,并允许你重新创建状态当从更改日志主题中读取这些记录时。
获取容错存储不仅可以通过更改日志记录来实现。例如,您还可以插入一个远程存储,它在内部进行复制等,因此依赖存储的容错能力而不是使用更改日志记录。当然,与使用本地商店相比,使用远程商店意味着其他挑战。
对于 Kafka Streams 默认存储,日志记录和缓存实现为实际存储的包装器,使其易于插入。但是您可以采用最适合您商店的任何方式来实现这一点。您可能想查看以下 类 键值存储作为比较:
- https://github.com/apache/kafka/blob/2.0/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
- https://github.com/apache/kafka/blob/2.0/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
- https://github.com/apache/kafka/blob/2.0/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
对于交互式查询,您实施相应的 QueryableStoreType
来集成您的自定义商店。比照。 https://docs.confluent.io/current/streams/developer-guide/interactive-queries.html#querying-local-custom-state-stores 没错,Interactive Queries 是现有商店的只读接口,因为 Processors
应该负责维护商店。但是,也没有什么能阻止您打开自定义存储以进行写入。但是,这将使您的应用程序本质上是不确定的,因为如果您倒回输入主题并重新处理它,它可能会计算出不同的结果,具体取决于执行的 "external store writes"。您应该考虑通过输入主题对商店进行任何写入。但这是你的决定。如果您允许 "external writes" 您将需要确保它们也被记录下来,以防您想要实现日志记录。