KafkaStreams state store初始化反复创建和删除RocksDB文件夹
KafkaStreams state store initialization repeatedly creates and deletes RocksDB folders
我将 KafkaStreams 0.10.2.1 与 Windowed RocksDB 状态存储一起使用,我在状态存储初始化期间看到了一个非常奇怪的行为。
在每个任务的状态存储文件夹中,KafkaStreams 正在创建和删除包含 RocksDB 文件的文件夹 30 分钟。
如果状态存储名为 XXX,那么我会看到在名为
的文件夹中创建了文件夹
State Folder/Task ID/XXX
名字如
XXX-201710211345
包含 RocksDB 文件。创建这些文件夹,然后删除并创建具有不同时间戳的新文件夹。这将持续 30 分钟,直到消息处理随之而来。
我猜 RocksDB 正在从状态存储的更改日志主题中重建所有历史状态,但我不明白是为了什么目的,因为它最终删除了除最后一个之外的所有内容。
KafkaStreams 创建和删除这些文件夹的原因是什么?
如何让 KafkaStreams 仅重新创建最新状态?
这是我的拓扑的精简版:
stream
.map((key, value) -> KeyValue.pair(key, value))
.through(Serdes.String(), serde, MY_TOPIC)
.groupByKey(Serdes.String(), serde)
.count(TimeWindows.of(TimeUnit.SECONDS.toMillis(windowDurationSec)).until(TimeUnit.SECONDS.toMillis(windowDurationSec) + TimeUnit.SECONDS.toMillis(lateEventGraceTimeSec)), "Hourly_Agg")
.foreach((k, v) -> System.out.println(""));
这是来自 strace 的(一小部分)转储:
6552 stat("/path/Prod/kafka-streams/Counter-V13/3_131/Hourly_Agg/Hourly_Agg-201710211230/000006.sst", {st_mode=S_IFREG|0644, st_size=3158, ...}) = 0
6552 unlink("/path/Prod/kafka-streams/Counter-V13/3_131/Hourly_Agg/Hourly_Agg-201710211230/000006.sst") = 0
6552 unlink("/path/Prod/kafka-streams/Counter-V13/3_131/Hourly_Agg/Hourly_Agg-201710211230") = -1 EISDIR (Is a directory)
6552 rmdir("/path/Prod/kafka-streams/Counter-V13/3_131/Hourly_Agg/Hourly_Agg-201710211230") = 0
6552 stat("/path/Prod/kafka-streams/Counter-V13/3_131/Hourly_Agg", {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0
6552 mkdir("/path/Prod/kafka-streams/Counter-V13/3_131/Hourly_Agg/Hourly_Agg-201710211500", 0755) = 0
6552 rename("/path/Prod/kafka-streams/Counter-V13/3_131/Hourly_Agg/Hourly_Agg-201710211500/LOG", "/path/Prod/kafka-streams/Counter-V13/3_131/Hourly_Agg/Hourly_Agg-201710211500/LOG.old.1508746634575191") = -1 ENOENT (No such file or directory)
Kafka Streams 确实会重新创建最新状态,您看到的行为是设计使然。
对于 windowed 存储,window 保留时间段被分成所谓的段,Streams 每个段使用一个 RocksDB 来存储相应的数据。这允许根据时间进度 "roll" 分段并有效地删除比保留时间更早的数据(即,删除一个洞 segment/RocksDB)。
重新创建状态时,我们只需阅读整个变更日志主题并将所有这些更新应用到商店。因此,您会看到与处理期间相同的段滚动行为(只是在更短的时间范围内)。要"jump"到最后一个状态并不容易,因为没有足够的前期信息——因此,盲目地重放更新日志是最好的选择。
我将 KafkaStreams 0.10.2.1 与 Windowed RocksDB 状态存储一起使用,我在状态存储初始化期间看到了一个非常奇怪的行为。 在每个任务的状态存储文件夹中,KafkaStreams 正在创建和删除包含 RocksDB 文件的文件夹 30 分钟。
如果状态存储名为 XXX,那么我会看到在名为
的文件夹中创建了文件夹State Folder/Task ID/XXX
名字如
XXX-201710211345
包含 RocksDB 文件。创建这些文件夹,然后删除并创建具有不同时间戳的新文件夹。这将持续 30 分钟,直到消息处理随之而来。 我猜 RocksDB 正在从状态存储的更改日志主题中重建所有历史状态,但我不明白是为了什么目的,因为它最终删除了除最后一个之外的所有内容。
KafkaStreams 创建和删除这些文件夹的原因是什么?
如何让 KafkaStreams 仅重新创建最新状态?
这是我的拓扑的精简版:
stream
.map((key, value) -> KeyValue.pair(key, value))
.through(Serdes.String(), serde, MY_TOPIC)
.groupByKey(Serdes.String(), serde)
.count(TimeWindows.of(TimeUnit.SECONDS.toMillis(windowDurationSec)).until(TimeUnit.SECONDS.toMillis(windowDurationSec) + TimeUnit.SECONDS.toMillis(lateEventGraceTimeSec)), "Hourly_Agg")
.foreach((k, v) -> System.out.println(""));
这是来自 strace 的(一小部分)转储:
6552 stat("/path/Prod/kafka-streams/Counter-V13/3_131/Hourly_Agg/Hourly_Agg-201710211230/000006.sst", {st_mode=S_IFREG|0644, st_size=3158, ...}) = 0
6552 unlink("/path/Prod/kafka-streams/Counter-V13/3_131/Hourly_Agg/Hourly_Agg-201710211230/000006.sst") = 0
6552 unlink("/path/Prod/kafka-streams/Counter-V13/3_131/Hourly_Agg/Hourly_Agg-201710211230") = -1 EISDIR (Is a directory)
6552 rmdir("/path/Prod/kafka-streams/Counter-V13/3_131/Hourly_Agg/Hourly_Agg-201710211230") = 0
6552 stat("/path/Prod/kafka-streams/Counter-V13/3_131/Hourly_Agg", {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0
6552 mkdir("/path/Prod/kafka-streams/Counter-V13/3_131/Hourly_Agg/Hourly_Agg-201710211500", 0755) = 0
6552 rename("/path/Prod/kafka-streams/Counter-V13/3_131/Hourly_Agg/Hourly_Agg-201710211500/LOG", "/path/Prod/kafka-streams/Counter-V13/3_131/Hourly_Agg/Hourly_Agg-201710211500/LOG.old.1508746634575191") = -1 ENOENT (No such file or directory)
Kafka Streams 确实会重新创建最新状态,您看到的行为是设计使然。
对于 windowed 存储,window 保留时间段被分成所谓的段,Streams 每个段使用一个 RocksDB 来存储相应的数据。这允许根据时间进度 "roll" 分段并有效地删除比保留时间更早的数据(即,删除一个洞 segment/RocksDB)。
重新创建状态时,我们只需阅读整个变更日志主题并将所有这些更新应用到商店。因此,您会看到与处理期间相同的段滚动行为(只是在更短的时间范围内)。要"jump"到最后一个状态并不容易,因为没有足够的前期信息——因此,盲目地重放更新日志是最好的选择。