Kafka 不使用偏移量而是通过记录的字段从主题中删除记录
Kafka delete records from the topic without using offsets but by a field of the record
假设我有一个名为 "batch" 的主题,有 1 个分区,我向它发布了数百万条记录以供处理。我有一个 3 人的消费者组来处理那数百万条记录。我遇到这样一种情况,我不再需要处理满足特定条件的特定消息子集,例如 age < 50
如何以编程方式从主题中删除这些消息。就像我单击 UI 中的 "Cancel" 按钮,它应该从 age < 50
的主题中删除那些记录子集,这样它就不会被消费者处理。
我知道我可以通过 运行 带有偏移量的命令行删除消息:- https://github.com/apache/kafka/blob/trunk/bin/kafka-delete-records.sh
还有 Java API 但同样是偏移量:
Delete records whose offset is smaller than the given offset of the corresponding partition
但在我的例子中,我不能使用偏移量,因为我只需要删除某些记录而不是 all records smaller than the given offset
我需要指出的主要一点是,您不应将 Kafka 中的数据视为与数据库中的数据相同的东西。 Kafka 没有被设计成以这种方式工作(例如:当我点击 X 按钮时,Y 记录将被删除)。
相反,您应该将主题视为 never-ending 数据流。为 Kafka 主题生成的每条记录都将由消费者独立消费和处理。
将主题视为流为您提供不同的解决方案:
您可以使用第二个主题,其中包含过滤后的结果!
Streaming Diagram
___ Topic A ____
-- Produced Messages --> | | _______________________
|________________| --> | |
| Filtering Application |
___ Topic B ___ | |
| | <-- |_______________________|
<-- Consumed Messages -- |________________|
解释很简单,您生成了主题 A 的消息。然后您使用 Filtering Application
将:
- 使用主题 A 中的消息
- 基于一些业务逻辑(例如:
age < 50
)将过滤
- 将筛选后的消息生成到主题 B
最后,您的消费者将收到来自主题 B 的消息。
现在,在创建过滤应用程序时,您有两种选择:
- 使用消费者和生产者实施基本解决方案
- 使用Kafka Streams
- 使用KSQL
你不能,Kafka 并没有被设计成像数据库一样使用,它实际上是一个 immutable 提交日志。删除记录工具主要用于管理任务。
有一个例外,那就是如果你使用log compaction。如果您有一个紧凑的主题,您可以通过向具有 NULL
值的主题发布记录来删除键的值。压缩主题通常像数据库提交日志一样使用,您将它们读入某个下游服务,在那里它像 table 一样具体化。 NULL
值应解析为记录删除。
因此,在您的用例中,您会将主题具体化为针对 SELECT key FROM TABLE WHERE age > 50;
等查询优化的系统,并将每个值为 NULL
的键的记录发布回 Kafka 主题。你甚至可以在主题的开头启动你的消费者并注意哪些记录有 age > 50
并做同样的事情,但效率不会那么高。
假设我有一个名为 "batch" 的主题,有 1 个分区,我向它发布了数百万条记录以供处理。我有一个 3 人的消费者组来处理那数百万条记录。我遇到这样一种情况,我不再需要处理满足特定条件的特定消息子集,例如 age < 50
如何以编程方式从主题中删除这些消息。就像我单击 UI 中的 "Cancel" 按钮,它应该从 age < 50
的主题中删除那些记录子集,这样它就不会被消费者处理。
我知道我可以通过 运行 带有偏移量的命令行删除消息:- https://github.com/apache/kafka/blob/trunk/bin/kafka-delete-records.sh
还有 Java API 但同样是偏移量:
Delete records whose offset is smaller than the given offset of the corresponding partition
但在我的例子中,我不能使用偏移量,因为我只需要删除某些记录而不是 all records smaller than the given offset
我需要指出的主要一点是,您不应将 Kafka 中的数据视为与数据库中的数据相同的东西。 Kafka 没有被设计成以这种方式工作(例如:当我点击 X 按钮时,Y 记录将被删除)。
相反,您应该将主题视为 never-ending 数据流。为 Kafka 主题生成的每条记录都将由消费者独立消费和处理。
将主题视为流为您提供不同的解决方案:
您可以使用第二个主题,其中包含过滤后的结果!
Streaming Diagram
___ Topic A ____
-- Produced Messages --> | | _______________________
|________________| --> | |
| Filtering Application |
___ Topic B ___ | |
| | <-- |_______________________|
<-- Consumed Messages -- |________________|
解释很简单,您生成了主题 A 的消息。然后您使用 Filtering Application
将:
- 使用主题 A 中的消息
- 基于一些业务逻辑(例如:
age < 50
)将过滤 - 将筛选后的消息生成到主题 B
最后,您的消费者将收到来自主题 B 的消息。
现在,在创建过滤应用程序时,您有两种选择:
- 使用消费者和生产者实施基本解决方案
- 使用Kafka Streams
- 使用KSQL
你不能,Kafka 并没有被设计成像数据库一样使用,它实际上是一个 immutable 提交日志。删除记录工具主要用于管理任务。
有一个例外,那就是如果你使用log compaction。如果您有一个紧凑的主题,您可以通过向具有 NULL
值的主题发布记录来删除键的值。压缩主题通常像数据库提交日志一样使用,您将它们读入某个下游服务,在那里它像 table 一样具体化。 NULL
值应解析为记录删除。
因此,在您的用例中,您会将主题具体化为针对 SELECT key FROM TABLE WHERE age > 50;
等查询优化的系统,并将每个值为 NULL
的键的记录发布回 Kafka 主题。你甚至可以在主题的开头启动你的消费者并注意哪些记录有 age > 50
并做同样的事情,但效率不会那么高。