将 KTable 聚合动态物化到不同的状态存储
Dynamically materialize KTable aggregate to different state stores
有没有办法动态选择将 KTable 实体化到哪个状态存储?目前名称是 MonthlyAggregates.NAME
并且我想在每个月在以当前月份命名的状态存储中具体化,例如2020-JULY
。但是由于拓扑将在系统启动时读取和构建,因此只会使用初始化期间使用的状态存储。
@Bean
fun leistung() =
Consumer<KStream<ByteArray, ByteArray>> {
it
.transform(TransformerSupplier { EventTypeAwareTransformer(EVENT_TYPE_MAPPING, objectMapper) })
.map { _, v -> KeyValue(createKey(v.payload as VerrechenbareLeistungPerformedEvent), v.payload) }
.peek { key, value -> validateTechnicalLocation(key, value) }
.transform(TransformerSupplier { DuplicateFilteringTransformer() }, LeistungEvents.NAME)
.groupByKey(Grouped.with(Serdes.StringSerde(), JsonSerde(VerrechenbareLeistungPerformedEvent::class.java)))
.aggregate(
{ ItemAggregator() },
{ _, event, aggregator -> aggregator.addItem(event) },
Named.`as`("aggregate"),
Materialized.`as`<String, ItemAggregator, KeyValueStore<Bytes, ByteArray>>(MonthlyAggregates.NAME)
.withKeySerde(Serdes.String())
.withValueSerde(JsonSerde(ItemAggregator::class.java))
)
}
我也尝试通过利用处理器 API 动态添加状态存储:
class DuplicateFilteringProcessor(private val topology: Topology) : Processor<String, VerrechenbareLeistungPerformedEvent> {
private lateinit var processorContext: ProcessorContext
private lateinit var stateStore: KeyValueStore<String, VerrechenbareLeistungPerformedEvent>
override fun init(context: ProcessorContext) {
processorContext = context
val storeBuilder = KeyValueStoreBuilder<String, VerrechenbareLeistungPerformedEvent>(
Stores.persistentKeyValueStore(LeistungEvents.NAME),
Serdes.StringSerde(),
JsonSerde(VerrechenbareLeistungPerformedEvent::class.java),
Time.SYSTEM
)
topology.addStateStore(storeBuilder, "filter_duplicates")
topology.connectProcessorAndStateStores("filter_duplicates", LeistungEvents.NAME)
val store = storeBuilder.build()
processorContext.register(store) { _, _ -> }
stateStore = processorContext.getStateStore(LeistungEvents.NAME) as KeyValueStore<String, VerrechenbareLeistungPerformedEvent>
}
override fun process(key: String, value: VerrechenbareLeistungPerformedEvent) {
val eventInStateStore = stateStore.get(value.businessId)
if (eventInStateStore == null) {
stateStore.putIfAbsent(value.businessId, value)
processorContext.forward(key, value)
} else {
logger().error("""Event with businessId ${value.businessId} has already been processed
Event in state store: $eventInStateStore
Event just received: $value""".trimIndent())
}
// processorContext.commit()
}
override fun close() {
// stateStore closing will be managed by Kafka/Spring
}
}
但在这种情况下,状态存储不会转换为打开状态,并且任何请求都会导致 NPE。
不支持。无法重命名状态存储。
有没有办法动态选择将 KTable 实体化到哪个状态存储?目前名称是 MonthlyAggregates.NAME
并且我想在每个月在以当前月份命名的状态存储中具体化,例如2020-JULY
。但是由于拓扑将在系统启动时读取和构建,因此只会使用初始化期间使用的状态存储。
@Bean
fun leistung() =
Consumer<KStream<ByteArray, ByteArray>> {
it
.transform(TransformerSupplier { EventTypeAwareTransformer(EVENT_TYPE_MAPPING, objectMapper) })
.map { _, v -> KeyValue(createKey(v.payload as VerrechenbareLeistungPerformedEvent), v.payload) }
.peek { key, value -> validateTechnicalLocation(key, value) }
.transform(TransformerSupplier { DuplicateFilteringTransformer() }, LeistungEvents.NAME)
.groupByKey(Grouped.with(Serdes.StringSerde(), JsonSerde(VerrechenbareLeistungPerformedEvent::class.java)))
.aggregate(
{ ItemAggregator() },
{ _, event, aggregator -> aggregator.addItem(event) },
Named.`as`("aggregate"),
Materialized.`as`<String, ItemAggregator, KeyValueStore<Bytes, ByteArray>>(MonthlyAggregates.NAME)
.withKeySerde(Serdes.String())
.withValueSerde(JsonSerde(ItemAggregator::class.java))
)
}
我也尝试通过利用处理器 API 动态添加状态存储:
class DuplicateFilteringProcessor(private val topology: Topology) : Processor<String, VerrechenbareLeistungPerformedEvent> {
private lateinit var processorContext: ProcessorContext
private lateinit var stateStore: KeyValueStore<String, VerrechenbareLeistungPerformedEvent>
override fun init(context: ProcessorContext) {
processorContext = context
val storeBuilder = KeyValueStoreBuilder<String, VerrechenbareLeistungPerformedEvent>(
Stores.persistentKeyValueStore(LeistungEvents.NAME),
Serdes.StringSerde(),
JsonSerde(VerrechenbareLeistungPerformedEvent::class.java),
Time.SYSTEM
)
topology.addStateStore(storeBuilder, "filter_duplicates")
topology.connectProcessorAndStateStores("filter_duplicates", LeistungEvents.NAME)
val store = storeBuilder.build()
processorContext.register(store) { _, _ -> }
stateStore = processorContext.getStateStore(LeistungEvents.NAME) as KeyValueStore<String, VerrechenbareLeistungPerformedEvent>
}
override fun process(key: String, value: VerrechenbareLeistungPerformedEvent) {
val eventInStateStore = stateStore.get(value.businessId)
if (eventInStateStore == null) {
stateStore.putIfAbsent(value.businessId, value)
processorContext.forward(key, value)
} else {
logger().error("""Event with businessId ${value.businessId} has already been processed
Event in state store: $eventInStateStore
Event just received: $value""".trimIndent())
}
// processorContext.commit()
}
override fun close() {
// stateStore closing will be managed by Kafka/Spring
}
}
但在这种情况下,状态存储不会转换为打开状态,并且任何请求都会导致 NPE。
不支持。无法重命名状态存储。