当我们必须动态获取数据时,Kafka 流丰富
Kafka stream enrichment when we must fetch data on the fly
我们有一连串的传入事件:
{ user: 123, itemid: 'abc', ... }
{ user: 456, itemid: 'abc', ... }
{ user: 123, itemid: 'def', ... }
{ user: 789, itemid: 'xyz', ... }
{ user: 123, itemid: 'xyz', ... }
等等。我们需要通过项目 ID(Web 服务调用)查找的数据来丰富这些,但是
查找可能很慢。另一方面,项目数据是非常静态的,
所以缓存一天不会有问题。
现在,标准的解决方案是将项目数据发布到 KTable 并加入,但是...
我们无法获得所有项目 - 我们从后端系统获得这些,不能只做 "get all"。
并且不会有更新通知。假设项目 ID 是书的 ISBN 号 -
我们不能要求一份世界上所有书籍的清单。而图书信息只会慢慢变化
(例如有人修复分类错误),我们不需要为此类更改重新发布事件。
那么,我们该怎么做呢?
我目前的想法是,我们对 itemid 进行分区,与项目 KTable 进行左连接,然后在值连接器中,
如果正确的值为 null,则执行 Web 服务查找,return 结果并将结果发布到项 KTable 主题。
但这会阻止我们查找 'abc' 两次吗?
就规模而言,我们当前的数据集有大约 150 万个单独的项目。
基本思路似乎不错,但我不会使用 KTable
,而是使用自定义状态 Transformer()
来维护状态。 transform()
只有一个输入流,您可以查找 Transformer
的状态。如果查找失败,您转到 RCP,将其放入存储中,并为您的输出记录进行连接。
我们有一连串的传入事件:
{ user: 123, itemid: 'abc', ... }
{ user: 456, itemid: 'abc', ... }
{ user: 123, itemid: 'def', ... }
{ user: 789, itemid: 'xyz', ... }
{ user: 123, itemid: 'xyz', ... }
等等。我们需要通过项目 ID(Web 服务调用)查找的数据来丰富这些,但是 查找可能很慢。另一方面,项目数据是非常静态的, 所以缓存一天不会有问题。
现在,标准的解决方案是将项目数据发布到 KTable 并加入,但是... 我们无法获得所有项目 - 我们从后端系统获得这些,不能只做 "get all"。 并且不会有更新通知。假设项目 ID 是书的 ISBN 号 - 我们不能要求一份世界上所有书籍的清单。而图书信息只会慢慢变化 (例如有人修复分类错误),我们不需要为此类更改重新发布事件。
那么,我们该怎么做呢? 我目前的想法是,我们对 itemid 进行分区,与项目 KTable 进行左连接,然后在值连接器中, 如果正确的值为 null,则执行 Web 服务查找,return 结果并将结果发布到项 KTable 主题。 但这会阻止我们查找 'abc' 两次吗?
就规模而言,我们当前的数据集有大约 150 万个单独的项目。
基本思路似乎不错,但我不会使用 KTable
,而是使用自定义状态 Transformer()
来维护状态。 transform()
只有一个输入流,您可以查找 Transformer
的状态。如果查找失败,您转到 RCP,将其放入存储中,并为您的输出记录进行连接。