Spring Cloud Stream 函数式方法:消息转换生成具有空字段值的对象

Spring Cloud Stream functional approach: message conversion produces an object with empty field values

我正在尝试按照功能方法(Spring Boot:2.3.4,SC:Hoxton.SR9,SC Stream:3.0. 9、SC功能3.0.11)。 问题:自动反序列化的对象具有空字段值。

Json kafka 消息的负载:

{"name":"orga-example"}

代码示例:

 @SpringBootApplication
public class TestApplication {

    public static void main(String[] args) {
        SpringApplication.run(TestApplication.class, args);
    }


    @Bean
    public Consumer<Flux<Organisation>> testFunction() {
        return inboundMessage -> inboundMessage
                .map(message -> {
                    System.out.println("Organisation name is: " + message.getName());
                    return message;
                })
                .subscribe();
    }


    @Data
    static class Organisation {
        private String name;

    }
}

application.yaml

spring:
  application.name: test-application
  cloud:
    stream:
      kafka.binder:
        brokers: kafka1:9092
        autoCreateTopics: false
      bindings:
        testFunction-in-0:
          destination: com.example.test
          group: test-group
      function:
        routing.enabled: true
    function:
      definition: testFunction

日志:

        2020-12-14 23:05:33.933  INFO 74045 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.5.1
    2020-12-14 23:05:33.934  INFO 74045 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 0efa8fb0f4c73d92
    2020-12-14 23:05:33.934  INFO 74045 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1607983533933
    2020-12-14 23:05:33.979  INFO 74045 --- [           main] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-test-group-1, groupId=test-group] Cluster ID: Pm9Q5h9cSECBAZ7n1o_urg
    2020-12-14 23:05:34.001  INFO 74045 --- [           main] o.s.c.stream.binder.BinderErrorChannel   : Channel 'com.example.test.test-group.errors' has 1 subscriber(s).
    2020-12-14 23:05:34.002  INFO 74045 --- [           main] o.s.c.stream.binder.BinderErrorChannel   : Channel 'com.example.test.test-group.errors' has 0 subscriber(s).
    2020-12-14 23:05:34.002  INFO 74045 --- [           main] o.s.c.stream.binder.BinderErrorChannel   : Channel 'com.example.test.test-group.errors' has 1 subscriber(s).
    2020-12-14 23:05:34.002  INFO 74045 --- [           main] o.s.c.stream.binder.BinderErrorChannel   : Channel 'com.example.test.test-group.errors' has 2 subscriber(s).
    2020-12-14 23:05:34.016  INFO 74045 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
        allow.auto.create.topics = true
        auto.commit.interval.ms = 100
        auto.offset.reset = earliest
        bootstrap.servers = [kafka1:9092]
        check.crcs = true
        client.dns.lookup = default
        client.id = 
        client.rack = 
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = false
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = test-group
        group.instance.id = null
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        isolation.level = read_uncommitted
        key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        security.providers = null
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2]
        ssl.endpoint.identification.algorithm = https
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLSv1.2
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    
    2020-12-14 23:05:34.020  INFO 74045 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.5.1
    2020-12-14 23:05:34.020  INFO 74045 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 0efa8fb0f4c73d92
    2020-12-14 23:05:34.020  INFO 74045 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1607983534020
    2020-12-14 23:05:34.021  INFO 74045 --- [           main] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-test-group-2, groupId=test-group] Subscribed to topic(s): com.example.test
    2020-12-14 23:05:34.022  INFO 74045 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService
    2020-12-14 23:05:34.025  INFO 74045 --- [           main] s.i.k.i.KafkaMessageDrivenChannelAdapter : started org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter@7ec13984
    2020-12-14 23:05:34.038  INFO 74045 --- [container-0-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-test-group-2, groupId=test-group] Cluster ID: Pm9Q5h9cSECBAZ7n1o_urg
    2020-12-14 23:05:34.039  INFO 74045 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-test-group-2, groupId=test-group] Discovered group coordinator kafka1:9092 (id: 2147483646 rack: null)
    2020-12-14 23:05:34.041  INFO 74045 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-test-group-2, groupId=test-group] (Re-)joining group
    2020-12-14 23:05:34.053  INFO 74045 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-test-group-2, groupId=test-group] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group
    2020-12-14 23:05:34.053  INFO 74045 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-test-group-2, groupId=test-group] (Re-)joining group
    2020-12-14 23:05:34.134  INFO 74045 --- [           main] o.s.b.web.embedded.netty.NettyWebServer  : Netty started on port(s): 8080
    2020-12-14 23:05:34.156  INFO 74045 --- [           main] com.test.test.TestApplication            : Started TestApplication in 3.966 seconds (JVM running for 4.365)
    2020-12-14 23:05:37.056  INFO 74045 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-group-2, groupId=test-group] Finished assignment for group at generation 13: {consumer-test-group-2-eb434f26-189c-44ae-90ff-1b4ad8c43553=Assignment(partitions=[com.example.test-0, com.example.test-1, com.example.test-2, com.example.test-3, com.example.test-4, com.example.test-5, com.example.test-6, com.example.test-7])}
    2020-12-14 23:05:37.065  INFO 74045 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-test-group-2, groupId=test-group] Successfully joined group with generation 13
    2020-12-14 23:05:37.068  INFO 74045 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-group-2, groupId=test-group] Adding newly assigned partitions: com.example.test-1, com.example.test-2, com.example.test-3, com.example.test-4, com.example.test-0, com.example.test-5, com.example.test-6, com.example.test-7
    2020-12-14 23:05:37.075  INFO 74045 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-group-2, groupId=test-group] Found no committed offset for partition com.example.test-1
