kafka 本地状态存储/变更日志中的保留时间
Retention time in kafka local state store / changelog
我正在使用 Kafka 和 Kafka Streams 作为 Spring Cloud Stream 的一部分。在我的 Kafka Streams 应用程序中流动的数据正在特定时间聚合和具体化 windows:
Materialized<String, ErrorScore, WindowStore<Bytes, byte[]>> oneHour = Materialized.as("one-hour-store");
oneHour.withLoggingEnabled(topicConfig);
events
.map(getStringSensorMeasurementKeyValueKeyValueMapper())
.groupByKey()
.windowedBy(TimeWindows.of(oneHourStore.getTimeUnit()))
.reduce((aggValue, newValue) -> getMaxErrorScore(aggValue, newValue),
(oneHour));
按照设计,具体化的信息也由更新日志主题支持。
我们的应用程序还有一个 rest 端点,它将像这样查询 statestore:
ReadOnlyWindowStore<String, Double> windowStore = queryableStoreRegistry.getQueryableStoreType("one-hour-store", QueryableStoreTypes.windowStore());
WindowStoreIterator<ErrorScore> iter = windowStore.fetch(key, from, to);
查看创建的更改日志主题的设置,它显示为:
min.insync.replicas 1
cleanup.policy delete
retention.ms 5259600000
retention.bytes -1
我假设当地的国有商店至少会将信息保存 61 天(约 2 个月)。然而,商店中似乎只剩下最后一天的数据。
什么会导致数据这么快被删除?
更新解决方案
Kafka Streams 版本 2.0.1 不包含 Materialized.withRetention 方法。对于这个特定版本,我能够使用以下解决我的问题的代码设置状态存储的保留时间:
TimeWindows timeWindows = TimeWindows.of(windowSizeMs);
timeWindows.until(retentionMs);
让我的代码写成这样:
...
.groupByKey()
.windowedBy(timeWindows)
.reduce((aggValue, newValue) -> getMaxErrorScore(aggValue, newValue),
(oneHour));
...
对于窗口化 KTable
s,有本地保留时间和变更日志保留时间。您可以通过 Materialized.withRetentionTime(...)
设置本地存储保留时间 -- 默认值为 24 小时。
For older Kafka release, the local store retention time is set via Windows#until()
.
如果创建了新应用程序,则创建的更改日志主题的保留时间与本地存储保留时间相同。但是,如果您手动增加日志保留时间,这不会影响您的存储保留时间,但您需要相应地更新您的代码。当变更日志主题已经存在时也是如此:如果您更改本地存储保留时间,变更日志主题配置不会自动更新。
为此也有一个 Jira:https://issues.apache.org/jira/browse/KAFKA-7591
我正在使用 Kafka 和 Kafka Streams 作为 Spring Cloud Stream 的一部分。在我的 Kafka Streams 应用程序中流动的数据正在特定时间聚合和具体化 windows:
Materialized<String, ErrorScore, WindowStore<Bytes, byte[]>> oneHour = Materialized.as("one-hour-store");
oneHour.withLoggingEnabled(topicConfig);
events
.map(getStringSensorMeasurementKeyValueKeyValueMapper())
.groupByKey()
.windowedBy(TimeWindows.of(oneHourStore.getTimeUnit()))
.reduce((aggValue, newValue) -> getMaxErrorScore(aggValue, newValue),
(oneHour));
按照设计,具体化的信息也由更新日志主题支持。
我们的应用程序还有一个 rest 端点,它将像这样查询 statestore:
ReadOnlyWindowStore<String, Double> windowStore = queryableStoreRegistry.getQueryableStoreType("one-hour-store", QueryableStoreTypes.windowStore());
WindowStoreIterator<ErrorScore> iter = windowStore.fetch(key, from, to);
查看创建的更改日志主题的设置,它显示为:
min.insync.replicas 1
cleanup.policy delete
retention.ms 5259600000
retention.bytes -1
我假设当地的国有商店至少会将信息保存 61 天(约 2 个月)。然而,商店中似乎只剩下最后一天的数据。
什么会导致数据这么快被删除?
更新解决方案 Kafka Streams 版本 2.0.1 不包含 Materialized.withRetention 方法。对于这个特定版本,我能够使用以下解决我的问题的代码设置状态存储的保留时间:
TimeWindows timeWindows = TimeWindows.of(windowSizeMs);
timeWindows.until(retentionMs);
让我的代码写成这样:
...
.groupByKey()
.windowedBy(timeWindows)
.reduce((aggValue, newValue) -> getMaxErrorScore(aggValue, newValue),
(oneHour));
...
对于窗口化 KTable
s,有本地保留时间和变更日志保留时间。您可以通过 Materialized.withRetentionTime(...)
设置本地存储保留时间 -- 默认值为 24 小时。
For older Kafka release, the local store retention time is set via
Windows#until()
.
如果创建了新应用程序,则创建的更改日志主题的保留时间与本地存储保留时间相同。但是,如果您手动增加日志保留时间,这不会影响您的存储保留时间,但您需要相应地更新您的代码。当变更日志主题已经存在时也是如此:如果您更改本地存储保留时间,变更日志主题配置不会自动更新。
为此也有一个 Jira:https://issues.apache.org/jira/browse/KAFKA-7591