如何创建可从现有变更日志主题恢复的状态存储?
How can I create a state store that is restorable from an existing changelog topic?
我正在使用流 DSL 对名为 users
:
的主题进行重复数据删除
topology.addStateStore(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("users"), byteStringSerde, userSerde));
KStream<ByteString, User> users = topology.stream("users", Consumed.with(byteStringSerde, userSerde));
users.transform(() -> new Transformer<ByteString, User, KeyValue<ByteString, User>>() {
private KeyValueStore<ByteString, User> store;
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
store = (KeyValueStore<ByteString, User>) context.getStateStore("users");
}
@Override
public KeyValue<ByteString, User> transform(ByteString key, User value) {
User user = store.get(key);
if (user != null) {
store.put(key, value);
return new KeyValue<>(key, value);
}
return null;
}
@Override
public KeyValue<ByteString, User> punctuate(long timestamp) {
return null;
}
@Override
public void close() {
}
}, "users");
根据此代码,Kafka Streams 为 users
商店创建了一个内部变更日志主题。我想知道,有没有什么方法可以使用现有的 users
主题,而不是创建一个本质上相同的更新日志主题?
PS。我看到 StreamsBuilder
说这是可能的:
However, no internal changelog topic is created since the original input topic can be used for recovery
但是按照 InternalStreamsBuilder#table()
和 InternalStreamsBuilder#createKTable()
的代码,我看不出它是如何实现这种效果的。
并非 DSL 所做的所有事情都可以在处理器 API 级别实现——它使用了一些不属于 public API 的内部构件来实现您描述的内容。
这是对 InternalTopologyBuilder#connectSourceStoreAndTopic()
的调用(参见 InternalStreamsBuilder#table()
)。
对于您关于重复数据删除的用例,您似乎需要两个主题(取决于您应用的重复数据删除逻辑)。通过变更日志主题恢复会进行基于键的更新,因此不会考虑值(这也可能是您的重复数据删除逻辑的一部分)。
我正在使用流 DSL 对名为 users
:
topology.addStateStore(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("users"), byteStringSerde, userSerde));
KStream<ByteString, User> users = topology.stream("users", Consumed.with(byteStringSerde, userSerde));
users.transform(() -> new Transformer<ByteString, User, KeyValue<ByteString, User>>() {
private KeyValueStore<ByteString, User> store;
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
store = (KeyValueStore<ByteString, User>) context.getStateStore("users");
}
@Override
public KeyValue<ByteString, User> transform(ByteString key, User value) {
User user = store.get(key);
if (user != null) {
store.put(key, value);
return new KeyValue<>(key, value);
}
return null;
}
@Override
public KeyValue<ByteString, User> punctuate(long timestamp) {
return null;
}
@Override
public void close() {
}
}, "users");
根据此代码,Kafka Streams 为 users
商店创建了一个内部变更日志主题。我想知道,有没有什么方法可以使用现有的 users
主题,而不是创建一个本质上相同的更新日志主题?
PS。我看到 StreamsBuilder
说这是可能的:
However, no internal changelog topic is created since the original input topic can be used for recovery
但是按照 InternalStreamsBuilder#table()
和 InternalStreamsBuilder#createKTable()
的代码,我看不出它是如何实现这种效果的。
并非 DSL 所做的所有事情都可以在处理器 API 级别实现——它使用了一些不属于 public API 的内部构件来实现您描述的内容。
这是对 InternalTopologyBuilder#connectSourceStoreAndTopic()
的调用(参见 InternalStreamsBuilder#table()
)。
对于您关于重复数据删除的用例,您似乎需要两个主题(取决于您应用的重复数据删除逻辑)。通过变更日志主题恢复会进行基于键的更新,因此不会考虑值(这也可能是您的重复数据删除逻辑的一部分)。