升级到 Spring Boot 2.6.1 和 Spring Cloud 2021.0.0 后 Spring Cloud Stream 中的启动错误

Startup error in Spring Cloud Stream after upgrading to Spring Boot 2.6.1 and Spring Cloud 2021.0.0

我刚刚将使用 Spring Cloud Stream Kafka 生产者和消费者的 Spring 引导应用程序升级到

plugins {
    id("org.springframework.boot") version "2.6.1"
    ...
}
extra["springCloudVersion"] = "2021.0.0"
extra["springCloudStreamVersion"] = "3.2.1"

应用程序不再启动,出现以下异常:

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'functionInitializer' defined in class path resource [org/springframework/cloud/stream/function/FunctionConfiguration.class]: Invocation of init method failed; nested exception is java.lang.ClassCastException: class reactor.core.publisher.MonoPeekTerminal cannot be cast to class reactor.core.publisher.Flux (reactor.core.publisher.MonoPeekTerminal and reactor.core.publisher.Flux are in unnamed module of loader 'app')
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1804)
...
Caused by: java.lang.ClassCastException: class reactor.core.publisher.MonoPeekTerminal cannot be cast to class reactor.core.publisher.Flux (reactor.core.publisher.MonoPeekTerminal and reactor.core.publisher.Flux are in unnamed module of loader 'app')
    at org.springframework.cloud.sleuth.instrument.messaging.TraceFunctionAroundWrapper.reactorFluxStream(TraceFunctionAroundWrapper.java:187)
    at org.springframework.cloud.sleuth.instrument.messaging.TraceFunctionAroundWrapper.reactorStream(TraceFunctionAroundWrapper.java:120)
    at org.springframework.cloud.sleuth.instrument.messaging.TraceFunctionAroundWrapper.doApply(TraceFunctionAroundWrapper.java:97)
    at org.springframework.cloud.function.context.catalog.FunctionAroundWrapper.apply(FunctionAroundWrapper.java:47)
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.doApply(SimpleFunctionRegistry.java:256)
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:550)
    at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder.bindFunctionToDestinations(FunctionConfiguration.java:512)
    at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder.afterPropertiesSet(FunctionConfiguration.java:418)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1863)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1800)
    ... 16 common frames omitted

我是否遗漏了任何升级指南,或者这是一个错误?

制作人

@Component
class EventProducer(@Qualifier("eventSink") private val eventProcessor: Sinks.Many<Message<EventReceived>>) {

    private val logger = LoggerFactory.getLogger(javaClass)

    fun send(event: EventReceived): Mono<EventReceived> {
        return Mono.defer {
            val message = MessageBuilder.withPayload(event)
                .setHeader(MESSAGE_KEY, event.id)
                .setHeader(TIMESTAMP, OffsetDateTime.now().toInstant().toEpochMilli())
                .build()
            logger.info("Sending event {}", event)
            while (eventProcessor.tryEmitNext(message).isFailure) {
                LockSupport.parkNanos(10)
            }
            event.toMono()
        }.subscribeOn(Schedulers.boundedElastic())
    }

消费者

@Configuration
class MetricConsumer(...) {

    private val logger = LoggerFactory.getLogger(javaClass)

    @Bean
    fun consumeMetricUpdated(): Function<Flux<Message<MetricUpdated>>, Mono<Void>> {
        ...
    }

这看起来像是 s-c-sleuth 中的错误。我会跟进 Marcin 的情况。 你能不能也请 post 你的函数的签名,需要确认什么? 同时,您可以通过将 spring.sleuth.function.enabled 设置为 false.

来暂时断开 sleuth 的 TraceFunctionAroundWrapper