每次启动我的应用程序时出现 "Error creating bean with name 'functionBindingRegistrar'" 异常
An "Error creating bean with name 'functionBindingRegistrar'" exception every time I start my application
我有一个普通的 spring 云流应用程序,它简单地从 Kafka 主题读取数据并向另一个 Kafka 主题生成消息,请在下面找到配置:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.6</version>
<relativePath/>
</parent>
<properties>
<spring-cloud.version>2020.0.4</spring-cloud.version>
<spring-boot-maven-plugin.version>2.3.0.RELEASE</spring-boot-maven-plugin.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
以及以下application.proeprties
#Kafka Configurations
spring.kafka.bootstrap-servers=localhost:9092
spring.cloud.stream.kafka.binder.configuration.auto.offset.reset=latest
spring.cloud.function.definition=merchantCredentials;validatedProducts;validateImages;retryUnprocessedItems
#Input topics
#Merchants
spring.cloud.stream.bindings.merchantCredentials-in-0.destination=mis.merchantCtpCredentials
spring.cloud.stream.kafka.bindings.merchantCredentials-in-0.consumer.ack-mode=manual_immediate
spring.cloud.stream.bindings.merchantCredentials-in-0.contentType=application/json
spring.cloud.stream.bindings.merchantCredentials-in-0.consumer.header-mode=headers
spring.cloud.stream.bindings.merchantCredentials-in-0.consumer.partitioned=true
spring.cloud.stream.bindings.merchantCredentials-in-0.consumer.max-attempts=1
spring.cloud.stream.bindings.merchantCredentials-in-0.group=tuevGroup
#kfc.notifications.product
spring.cloud.stream.bindings.validatedProducts-in-0.destination=kfc.notifications.product
spring.cloud.stream.kafka.bindings.validatedProducts-in-0.consumer.ack-mode=manual_immediate
spring.cloud.stream.bindings.validatedProducts-in-0.contentType=application/json
spring.cloud.stream.bindings.validatedProducts-in-0.consumer.header-mode=headers
spring.cloud.stream.bindings.validatedProducts-in-0.consumer.concurrency=5
spring.cloud.stream.bindings.validatedProducts-in-0.consumer.partitioned=true
spring.cloud.stream.bindings.validatedProducts-in-0.consumer.max-attempts=1
spring.cloud.stream.bindings.validatedProducts-in-0.group=tuevGroup
#marketplace.products
spring.cloud.stream.bindings.validateImages-in-0.destination=marketplace.products
spring.cloud.stream.kafka.bindings.validateImages-in-0.consumer.ack-mode=manual_immediate
spring.cloud.stream.bindings.validateImages-in-0.contentType=application/json
spring.cloud.stream.bindings.validateImages-in-0.consumer.header-mode=headers
spring.cloud.stream.bindings.validateImages-in-0.consumer.partitioned=true
spring.cloud.stream.bindings.validateImages-in-0.consumer.max-attempts=1
spring.cloud.stream.bindings.validateImages-in-0.group=tuevGroup
#Output topics
#productValidated
spring.cloud.stream.bindings.validatedProducts-out-0.destination=marketplace.validated.products
spring.cloud.stream.bindings.validatedProducts-out-0.contentType=application/json
spring.cloud.stream.bindings.validatedProducts-out-0.producer.partition-count=10
spring.cloud.stream.bindings.validatedProducts-out-0.producer.header-mode=headers
spring.cloud.stream.bindings.retryUnprocessedItems-out-0.destination=marketplace.validated.products
spring.cloud.stream.bindings.retryUnprocessedItems-out-0.contentType=application/json
spring.cloud.stream.bindings.retryUnprocessedItems-out-0.producer.partition-count=10
spring.cloud.stream.bindings.retryUnprocessedItems-out-0.producer.header-mode=headers
spring.cloud.stream.poller.cron=0 0/10 * * * *
spring.cloud.stream.poller.initial-delay=10000
下面是所有定义的 spring 云函数的签名
@Bean
public Consumer<Flux<Message<JsonNode>>> merchantCredentials() {
@Bean
public Function<Message<NotificationMessage>, Message<ProductValidatedEvent>> validatedProducts() {
@Bean
public Consumer<Message<ProductImportMessage>> validateImages() {
@PollableBean
@SchedulerLock(name = "retryProcess_scheduledTask", lockAtMostFor = "${retry.job.lock.atMost}", lockAtLeastFor = "${retry.job.lock.atLeast}")
public Supplier<Flux<Message<ProductValidatedEvent>>> retryUnprocessedItems() {
一切正常,应用程序启动并正常运行,但是,在日志中我多次遇到此异常,特别是在应用程序的启动阶段
org.springframework.boot.SpringApplication - Application run failed org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'functionBindingRegistrar' defined in class path resource [org/springframework/cloud/stream/function/FunctionConfiguration.class]: Invocation of init method failed; nested exception is java.lang.IllegalArgumentException: Type must be one of Supplier, Function or Consumer
我已经仔细检查了所有配置,但我仍然不知道如何防止此问题发生。为什么会发生此异常?可以忽略吗?
更新 1:
我已在 spring 框架中跟踪此函数的错误,FunctionTypeUtils:
public static Type discoverFunctionTypeFromClass(Class<?> functionalClass) {
Assert.isTrue(isFunctional(functionalClass), "Type must be one of Supplier, Function or Consumer");
此函数被 FunctionConfiguration 中的此函数调用:
private String[] filterEligibleFunctionDefinitions() {
...
for (int i = 0; i < functionNames.length && eligibleDefinition; i++) {
String functionName = functionNames[i];
if (this.applicationContext.containsBean(functionName)) {
当我向这个以及前一个添加调试点时,我得到了以下输出
functionName: merchantCredentials
functionalClass: com.rewedigital.services.tuev.marketplace.merchant.flow.MerchantFlowManger$$Lambda23/0x00000008008fc040
functionName: validatedProducts
functionalClass: com.rewedigital.services.tuev.marketplace.validator.listener.ProductChangedListener$$Lambda31/0x00000008008fa040
functionName: validateImages
functionalClass: com.rewedigital.services.tuev.marketplace.sieve.listener.ProductImagesListener$$Lambda24/0x00000008008fc440
functionName: retryUnprocessedItems
functionalClass: org.springframework.beans.factory.support.NullBean
显示 retryUnprocessedItems 是罪魁祸首,但不确定为什么?
经过一番排查,发现问题主要出在@SchedulerLock
注解上。
我观察到这个问题是在 shedLock table 在方法上添加了锁时发生的,因此它阻止了 FunctionBeanRegistrar
添加方法,因此出现了异常。
当然,这意味着注解现在被认为不可用,因为 @PollableBean
注解真正运行的不是函数本身,而是函数返回的 Supplier lambda 表达式,实际上是渲染ShedLock
没用。
一旦我删除了注释,所有的异常都消失了,阳光再次照耀,鸟儿歌唱等等。
下一个要回答的问题是如何以分布式方式使用可滚动 bean,但这超出了这个问题的范围
我有一个普通的 spring 云流应用程序,它简单地从 Kafka 主题读取数据并向另一个 Kafka 主题生成消息,请在下面找到配置:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.6</version>
<relativePath/>
</parent>
<properties>
<spring-cloud.version>2020.0.4</spring-cloud.version>
<spring-boot-maven-plugin.version>2.3.0.RELEASE</spring-boot-maven-plugin.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
以及以下application.proeprties
#Kafka Configurations
spring.kafka.bootstrap-servers=localhost:9092
spring.cloud.stream.kafka.binder.configuration.auto.offset.reset=latest
spring.cloud.function.definition=merchantCredentials;validatedProducts;validateImages;retryUnprocessedItems
#Input topics
#Merchants
spring.cloud.stream.bindings.merchantCredentials-in-0.destination=mis.merchantCtpCredentials
spring.cloud.stream.kafka.bindings.merchantCredentials-in-0.consumer.ack-mode=manual_immediate
spring.cloud.stream.bindings.merchantCredentials-in-0.contentType=application/json
spring.cloud.stream.bindings.merchantCredentials-in-0.consumer.header-mode=headers
spring.cloud.stream.bindings.merchantCredentials-in-0.consumer.partitioned=true
spring.cloud.stream.bindings.merchantCredentials-in-0.consumer.max-attempts=1
spring.cloud.stream.bindings.merchantCredentials-in-0.group=tuevGroup
#kfc.notifications.product
spring.cloud.stream.bindings.validatedProducts-in-0.destination=kfc.notifications.product
spring.cloud.stream.kafka.bindings.validatedProducts-in-0.consumer.ack-mode=manual_immediate
spring.cloud.stream.bindings.validatedProducts-in-0.contentType=application/json
spring.cloud.stream.bindings.validatedProducts-in-0.consumer.header-mode=headers
spring.cloud.stream.bindings.validatedProducts-in-0.consumer.concurrency=5
spring.cloud.stream.bindings.validatedProducts-in-0.consumer.partitioned=true
spring.cloud.stream.bindings.validatedProducts-in-0.consumer.max-attempts=1
spring.cloud.stream.bindings.validatedProducts-in-0.group=tuevGroup
#marketplace.products
spring.cloud.stream.bindings.validateImages-in-0.destination=marketplace.products
spring.cloud.stream.kafka.bindings.validateImages-in-0.consumer.ack-mode=manual_immediate
spring.cloud.stream.bindings.validateImages-in-0.contentType=application/json
spring.cloud.stream.bindings.validateImages-in-0.consumer.header-mode=headers
spring.cloud.stream.bindings.validateImages-in-0.consumer.partitioned=true
spring.cloud.stream.bindings.validateImages-in-0.consumer.max-attempts=1
spring.cloud.stream.bindings.validateImages-in-0.group=tuevGroup
#Output topics
#productValidated
spring.cloud.stream.bindings.validatedProducts-out-0.destination=marketplace.validated.products
spring.cloud.stream.bindings.validatedProducts-out-0.contentType=application/json
spring.cloud.stream.bindings.validatedProducts-out-0.producer.partition-count=10
spring.cloud.stream.bindings.validatedProducts-out-0.producer.header-mode=headers
spring.cloud.stream.bindings.retryUnprocessedItems-out-0.destination=marketplace.validated.products
spring.cloud.stream.bindings.retryUnprocessedItems-out-0.contentType=application/json
spring.cloud.stream.bindings.retryUnprocessedItems-out-0.producer.partition-count=10
spring.cloud.stream.bindings.retryUnprocessedItems-out-0.producer.header-mode=headers
spring.cloud.stream.poller.cron=0 0/10 * * * *
spring.cloud.stream.poller.initial-delay=10000
下面是所有定义的 spring 云函数的签名
@Bean
public Consumer<Flux<Message<JsonNode>>> merchantCredentials() {
@Bean
public Function<Message<NotificationMessage>, Message<ProductValidatedEvent>> validatedProducts() {
@Bean
public Consumer<Message<ProductImportMessage>> validateImages() {
@PollableBean
@SchedulerLock(name = "retryProcess_scheduledTask", lockAtMostFor = "${retry.job.lock.atMost}", lockAtLeastFor = "${retry.job.lock.atLeast}")
public Supplier<Flux<Message<ProductValidatedEvent>>> retryUnprocessedItems() {
一切正常,应用程序启动并正常运行,但是,在日志中我多次遇到此异常,特别是在应用程序的启动阶段
org.springframework.boot.SpringApplication - Application run failed org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'functionBindingRegistrar' defined in class path resource [org/springframework/cloud/stream/function/FunctionConfiguration.class]: Invocation of init method failed; nested exception is java.lang.IllegalArgumentException: Type must be one of Supplier, Function or Consumer
我已经仔细检查了所有配置,但我仍然不知道如何防止此问题发生。为什么会发生此异常?可以忽略吗?
更新 1:
我已在 spring 框架中跟踪此函数的错误,FunctionTypeUtils:
public static Type discoverFunctionTypeFromClass(Class<?> functionalClass) {
Assert.isTrue(isFunctional(functionalClass), "Type must be one of Supplier, Function or Consumer");
此函数被 FunctionConfiguration 中的此函数调用:
private String[] filterEligibleFunctionDefinitions() {
...
for (int i = 0; i < functionNames.length && eligibleDefinition; i++) {
String functionName = functionNames[i];
if (this.applicationContext.containsBean(functionName)) {
当我向这个以及前一个添加调试点时,我得到了以下输出
functionName: merchantCredentials
functionalClass: com.rewedigital.services.tuev.marketplace.merchant.flow.MerchantFlowManger$$Lambda23/0x00000008008fc040
functionName: validatedProducts
functionalClass: com.rewedigital.services.tuev.marketplace.validator.listener.ProductChangedListener$$Lambda31/0x00000008008fa040
functionName: validateImages
functionalClass: com.rewedigital.services.tuev.marketplace.sieve.listener.ProductImagesListener$$Lambda24/0x00000008008fc440
functionName: retryUnprocessedItems
functionalClass: org.springframework.beans.factory.support.NullBean
显示 retryUnprocessedItems 是罪魁祸首,但不确定为什么?
经过一番排查,发现问题主要出在@SchedulerLock
注解上。
我观察到这个问题是在 shedLock table 在方法上添加了锁时发生的,因此它阻止了 FunctionBeanRegistrar
添加方法,因此出现了异常。
当然,这意味着注解现在被认为不可用,因为 @PollableBean
注解真正运行的不是函数本身,而是函数返回的 Supplier lambda 表达式,实际上是渲染ShedLock
没用。
一旦我删除了注释,所有的异常都消失了,阳光再次照耀,鸟儿歌唱等等。
下一个要回答的问题是如何以分布式方式使用可滚动 bean,但这超出了这个问题的范围