从 Spring Cloud Streams Kafka Stream 应用程序中的处理器写入主题
Writing to a topic from a Processor in a Spring Cloud Streams Kafka Stream application
我正在使用处理器 API 对状态存储进行一些低级处理。关键是我还需要在存储到商店后写入一个主题。如何在 Spring Cloud Streams Kafka 应用程序中完成?
@Bean
fun processEvent() = Consumer<KStream<EventId, EventValue>> { event ->
event.map{
...
}.process(ProcessorSupplier {
object : Processor<EventId, MappedEventValue> {
private lateinit var store: KeyValueStore<EventId, MappedEventValue>
override fun init(context: ProcessorContext) {
store = context.getStateStore("event-store") as KeyValueStore<EventId, MappedEventValue>
}
override fun process(key: EventId, value: MappedEventValue) {
...
store.put(key, processedMappedEventValue)
//TODO Write into a topic
}
}
}
}
你不能。 process()
方法是一种终端操作,不允许您向下游发送数据。相反,您可以使用 transform()
(它与 process()
基本相同,但允许您向下游发送数据);或者根据您的应用,transformValues()
或 flatTransform()
等
使用 transform()
你得到 KStream
回来,你可以写入主题。
我正在使用处理器 API 对状态存储进行一些低级处理。关键是我还需要在存储到商店后写入一个主题。如何在 Spring Cloud Streams Kafka 应用程序中完成?
@Bean
fun processEvent() = Consumer<KStream<EventId, EventValue>> { event ->
event.map{
...
}.process(ProcessorSupplier {
object : Processor<EventId, MappedEventValue> {
private lateinit var store: KeyValueStore<EventId, MappedEventValue>
override fun init(context: ProcessorContext) {
store = context.getStateStore("event-store") as KeyValueStore<EventId, MappedEventValue>
}
override fun process(key: EventId, value: MappedEventValue) {
...
store.put(key, processedMappedEventValue)
//TODO Write into a topic
}
}
}
}
你不能。 process()
方法是一种终端操作,不允许您向下游发送数据。相反,您可以使用 transform()
(它与 process()
基本相同,但允许您向下游发送数据);或者根据您的应用,transformValues()
或 flatTransform()
等
使用 transform()
你得到 KStream
回来,你可以写入主题。