只读取来自 kafka 主题的特定消息

reading only specific messages from kafka topic

场景:

我正在将数据 JSON 对象数据写入 kafka 主题,同时阅读我只想根据消息中存在的值读取一组特定的消息。我正在使用 kafka-python 库。

示例消息:

{flow_status: "completed", value: 1, active: yes}
{flow_status:"failure",value 2, active:yes}

这里我只想阅读 flow_Status 已完成的消息。

您可以创建两个不同的主题;一个用于完成状态,另一个用于失败状态。然后从已完成的主题中读取消息进行处理。

否则,如果您希望它们在一个主题中并且只想阅读已完成的,我相信您需要将它们全部阅读并使用简单的 if-else 条件忽略失败的。

在 Kafka 中不可能做那样的事情。 消费者一个接一个地消费消息,一个接一个地从最新提交的偏移量开始(或者从头开始,或者寻找特定的偏移量)。 取决于您的用例,也许您的场景中可能会有不同的流程:执行流程的消息进入主题,然后处理操作的应用程序,然后将结果(完成或失败)写入两个不同的主题: 这样你就完成了所有的失败。 另一种方法是使用 Kafka Streams 应用程序进行过滤,但考虑到它只是一个糖,实际上,流应用程序将始终读取所有消息,但允许您轻松过滤消息。

Kafka 消费者不预先支持这种功能。您将必须按顺序使用所有事件,过滤出状态已完成的事件并将其放在某个地方。相反,您可以考虑使用 Kafka Streams 应用程序,您可以在其中将数据作为流读取并过滤 flow_status = "completed" 中的事件,并在某些输出主题或其他目标中发布。

示例:

KStream<String,JsonNode> inputStream= builder.stream(inputTopic);
KStream<String,JsonNode> completedFlowStream = inputStream.filter(value-> value.get("flow_status").equals("completed"));

P.S。 Kafka 没有针对 KStream Python API 的官方版本,但有开源项目:https://github.com/wintoncode/winton-kafka-streams

截至今天,无法在代理端实现它,有一个 Jira 功能请求开放给 apache kafka 来实现这个功能,你可以在这里跟踪它,我希望他们能在近期实现这个未来: https://issues.apache.org/jira/browse/KAFKA-6020

我觉得最好的方法是使用 RecordFilterStrategy (Java/spring) 接口并在消费者端对其进行过滤。