具有 2 个功能的 Apache Kafka Streams 应用程序无法正常工作
Apache Kafka Streams Application with 2 functions not working
我得到了 2 个应用程序和 3 个 Kafka 主题。第一个应用程序将消息生成到 pos-topic
。该应用程序正在运行,消息已写入主题。在另一个应用程序(不起作用)中,我有 2 个函数在听那个主题:
hadoop:
@Bean
fun hadoop(): Function<KStream<String, PosInvoice>, KStream<String, HadoopRecord>> {
return Function { input: KStream<String, PosInvoice> ->
input.mapValues { v -> recordBuilder.getMaskedInvoice(v) }
.flatMapValues { v -> recordBuilder.getHadoopRecords(v) }
}
}
通知:
@Bean
fun notification(): Function<KStream<String, PosInvoice>, KStream<String, Notification>> {
return Function { input -> input.filter { _, v -> v.customerType.equals("PRIME", ignoreCase = true) }
.mapValues { v -> recordBuilder.getNotification(v) }}
}
hadoop 函数应生成 hadoop-sink-topic
,通知函数应生成 notification-topic
。这两个功能都在 @Configuration
class.
内
main class 看起来像这样:
@SpringBootApplication
class JsonposfanoutApplication
fun main(args: Array<String>) {
runApplication<JsonposfanoutApplication>(*args)
}
这里是 application.yml :
spring:
cloud:
stream:
bindings:
notification-in-0.destination: pos-topic
notification-out-0.destination: loyalty-topic
hadoop-in-0.destination: pos-topic
hadoop-out-0.destination: hadoop-sink-topic
function:
definition: notification;hadoop
kafka:
streams:
binder:
brokers: localhost:9092
configuration:
schema.registry.url: http://localhost:8081
bindings:
notification-out-0.producer.valueSerde: io.confluent.kafka.streams.serdes.json.KafkaJsonSchemaSerde
hadoop-out-0.producer.valueSerde: io.confluent.kafka.streams.serdes.json.KafkaJsonSchemaSerde
反序列化和序列化的 classes 是 Json 友好的。第一个应用程序将一个 Json 序列化的值写入主题。所以我不需要用 Spring Cloud Streams 明确定义我的 consumer.value.serdes
和我的 key.serdes
对吗?
怎么了?
我不知道。我没有收到任何错误或异常。应用程序刚刚启动并在几秒钟后完成。我没有关于我的处理器的信息,所以我认为它们甚至没有启动。
我从 Spring Boot 得到的所有可疑信息是:
@Bean method FunctionConfiguration.po is non-static and returns an object assignable to Spring's BeanFactoryPostProcessor interface. This will result in a failure to process annotations such as @Autowired, @Resource and @PostConstruct within the method's declaring @Configuration class. Add the 'static' modifier to this method to avoid these container lifecycle issues; see @Bean javadoc for complete details.
我尝试了什么?
我在 application.yml 中尝试了很多,但没有任何帮助,我将所有内容都恢复到第一个版本。也许我在其他地方遗漏了什么?
我认为对于 Kotlin 应用程序,您需要单独定义 Configuration
class。下面的设置对我有用,我看到两个功能都被激活了。
@SpringBootApplication
class DemoKotlinApplication
fun main(args: Array<String>) {
runApplication<DemoKotlinApplication>(*args)
}
和
@Configuration
class DemoKotlinConfiguration {
@Bean
fun foo(): java.util.function.Function<KStream<String, String>, KStream<String, String>> {
return Function {
input: KStream<String, String> -> input
}
}
@Bean
fun bar(): java.util.function.Function<KStream<String, String>, KStream<String, String>> {
return Function {
input: KStream<String, String> -> input
}
}
}
application.properties
spring.cloud.stream.function.definition=foo;bar
将 org.springframework.cloud:spring-cloud-stream-binder-kafka-streams
(版本 3.1.2)更改为 org.springframework.cloud:spring-cloud-stream-binder-kafka-streams:3.1.3-SNAPSHOT
解决了我的问题。出于某种原因,我没有得到这个例外:
org.springframework.context.ApplicationContextException: Failed to start bean 'streamsBuilderFactoryManager'; nested exception is org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is java.lang.IllegalArgumentException: 'listener' cannot be null
在我的应用程序中,但在另一个应用程序中。
这是当时有用的答案:
我得到了 2 个应用程序和 3 个 Kafka 主题。第一个应用程序将消息生成到 pos-topic
。该应用程序正在运行,消息已写入主题。在另一个应用程序(不起作用)中,我有 2 个函数在听那个主题:
hadoop:
@Bean
fun hadoop(): Function<KStream<String, PosInvoice>, KStream<String, HadoopRecord>> {
return Function { input: KStream<String, PosInvoice> ->
input.mapValues { v -> recordBuilder.getMaskedInvoice(v) }
.flatMapValues { v -> recordBuilder.getHadoopRecords(v) }
}
}
通知:
@Bean
fun notification(): Function<KStream<String, PosInvoice>, KStream<String, Notification>> {
return Function { input -> input.filter { _, v -> v.customerType.equals("PRIME", ignoreCase = true) }
.mapValues { v -> recordBuilder.getNotification(v) }}
}
hadoop 函数应生成 hadoop-sink-topic
,通知函数应生成 notification-topic
。这两个功能都在 @Configuration
class.
main class 看起来像这样:
@SpringBootApplication
class JsonposfanoutApplication
fun main(args: Array<String>) {
runApplication<JsonposfanoutApplication>(*args)
}
这里是 application.yml :
spring:
cloud:
stream:
bindings:
notification-in-0.destination: pos-topic
notification-out-0.destination: loyalty-topic
hadoop-in-0.destination: pos-topic
hadoop-out-0.destination: hadoop-sink-topic
function:
definition: notification;hadoop
kafka:
streams:
binder:
brokers: localhost:9092
configuration:
schema.registry.url: http://localhost:8081
bindings:
notification-out-0.producer.valueSerde: io.confluent.kafka.streams.serdes.json.KafkaJsonSchemaSerde
hadoop-out-0.producer.valueSerde: io.confluent.kafka.streams.serdes.json.KafkaJsonSchemaSerde
反序列化和序列化的 classes 是 Json 友好的。第一个应用程序将一个 Json 序列化的值写入主题。所以我不需要用 Spring Cloud Streams 明确定义我的 consumer.value.serdes
和我的 key.serdes
对吗?
怎么了?
我不知道。我没有收到任何错误或异常。应用程序刚刚启动并在几秒钟后完成。我没有关于我的处理器的信息,所以我认为它们甚至没有启动。
我从 Spring Boot 得到的所有可疑信息是:
@Bean method FunctionConfiguration.po is non-static and returns an object assignable to Spring's BeanFactoryPostProcessor interface. This will result in a failure to process annotations such as @Autowired, @Resource and @PostConstruct within the method's declaring @Configuration class. Add the 'static' modifier to this method to avoid these container lifecycle issues; see @Bean javadoc for complete details.
我尝试了什么?
我在 application.yml 中尝试了很多,但没有任何帮助,我将所有内容都恢复到第一个版本。也许我在其他地方遗漏了什么?
我认为对于 Kotlin 应用程序,您需要单独定义 Configuration
class。下面的设置对我有用,我看到两个功能都被激活了。
@SpringBootApplication
class DemoKotlinApplication
fun main(args: Array<String>) {
runApplication<DemoKotlinApplication>(*args)
}
和
@Configuration
class DemoKotlinConfiguration {
@Bean
fun foo(): java.util.function.Function<KStream<String, String>, KStream<String, String>> {
return Function {
input: KStream<String, String> -> input
}
}
@Bean
fun bar(): java.util.function.Function<KStream<String, String>, KStream<String, String>> {
return Function {
input: KStream<String, String> -> input
}
}
}
application.properties
spring.cloud.stream.function.definition=foo;bar
将 org.springframework.cloud:spring-cloud-stream-binder-kafka-streams
(版本 3.1.2)更改为 org.springframework.cloud:spring-cloud-stream-binder-kafka-streams:3.1.3-SNAPSHOT
解决了我的问题。出于某种原因,我没有得到这个例外:
org.springframework.context.ApplicationContextException: Failed to start bean 'streamsBuilderFactoryManager'; nested exception is org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is java.lang.IllegalArgumentException: 'listener' cannot be null
在我的应用程序中,但在另一个应用程序中。
这是当时有用的答案: