将消息加入 Kafka Stream 后从流中清除消息
Purging messages from stream after joining them in Kafka Stream
我正在使用 Kafka Streams 通过密钥加入来自两个不同 Kafka 主题的两种不同类型的消息。我正在使用 Sliding time window。此 window 策略保留来自流的信息,其类型与消息是否加入了某些内容无关。
在输入流吞吐量非常高的情况下,Kafka 创建的用于执行连接的主题会增长得非常快,消耗大量磁盘 space。
是否可以在加入后清除上述主题中的消息?这样,我将假设一条消息最多与另一条具有相同密钥的消息连接一次。
非常感谢。
0.11.0.0 在 AdminClient 中引入了一个新的 API deleteRecords
和一个名为 kafka-delete-records
的脚本,可用于删除给定偏移量之前的所有记录。您可以使用它们清除不再需要的数据。
详情见KIP-107。
更新
从 2.4.0 版本开始,您可以通过 StreamJoined
参数配置流-流连接(参见 https://cwiki.apache.org/confluence/display/KAFKA/KIP-479%3A+Add+StreamJoined+config+object+to+Join)。
您可以通过 Stores
工厂 class 创建一个 WindowedStoreSupplier
,并在传递给 join()
方法的 StreamJoined
对象上指定供应商.
原答案
您可以通过 until()
参数减少保留时间:
stream1.join(stream2, JoinWindows.of(...).until(/*put retention time here*/);
指定的保留时间将用于本地存储以及基础更新日志主题。请注意,如果更改日志主题已经存在,更改 until()
将 不会 更新主题配置——您需要手动更新主题配置。
我正在使用 Kafka Streams 通过密钥加入来自两个不同 Kafka 主题的两种不同类型的消息。我正在使用 Sliding time window。此 window 策略保留来自流的信息,其类型与消息是否加入了某些内容无关。
在输入流吞吐量非常高的情况下,Kafka 创建的用于执行连接的主题会增长得非常快,消耗大量磁盘 space。
是否可以在加入后清除上述主题中的消息?这样,我将假设一条消息最多与另一条具有相同密钥的消息连接一次。
非常感谢。
0.11.0.0 在 AdminClient 中引入了一个新的 API deleteRecords
和一个名为 kafka-delete-records
的脚本,可用于删除给定偏移量之前的所有记录。您可以使用它们清除不再需要的数据。
详情见KIP-107。
更新
从 2.4.0 版本开始,您可以通过 StreamJoined
参数配置流-流连接(参见 https://cwiki.apache.org/confluence/display/KAFKA/KIP-479%3A+Add+StreamJoined+config+object+to+Join)。
您可以通过 Stores
工厂 class 创建一个 WindowedStoreSupplier
,并在传递给 join()
方法的 StreamJoined
对象上指定供应商.
原答案
您可以通过 until()
参数减少保留时间:
stream1.join(stream2, JoinWindows.of(...).until(/*put retention time here*/);
指定的保留时间将用于本地存储以及基础更新日志主题。请注意,如果更改日志主题已经存在,更改 until()
将 不会 更新主题配置——您需要手动更新主题配置。