可能由于 ktable 相关状态存储导致内存不足异常

Getting Out of Memory exception possibly due to the ktable related state store

我们有一个 kstreams 应用程序在执行 kstream-kstable 内连接。这两个主题都是高容量的,每个都有 256 个分区。 kstreams 应用程序现在部署在 8 个节点上,每个节点有 8 GB 堆。我们看到堆内存不断增长,最终发生OOM。我无法将堆转储作为其 运行 在发生这种情况时被杀死的容器中。但是,我已经尝试了一些事情来确信它与状态 stores/ktable 相关的东西有关。如果没有下面的 RocksDBConfigSetter 内存会很快用完,但是下面的内存会在一定程度上减慢。需要一些指导才能继续,谢谢

我添加了以下 3 个属性,

properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 1 * 1024L);
           properties.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, CustomRocksDBConfig.class);
           properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);


public static class CustomRocksDBConfig implements RocksDBConfigSetter {

       
        private org.rocksdb.Cache cache = new org.rocksdb.LRUCache(1 * 1024L * 1024L);

        @Override
        public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
            log.info("In CustomRocksDBConfig");

            BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();
            tableConfig.setBlockCache(cache);

            tableConfig.setBlockSize(1 * 1024L);

            tableConfig.setCacheIndexAndFilterBlocks(true);
            options.setTableFormatConfig(tableConfig);

            options.setMaxWriteBufferNumber(2);
        }

        @Override
        public void close(final String storeName, final Options options) {

            cache.close();
        } 

您可以尝试在一个节点上的所有 RocksDB 实例中限制 RocksDB 的内存使用。为此,您必须配置 RocksDB 以在块缓存中缓存索引和过滤块,通过共享 WriteBufferManager 限制 memtable 内存并将其内存计入块缓存,然后传递相同的 Cache对象到每个实例。您可以在

下找到更多详细信息和示例配置

https://kafka.apache.org/28/documentation/streams/developer-guide/memory-mgmt.html#rocksdb

通过这样的设置,您可以为单个实例上所有 RocksDB 状态存储使用的总堆指定一个软上限(示例配置中的 TOTAL_OFF_HEAP_MEMORY),然后指定该堆有多少是用于写入和读取单个节点上的状态存储(示例配置中分别为 TOTAL_MEMTABLE_MEMORY 和 INDEX_FILTER_BLOCK_RATIO)。

由于所有值都是特定于应用程序和工作负载的,因此您需要对它们进行试验并使用 metrics provided by Kafka Streams.

监控 RocksDB 状态存储

有关如何处理 Kafka Streams 中的 RocksDB 问题的指南,请参见:

https://www.confluent.io/blog/how-to-tune-rocksdb-kafka-streams-state-stores-performance/

特别是对于您的情况,以下部分可能很有趣:

https://www.confluent.io/blog/how-to-tune-rocksdb-kafka-streams-state-stores-performance/#high-memory-usage