Kafka Streams - 使用处理器实现加入 Api
Kafka Streams - Implementing Joining Using the Processor Api
我知道可以使用 dsl api 执行连接。由于各种原因,我们需要使用处理器 api。
如何使用处理器 api 实现加入流。我有一些想法,但不认为它们是正确的。
一个处理器有多个源主题。流程接口的基础对象,然后转换为流程方法内的正确类型。
两个处理器,每个处理器都有自己的源主题。每个处理器都获得对其他处理器状态存储的只读访问权限(如果可能的话)。
任何想法 - 我确实在 KStreamImpl 中找到了连接实现,但我无法遵循。也许是关于 dsl 如何做到这一点的解释?
您建议的两种实现方式都是可行的。 Kafka Stream本身使用5个处理器实现stream-stream join:
source1 ---> "state maintainer 1" --> "joiner 1" ----+
| | |
updates "join lookups" |
| | +-----+
| +------+ |
v | v
"state 1" <------|------+ "merger" -->
| | ^
"state 2" <------+ | |
^ | +-----+
| | |
updates "join lookups" |
| | |
source2 ---> "state maintainer 2" --> "joiner 2" ----+
左右管道对称。两者都有 "state maintainer" 和 "joiner" Processor
。 "State maintainer" 具有对该状态的写入权限。 "Joiner" as read access to other state.最后一步,将两个连接结果流合并在一起。
我知道可以使用 dsl api 执行连接。由于各种原因,我们需要使用处理器 api。
如何使用处理器 api 实现加入流。我有一些想法,但不认为它们是正确的。
一个处理器有多个源主题。流程接口的基础对象,然后转换为流程方法内的正确类型。
两个处理器,每个处理器都有自己的源主题。每个处理器都获得对其他处理器状态存储的只读访问权限(如果可能的话)。
任何想法 - 我确实在 KStreamImpl 中找到了连接实现,但我无法遵循。也许是关于 dsl 如何做到这一点的解释?
您建议的两种实现方式都是可行的。 Kafka Stream本身使用5个处理器实现stream-stream join:
source1 ---> "state maintainer 1" --> "joiner 1" ----+
| | |
updates "join lookups" |
| | +-----+
| +------+ |
v | v
"state 1" <------|------+ "merger" -->
| | ^
"state 2" <------+ | |
^ | +-----+
| | |
updates "join lookups" |
| | |
source2 ---> "state maintainer 2" --> "joiner 2" ----+
左右管道对称。两者都有 "state maintainer" 和 "joiner" Processor
。 "State maintainer" 具有对该状态的写入权限。 "Joiner" as read access to other state.最后一步,将两个连接结果流合并在一起。