Kafka加入存储

Kafka join storage

我使用 Kafka 加入两个流,加入时间为 3 天 window:

    ...
 private final long retentionHours = Duration.ofDays(3);

    ...
    var joinWindow = JoinWindows.of(Duration.ofMinutes(retentionHours))
                                .grace(Duration.ofMillis(0));
    var joinStores = StreamJoined.with(Serdes.String(), aggregatorSerde, aggregatorSerde)
                                 .withStoreName("STORE-1")
                                 .withName("STORE-2");
    stream1.join(stream2, streamJoiner(), joinWindow, joinStores);

通过上面的实现,我发现 Kafka 创建了状态文件夹:/tmp/kafka-streams,(看起来像 RocksDB)和 它不断增长。 此外,Kafka 集群中的状态存储 不断增长

所以,我将流连接实现更改为:

...
private final long retentionHours = Duration.ofDays(3);
    ...
    var joinWindow = JoinWindows.of(Duration.ofMinutes(retentionHours))
                                .grace(Duration.ofMillis(0));
    var joinStores = StreamJoined.with(Serdes.String(), aggregatorSerde, aggregatorSerde)
                                 .withStoreName("STORE-1")
                                 .withName("STORE-2")
                                 .withThisStoreSupplier(createStoreSupplier("MEM-STORE-1"))
                                 .withOtherStoreSupplier(createStoreSupplier("MEM-STORE-2"));
    stream1.join(stream2, streamJoiner(), joinWindow, joinStores);

...
private WindowBytesStoreSupplier createStoreSupplier(String storeName) {
    var window = Duration.ofMinutes(retentionHours * 2)
                         .toMillis();
    return new InMemoryWindowBytesStoreSupplier(storeName, window, window, true);
}

现在,没有状态文件夹:/tmp/kafka-streams.

这是否意味着 InMemoryWindowBytesStoreSupplier 根本不使用磁盘? 如果是,它是如何工作的?

此外,我仍然看到 Kafka 集群中的状态存储不断增长

Does it mean that InMemoryWindowBytesStoreSupplier doesn't use disk at all? If yes, how does it work?

IIRC,InMemoryWindowBytesStore 根本不使用磁盘。

一般来说,逻辑状态存储实际上被划分为多个状态存储'instances'(想想:每个流任务都有自己的本地状态存储实例)。具体来说,对于 InMemoryWindowBytesStore,根据设计,这些存储实例管理内存中的所有本地数据。

Also, I still see that state store in Kafka cluster grows constantly.

然而,InMemoryWindowBytesStore 仍然是 fault-tolerant。 这常常让新的 Kafka Streams 开发人员感到困惑,因为在大多数软件中,“在内存中”总是暗示“如果发生某些事情,数据将丢失”。然而,Kafka Streams 并非如此。无论您使用默认状态存储(使用 RocksDB)还是 in-memory 状态存储,状态存储始终是其 Kafka 更新日志主题的 'backed up' 持久性。 这解释了为什么您在 Kafka 集群中看到 in-memory 状态的(更新日志)数据 数据不应该永远增长,顺便说一句,因为变更日志主题是 compacted 以防止这种情况。

注意:但是,当使用 in-memory 存储时,您的应用程序实例可能 运行 内存不足 (OOM),从而崩溃。虽然您的状态数据永远不会丢失,如上所述,您的应用程序不会由于 OOM 崩溃而 运行ning/它只会部分 运行(某些应用程序实例 运行 OOM,其他人没有)。此 OOM 问题不适用于默认存储 (RocksDB),因为它在磁盘上管理其数据,并且仅将内存 (RAM) 用于缓存目的。但是,同样,这个应用程序可用性问题与数据安全是正交的(无论您的应用程序是否崩溃,您的数据都是安全的)。