java.lang.ClassCastException: class .$Proxy143 无法转换为 class .MessageChannel(...在加载器 'app' 的未命名模块中)

java.lang.ClassCastException: class .$Proxy143 cannot be cast to class .MessageChannel (... are in unnamed module of loader 'app')

我正在为 Spring Cloud Stream 应用程序编写测试。这有一个从 topicA 读取的 KStream。在测试中,我使用 KafkaTemplate 发布消息并等待 KStream 日志显示。

测试抛出以下异常:

java.lang.ClassCastException: class com.sun.proxy.$Proxy143 cannot be cast to class org.springframework.messaging.MessageChannel (com.sun.proxy.$Proxy143 and org.springframework.messaging.MessageChannel are in unnamed module of loader 'app')
    at org.springframework.cloud.stream.test.binder.TestSupportBinder.bindConsumer(TestSupportBinder.java:66) ~[spring-cloud-stream-test-support-3.0.1.RELEASE.jar:3.0.1.RELEASE]
    at org.springframework.cloud.stream.binding.BindingService.doBindConsumer(BindingService.java:169) ~[spring-cloud-stream-3.0.2.BUILD-SNAPSHOT.jar:3.0.2.BUILD-SNAPSHOT]
    at org.springframework.cloud.stream.binding.BindingService.bindConsumer(BindingService.java:115) ~[spring-cloud-stream-3.0.2.BUILD-SNAPSHOT.jar:3.0.2.BUILD-SNAPSHOT]
    at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindInputs(AbstractBindableProxyFactory.java:112) ~[spring-cloud-stream-3.0.2.BUILD-SNAPSHOT.jar:3.0.2.BUILD-SNAPSHOT]
    at org.springframework.cloud.stream.binding.InputBindingLifecycle.doStartWithBindable(InputBindingLifecycle.java:58) ~[spring-cloud-stream-3.0.2.BUILD-SNAPSHOT.jar:3.0.2.BUILD-SNAPSHOT]
    at java.base/java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608) ~[na:na]

在应用程序的正常执行中不会出现此异常。

K流:

@Configuration
class MyKStream() {

    private val logger = LoggerFactory.getLogger(javaClass)

    @Bean
    fun processSomething(): Consumer<KStream<XX, XX>> {
        return Consumer { something ->
            something.foreach { key, value ->
            logger.info("--------> Processing xxx key {} - value {}", key, value)
        }
    }

测试:

@TestInstance(PER_CLASS)
@EmbeddedKafka
@SpringBootTest(properties = [
    "spring.profiles.active=local",
    "schema-registry.user=",
    "schema-registry.password=",
    "spring.cloud.stream.bindings.processSomething-in-0.destination=topicA",
    "spring.cloud.stream.bindings.processSomething-in-0.producer.useNativeEncoding=true",
"spring.cloud.stream.bindings.processSomethingElse-in-0.destination=topicB",
    "spring.cloud.stream.bindings.processSomethingElse-in-0.producer.useNativeEncoding=true",
    "spring.cloud.stream.kafka.streams.binder.configuration.application.server=localhost:8080",
    "spring.cloud.stream.function.definition=processSomething;processSomethingElse"])
class MyKStreamTests {

    private val logger = LoggerFactory.getLogger(javaClass)

    @Autowired
    private lateinit var embeddedKafka: EmbeddedKafkaBroker

    @Autowired
    private lateinit var schemaRegistryMock: SchemaRegistryMock

    @AfterAll
    fun afterAll() {
        embeddedKafka.kafkaServers.forEach { it.shutdown() }
        embeddedKafka.kafkaServers.forEach { it.awaitShutdown() }
    }

    @Test
    fun `should send and process something`() {

        val producer = createProducer()
        logger.debug("**********----> presend")
        val msg = MessageBuilder.withPayload(xxx)
                .setHeader(KafkaHeaders.MESSAGE_KEY, xxx)
                .setHeader(KafkaHeaders.TIMESTAMP, 1L)
                .build()
        producer.send(msg).get()
        logger.debug("**********----> sent")

        Thread.sleep(100000)
    }
}

@Configuration
class KafkaTestConfiguration(private val embeddedKafkaBroker: EmbeddedKafkaBroker) {

    private val schemaRegistryMock = SchemaRegistryMock()

    @PostConstruct
    fun init() {
        System.setProperty("spring.kafka.bootstrap-servers", embeddedKafkaBroker.brokersAsString)
        System.setProperty("spring.cloud.stream.kafka.streams.binder.brokers", embeddedKafkaBroker.brokersAsString)
        schemaRegistryMock.start()
        System.setProperty("spring.cloud.stream.kafka.streams.binder.configuration.schema.registry.url", schemaRegistryMock.url)
    }

    @Bean
    fun schemaRegistryMock(): SchemaRegistryMock {
        return schemaRegistryMock
    }

    @PreDestroy
    fun preDestroy() {
        schemaRegistryMock.stop()
    }
}

您可能正在使用 spring-cloud-stream-test-support 作为依赖项,并且此依赖项绕过了活页夹 API 的一些核心功能,导致了此错误。

https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.3.RELEASE/reference/html/spring-cloud-stream.html#_testing