如何使用 spring-cloud-function 使用 Kafka-events,但有时只产生
How to consume Kafka-events with spring-cloud-function, but only sometimes produce
我已经在 Kotlin 中使用 kafka-streams 成功设置了 spring-cloud-function 并且有一个工作原型。
我有一个函数可以从一个主题接收事件并为另一个主题生成另一个事件。
这是我的 application.yml
spring:
cloud:
stream:
bindings:
consumeAndProduce-in-0:
destination: topicToConsumeFrom
consumeAndProduce-out-0:
destination: topicToProduceTo
kafka:
binder:
brokers: ${kafka.broker.prod}
default:
group: ${spring.application.name}
function:
definition: consumeAndProduce
我的 kotlin 应用程序如下所示:
@SpringBootApplication
@ConfigurationPropertiesScan
class Application {
companion object {
@JvmStatic
fun main(args: Array<String>) {
runApplication<Application>(*args)
}
}
@Bean
fun consumeAndProduce(): (ConsumerEvent) -> ProducerEvent {
return { message ->
doSomethingAndReturnProducerEvent(message)
}
}
}
我遵循了 Internet 上的许多示例,它非常有效。一旦将消息放入 topicToConsumeFrom
,将调用该函数并将结果写入 topicToProduceTo
.
现在我的问题是:
如果我的功能 并不总是 产生某些东西,那么正确的处理方法是什么?服务侦听主题并忽略消息是一种非常常见的用例。它应该只对特定消息做出反应,然后才产生输出,否则什么也不做。我通过创建一个不同的生产者函数来尝试,只有在适用时我才会从消费者那里调用:
这是我改编的 application.yml
现在定义了两个不同的函数 consume
和 produce
:
spring:
cloud:
stream:
bindings:
consume-in-0:
destination: topicToConsumeFrom
produce-out-0:
destination: topicToProduceTo
kafka:
binder:
brokers: ${kafka.broker.prod}
default:
group: ${spring.application.name}
function:
definition: consume;produce
在我的应用程序中,我将 consumeAndProduce()
重命名为 consume()
,return 值现在是 Unit
。另外,我创建了第二个 bean produce()
,它除了 return 一个函数什么都不做,return 是我给它的有效负载:
@SpringBootApplication
@ConfigurationPropertiesScan
class Application {
companion object {
@JvmStatic
fun main(args: Array<String>) {
runApplication<Application>(*args)
}
}
@Bean
fun consume(produce: (ProducerEvent) -> ProducerEvent): (ConsumerEvent) -> Unit {
return { message ->
if (isSomethingIWantToReactTo(message)) {
val result = doSomethingAndReturnProduceEvent(message)
producer(result)
}
}
}
@Bean
fun produce(): (ProducerEvent) -> ProducerEvent {
return { it }
}
}
现在,如果主题中有消息,并且消息是给我的,则调用 consume()
-函数,我会做一些事情并用我的结果调用 produce()
-函数,否则什么也不做。
我在传出主题中看不到任何消息。
我通过调试知道,它进入了 if-branch 并用我的结果调用 produce()
,但似乎 kafka-binding 不起作用并且没有真正发送任何消息。
我知道,我的方法有些幼稚,但经过广泛研究后,我找不到任何描述此用例的内容。所有示例始终具有 一个 函数,在所有情况下都使用和生成。
有正确的方法吗?
基于第一个代码示例,有两种可能的解决方案,仅使用一个函数 consumeAndProduce()
用于消费和生产:
- Return 一个可为 null 的事件。如果函数returns
null
,不会产生事件:
@Bean
fun consumeAndProduce(): (ConsumerEvent) -> ProducerEvent? {
return { message ->
if(isSomethingIWantToReactTo(message)) {
doSomethingAndReturnProducerEvent(message)
}
null
}
}
- 使用
KStream
s 消费事件流并产生事件流。这样可以简单地过滤任何不适用的传入事件:
@Bean
fun consumeAndProduce(): (KStream<ConsumerEvent>) -> KStream<ProducerEvent> {
return { messages ->
messages
.filter(message -> isSomethingIWantToReactTo(message))
.map(message -> doSomethingAndReturnProducerEvent(message))
}
}
注意:我没有遵循这种方法,也没有尝试过,因此建议研究 KStreams。
我已经在 Kotlin 中使用 kafka-streams 成功设置了 spring-cloud-function 并且有一个工作原型。
我有一个函数可以从一个主题接收事件并为另一个主题生成另一个事件。
这是我的 application.yml
spring:
cloud:
stream:
bindings:
consumeAndProduce-in-0:
destination: topicToConsumeFrom
consumeAndProduce-out-0:
destination: topicToProduceTo
kafka:
binder:
brokers: ${kafka.broker.prod}
default:
group: ${spring.application.name}
function:
definition: consumeAndProduce
我的 kotlin 应用程序如下所示:
@SpringBootApplication
@ConfigurationPropertiesScan
class Application {
companion object {
@JvmStatic
fun main(args: Array<String>) {
runApplication<Application>(*args)
}
}
@Bean
fun consumeAndProduce(): (ConsumerEvent) -> ProducerEvent {
return { message ->
doSomethingAndReturnProducerEvent(message)
}
}
}
我遵循了 Internet 上的许多示例,它非常有效。一旦将消息放入 topicToConsumeFrom
,将调用该函数并将结果写入 topicToProduceTo
.
现在我的问题是: 如果我的功能 并不总是 产生某些东西,那么正确的处理方法是什么?服务侦听主题并忽略消息是一种非常常见的用例。它应该只对特定消息做出反应,然后才产生输出,否则什么也不做。我通过创建一个不同的生产者函数来尝试,只有在适用时我才会从消费者那里调用:
这是我改编的 application.yml
现在定义了两个不同的函数 consume
和 produce
:
spring:
cloud:
stream:
bindings:
consume-in-0:
destination: topicToConsumeFrom
produce-out-0:
destination: topicToProduceTo
kafka:
binder:
brokers: ${kafka.broker.prod}
default:
group: ${spring.application.name}
function:
definition: consume;produce
在我的应用程序中,我将 consumeAndProduce()
重命名为 consume()
,return 值现在是 Unit
。另外,我创建了第二个 bean produce()
,它除了 return 一个函数什么都不做,return 是我给它的有效负载:
@SpringBootApplication
@ConfigurationPropertiesScan
class Application {
companion object {
@JvmStatic
fun main(args: Array<String>) {
runApplication<Application>(*args)
}
}
@Bean
fun consume(produce: (ProducerEvent) -> ProducerEvent): (ConsumerEvent) -> Unit {
return { message ->
if (isSomethingIWantToReactTo(message)) {
val result = doSomethingAndReturnProduceEvent(message)
producer(result)
}
}
}
@Bean
fun produce(): (ProducerEvent) -> ProducerEvent {
return { it }
}
}
现在,如果主题中有消息,并且消息是给我的,则调用 consume()
-函数,我会做一些事情并用我的结果调用 produce()
-函数,否则什么也不做。
我在传出主题中看不到任何消息。
我通过调试知道,它进入了 if-branch 并用我的结果调用 produce()
,但似乎 kafka-binding 不起作用并且没有真正发送任何消息。
我知道,我的方法有些幼稚,但经过广泛研究后,我找不到任何描述此用例的内容。所有示例始终具有 一个 函数,在所有情况下都使用和生成。
有正确的方法吗?
基于第一个代码示例,有两种可能的解决方案,仅使用一个函数 consumeAndProduce()
用于消费和生产:
- Return 一个可为 null 的事件。如果函数returns
null
,不会产生事件:
@Bean
fun consumeAndProduce(): (ConsumerEvent) -> ProducerEvent? {
return { message ->
if(isSomethingIWantToReactTo(message)) {
doSomethingAndReturnProducerEvent(message)
}
null
}
}
- 使用
KStream
s 消费事件流并产生事件流。这样可以简单地过滤任何不适用的传入事件:
@Bean
fun consumeAndProduce(): (KStream<ConsumerEvent>) -> KStream<ProducerEvent> {
return { messages ->
messages
.filter(message -> isSomethingIWantToReactTo(message))
.map(message -> doSomethingAndReturnProducerEvent(message))
}
}
注意:我没有遵循这种方法,也没有尝试过,因此建议研究 KStreams。