具有 2 个功能的 Apache Kafka Streams 应用程序无法正常工作

Apache Kafka Streams Application with 2 functions not working

我得到了 2 个应用程序和 3 个 Kafka 主题。第一个应用程序将消息生成到 pos-topic。该应用程序正在运行,消息已写入主题。在另一个应用程序(不起作用)中,我有 2 个函数在听那个主题:

hadoop:

@Bean
fun hadoop(): Function<KStream<String, PosInvoice>, KStream<String, HadoopRecord>> {
    return Function { input: KStream<String, PosInvoice> ->
        input.mapValues { v -> recordBuilder.getMaskedInvoice(v) }
            .flatMapValues { v -> recordBuilder.getHadoopRecords(v) }
    }
}

通知:

@Bean
fun notification(): Function<KStream<String, PosInvoice>, KStream<String, Notification>> {
    return Function { input -> input.filter { _, v -> v.customerType.equals("PRIME", ignoreCase = true) }
        .mapValues { v -> recordBuilder.getNotification(v) }}
}

hadoop 函数应生成 hadoop-sink-topic,通知函数应生成 notification-topic。这两个功能都在 @Configuration class.

main class 看起来像这样:

@SpringBootApplication
class JsonposfanoutApplication
fun main(args: Array<String>) {
   runApplication<JsonposfanoutApplication>(*args)
}

这里是 application.yml :

spring:
  cloud:
    stream:
      bindings:
        notification-in-0.destination: pos-topic
        notification-out-0.destination: loyalty-topic
        hadoop-in-0.destination: pos-topic
        hadoop-out-0.destination: hadoop-sink-topic
      function:
        definition: notification;hadoop
      kafka:
        streams:
          binder:
            brokers: localhost:9092
            configuration:
              schema.registry.url: http://localhost:8081
          bindings:
            notification-out-0.producer.valueSerde: io.confluent.kafka.streams.serdes.json.KafkaJsonSchemaSerde
            hadoop-out-0.producer.valueSerde: io.confluent.kafka.streams.serdes.json.KafkaJsonSchemaSerde

反序列化和序列化的 classes 是 Json 友好的。第一个应用程序将一个 Json 序列化的值写入主题。所以我不需要用 Spring Cloud Streams 明确定义我的 consumer.value.serdes 和我的 key.serdes 对吗?

怎么了?

我不知道。我没有收到任何错误或异常。应用程序刚刚启动并在几秒钟后完成。我没有关于我的处理器的信息,所以我认为它们甚至没有启动。 我从 Spring Boot 得到的所有可疑信息是: @Bean method FunctionConfiguration.po is non-static and returns an object assignable to Spring's BeanFactoryPostProcessor interface. This will result in a failure to process annotations such as @Autowired, @Resource and @PostConstruct within the method's declaring @Configuration class. Add the 'static' modifier to this method to avoid these container lifecycle issues; see @Bean javadoc for complete details.

我尝试了什么?

我在 application.yml 中尝试了很多,但没有任何帮助,我将所有内容都恢复到第一个版本。也许我在其他地方遗漏了什么?

我认为对于 Kotlin 应用程序,您需要单独定义 Configuration class。下面的设置对我有用,我看到两个功能都被激活了。

@SpringBootApplication
class DemoKotlinApplication

fun main(args: Array<String>) {
    runApplication<DemoKotlinApplication>(*args)
}

@Configuration
class DemoKotlinConfiguration {

    @Bean
    fun foo(): java.util.function.Function<KStream<String, String>, KStream<String, String>> {
        return Function { 
           input: KStream<String, String> -> input
        }
    }

    @Bean
    fun bar(): java.util.function.Function<KStream<String, String>, KStream<String, String>> {
        return Function { 
           input: KStream<String, String> -> input
        }
    }
}

application.properties

spring.cloud.stream.function.definition=foo;bar

org.springframework.cloud:spring-cloud-stream-binder-kafka-streams(版本 3.1.2)更​​改为 org.springframework.cloud:spring-cloud-stream-binder-kafka-streams:3.1.3-SNAPSHOT 解决了我的问题。出于某种原因,我没有得到这个例外: org.springframework.context.ApplicationContextException: Failed to start bean 'streamsBuilderFactoryManager'; nested exception is org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is java.lang.IllegalArgumentException: 'listener' cannot be null 在我的应用程序中,但在另一个应用程序中。 这是当时有用的答案: