如何创建可从现有变更日志主题恢复的状态存储?

How can I create a state store that is restorable from an existing changelog topic?

我正在使用流 DSL 对名为 users:

的主题进行重复数据删除
topology.addStateStore(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("users"), byteStringSerde, userSerde));                                                                                                                                                   

KStream<ByteString, User> users = topology.stream("users", Consumed.with(byteStringSerde, userSerde));                                                                                                                                                                      

users.transform(() -> new Transformer<ByteString, User, KeyValue<ByteString, User>>() {                                                                                                                                                                                     

    private KeyValueStore<ByteString, User> store;                                                                                                                                                                                                                          

    @Override                                                                                                                                                                                                                                                               
    @SuppressWarnings("unchecked")                                                                                                                                                                                                                                          
    public void init(ProcessorContext context) {                                                                                                                                                                                                                            
        store = (KeyValueStore<ByteString, User>) context.getStateStore("users");                                                                                                                                                                                           
    }                                                                                                                                                                                                                                                                       

    @Override                                                                                                                                                                                                                                                               
    public KeyValue<ByteString, User> transform(ByteString key, User value) {                                                                                                                                                                                               
        User user = store.get(key);                                                                                                                                                                                                                                         
        if (user != null) {                                                                                                                                                                                                                                                 
            store.put(key, value);                                                                                                                                                                                                                                          
            return new KeyValue<>(key, value);                                                                                                                                                                                                                              
        }                                                                                                                                                                                                                                                                   
        return null;                                                                                                                                                                                                                                                        
    }                                                                                                                                                                                                                                                                       

    @Override                                                                                                                                                                                                                                                               
    public KeyValue<ByteString, User> punctuate(long timestamp) {                                                                                                                                                                                                           
        return null;                                                                                                                                                                                                                                                        
    }                                                                                                                                                                                                                                                                       

    @Override                                                                                                                                                                                                                                                               
    public void close() {                                                                                                                                                                                                                                                   

    }                                                                                                                                                                                                                                                                       
}, "users"); 

根据此代码,Kafka Streams 为 users 商店创建了一个内部变更日志主题。我想知道,有没有什么方法可以使用现有的 users 主题,而不是创建一个本质上相同的更新日志主题?

PS。我看到 StreamsBuilder 说这是可能的:

However, no internal changelog topic is created since the original input topic can be used for recovery

但是按照 InternalStreamsBuilder#table()InternalStreamsBuilder#createKTable() 的代码,我看不出它是如何实现这种效果的。

并非 DSL 所做的所有事情都可以在处理器 API 级别实现——它使用了一些不属于 public API 的内部构件来实现您描述的内容。

这是对 InternalTopologyBuilder#connectSourceStoreAndTopic() 的调用(参见 InternalStreamsBuilder#table())。

对于您关于重复数据删除的用例,您似乎需要两个主题(取决于您应用的重复数据删除逻辑)。通过变更日志主题恢复会进行基于键的更新,因此不会考虑值(这也可能是您的重复数据删除逻辑的一部分)。