升级到 Spring Boot 2.6.1 和 Spring Cloud 2021.0.0 后 Spring Cloud Stream 中的启动错误
Startup error in Spring Cloud Stream after upgrading to Spring Boot 2.6.1 and Spring Cloud 2021.0.0
我刚刚将使用 Spring Cloud Stream Kafka 生产者和消费者的 Spring 引导应用程序升级到
plugins {
id("org.springframework.boot") version "2.6.1"
...
}
extra["springCloudVersion"] = "2021.0.0"
extra["springCloudStreamVersion"] = "3.2.1"
应用程序不再启动,出现以下异常:
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'functionInitializer' defined in class path resource [org/springframework/cloud/stream/function/FunctionConfiguration.class]: Invocation of init method failed; nested exception is java.lang.ClassCastException: class reactor.core.publisher.MonoPeekTerminal cannot be cast to class reactor.core.publisher.Flux (reactor.core.publisher.MonoPeekTerminal and reactor.core.publisher.Flux are in unnamed module of loader 'app')
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1804)
...
Caused by: java.lang.ClassCastException: class reactor.core.publisher.MonoPeekTerminal cannot be cast to class reactor.core.publisher.Flux (reactor.core.publisher.MonoPeekTerminal and reactor.core.publisher.Flux are in unnamed module of loader 'app')
at org.springframework.cloud.sleuth.instrument.messaging.TraceFunctionAroundWrapper.reactorFluxStream(TraceFunctionAroundWrapper.java:187)
at org.springframework.cloud.sleuth.instrument.messaging.TraceFunctionAroundWrapper.reactorStream(TraceFunctionAroundWrapper.java:120)
at org.springframework.cloud.sleuth.instrument.messaging.TraceFunctionAroundWrapper.doApply(TraceFunctionAroundWrapper.java:97)
at org.springframework.cloud.function.context.catalog.FunctionAroundWrapper.apply(FunctionAroundWrapper.java:47)
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.doApply(SimpleFunctionRegistry.java:256)
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:550)
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder.bindFunctionToDestinations(FunctionConfiguration.java:512)
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder.afterPropertiesSet(FunctionConfiguration.java:418)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1863)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1800)
... 16 common frames omitted
我是否遗漏了任何升级指南,或者这是一个错误?
制作人
@Component
class EventProducer(@Qualifier("eventSink") private val eventProcessor: Sinks.Many<Message<EventReceived>>) {
private val logger = LoggerFactory.getLogger(javaClass)
fun send(event: EventReceived): Mono<EventReceived> {
return Mono.defer {
val message = MessageBuilder.withPayload(event)
.setHeader(MESSAGE_KEY, event.id)
.setHeader(TIMESTAMP, OffsetDateTime.now().toInstant().toEpochMilli())
.build()
logger.info("Sending event {}", event)
while (eventProcessor.tryEmitNext(message).isFailure) {
LockSupport.parkNanos(10)
}
event.toMono()
}.subscribeOn(Schedulers.boundedElastic())
}
消费者
@Configuration
class MetricConsumer(...) {
private val logger = LoggerFactory.getLogger(javaClass)
@Bean
fun consumeMetricUpdated(): Function<Flux<Message<MetricUpdated>>, Mono<Void>> {
...
}
这看起来像是 s-c-sleuth 中的错误。我会跟进 Marcin 的情况。
你能不能也请 post 你的函数的签名,需要确认什么?
同时,您可以通过将 spring.sleuth.function.enabled
设置为 false
.
来暂时断开 sleuth 的 TraceFunctionAroundWrapper
我刚刚将使用 Spring Cloud Stream Kafka 生产者和消费者的 Spring 引导应用程序升级到
plugins {
id("org.springframework.boot") version "2.6.1"
...
}
extra["springCloudVersion"] = "2021.0.0"
extra["springCloudStreamVersion"] = "3.2.1"
应用程序不再启动,出现以下异常:
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'functionInitializer' defined in class path resource [org/springframework/cloud/stream/function/FunctionConfiguration.class]: Invocation of init method failed; nested exception is java.lang.ClassCastException: class reactor.core.publisher.MonoPeekTerminal cannot be cast to class reactor.core.publisher.Flux (reactor.core.publisher.MonoPeekTerminal and reactor.core.publisher.Flux are in unnamed module of loader 'app')
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1804)
...
Caused by: java.lang.ClassCastException: class reactor.core.publisher.MonoPeekTerminal cannot be cast to class reactor.core.publisher.Flux (reactor.core.publisher.MonoPeekTerminal and reactor.core.publisher.Flux are in unnamed module of loader 'app')
at org.springframework.cloud.sleuth.instrument.messaging.TraceFunctionAroundWrapper.reactorFluxStream(TraceFunctionAroundWrapper.java:187)
at org.springframework.cloud.sleuth.instrument.messaging.TraceFunctionAroundWrapper.reactorStream(TraceFunctionAroundWrapper.java:120)
at org.springframework.cloud.sleuth.instrument.messaging.TraceFunctionAroundWrapper.doApply(TraceFunctionAroundWrapper.java:97)
at org.springframework.cloud.function.context.catalog.FunctionAroundWrapper.apply(FunctionAroundWrapper.java:47)
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.doApply(SimpleFunctionRegistry.java:256)
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:550)
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder.bindFunctionToDestinations(FunctionConfiguration.java:512)
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder.afterPropertiesSet(FunctionConfiguration.java:418)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1863)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1800)
... 16 common frames omitted
我是否遗漏了任何升级指南,或者这是一个错误?
制作人
@Component
class EventProducer(@Qualifier("eventSink") private val eventProcessor: Sinks.Many<Message<EventReceived>>) {
private val logger = LoggerFactory.getLogger(javaClass)
fun send(event: EventReceived): Mono<EventReceived> {
return Mono.defer {
val message = MessageBuilder.withPayload(event)
.setHeader(MESSAGE_KEY, event.id)
.setHeader(TIMESTAMP, OffsetDateTime.now().toInstant().toEpochMilli())
.build()
logger.info("Sending event {}", event)
while (eventProcessor.tryEmitNext(message).isFailure) {
LockSupport.parkNanos(10)
}
event.toMono()
}.subscribeOn(Schedulers.boundedElastic())
}
消费者
@Configuration
class MetricConsumer(...) {
private val logger = LoggerFactory.getLogger(javaClass)
@Bean
fun consumeMetricUpdated(): Function<Flux<Message<MetricUpdated>>, Mono<Void>> {
...
}
这看起来像是 s-c-sleuth 中的错误。我会跟进 Marcin 的情况。
你能不能也请 post 你的函数的签名,需要确认什么?
同时,您可以通过将 spring.sleuth.function.enabled
设置为 false
.
TraceFunctionAroundWrapper