Spring Cloud Stream Kafka 在 Reactive 中被忽略的消息
Messages getting omitted in Spring Cloud Stream Kafka in Reactive
我有一个 spring 云应用程序,使用 spring 反应核心监听两个主题,每个主题有 10 个分区。
在消费者中我只是阅读消息并打印主题、分区和偏移量,有些消息没有被读取。
我已经尝试了自动提交和手动确认。
测试设置:
将 30K 消息推送到每个主题 Topic 1 和 Topic 2 并启动应用程序,它只读取 59999 条记录而不是 60000 条。
所有主题的所有分区的滞后为 0,表明所有数据都已消耗。
- 收件人代码:
@Bean
public Consumer<Flux<Message<String>>> receiver() {
return (sink -> {
sink
.doOnNext((record)->{
String topic=(String) record.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC);
Long offset=(Long) record.getHeaders().get(KafkaHeaders.OFFSET);
Integer partition=(Integer) record.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID);
log.startstate(()-> format("Register Topic %s partition %d offset %d",topic,partition,offset));
})
.subscribe();
});
我的Application.yml包含以下信息
spring:
cloud:
stream:
kafka:
default:
consumer:
autoCommitOffset: true
bindings:
receiver-in-0:
consumer:
autoCommitOffset: true
binder:
brokers: localhost:9092
autoAddPartitions: true
minPartitionCount: 10
auto-offset-reset: earliest
bindings:
receiver-in-0:
binder: kafka
destination: Topic1,Topic2
content-type: text/plain;charset=UTF-8
group: input-group-1
max-attempts: 5
back-off-initial-interval: 10000
back-off-max-interval: 30000
emitter-out-0:
binder: kafka
producer:
partition-count: 2
partition-key-extractor-name: EmitterPartitionKey
erroremitter-out-0:
binder: kafka
destination: error
error:
binder: kafka
destination: error
spring.cloud.stream.function.definition: receiver;emitter;erroremitter
我的日志文件显示消费者没有读取 Topic:Topic1
partition:2
offset 76031
,它从 76030
跳到 76032
,
[STARTSTATE] 2020-05-22 11:40:01.033 [KafkaConsumerDestination{consumerDestinationName='Topic1', partitions=10, dlqName='null'}.container-0-C-1][52] CloudConsumer - Register Topic Topic1 partition 2 offset 76030
[STARTSTATE] 2020-05-22 11:40:01.034 [KafkaConsumerDestination{consumerDestinationName='Topic1', partitions=10, dlqName='null'}.container-0-C-1][52] CloudConsumer - Register Topic Topic2 partition 7 offset 86149
[STARTSTATE] 2020-05-22 11:40:01.034 [KafkaConsumerDestination{consumerDestinationName='Topic1', partitions=10, dlqName='null'}.container-0-C-1][52] CloudConsumer - Register Topic Topic2 partition 7 offset 86150
[STARTSTATE] 2020-05-22 11:40:01.034 [KafkaConsumerDestination{consumerDestinationName='Topic1', partitions=10, dlqName='null'}.container-0-C-1][52] CloudConsumer - Register Topic Topic1 partition 2 offset 76032
pom.xml
的相关章节:
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>11</java.version>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<spring-boot.version>2.1.7.RELEASE</spring-boot.version>
<spring-cloud.version>Hoxton.SR4</spring-cloud.version>
<skipTests>true</skipTests>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>3.0.4.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-reactive</artifactId>
<version>2.2.1.RELEASE</version>
</dependency>
</dependencies>
我重现了你的问题并将针对 SCSt 提出一个问题(不确定是流问题还是 Reactor)。
当不涉及 Reactor 时,我没有看到任何缺失的记录。
Consumer<Message<String>>
https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/906
我有一个 spring 云应用程序,使用 spring 反应核心监听两个主题,每个主题有 10 个分区。
在消费者中我只是阅读消息并打印主题、分区和偏移量,有些消息没有被读取。
我已经尝试了自动提交和手动确认。
测试设置: 将 30K 消息推送到每个主题 Topic 1 和 Topic 2 并启动应用程序,它只读取 59999 条记录而不是 60000 条。
所有主题的所有分区的滞后为 0,表明所有数据都已消耗。
- 收件人代码:
@Bean
public Consumer<Flux<Message<String>>> receiver() {
return (sink -> {
sink
.doOnNext((record)->{
String topic=(String) record.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC);
Long offset=(Long) record.getHeaders().get(KafkaHeaders.OFFSET);
Integer partition=(Integer) record.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID);
log.startstate(()-> format("Register Topic %s partition %d offset %d",topic,partition,offset));
})
.subscribe();
});
我的Application.yml包含以下信息
spring:
cloud:
stream:
kafka:
default:
consumer:
autoCommitOffset: true
bindings:
receiver-in-0:
consumer:
autoCommitOffset: true
binder:
brokers: localhost:9092
autoAddPartitions: true
minPartitionCount: 10
auto-offset-reset: earliest
bindings:
receiver-in-0:
binder: kafka
destination: Topic1,Topic2
content-type: text/plain;charset=UTF-8
group: input-group-1
max-attempts: 5
back-off-initial-interval: 10000
back-off-max-interval: 30000
emitter-out-0:
binder: kafka
producer:
partition-count: 2
partition-key-extractor-name: EmitterPartitionKey
erroremitter-out-0:
binder: kafka
destination: error
error:
binder: kafka
destination: error
spring.cloud.stream.function.definition: receiver;emitter;erroremitter
我的日志文件显示消费者没有读取 Topic:Topic1
partition:2
offset 76031
,它从 76030
跳到 76032
,
[STARTSTATE] 2020-05-22 11:40:01.033 [KafkaConsumerDestination{consumerDestinationName='Topic1', partitions=10, dlqName='null'}.container-0-C-1][52] CloudConsumer - Register Topic Topic1 partition 2 offset 76030
[STARTSTATE] 2020-05-22 11:40:01.034 [KafkaConsumerDestination{consumerDestinationName='Topic1', partitions=10, dlqName='null'}.container-0-C-1][52] CloudConsumer - Register Topic Topic2 partition 7 offset 86149
[STARTSTATE] 2020-05-22 11:40:01.034 [KafkaConsumerDestination{consumerDestinationName='Topic1', partitions=10, dlqName='null'}.container-0-C-1][52] CloudConsumer - Register Topic Topic2 partition 7 offset 86150
[STARTSTATE] 2020-05-22 11:40:01.034 [KafkaConsumerDestination{consumerDestinationName='Topic1', partitions=10, dlqName='null'}.container-0-C-1][52] CloudConsumer - Register Topic Topic1 partition 2 offset 76032
pom.xml
的相关章节:
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>11</java.version>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<spring-boot.version>2.1.7.RELEASE</spring-boot.version>
<spring-cloud.version>Hoxton.SR4</spring-cloud.version>
<skipTests>true</skipTests>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>3.0.4.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-reactive</artifactId>
<version>2.2.1.RELEASE</version>
</dependency>
</dependencies>
我重现了你的问题并将针对 SCSt 提出一个问题(不确定是流问题还是 Reactor)。
当不涉及 Reactor 时,我没有看到任何缺失的记录。
Consumer<Message<String>>
https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/906