如何使用标点符号从状态存储中删除旧记录? (卡夫卡)
How to remove old records from a state store using a punctuator? (Kafka)
我使用 streamsBuilder.table("myTopic")
为主题创建了一个 Ktable
,我将其具体化为状态存储,以便我可以使用交互式查询。
每个小时,我都想从此状态存储(和关联的变更日志主题)中删除其值在过去一小时内未更新的记录。
我相信这可以使用 punctuator,但到目前为止我只使用过 DSL,所以不确定如何进行。如果有人能给我一个例子,我将不胜感激。
谢谢,
杰克
可以将处理器 API 与 DSL 混合搭配,但不能处理 KTable。您需要转换为 KStream。或者,您可以使用与状态存储交互的处理器创建新拓扑。
您需要将该状态存储在某处——如何确定记录是否超过一小时。一种选择是为状态存储中的每条记录添加时间戳。
在 Processor 的 init 方法中,您可以调用 schedule (punctuate) 来迭代状态存储中的记录并删除旧的:
context.schedule(Duration.ofMillis(everyHourInMillis), PunctuationType.WALL_CLOCK_TIME, timestamp -> {
myStateStore.all().forEachRemaining(keyValue -> {
if (Instant.ofEpochMilli(valueInStateStore).compareTo(olderThanAnHour) < 0) {
myStateStore.delete(keyValue.key);
}
});
});
我使用 streamsBuilder.table("myTopic")
为主题创建了一个 Ktable
,我将其具体化为状态存储,以便我可以使用交互式查询。
每个小时,我都想从此状态存储(和关联的变更日志主题)中删除其值在过去一小时内未更新的记录。
我相信这可以使用 punctuator,但到目前为止我只使用过 DSL,所以不确定如何进行。如果有人能给我一个例子,我将不胜感激。
谢谢,
杰克
可以将处理器 API 与 DSL 混合搭配,但不能处理 KTable。您需要转换为 KStream。或者,您可以使用与状态存储交互的处理器创建新拓扑。
您需要将该状态存储在某处——如何确定记录是否超过一小时。一种选择是为状态存储中的每条记录添加时间戳。
在 Processor 的 init 方法中,您可以调用 schedule (punctuate) 来迭代状态存储中的记录并删除旧的:
context.schedule(Duration.ofMillis(everyHourInMillis), PunctuationType.WALL_CLOCK_TIME, timestamp -> {
myStateStore.all().forEachRemaining(keyValue -> {
if (Instant.ofEpochMilli(valueInStateStore).compareTo(olderThanAnHour) < 0) {
myStateStore.delete(keyValue.key);
}
});
});