Kafka Streams:动态配置 RocksDb
Kafka Streams: Dynamically Configure RocksDb
我想调整 Kafka Streams 的性能,为此我必须使用 RocksDb 配置值。
我知道我可以使用 StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG
来设置 RocksDB 的配置。如图所示 here.
但我想要一种从配置中动态配置它的方法(以便更改阈值而无需编译和部署我的所有代码。
是否可以选择以某种方式给出具体的实现实例而不是 class 名称? (这将允许设置阈值,例如,使用属性文件)
嗯,您可以在运行时从属性文件中传递配置参数,并在 StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG
中设置 ClassName。这样,您就不必一次又一次地重新编译和重新部署您的代码。相反,在运行时,您将能够传递不同的 2 个属性值。
示例:您可以按如下方式实现 CustomRockDBConfig:
public class CustomRocksDBConfig implements RocksDBConfigSetter {
public static long blockCacheSize = 50*1024*1024L;
public static long blockSize = 4096L;
public static boolean cacheIndexAndFilterBlock = false;
public static Logger log = Logger.getLogger(CustomRocksDBConfig.class);
@Override
public void setConfig(String storeName, Options options, Map<String, Object> configs) {
BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
// Reducing default block cache size
tableConfig.setBlockCacheSize(blockCacheSize);
// Increasing the block size as default block size is only 4KB
tableConfig.setBlockSize(blockSize);
// Index and Filter block
tableConfig.setCacheIndexAndFilterBlocks(cacheIndexAndFilterBlock);
options.setTableFormatConfig(tableConfig);
options.setMaxWriteBufferNumber(2);
}
}
在设置 StreamsConfig 属性时,添加以下属性。
CustomRocksDBConfig.blockCacheSize = properties.get("blockCacheSize");
CustomRocksDBConfig.blockSize = properties.get("blockSize");
CustomRocksDBConfig.cacheIndexAndFilterBlock = properties.get("cacheIndexAndFilterBlock");
properties.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, CustomRocksDBConfig.class);
无需重新编译代码!它将始终从运行时属性文件中读取值。
我想调整 Kafka Streams 的性能,为此我必须使用 RocksDb 配置值。
我知道我可以使用 StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG
来设置 RocksDB 的配置。如图所示 here.
但我想要一种从配置中动态配置它的方法(以便更改阈值而无需编译和部署我的所有代码。
是否可以选择以某种方式给出具体的实现实例而不是 class 名称? (这将允许设置阈值,例如,使用属性文件)
嗯,您可以在运行时从属性文件中传递配置参数,并在 StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG
中设置 ClassName。这样,您就不必一次又一次地重新编译和重新部署您的代码。相反,在运行时,您将能够传递不同的 2 个属性值。
示例:您可以按如下方式实现 CustomRockDBConfig:
public class CustomRocksDBConfig implements RocksDBConfigSetter {
public static long blockCacheSize = 50*1024*1024L;
public static long blockSize = 4096L;
public static boolean cacheIndexAndFilterBlock = false;
public static Logger log = Logger.getLogger(CustomRocksDBConfig.class);
@Override
public void setConfig(String storeName, Options options, Map<String, Object> configs) {
BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
// Reducing default block cache size
tableConfig.setBlockCacheSize(blockCacheSize);
// Increasing the block size as default block size is only 4KB
tableConfig.setBlockSize(blockSize);
// Index and Filter block
tableConfig.setCacheIndexAndFilterBlocks(cacheIndexAndFilterBlock);
options.setTableFormatConfig(tableConfig);
options.setMaxWriteBufferNumber(2);
}
}
在设置 StreamsConfig 属性时,添加以下属性。
CustomRocksDBConfig.blockCacheSize = properties.get("blockCacheSize");
CustomRocksDBConfig.blockSize = properties.get("blockSize");
CustomRocksDBConfig.cacheIndexAndFilterBlock = properties.get("cacheIndexAndFilterBlock");
properties.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, CustomRocksDBConfig.class);
无需重新编译代码!它将始终从运行时属性文件中读取值。