每次启动我的应用程序时出现 "Error creating bean with name 'functionBindingRegistrar'" 异常

An "Error creating bean with name 'functionBindingRegistrar'" exception every time I start my application

我有一个普通的 spring 云流应用程序,它简单地从 Kafka 主题读取数据并向另一个 Kafka 主题生成消息,请在下面找到配置:

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.5.6</version>
    <relativePath/>
</parent>

<properties>
    <spring-cloud.version>2020.0.4</spring-cloud.version>
    <spring-boot-maven-plugin.version>2.3.0.RELEASE</spring-boot-maven-plugin.version>
</properties>
<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>${spring-cloud.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        <exclusions>
            <exclusion>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-tomcat</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
</dependencies>

以及以下application.proeprties

#Kafka Configurations
spring.kafka.bootstrap-servers=localhost:9092
spring.cloud.stream.kafka.binder.configuration.auto.offset.reset=latest

spring.cloud.function.definition=merchantCredentials;validatedProducts;validateImages;retryUnprocessedItems
#Input topics
#Merchants
spring.cloud.stream.bindings.merchantCredentials-in-0.destination=mis.merchantCtpCredentials
spring.cloud.stream.kafka.bindings.merchantCredentials-in-0.consumer.ack-mode=manual_immediate
spring.cloud.stream.bindings.merchantCredentials-in-0.contentType=application/json
spring.cloud.stream.bindings.merchantCredentials-in-0.consumer.header-mode=headers
spring.cloud.stream.bindings.merchantCredentials-in-0.consumer.partitioned=true
spring.cloud.stream.bindings.merchantCredentials-in-0.consumer.max-attempts=1
spring.cloud.stream.bindings.merchantCredentials-in-0.group=tuevGroup

#kfc.notifications.product
spring.cloud.stream.bindings.validatedProducts-in-0.destination=kfc.notifications.product
spring.cloud.stream.kafka.bindings.validatedProducts-in-0.consumer.ack-mode=manual_immediate
spring.cloud.stream.bindings.validatedProducts-in-0.contentType=application/json
spring.cloud.stream.bindings.validatedProducts-in-0.consumer.header-mode=headers
spring.cloud.stream.bindings.validatedProducts-in-0.consumer.concurrency=5
spring.cloud.stream.bindings.validatedProducts-in-0.consumer.partitioned=true
spring.cloud.stream.bindings.validatedProducts-in-0.consumer.max-attempts=1
spring.cloud.stream.bindings.validatedProducts-in-0.group=tuevGroup

#marketplace.products
spring.cloud.stream.bindings.validateImages-in-0.destination=marketplace.products
spring.cloud.stream.kafka.bindings.validateImages-in-0.consumer.ack-mode=manual_immediate
spring.cloud.stream.bindings.validateImages-in-0.contentType=application/json
spring.cloud.stream.bindings.validateImages-in-0.consumer.header-mode=headers
spring.cloud.stream.bindings.validateImages-in-0.consumer.partitioned=true
spring.cloud.stream.bindings.validateImages-in-0.consumer.max-attempts=1
spring.cloud.stream.bindings.validateImages-in-0.group=tuevGroup

#Output topics
#productValidated
spring.cloud.stream.bindings.validatedProducts-out-0.destination=marketplace.validated.products
spring.cloud.stream.bindings.validatedProducts-out-0.contentType=application/json
spring.cloud.stream.bindings.validatedProducts-out-0.producer.partition-count=10
spring.cloud.stream.bindings.validatedProducts-out-0.producer.header-mode=headers

spring.cloud.stream.bindings.retryUnprocessedItems-out-0.destination=marketplace.validated.products
spring.cloud.stream.bindings.retryUnprocessedItems-out-0.contentType=application/json
spring.cloud.stream.bindings.retryUnprocessedItems-out-0.producer.partition-count=10
spring.cloud.stream.bindings.retryUnprocessedItems-out-0.producer.header-mode=headers
spring.cloud.stream.poller.cron=0 0/10 * * * *
spring.cloud.stream.poller.initial-delay=10000

下面是所有定义的 spring 云函数的签名

@Bean
public Consumer<Flux<Message<JsonNode>>> merchantCredentials() {

@Bean
public Function<Message<NotificationMessage>, Message<ProductValidatedEvent>> validatedProducts() {

@Bean
public Consumer<Message<ProductImportMessage>> validateImages() {

@PollableBean
@SchedulerLock(name = "retryProcess_scheduledTask", lockAtMostFor = "${retry.job.lock.atMost}", lockAtLeastFor = "${retry.job.lock.atLeast}")
public Supplier<Flux<Message<ProductValidatedEvent>>> retryUnprocessedItems() {

一切正常,应用程序启动并正常运行,但是,在日志中我多次遇到此异常,特别是在应用程序的启动阶段

org.springframework.boot.SpringApplication - Application run failed org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'functionBindingRegistrar' defined in class path resource [org/springframework/cloud/stream/function/FunctionConfiguration.class]: Invocation of init method failed; nested exception is java.lang.IllegalArgumentException: Type must be one of Supplier, Function or Consumer

我已经仔细检查了所有配置,但我仍然不知道如何防止此问题发生。为什么会发生此异常?可以忽略吗?

更新 1:

我已在 spring 框架中跟踪此函数的错误,FunctionTypeUtils:

    public static Type discoverFunctionTypeFromClass(Class<?> functionalClass) {
    Assert.isTrue(isFunctional(functionalClass), "Type must be one of Supplier, Function or Consumer");

此函数被 FunctionConfiguration 中的此函数调用:

private String[] filterEligibleFunctionDefinitions() {
...
                for (int i = 0; i < functionNames.length && eligibleDefinition; i++) {
                    String functionName = functionNames[i];
                    if (this.applicationContext.containsBean(functionName)) {

当我向这个以及前一个添加调试点时,我得到了以下输出

functionName: merchantCredentials
functionalClass: com.rewedigital.services.tuev.marketplace.merchant.flow.MerchantFlowManger$$Lambda23/0x00000008008fc040
functionName: validatedProducts
functionalClass: com.rewedigital.services.tuev.marketplace.validator.listener.ProductChangedListener$$Lambda31/0x00000008008fa040
functionName: validateImages
functionalClass: com.rewedigital.services.tuev.marketplace.sieve.listener.ProductImagesListener$$Lambda24/0x00000008008fc440
functionName: retryUnprocessedItems
functionalClass: org.springframework.beans.factory.support.NullBean

显示 retryUnprocessedItems 是罪魁祸首,但不确定为什么?

经过一番排查,发现问题主要出在@SchedulerLock注解上。

我观察到这个问题是在 shedLock table 在方法上添加了锁时发生的,因此它阻止了 FunctionBeanRegistrar 添加方法,因此出现了异常。

当然,这意味着注解现在被认为不可用,因为 @PollableBean 注解真正运行的不是函数本身,而是函数返回的 Supplier lambda 表达式,实际上是渲染ShedLock没用。

一旦我删除了注释,所有的异常都消失了,阳光再次照耀,鸟儿歌唱等等。

下一个要回答的问题是如何以分布式方式使用可滚动 bean,但这超出了这个问题的范围