Kafka Streams:动态配置 RocksDb

Kafka Streams: Dynamically Configure RocksDb

我想调整 Kafka Streams 的性能,为此我必须使用 RocksDb 配置值。

我知道我可以使用 StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG 来设置 RocksDB 的配置。如图所示 here.

但我想要一种从配置中动态配置它的方法(以便更改阈值而无需编译和部署我的所有代码。

是否可以选择以某种方式给出具体的实现实例而不是 class 名称? (这将允许设置阈值,例如,使用属性文件)

嗯,您可以在运行时从属性文件中传递配置参数,并在 StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG 中设置 ClassName。这样,您就不必一次又一次地重新编译和重新部署您的代码。相反,在运行时,您将能够传递不同的 2 个属性值。

示例:您可以按如下方式实现 CustomRockDBConfig:

public class CustomRocksDBConfig implements RocksDBConfigSetter {

    public static long blockCacheSize = 50*1024*1024L;
    public static long blockSize = 4096L;
    public static boolean cacheIndexAndFilterBlock = false;

    public static Logger log = Logger.getLogger(CustomRocksDBConfig.class);

    @Override
    public void setConfig(String storeName, Options options, Map<String, Object> configs) {

        BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
        // Reducing default block cache size
        tableConfig.setBlockCacheSize(blockCacheSize);
        // Increasing the block size as default block size is only 4KB
        tableConfig.setBlockSize(blockSize);
        // Index and Filter block
        tableConfig.setCacheIndexAndFilterBlocks(cacheIndexAndFilterBlock);

        options.setTableFormatConfig(tableConfig);
        options.setMaxWriteBufferNumber(2);


    }

}

在设置 StreamsConfig 属性时,添加以下属性。

CustomRocksDBConfig.blockCacheSize = properties.get("blockCacheSize");
CustomRocksDBConfig.blockSize = properties.get("blockSize");
CustomRocksDBConfig.cacheIndexAndFilterBlock = properties.get("cacheIndexAndFilterBlock");
properties.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, CustomRocksDBConfig.class);

无需重新编译代码!它将始终从运行时属性文件中读取值。