使用 Kafka Streams 进行简单分类
Simple classification with Kafka Streams
我目前正在尝试寻找一种直接且高效的方法来使用 Kafka Streams 对记录进行分类。
所有记录至少包含一个 id
和一个 failed
属性.
(id
只是一个字符串,failed
是布尔值)
想法是,一开始,将所有传入记录分类为 "message"。
一旦其中一条传入记录设置了失败字段,这应该是 "persisted" 某处并且该记录应分类为 "failure".
从现在开始,每个具有相同 id
的传入记录也应归类为 "failure",无论是否设置了 failed
属性。
我正在考虑使用 Kafka Streams 的内部状态存储(连同交互式查询功能)或每次有记录进入时都会查询的外部数据库。我认为 Kafka 本身的状态存储听起来像是一个更轻量级的解决方案。
这是一个小概念草图,希望能帮助理解问题。
有人知道如何以正确的方式解决这个问题吗?
谢谢
一切顺利
-蒂姆
我觉得你的方法不错。不要认为你需要 IQ 功能。只需定义一个自定义 Transformer
并为其附加一个键值存储。在处理过程中,如果您收到带有 failed=true
的消息,您会将 ID 放入商店。对于带有 failed=false
的每条传入消息,您还检查商店以检查是否存在具有相同 ID 的先前失败消息。
要保留失败的消息,您只需将流分成两部分(可以使用 branch()
并将 failed
消息写入一个特殊主题。
我目前正在尝试寻找一种直接且高效的方法来使用 Kafka Streams 对记录进行分类。
所有记录至少包含一个 id
和一个 failed
属性.
(id
只是一个字符串,failed
是布尔值)
想法是,一开始,将所有传入记录分类为 "message"。
一旦其中一条传入记录设置了失败字段,这应该是 "persisted" 某处并且该记录应分类为 "failure".
从现在开始,每个具有相同 id
的传入记录也应归类为 "failure",无论是否设置了 failed
属性。
我正在考虑使用 Kafka Streams 的内部状态存储(连同交互式查询功能)或每次有记录进入时都会查询的外部数据库。我认为 Kafka 本身的状态存储听起来像是一个更轻量级的解决方案。
这是一个小概念草图,希望能帮助理解问题。
有人知道如何以正确的方式解决这个问题吗?
谢谢 一切顺利 -蒂姆
我觉得你的方法不错。不要认为你需要 IQ 功能。只需定义一个自定义 Transformer
并为其附加一个键值存储。在处理过程中,如果您收到带有 failed=true
的消息,您会将 ID 放入商店。对于带有 failed=false
的每条传入消息,您还检查商店以检查是否存在具有相同 ID 的先前失败消息。
要保留失败的消息,您只需将流分成两部分(可以使用 branch()
并将 failed
消息写入一个特殊主题。