在 Kstreams 中创建和访问更新 table
Create and access Updating table within Kstreams
我想访问数据流 "A",根据 table "B" 检查一些条件,最后更新 table "B" 的状态。
(A,B) ====> check conditions in B ====> finally update B
如何在 kafka 流 中实现相同的逻辑?从文档中我观察到,对于流数据 "A" 我们可以使用 Kstreams,对于更新 table "B" 我们可以使用 Ktable。但 kstreams 和 ktable 之间唯一允许的操作是连接,它不会更新 ktable(而是仅将其用作查找)。它还创建了另一个新的 kstream(我不想要)。简而言之,我想访问 Ktable 就像更新 table 参考传入流一样,那么我们如何在 kafka 流中实现相同的?
KTable
的内容由 Streams API 维护,您不能
直接更新一个KTable
。
我假设您阅读了某个主题的 KTable
;为此,您通过写入主题来更新 table,Kafka Streams 将通过读取主题来更新 table。从主题中读取的 KTable
旨在包含主题的内容。
当然,写入主题不会立即更新 KTable
,而是会有一些延迟(这是异步更新)。如果您需要同步更新,则不能使用 KTable
,但可以使用带有附加状态的 .transform()
。
我想访问数据流 "A",根据 table "B" 检查一些条件,最后更新 table "B" 的状态。
(A,B) ====> check conditions in B ====> finally update B
如何在 kafka 流 中实现相同的逻辑?从文档中我观察到,对于流数据 "A" 我们可以使用 Kstreams,对于更新 table "B" 我们可以使用 Ktable。但 kstreams 和 ktable 之间唯一允许的操作是连接,它不会更新 ktable(而是仅将其用作查找)。它还创建了另一个新的 kstream(我不想要)。简而言之,我想访问 Ktable 就像更新 table 参考传入流一样,那么我们如何在 kafka 流中实现相同的?
KTable
的内容由 Streams API 维护,您不能
直接更新一个KTable
。
我假设您阅读了某个主题的 KTable
;为此,您通过写入主题来更新 table,Kafka Streams 将通过读取主题来更新 table。从主题中读取的 KTable
旨在包含主题的内容。
当然,写入主题不会立即更新 KTable
,而是会有一些延迟(这是异步更新)。如果您需要同步更新,则不能使用 KTable
,但可以使用带有附加状态的 .transform()
。