Kafka Stream 自定义 State Store
Kafka Stream custom State Store
我一直在准备关于 state store 的文档,但我仍然不清楚它是否符合我的目的。我想使用一些分布式图形数据库作为其他外部应用程序可以使用的状态存储。这是否可能,这需要付出哪些努力,任何人都可以指出 class/code 需要扩展才能实现该功能。
您可以使用处理器 API 实现自定义状态存储,如下所述:
https://docs.confluent.io/current/streams/developer-guide/interactive-queries.html#querying-local-custom-state-stores
- 您的自定义状态存储必须实现 StateStore。
- 您必须有一个界面来表示商店中可用的操作。
- 您必须提供 StoreBuilder 的实现来创建您的商店实例。
- 建议您提供一个限制访问只读操作的接口。这可以防止此 API 的用户带外改变 运行 Kafka Streams 应用程序的状态。
实现看起来像这样:
public class MyCustomStore<K,V> implements StateStore, MyWriteableCustomStore<K,V> {
// implementation of the actual store
}
// Read-write interface for MyCustomStore
public interface MyWriteableCustomStore<K,V> extends MyReadableCustomStore<K,V> {
void write(K Key, V value);
}
// Read-only interface for MyCustomStore
public interface MyReadableCustomStore<K,V> {
V read(K key);
}
public class MyCustomStoreBuilder implements StoreBuilder<MyCustomStore<K,V>> {
// implementation of the supplier for MyCustomStore
}
为了使其可查询;
- 提供 QueryableStoreType 的实现。
- 提供一个包装器 class,它可以访问商店的所有底层实例并用于查询。
示例:
public class MyCustomStoreType<K,V> implements QueryableStoreType<MyReadableCustomStore<K,V>> {
// Only accept StateStores that are of type MyCustomStore
public boolean accepts(final StateStore stateStore) {
return stateStore instanceOf MyCustomStore;
}
public MyReadableCustomStore<K,V> create(final StateStoreProvider storeProvider, final String storeName) {
return new MyCustomStoreTypeWrapper(storeProvider, storeName, this);
}
}
我一直在准备关于 state store 的文档,但我仍然不清楚它是否符合我的目的。我想使用一些分布式图形数据库作为其他外部应用程序可以使用的状态存储。这是否可能,这需要付出哪些努力,任何人都可以指出 class/code 需要扩展才能实现该功能。
您可以使用处理器 API 实现自定义状态存储,如下所述:
https://docs.confluent.io/current/streams/developer-guide/interactive-queries.html#querying-local-custom-state-stores
- 您的自定义状态存储必须实现 StateStore。
- 您必须有一个界面来表示商店中可用的操作。
- 您必须提供 StoreBuilder 的实现来创建您的商店实例。
- 建议您提供一个限制访问只读操作的接口。这可以防止此 API 的用户带外改变 运行 Kafka Streams 应用程序的状态。
实现看起来像这样:
public class MyCustomStore<K,V> implements StateStore, MyWriteableCustomStore<K,V> {
// implementation of the actual store
}
// Read-write interface for MyCustomStore
public interface MyWriteableCustomStore<K,V> extends MyReadableCustomStore<K,V> {
void write(K Key, V value);
}
// Read-only interface for MyCustomStore
public interface MyReadableCustomStore<K,V> {
V read(K key);
}
public class MyCustomStoreBuilder implements StoreBuilder<MyCustomStore<K,V>> {
// implementation of the supplier for MyCustomStore
}
为了使其可查询;
- 提供 QueryableStoreType 的实现。
- 提供一个包装器 class,它可以访问商店的所有底层实例并用于查询。
示例:
public class MyCustomStoreType<K,V> implements QueryableStoreType<MyReadableCustomStore<K,V>> {
// Only accept StateStores that are of type MyCustomStore
public boolean accepts(final StateStore stateStore) {
return stateStore instanceOf MyCustomStore;
}
public MyReadableCustomStore<K,V> create(final StateStoreProvider storeProvider, final String storeName) {
return new MyCustomStoreTypeWrapper(storeProvider, storeName, this);
}
}