Kafka 流使用来自 dsl api 中调用的处理器的上下文转发

Kafka streams using context forward from processor called in dsl api

我有一个处理器,想在其中调用 context.forward()。但是我觉得我需要为它设置一个接收器主题才能真正转发。如果我使用的是拓扑,我只会使用 .addSource()、.addProcessor()、.addSink()。然而,对于 DSL,我有一个 StreamsBuilder/KStream。从 dsl 调用处理器时是否仍然要使用 context.forward()?

注意:我需要使用处理器而不是转换,因为我有关于何时向下转发记录的自定义逻辑。

stream.process(() -> new WindowAggregatorProcessor(storeName), storeName);

stream.process()是DSL中的终端操作。您可以使用 stream.transform() 来获取输出流。 TransformerProcessor 基本相同。