使用 Batch 从 Kafka 读取数据无法使用 SpringBoot 正常工作
Read data from Kafka using Batch not work correctly using SpringBoot
我使用 SpringBoot
并希望使用批处理从 Kafka
读取数据。我的 application.yml
看起来像这样:
spring:
kafka:
bootstrap-servers:
- localhost:9092
properties:
schema.registry.url: http://localhost:8081
consumer:
auto-offset-reset: earliest
max-poll-records: 50000
enable-auto-commit: true
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
group-id: "batch"
properties:
fetch.min.bytes: 1000000
fetch.max.wait.ms: 20000
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
listener:
type: batch
我的听众:
@KafkaListener(id = "bar2", topics = "TestTopic")
public void listen(List<ConsumerRecord<String, GenericRecord>> records) {
log.info("start of batch receive. Size::{}", records.size());
}
在日志中我看到:
2019-10-04 11:08:19.693 INFO 2123 --- [ bar2-0-C-1] kafka.batch.demo.DemoApplication : start of batch receive. Size::33279
2019-10-04 11:08:19.746 INFO 2123 --- [ bar2-0-C-1] kafka.batch.demo.DemoApplication : start of batch receive. Size::33353
2019-10-04 11:08:19.784 INFO 2123 --- [ bar2-0-C-1] kafka.batch.demo.DemoApplication : start of batch receive. Size::33400
2019-10-04 11:08:19.821 INFO 2123 --- [ bar2-0-C-1] kafka.batch.demo.DemoApplication : start of batch receive. Size::33556
2019-10-04 11:08:39.859 INFO 2123 --- [ bar2-0-C-1] kafka.batch.demo.DemoApplication : start of batch receive. Size::16412
我设置了所需的设置:fetch.min.bytes
和 fetch.max.wait.ms
,但它们没有任何效果。
在日志中我看到一个包在任何设置下的大小都不超过 33000。我崩溃了,我不明白为什么会这样?
max.poll.records
只是最大值。
还有其他属性会影响您获得的记录数
fetch.min.bytes
- 服务器应该 return 获取请求的最小数据量。如果可用数据不足,则请求将等待积累足够多的数据,然后再回答请求。 1 字节的默认设置意味着一旦单个字节的数据可用或获取请求在等待数据到达时超时,就会立即响应获取请求。将此设置为大于 1 的值将导致服务器等待大量数据的积累,这可以稍微提高服务器吞吐量,但会增加一些延迟。
fetch.max.wait.ms
- 如果没有足够的数据立即满足 fetch.min.bytes. 给出的要求,服务器在回答获取请求之前将阻塞的最长时间
无法精确控制最小记录数(除非它们的长度都相同)。
我使用 SpringBoot
并希望使用批处理从 Kafka
读取数据。我的 application.yml
看起来像这样:
spring:
kafka:
bootstrap-servers:
- localhost:9092
properties:
schema.registry.url: http://localhost:8081
consumer:
auto-offset-reset: earliest
max-poll-records: 50000
enable-auto-commit: true
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
group-id: "batch"
properties:
fetch.min.bytes: 1000000
fetch.max.wait.ms: 20000
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
listener:
type: batch
我的听众:
@KafkaListener(id = "bar2", topics = "TestTopic")
public void listen(List<ConsumerRecord<String, GenericRecord>> records) {
log.info("start of batch receive. Size::{}", records.size());
}
在日志中我看到:
2019-10-04 11:08:19.693 INFO 2123 --- [ bar2-0-C-1] kafka.batch.demo.DemoApplication : start of batch receive. Size::33279
2019-10-04 11:08:19.746 INFO 2123 --- [ bar2-0-C-1] kafka.batch.demo.DemoApplication : start of batch receive. Size::33353
2019-10-04 11:08:19.784 INFO 2123 --- [ bar2-0-C-1] kafka.batch.demo.DemoApplication : start of batch receive. Size::33400
2019-10-04 11:08:19.821 INFO 2123 --- [ bar2-0-C-1] kafka.batch.demo.DemoApplication : start of batch receive. Size::33556
2019-10-04 11:08:39.859 INFO 2123 --- [ bar2-0-C-1] kafka.batch.demo.DemoApplication : start of batch receive. Size::16412
我设置了所需的设置:fetch.min.bytes
和 fetch.max.wait.ms
,但它们没有任何效果。
在日志中我看到一个包在任何设置下的大小都不超过 33000。我崩溃了,我不明白为什么会这样?
max.poll.records
只是最大值。
还有其他属性会影响您获得的记录数
fetch.min.bytes
- 服务器应该 return 获取请求的最小数据量。如果可用数据不足,则请求将等待积累足够多的数据,然后再回答请求。 1 字节的默认设置意味着一旦单个字节的数据可用或获取请求在等待数据到达时超时,就会立即响应获取请求。将此设置为大于 1 的值将导致服务器等待大量数据的积累,这可以稍微提高服务器吞吐量,但会增加一些延迟。fetch.max.wait.ms
- 如果没有足够的数据立即满足 fetch.min.bytes. 给出的要求,服务器在回答获取请求之前将阻塞的最长时间
无法精确控制最小记录数(除非它们的长度都相同)。