Kafka Streams - 处理器 API - 转发到不同的主题
Kafka Streams - Processor API - Forward to different topics
我有一个 Processor-API Processor,它在内部转发到几个独立的接收器(想想一个事件分类器,尽管它在事件之间也有状态逻辑)。我正在考虑稍后加入其中两个主题。加入后,我会将元素的更新(丰富)版本转发到我实际加入的那些主题。
如果在您的处理器 API 中将代码转发到多个接收器(sink1、sink2),这些接收器又被发送到主题,您将如何混合 DSL?
我想你可以创建单独的流,比如
val stream1 = builder.stream(outputTopic)
val stream2 = builder.stream(outputTopic2)
并从那里开始建造?然而,这会产生更多的子拓扑 - 这意味着什么?
另一种可能性是在处理器中拥有自己的状态存储 API 并在同一个处理器中管理它(我实际上正在这样做)。它增加了代码的复杂性,但效率不是更高吗?例如,您可以删除不再使用的数据(一旦建立连接,您可以将新连接的数据转发到接收器,它不再符合连接条件)。还有其他效率陷阱吗?
最简单的方法可能是将处理器 API 与 DSL 混合,方法是从 StreamsBuilder
开始并使用 transform()
StreamsBuilder builder = new StreamsBuilder()
KStream[] streams = builder.stream("input-topic")
.transform(/* put your processor API code here */)
.branch(...);
KStream joined = streams[0].join(streams[1], ...);
也可以先将中间流写入主题再读回。您获得更多子拓扑这一事实应该无关紧要。
可以通过状态手动进行连接,但很难正确编码。如果可能,我建议使用 DSL 中提供的连接运算符。
我有一个 Processor-API Processor,它在内部转发到几个独立的接收器(想想一个事件分类器,尽管它在事件之间也有状态逻辑)。我正在考虑稍后加入其中两个主题。加入后,我会将元素的更新(丰富)版本转发到我实际加入的那些主题。
如果在您的处理器 API 中将代码转发到多个接收器(sink1、sink2),这些接收器又被发送到主题,您将如何混合 DSL?
我想你可以创建单独的流,比如
val stream1 = builder.stream(outputTopic)
val stream2 = builder.stream(outputTopic2)
并从那里开始建造?然而,这会产生更多的子拓扑 - 这意味着什么?
另一种可能性是在处理器中拥有自己的状态存储 API 并在同一个处理器中管理它(我实际上正在这样做)。它增加了代码的复杂性,但效率不是更高吗?例如,您可以删除不再使用的数据(一旦建立连接,您可以将新连接的数据转发到接收器,它不再符合连接条件)。还有其他效率陷阱吗?
最简单的方法可能是将处理器 API 与 DSL 混合,方法是从 StreamsBuilder
开始并使用 transform()
StreamsBuilder builder = new StreamsBuilder()
KStream[] streams = builder.stream("input-topic")
.transform(/* put your processor API code here */)
.branch(...);
KStream joined = streams[0].join(streams[1], ...);
也可以先将中间流写入主题再读回。您获得更多子拓扑这一事实应该无关紧要。
可以通过状态手动进行连接,但很难正确编码。如果可能,我建议使用 DSL 中提供的连接运算符。