...
    2020-12-14 23:05:37.076  INFO 74045 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-group-2, groupId=test-group] Setting offset for partition com.example.test-0 to the committed offset FetchPosition{offset=8, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafka1:9092 (id: 1 rack: null)], epoch=absent}}
...
    Organisation name is: null

命令式方法一切正常:

    @SpringBootApplication
public class TestApplication {

    public static void main(String[] args) {
        SpringApplication.run(TestApplication.class, args);
    }

    @Bean
    public Consumer<Organisation> testFunction() {
        return inboundMessage -> System.out.println("Organisation name is: " + inboundMessage.getName());
    }
    
    @Data
    static class Organisation {
        private String name;

    }
}

日志:

2020-12-14 23:22:51.116  INFO 75026 --- [           main] o.s.b.web.embedded.netty.NettyWebServer  : Netty started on port(s): 8080
2020-12-14 23:22:52.121  INFO 75026 --- [           main] o.s.cloud.commons.util.InetUtils         : Cannot determine local hostname
2020-12-14 23:22:53.126  INFO 75026 --- [           main] o.s.cloud.commons.util.InetUtils         : Cannot determine local hostname
2020-12-14 23:22:53.147  INFO 75026 --- [           main] com.test.test.TestApplication            : Started TestApplication in 20.799 seconds (JVM running for 21.176)
2020-12-14 23:23:02.994  INFO 75026 --- [container-0-C-1] o.s.i.h.s.MessagingMethodInvokerHelper   : Overriding default instance of MessageHandlerMethodFactory with provided one.
Organisation name is: orga-example

我也试过 运行 一个示例应用程序:https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/processor-samples/sensor-average-reactive-kafka 我遇到了同样的问题。我得到的输出是:

2020-12-14 20:13:49.827  INFO 63080 --- [container-0-C-1] nsorAverageProcessorApplication$TestSink : Data received: {"id":0,"average":0.0}
2020-12-14 20:13:52.802  INFO 63080 --- [container-0-C-1] nsorAverageProcessorApplication$TestSink : Data received: {"id":0,"average":0.0}
2020-12-14 20:13:55.801  INFO 63080 --- [container-0-C-1] nsorAverageProcessorApplication$TestSink : Data received: {"id":0,"average":0.0}

任何提示或建议都会非常有帮助和感激:)

根据 https://github.com/spring-cloud/spring-cloud-stream/issues/2056,此问题与 spring-cloud-function 的 3.0.11-RELEASE 有关。

将 spring-cloud-function 降级到 3.0.10-RELEASE 暂时解决了这个问题:

    <dependencyManagement>
    <dependencies>
        <!-- This can be removed when this issue is fixed:
      https://github.com/spring-cloud/spring-cloud-stream/issues/2056 -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-function-dependencies</artifactId>
            <version>3.0.10.RELEASE</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-dependencies</artifactId>
            <version>Horsham.SR10</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>Hoxton.SR9</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>