Kafka Stream BoundedMemoryRocksDBConfig

Kafka Stream BoundedMemoryRocksDBConfig

我正在尝试了解 Kafka Streams 的内部结构如何在缓存和 RocksDB(状态存储)方面工作。

        KTable<Windowed<EligibilityKey>, String> kTable = kStreamMapValues
                .groupByKey(Grouped.with(keySpecificAvroSerde, Serdes.String())).windowedBy(timeWindows)
                .reduce((a, b) -> b, materialized.withLoggingDisabled().withRetention(Duration.ofSeconds(retention)))
                .suppress(Suppressed.untilTimeLimit(Duration.ofSeconds(timeToWaitForMoreEvents),
                        Suppressed.BufferConfig.unbounded().withLoggingDisabled()));

在我的拓扑的上述部分中,我正在使用具有 300 个分区的 Kafka 主题。该应用程序部署在 OpenShift 上,内存分配为 4GB。我注意到应用程序的内存不断增加,直到最终出现 OOMKILLED。经过一些研究,我了解到自定义 RocksDB 配置是我应该实现的,因为默认大小对我的应用程序来说太大了。记录首先进入缓存(由 CACHE_MAX_BYTES_BUFFERING_CONFIG 和 COMMIT_INTERVAL_MS_CONFIG 配置),然后进入状态存储。

public class BoundedMemoryRocksDBConfig implements RocksDBConfigSetter {

  private static org.rocksdb.Cache cache = new org.rocksdb.LRUCache(1 * 1024 * 1024L, -1, false, 0);
  private static org.rocksdb.WriteBufferManager writeBufferManager = new org.rocksdb.WriteBufferManager(1 * 1024 * 1024L, cache);

  @Override
  public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {

    BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();

    // These three options in combination will limit the memory used by RocksDB to the size passed to the block cache (TOTAL_OFF_HEAP_MEMORY)
    tableConfig.setBlockCache(cache);
    tableConfig.setCacheIndexAndFilterBlocks(true);
    options.setWriteBufferManager(writeBufferManager);

    // These options are recommended to be set when bounding the total memory
    tableConfig.setCacheIndexAndFilterBlocksWithHighPriority(true);
    tableConfig.setPinTopLevelIndexAndFilter(true);
    tableConfig.setBlockSize(2048L);
    options.setMaxWriteBufferNumber(2);
    options.setWriteBufferSize(1 * 1024 * 1024L);

    options.setTableFormatConfig(tableConfig);
  }

  @Override
  public void close(final String storeName, final Options options) {
    // Cache and WriteBufferManager should not be closed here, as the same objects are shared by every store instance.
  }
}

每次 window 段,默认创建三个段。如果我从 300 个分区中消费,由于将为每个分区创建 3 次 window 段,因此将创建 900 个 RocksDB 实例。以下是正确的,我的理解是否正确?

 Memory allocated in OpenShift / RocksDB instances => 4096MB / 900 => 4.55 MB
 
 (WriteBufferSize * MaxWriteBufferNumber) + BlockCache + WriteBufferManager => (1MB * 2) + 1MB + 1MB => 4MB

BoundedMemoryRocksDBConfig.java 是针对每个 RocksDB 实例,还是针对所有实例?

如果你从一个有 300 个分区的主题中消费并且你使用分段状态存储,即你在 DSL 中使用时间 window,你最终会得到 900 个 RocksDB 实例。如果你只使用一个 Kafka Streams 客户端,即不横向扩展,那么所有 900 个 RocksDB 实例最终将在同一个计算节点上。

BoundedMemoryRocksDBConfig 限制了 RocksDB 每个 Kafka Streams 客户端使用的内存。这意味着,如果您只使用一个 Kafka Streams 客户端,BoundedMemoryRocksDBConfig 会限制所有 900 个实例的内存。

Is my understanding correct that the following is true?

Memory allocated in OpenShift / RocksDB instances => 4096MB / 900 => 4.55 MB

(WriteBufferSize * MaxWriteBufferNumber) + BlockCache + WriteBufferManager => (1MB * 2) + 1MB + 1MB => 4MB

不,那是不正确的。

如果将 Cache 传递给 WriteBufferManager,内存表所需的大小也会计入缓存(请参阅 docs of the BoundedMemoryRocksDBConfig and the RocksDB docs 中的脚注 1)。因此,您传递给缓存的大小是内存表和块缓存的限制。由于您将缓存和写入缓冲区管理器传递给同一计算节点上的所有实例,因此所有 900 个实例都受您传递给缓存的大小的限制。例如,如果您指定 4 GB 的大小,则所有 900 个实例(假设一个 Kafka Streams 客户端)使用的总内存限制为 4 GB。

请注意,传递给缓存的大小并非严格限制。尽管缓存构造函数中的布尔参数为您提供了强制执行严格限制的选项,但如果由于 RocksDB 版本 Kafka Streams 中的 bug 而将写入缓冲区内存也计入缓存,则强制执行不起作用使用。

使用 Kafka Streams 2.7.0,您将可以使用通过 JMX 公开的指标来监控 RocksDB 内存消耗。查看 KIP-607 了解更多详情。