如何仅用于集成测试的内存数据库替换 RocksDB?

How to replace RocksDB by in-memory db just for integration tests?

由于 RocksDB 目前仍然不支持 Apple Silicon,目前只能使用 x86_64 JDKs via Rosetta,比原生 JDK 慢 5 倍。 因此,我想用内存中的键值存储替换 RocksDB。 Kafka 如何配置为默认使用这样的内存存储?

storeBuilder添加到StreamsBuilder时,您可以选择构建持久性(rocksdb)或内存存储。

    final var storeBuilder = Stores.windowStoreBuilder(
//        Stores.persistentWindowStore(storeName, Duration.ofMinutes(10), Duration.ofMinutes(1),
//            false),
        Stores.inMemoryWindowStore(storeName, Duration.ofMinutes(10), Duration.ofMinutes(1),
            false),
        Serdes.String(),
        Serdes.String()
    );
    builder.addStateStore(storeBuilder);

确保覆盖测试用例中的 storeBuilder

和jego的回答很像,不过我是和供应商打交道的。 这就是我配置它们的方式:

@Profile("prod || stage || test")
@Configuration
class PersistentStoreConfiguration {
    @Bean
    fun projektanhangStoreSupplier(): KeyValueBytesStoreSupplier = Stores.persistentKeyValueStore(ProjektanhangStore.NAME)
}

@Profile( "it || dev")
@Configuration
class ProjektInMemoryStoreConfiguration {
    @Bean
    fun projektanhangStoreSupplier(): KeyValueBytesStoreSupplier = Stores.inMemoryKeyValueStore(ProjektanhangStore.NAME)
}

这就是根据 spring 配置文件选择的供应商将被注入和使用的位置和方式。注意@Bean 和@Configuration class 名称。

@Configuration
class ProjektAnhangStreamConfiguration {
    @Inject
    private lateinit var projektanhangStoreSupplier: KeyValueBytesStoreSupplier

    @Bean
    fun projektanhaenge() = Consumer<KStream<String, AnhangEvent>> {
        it.map { _, v -> KeyValue(v.anhang.projektId, v) }
            .groupByKey(Grouped.with(Serdes.StringSerde(), JsonSerde(AnhangEvent::class.java)))
            .aggregate(
                { ProjektanhangAggregator() },
                { _, anhangEvent, aggregator ->
                    when (anhangEvent.action) {
                        CREATE -> aggregator.add(anhangEvent.anhang)
                        DELETE -> aggregator.remove(anhangEvent.anhang)
                        UPDATE -> aggregator.update(anhangEvent.anhang)
                    }
                },
                Materialized
                    .`as`<String, ProjektanhangAggregator>(projektanhangStoreSupplier)
                    .withKeySerde(Serdes.String())
                    .withValueSerde(JsonSerde(ProjektanhangAggregator::class.java))
            )
    }
}