Kafka 不使用偏移量而是通过记录的字段从主题中删除记录

Kafka delete records from the topic without using offsets but by a field of the record

假设我有一个名为 "batch" 的主题,有 1 个分区,我向它发布了数百万条记录以供处理。我有一个 3 人的消费者组来处理那数百万条记录。我遇到这样一种情况,我不再需要处理满足特定条件的特定消息子集,例如 age < 50

如何以编程方式从主题中删除这些消息。就像我单击 UI 中的 "Cancel" 按钮,它应该从 age < 50 的主题中删除那些记录子集,这样它就不会被消费者处理。

我知道我可以通过 运行 带有偏移量的命令行删除消息:- https://github.com/apache/kafka/blob/trunk/bin/kafka-delete-records.sh

还有 Java API 但同样是偏移量:

https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/admin/AdminClient.html#deleteRecords-java.util.Map-org.apache.kafka.clients.admin.DeleteRecordsOptions-

Delete records whose offset is smaller than the given offset of the corresponding partition

但在我的例子中,我不能使用偏移量,因为我只需要删除某些记录而不是 all records smaller than the given offset

我需要指出的主要一点是,您不应将 Kafka 中的数据视为与数据库中的数据相同的东西。 Kafka 没有被设计成以这种方式工作(例如:当我点击 X 按钮时,Y 记录将被删除)。

相反,您应该将主题视为 never-ending 数据流。为 Kafka 主题生成的每条记录都将由消费者独立消费和处理。

将主题视为流为您提供不同的解决方案:

您可以使用第二个主题,其中包含过滤后的结果!

Streaming Diagram
                            ___ Topic A ____
--  Produced Messages -->  |                |      _______________________
                           |________________| --> |                       |
                                                  | Filtering Application |
                            ___  Topic B ___      |                       |
                           |                | <-- |_______________________|
<-- Consumed Messages --   |________________|

解释很简单,您生成了主题 A 的消息。然后您使用 Filtering Application 将:

  1. 使用主题 A 中的消息
  2. 基于一些业务逻辑(例如:age < 50)将过滤
  3. 将筛选后的消息生成到主题 B

最后,您的消费者将收到来自主题 B 的消息。

现在,在创建过滤应用程序时,您有两种选择:

  1. 使用消费者和生产者实施基本解决方案
  2. 使用Kafka Streams
  3. 使用KSQL

你不能,Kafka 并没有被设计成像数据库一样使用,它实际上是一个 immutable 提交日志。删除记录工具主要用于管理任务。

有一个例外,那就是如果你使用log compaction。如果您有一个紧凑的主题,您可以通过向具有 NULL 值的主题发布记录来删除键的值。压缩主题通常​​像数据库提交日志一样使用,您将它们读入某个下游服务,在那里它像 table 一样具体化。 NULL 值应解析为记录删除。

因此,在您的用例中,您会将主题具体化为针对 SELECT key FROM TABLE WHERE age > 50; 等查询优化的系统,并将每个值为 NULL 的键的记录发布回 Kafka 主题。你甚至可以在主题的开头启动你的消费者并注意哪些记录有 age > 50 并做同样的事情,但效率不会那么高。