Spring Cloud Stream 函数式方法:消息转换生成具有空字段值的对象
Spring Cloud Stream functional approach: message conversion produces an object with empty field values
我正在尝试按照功能方法(Spring Boot:2.3.4,SC:Hoxton.SR9,SC Stream:3.0. 9、SC功能3.0.11)。
问题:自动反序列化的对象具有空字段值。
Json kafka 消息的负载:
{"name":"orga-example"}
代码示例:
@SpringBootApplication
public class TestApplication {
public static void main(String[] args) {
SpringApplication.run(TestApplication.class, args);
}
@Bean
public Consumer<Flux<Organisation>> testFunction() {
return inboundMessage -> inboundMessage
.map(message -> {
System.out.println("Organisation name is: " + message.getName());
return message;
})
.subscribe();
}
@Data
static class Organisation {
private String name;
}
}
application.yaml
spring:
application.name: test-application
cloud:
stream:
kafka.binder:
brokers: kafka1:9092
autoCreateTopics: false
bindings:
testFunction-in-0:
destination: com.example.test
group: test-group
function:
routing.enabled: true
function:
definition: testFunction
日志:
2020-12-14 23:05:33.933 INFO 74045 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.5.1
2020-12-14 23:05:33.934 INFO 74045 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 0efa8fb0f4c73d92
2020-12-14 23:05:33.934 INFO 74045 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1607983533933
2020-12-14 23:05:33.979 INFO 74045 --- [ main] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-test-group-1, groupId=test-group] Cluster ID: Pm9Q5h9cSECBAZ7n1o_urg
2020-12-14 23:05:34.001 INFO 74045 --- [ main] o.s.c.stream.binder.BinderErrorChannel : Channel 'com.example.test.test-group.errors' has 1 subscriber(s).
2020-12-14 23:05:34.002 INFO 74045 --- [ main] o.s.c.stream.binder.BinderErrorChannel : Channel 'com.example.test.test-group.errors' has 0 subscriber(s).
2020-12-14 23:05:34.002 INFO 74045 --- [ main] o.s.c.stream.binder.BinderErrorChannel : Channel 'com.example.test.test-group.errors' has 1 subscriber(s).
2020-12-14 23:05:34.002 INFO 74045 --- [ main] o.s.c.stream.binder.BinderErrorChannel : Channel 'com.example.test.test-group.errors' has 2 subscriber(s).
2020-12-14 23:05:34.016 INFO 74045 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 100
auto.offset.reset = earliest
bootstrap.servers = [kafka1:9092]
check.crcs = true
client.dns.lookup = default
client.id =
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = test-group
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
2020-12-14 23:05:34.020 INFO 74045 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.5.1
2020-12-14 23:05:34.020 INFO 74045 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 0efa8fb0f4c73d92
2020-12-14 23:05:34.020 INFO 74045 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1607983534020
2020-12-14 23:05:34.021 INFO 74045 --- [ main] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-test-group-2, groupId=test-group] Subscribed to topic(s): com.example.test
2020-12-14 23:05:34.022 INFO 74045 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService
2020-12-14 23:05:34.025 INFO 74045 --- [ main] s.i.k.i.KafkaMessageDrivenChannelAdapter : started org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter@7ec13984
2020-12-14 23:05:34.038 INFO 74045 --- [container-0-C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-test-group-2, groupId=test-group] Cluster ID: Pm9Q5h9cSECBAZ7n1o_urg
2020-12-14 23:05:34.039 INFO 74045 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-test-group-2, groupId=test-group] Discovered group coordinator kafka1:9092 (id: 2147483646 rack: null)
2020-12-14 23:05:34.041 INFO 74045 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-test-group-2, groupId=test-group] (Re-)joining group
2020-12-14 23:05:34.053 INFO 74045 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-test-group-2, groupId=test-group] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group
2020-12-14 23:05:34.053 INFO 74045 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-test-group-2, groupId=test-group] (Re-)joining group
2020-12-14 23:05:34.134 INFO 74045 --- [ main] o.s.b.web.embedded.netty.NettyWebServer : Netty started on port(s): 8080
2020-12-14 23:05:34.156 INFO 74045 --- [ main] com.test.test.TestApplication : Started TestApplication in 3.966 seconds (JVM running for 4.365)
2020-12-14 23:05:37.056 INFO 74045 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-group-2, groupId=test-group] Finished assignment for group at generation 13: {consumer-test-group-2-eb434f26-189c-44ae-90ff-1b4ad8c43553=Assignment(partitions=[com.example.test-0, com.example.test-1, com.example.test-2, com.example.test-3, com.example.test-4, com.example.test-5, com.example.test-6, com.example.test-7])}
2020-12-14 23:05:37.065 INFO 74045 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-test-group-2, groupId=test-group] Successfully joined group with generation 13
2020-12-14 23:05:37.068 INFO 74045 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-group-2, groupId=test-group] Adding newly assigned partitions: com.example.test-1, com.example.test-2, com.example.test-3, com.example.test-4, com.example.test-0, com.example.test-5, com.example.test-6, com.example.test-7
2020-12-14 23:05:37.075 INFO 74045 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-group-2, groupId=test-group] Found no committed offset for partition com.example.test-1
...
2020-12-14 23:05:37.076 INFO 74045 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-group-2, groupId=test-group] Setting offset for partition com.example.test-0 to the committed offset FetchPosition{offset=8, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafka1:9092 (id: 1 rack: null)], epoch=absent}}
...
Organisation name is: null
命令式方法一切正常:
@SpringBootApplication
public class TestApplication {
public static void main(String[] args) {
SpringApplication.run(TestApplication.class, args);
}
@Bean
public Consumer<Organisation> testFunction() {
return inboundMessage -> System.out.println("Organisation name is: " + inboundMessage.getName());
}
@Data
static class Organisation {
private String name;
}
}
日志:
2020-12-14 23:22:51.116 INFO 75026 --- [ main] o.s.b.web.embedded.netty.NettyWebServer : Netty started on port(s): 8080
2020-12-14 23:22:52.121 INFO 75026 --- [ main] o.s.cloud.commons.util.InetUtils : Cannot determine local hostname
2020-12-14 23:22:53.126 INFO 75026 --- [ main] o.s.cloud.commons.util.InetUtils : Cannot determine local hostname
2020-12-14 23:22:53.147 INFO 75026 --- [ main] com.test.test.TestApplication : Started TestApplication in 20.799 seconds (JVM running for 21.176)
2020-12-14 23:23:02.994 INFO 75026 --- [container-0-C-1] o.s.i.h.s.MessagingMethodInvokerHelper : Overriding default instance of MessageHandlerMethodFactory with provided one.
Organisation name is: orga-example
我也试过 运行 一个示例应用程序:https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/processor-samples/sensor-average-reactive-kafka
我遇到了同样的问题。我得到的输出是:
2020-12-14 20:13:49.827 INFO 63080 --- [container-0-C-1] nsorAverageProcessorApplication$TestSink : Data received: {"id":0,"average":0.0}
2020-12-14 20:13:52.802 INFO 63080 --- [container-0-C-1] nsorAverageProcessorApplication$TestSink : Data received: {"id":0,"average":0.0}
2020-12-14 20:13:55.801 INFO 63080 --- [container-0-C-1] nsorAverageProcessorApplication$TestSink : Data received: {"id":0,"average":0.0}
任何提示或建议都会非常有帮助和感激:)
根据 https://github.com/spring-cloud/spring-cloud-stream/issues/2056,此问题与 spring-cloud-function 的 3.0.11-RELEASE 有关。
将 spring-cloud-function 降级到 3.0.10-RELEASE 暂时解决了这个问题:
<dependencyManagement>
<dependencies>
<!-- This can be removed when this issue is fixed:
https://github.com/spring-cloud/spring-cloud-stream/issues/2056 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-function-dependencies</artifactId>
<version>3.0.10.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-dependencies</artifactId>
<version>Horsham.SR10</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Hoxton.SR9</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
我正在尝试按照功能方法(Spring Boot:2.3.4,SC:Hoxton.SR9,SC Stream:3.0. 9、SC功能3.0.11)。 问题:自动反序列化的对象具有空字段值。
Json kafka 消息的负载:
{"name":"orga-example"}
代码示例:
@SpringBootApplication
public class TestApplication {
public static void main(String[] args) {
SpringApplication.run(TestApplication.class, args);
}
@Bean
public Consumer<Flux<Organisation>> testFunction() {
return inboundMessage -> inboundMessage
.map(message -> {
System.out.println("Organisation name is: " + message.getName());
return message;
})
.subscribe();
}
@Data
static class Organisation {
private String name;
}
}
application.yaml
spring:
application.name: test-application
cloud:
stream:
kafka.binder:
brokers: kafka1:9092
autoCreateTopics: false
bindings:
testFunction-in-0:
destination: com.example.test
group: test-group
function:
routing.enabled: true
function:
definition: testFunction
日志:
2020-12-14 23:05:33.933 INFO 74045 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.5.1
2020-12-14 23:05:33.934 INFO 74045 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 0efa8fb0f4c73d92
2020-12-14 23:05:33.934 INFO 74045 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1607983533933
2020-12-14 23:05:33.979 INFO 74045 --- [ main] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-test-group-1, groupId=test-group] Cluster ID: Pm9Q5h9cSECBAZ7n1o_urg
2020-12-14 23:05:34.001 INFO 74045 --- [ main] o.s.c.stream.binder.BinderErrorChannel : Channel 'com.example.test.test-group.errors' has 1 subscriber(s).
2020-12-14 23:05:34.002 INFO 74045 --- [ main] o.s.c.stream.binder.BinderErrorChannel : Channel 'com.example.test.test-group.errors' has 0 subscriber(s).
2020-12-14 23:05:34.002 INFO 74045 --- [ main] o.s.c.stream.binder.BinderErrorChannel : Channel 'com.example.test.test-group.errors' has 1 subscriber(s).
2020-12-14 23:05:34.002 INFO 74045 --- [ main] o.s.c.stream.binder.BinderErrorChannel : Channel 'com.example.test.test-group.errors' has 2 subscriber(s).
2020-12-14 23:05:34.016 INFO 74045 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 100
auto.offset.reset = earliest
bootstrap.servers = [kafka1:9092]
check.crcs = true
client.dns.lookup = default
client.id =
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = test-group
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
2020-12-14 23:05:34.020 INFO 74045 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.5.1
2020-12-14 23:05:34.020 INFO 74045 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 0efa8fb0f4c73d92
2020-12-14 23:05:34.020 INFO 74045 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1607983534020
2020-12-14 23:05:34.021 INFO 74045 --- [ main] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-test-group-2, groupId=test-group] Subscribed to topic(s): com.example.test
2020-12-14 23:05:34.022 INFO 74045 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService
2020-12-14 23:05:34.025 INFO 74045 --- [ main] s.i.k.i.KafkaMessageDrivenChannelAdapter : started org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter@7ec13984
2020-12-14 23:05:34.038 INFO 74045 --- [container-0-C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-test-group-2, groupId=test-group] Cluster ID: Pm9Q5h9cSECBAZ7n1o_urg
2020-12-14 23:05:34.039 INFO 74045 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-test-group-2, groupId=test-group] Discovered group coordinator kafka1:9092 (id: 2147483646 rack: null)
2020-12-14 23:05:34.041 INFO 74045 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-test-group-2, groupId=test-group] (Re-)joining group
2020-12-14 23:05:34.053 INFO 74045 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-test-group-2, groupId=test-group] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group
2020-12-14 23:05:34.053 INFO 74045 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-test-group-2, groupId=test-group] (Re-)joining group
2020-12-14 23:05:34.134 INFO 74045 --- [ main] o.s.b.web.embedded.netty.NettyWebServer : Netty started on port(s): 8080
2020-12-14 23:05:34.156 INFO 74045 --- [ main] com.test.test.TestApplication : Started TestApplication in 3.966 seconds (JVM running for 4.365)
2020-12-14 23:05:37.056 INFO 74045 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-group-2, groupId=test-group] Finished assignment for group at generation 13: {consumer-test-group-2-eb434f26-189c-44ae-90ff-1b4ad8c43553=Assignment(partitions=[com.example.test-0, com.example.test-1, com.example.test-2, com.example.test-3, com.example.test-4, com.example.test-5, com.example.test-6, com.example.test-7])}
2020-12-14 23:05:37.065 INFO 74045 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-test-group-2, groupId=test-group] Successfully joined group with generation 13
2020-12-14 23:05:37.068 INFO 74045 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-group-2, groupId=test-group] Adding newly assigned partitions: com.example.test-1, com.example.test-2, com.example.test-3, com.example.test-4, com.example.test-0, com.example.test-5, com.example.test-6, com.example.test-7
2020-12-14 23:05:37.075 INFO 74045 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-group-2, groupId=test-group] Found no committed offset for partition com.example.test-1
...
2020-12-14 23:05:37.076 INFO 74045 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-group-2, groupId=test-group] Setting offset for partition com.example.test-0 to the committed offset FetchPosition{offset=8, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafka1:9092 (id: 1 rack: null)], epoch=absent}}
...
Organisation name is: null
命令式方法一切正常:
@SpringBootApplication
public class TestApplication {
public static void main(String[] args) {
SpringApplication.run(TestApplication.class, args);
}
@Bean
public Consumer<Organisation> testFunction() {
return inboundMessage -> System.out.println("Organisation name is: " + inboundMessage.getName());
}
@Data
static class Organisation {
private String name;
}
}
日志:
2020-12-14 23:22:51.116 INFO 75026 --- [ main] o.s.b.web.embedded.netty.NettyWebServer : Netty started on port(s): 8080
2020-12-14 23:22:52.121 INFO 75026 --- [ main] o.s.cloud.commons.util.InetUtils : Cannot determine local hostname
2020-12-14 23:22:53.126 INFO 75026 --- [ main] o.s.cloud.commons.util.InetUtils : Cannot determine local hostname
2020-12-14 23:22:53.147 INFO 75026 --- [ main] com.test.test.TestApplication : Started TestApplication in 20.799 seconds (JVM running for 21.176)
2020-12-14 23:23:02.994 INFO 75026 --- [container-0-C-1] o.s.i.h.s.MessagingMethodInvokerHelper : Overriding default instance of MessageHandlerMethodFactory with provided one.
Organisation name is: orga-example
我也试过 运行 一个示例应用程序:https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/processor-samples/sensor-average-reactive-kafka 我遇到了同样的问题。我得到的输出是:
2020-12-14 20:13:49.827 INFO 63080 --- [container-0-C-1] nsorAverageProcessorApplication$TestSink : Data received: {"id":0,"average":0.0}
2020-12-14 20:13:52.802 INFO 63080 --- [container-0-C-1] nsorAverageProcessorApplication$TestSink : Data received: {"id":0,"average":0.0}
2020-12-14 20:13:55.801 INFO 63080 --- [container-0-C-1] nsorAverageProcessorApplication$TestSink : Data received: {"id":0,"average":0.0}
任何提示或建议都会非常有帮助和感激:)
根据 https://github.com/spring-cloud/spring-cloud-stream/issues/2056,此问题与 spring-cloud-function 的 3.0.11-RELEASE 有关。
将 spring-cloud-function 降级到 3.0.10-RELEASE 暂时解决了这个问题:
<dependencyManagement>
<dependencies>
<!-- This can be removed when this issue is fixed:
https://github.com/spring-cloud/spring-cloud-stream/issues/2056 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-function-dependencies</artifactId>
<version>3.0.10.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-dependencies</artifactId>
<version>Horsham.SR10</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Hoxton.SR9</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>