如何仅用于集成测试的内存数据库替换 RocksDB?
How to replace RocksDB by in-memory db just for integration tests?
由于 RocksDB 目前仍然不支持 Apple Silicon,目前只能使用 x86_64 JDKs via Rosetta,比原生 JDK 慢 5 倍。
因此,我想用内存中的键值存储替换 RocksDB。
Kafka 如何配置为默认使用这样的内存存储?
将storeBuilder
添加到StreamsBuilder
时,您可以选择构建持久性(rocksdb)或内存存储。
final var storeBuilder = Stores.windowStoreBuilder(
// Stores.persistentWindowStore(storeName, Duration.ofMinutes(10), Duration.ofMinutes(1),
// false),
Stores.inMemoryWindowStore(storeName, Duration.ofMinutes(10), Duration.ofMinutes(1),
false),
Serdes.String(),
Serdes.String()
);
builder.addStateStore(storeBuilder);
确保覆盖测试用例中的 storeBuilder
。
和jego的回答很像,不过我是和供应商打交道的。
这就是我配置它们的方式:
@Profile("prod || stage || test")
@Configuration
class PersistentStoreConfiguration {
@Bean
fun projektanhangStoreSupplier(): KeyValueBytesStoreSupplier = Stores.persistentKeyValueStore(ProjektanhangStore.NAME)
}
@Profile( "it || dev")
@Configuration
class ProjektInMemoryStoreConfiguration {
@Bean
fun projektanhangStoreSupplier(): KeyValueBytesStoreSupplier = Stores.inMemoryKeyValueStore(ProjektanhangStore.NAME)
}
这就是根据 spring 配置文件选择的供应商将被注入和使用的位置和方式。注意@Bean 和@Configuration class 名称。
@Configuration
class ProjektAnhangStreamConfiguration {
@Inject
private lateinit var projektanhangStoreSupplier: KeyValueBytesStoreSupplier
@Bean
fun projektanhaenge() = Consumer<KStream<String, AnhangEvent>> {
it.map { _, v -> KeyValue(v.anhang.projektId, v) }
.groupByKey(Grouped.with(Serdes.StringSerde(), JsonSerde(AnhangEvent::class.java)))
.aggregate(
{ ProjektanhangAggregator() },
{ _, anhangEvent, aggregator ->
when (anhangEvent.action) {
CREATE -> aggregator.add(anhangEvent.anhang)
DELETE -> aggregator.remove(anhangEvent.anhang)
UPDATE -> aggregator.update(anhangEvent.anhang)
}
},
Materialized
.`as`<String, ProjektanhangAggregator>(projektanhangStoreSupplier)
.withKeySerde(Serdes.String())
.withValueSerde(JsonSerde(ProjektanhangAggregator::class.java))
)
}
}
由于 RocksDB 目前仍然不支持 Apple Silicon,目前只能使用 x86_64 JDKs via Rosetta,比原生 JDK 慢 5 倍。 因此,我想用内存中的键值存储替换 RocksDB。 Kafka 如何配置为默认使用这样的内存存储?
将storeBuilder
添加到StreamsBuilder
时,您可以选择构建持久性(rocksdb)或内存存储。
final var storeBuilder = Stores.windowStoreBuilder(
// Stores.persistentWindowStore(storeName, Duration.ofMinutes(10), Duration.ofMinutes(1),
// false),
Stores.inMemoryWindowStore(storeName, Duration.ofMinutes(10), Duration.ofMinutes(1),
false),
Serdes.String(),
Serdes.String()
);
builder.addStateStore(storeBuilder);
确保覆盖测试用例中的 storeBuilder
。
和jego的回答很像,不过我是和供应商打交道的。 这就是我配置它们的方式:
@Profile("prod || stage || test")
@Configuration
class PersistentStoreConfiguration {
@Bean
fun projektanhangStoreSupplier(): KeyValueBytesStoreSupplier = Stores.persistentKeyValueStore(ProjektanhangStore.NAME)
}
@Profile( "it || dev")
@Configuration
class ProjektInMemoryStoreConfiguration {
@Bean
fun projektanhangStoreSupplier(): KeyValueBytesStoreSupplier = Stores.inMemoryKeyValueStore(ProjektanhangStore.NAME)
}
这就是根据 spring 配置文件选择的供应商将被注入和使用的位置和方式。注意@Bean 和@Configuration class 名称。
@Configuration
class ProjektAnhangStreamConfiguration {
@Inject
private lateinit var projektanhangStoreSupplier: KeyValueBytesStoreSupplier
@Bean
fun projektanhaenge() = Consumer<KStream<String, AnhangEvent>> {
it.map { _, v -> KeyValue(v.anhang.projektId, v) }
.groupByKey(Grouped.with(Serdes.StringSerde(), JsonSerde(AnhangEvent::class.java)))
.aggregate(
{ ProjektanhangAggregator() },
{ _, anhangEvent, aggregator ->
when (anhangEvent.action) {
CREATE -> aggregator.add(anhangEvent.anhang)
DELETE -> aggregator.remove(anhangEvent.anhang)
UPDATE -> aggregator.update(anhangEvent.anhang)
}
},
Materialized
.`as`<String, ProjektanhangAggregator>(projektanhangStoreSupplier)
.withKeySerde(Serdes.String())
.withValueSerde(JsonSerde(ProjektanhangAggregator::class.java))
)
}
}