Spring Kafka:在批处理侦听器模式下仅轮询 1 条记录

Spring Kafka: Polls only 1 record when in batch listener mode

我是 运行 一个 Spring Kafka 消费者,我想每 10 秒轮询一次给定的主题并获取所有记录或我指定的最大数量。主题包含一些 base64 图像字符串,通常尺寸为 700x400。下面是我的配置:

@Bean
public ConsumerFactory<String, String> consumerConfig() {
    Map<String, Object> config = new HashMap<>();
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    config.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "120000");
    config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2000);
    config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
    return new DefaultKafkaConsumerFactory<>(config);
}

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> listener = new ConcurrentKafkaListenerContainerFactory<>();
    listener.setBatchListener(true);
    listener.getContainerProperties().setIdleBetweenPolls(10000);
    listener.setConsumerFactory(consumerConfig());
    listener.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
    return listener;
}

下面是我的听众:

@KafkaListener(id = "feedconsumer", topicPattern = ".*_hello")
public void messageListener(List<ConsumerRecord> records, Acknowledgment acknowledgment) {
    log.info(String.valueOf(records.size()));
    acknowledgment.acknowledge();
}

在我的日志中我只能看到这个:

2021-03-29 17:48:12.793  INFO 25102 --- [dconsumer-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : feedconsumer: partitions assigned: [test_hello-0]
2021-03-29 17:48:13.338 DEBUG 25102 --- [dconsumer-0-C-1] essageListenerContainer$ListenerConsumer : Received: 1 records
2021-03-29 17:48:13.341 DEBUG 25102 --- [dconsumer-0-C-1] l.a.BatchMessagingMessageListenerAdapter : Processing [GenericMessage [payload=org.springframework.kafka.support.KafkaNull@4f27e57e, headers={id=a9dea384-5f4a-5a59-22ad-45be4ac0c819, timestamp=1617020279053}]]
2021-03-29 17:48:13.342  INFO 25102 --- [dconsumer-0-C-1] c.r.i.t.m.s.s.i.KafkaConsumerServiceImpl : 1
2021-03-29 17:48:13.344 DEBUG 25102 --- [dconsumer-0-C-1] essageListenerContainer$ListenerConsumer : Committing: {test_hello-0=OffsetAndMetadata{offset=92, leaderEpoch=null, metadata=''}}
2021-03-29 17:48:23.359 DEBUG 25102 --- [dconsumer-0-C-1] essageListenerContainer$ListenerConsumer : Received: 1 records

如您所见,即使启用了批处理侦听器并且最大记录数为 2000,我每 10 秒也只能获得 1 条记录。我错过了什么?

编辑:也尝试了以下配置

    config.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 10000000);
    config.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 50000000);
    config.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 10000);

更多日志:

2021-03-30 13:15:10.835 DEBUG 34356 --- [dconsumer-0-C-1] l.a.BatchMessagingMessageListenerAdapter : Processing [GenericMessage [payload=org.springframework.kafka.support.KafkaNull@b4ddc5, headers={id=72ae298d-1a89-d632-342a-282569e5c400, timestamp=1617090254725}]]
2021-03-30 13:15:10.836  INFO 34356 --- [dconsumer-0-C-1] c.r.i.t.m.s.s.i.KafkaConsumerServiceImpl : 1
2021-03-30 13:15:10.836 DEBUG 34356 --- [dconsumer-0-C-1] essageListenerContainer$ListenerConsumer : Committing: {test_hello-0=OffsetAndMetadata{offset=46, leaderEpoch=null, metadata=''}}
2021-03-30 13:15:10.836 DEBUG 34356 --- [dconsumer-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-feedconsumer-1, groupId=feedconsumer] Sending OFFSET_COMMIT request with header RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=8, clientId=consumer-feedconsumer-1, correlationId=59) and timeout 120000 to node 2147483646: {group_id=feedconsumer,generation_id=7,member_id=consumer-feedconsumer-1-ca5f91a1-e17b-40ad-a98f-770abbba1cee,group_instance_id=null,topics=[{name=test_hello,partitions=[{partition_index=0,committed_offset=46,committed_leader_epoch=-1,committed_metadata=,_tagged_fields={}}],_tagged_fields={}}],_tagged_fields={}}
2021-03-30 13:15:10.847 DEBUG 34356 --- [dconsumer-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-feedconsumer-1, groupId=feedconsumer] Received OFFSET_COMMIT response from node 2147483646 for request with header RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=8, clientId=consumer-feedconsumer-1, correlationId=59): OffsetCommitResponseData(throttleTimeMs=0, topics=[OffsetCommitResponseTopic(name='test_hello', partitions=[OffsetCommitResponsePartition(partitionIndex=0, errorCode=0)])])
2021-03-30 13:15:10.848 DEBUG 34356 --- [dconsumer-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-feedconsumer-1, groupId=feedconsumer] Committed offset 46 for partition test_hello-0
2021-03-30 13:15:11.015 DEBUG 34356 --- [ng-feedconsumer] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-feedconsumer-1, groupId=feedconsumer] Received FETCH response from node 1 for request with header RequestHeader(apiKey=FETCH, apiVersion=11, clientId=consumer-feedconsumer-1, correlationId=58): org.apache.kafka.common.requests.FetchResponse@66229066
2021-03-30 13:15:11.015 DEBUG 34356 --- [ng-feedconsumer] o.a.kafka.clients.FetchSessionHandler    : [Consumer clientId=consumer-feedconsumer-1, groupId=feedconsumer] Node 0 sent an incremental fetch response with throttleTimeMs = 1 for session 1615838501 with 1 response partition(s)
2021-03-30 13:15:11.016 DEBUG 34356 --- [ng-feedconsumer] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-feedconsumer-1, groupId=feedconsumer] Fetch READ_UNCOMMITTED at offset 46 for partition test_hello-0 returned fetch data (error=NONE, highWaterMark=4513, lastStableOffset = 4513, logStartOffset = 0, preferredReadReplica = absent, abortedTransactions = null, recordsSizeInBytes=1048576)
2021-03-30 13:15:12.263 DEBUG 34356 --- [alina-utility-2] org.apache.catalina.session.ManagerBase  : Start expire sessions StandardManager at 1617090312260 sessioncount 0
2021-03-30 13:15:12.264 DEBUG 34356 --- [alina-utility-2] org.apache.catalina.session.ManagerBase  : End expire sessions StandardManager processingTime 4 expired sessions: 0
2021-03-30 13:15:12.751 DEBUG 34356 --- [ng-feedconsumer] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-feedconsumer-1, groupId=feedconsumer] Sending Heartbeat request with generation 7 and member id consumer-feedconsumer-1-ca5f91a1-e17b-40ad-a98f-770abbba1cee to coordinator 192.168.1.3:9092 (id: 2147483646 rack: null)
2021-03-30 13:15:12.752 DEBUG 34356 --- [ng-feedconsumer] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-feedconsumer-1, groupId=feedconsumer] Sending HEARTBEAT request with header RequestHeader(apiKey=HEARTBEAT, apiVersion=4, clientId=consumer-feedconsumer-1, correlationId=60) and timeout 120000 to node 2147483646: {group_id=feedconsumer,generation_id=7,member_id=consumer-feedconsumer-1-ca5f91a1-e17b-40ad-a98f-770abbba1cee,group_instance_id=null,_tagged_fields={}}
2021-03-30 13:15:12.858 DEBUG 34356 --- [ng-feedconsumer] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-feedconsumer-1, groupId=feedconsumer] Received HEARTBEAT response from node 2147483646 for request with header RequestHeader(apiKey=HEARTBEAT, apiVersion=4, clientId=consumer-feedconsumer-1, correlationId=60): org.apache.kafka.common.requests.HeartbeatResponse@2e91937c
2021-03-30 13:15:12.858 DEBUG 34356 --- [ng-feedconsumer] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-feedconsumer-1, groupId=feedconsumer] Received successful Heartbeat response
2021-03-30 13:15:15.831 DEBUG 34356 --- [ng-feedconsumer] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-feedconsumer-1, groupId=feedconsumer] Sending Heartbeat request with generation 7 and member id consumer-feedconsumer-1-ca5f91a1-e17b-40ad-a98f-770abbba1cee to coordinator 192.168.1.3:9092 (id: 2147483646 rack: null)
2021-03-30 13:15:15.831 DEBUG 34356 --- [ng-feedconsumer] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-feedconsumer-1, groupId=feedconsumer] Sending HEARTBEAT request with header RequestHeader(apiKey=HEARTBEAT, apiVersion=4, clientId=consumer-feedconsumer-1, correlationId=61) and timeout 120000 to node 2147483646: {group_id=feedconsumer,generation_id=7,member_id=consumer-feedconsumer-1-ca5f91a1-e17b-40ad-a98f-770abbba1cee,group_instance_id=null,_tagged_fields={}}
2021-03-30 13:15:15.937 DEBUG 34356 --- [ng-feedconsumer] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-feedconsumer-1, groupId=feedconsumer] Received HEARTBEAT response from node 2147483646 for request with header RequestHeader(apiKey=HEARTBEAT, apiVersion=4, clientId=consumer-feedconsumer-1, correlationId=61): org.apache.kafka.common.requests.HeartbeatResponse@124bda17
2021-03-30 13:15:15.937 DEBUG 34356 --- [ng-feedconsumer] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-feedconsumer-1, groupId=feedconsumer] Received successful Heartbeat response
2021-03-30 13:15:18.906 DEBUG 34356 --- [ng-feedconsumer] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-feedconsumer-1, groupId=feedconsumer] Sending Heartbeat request with generation 7 and member id consumer-feedconsumer-1-ca5f91a1-e17b-40ad-a98f-770abbba1cee to coordinator 192.168.1.3:9092 (id: 2147483646 rack: null)
2021-03-30 13:15:18.907 DEBUG 34356 --- [ng-feedconsumer] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-feedconsumer-1, groupId=feedconsumer] Sending HEARTBEAT request with header RequestHeader(apiKey=HEARTBEAT, apiVersion=4, clientId=consumer-feedconsumer-1, correlationId=62) and timeout 120000 to node 2147483646: {group_id=feedconsumer,generation_id=7,member_id=consumer-feedconsumer-1-ca5f91a1-e17b-40ad-a98f-770abbba1cee,group_instance_id=null,_tagged_fields={}}
2021-03-30 13:15:19.012 DEBUG 34356 --- [ng-feedconsumer] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-feedconsumer-1, groupId=feedconsumer] Received HEARTBEAT response from node 2147483646 for request with header RequestHeader(apiKey=HEARTBEAT, apiVersion=4, clientId=consumer-feedconsumer-1, correlationId=62): org.apache.kafka.common.requests.HeartbeatResponse@50bb3548
2021-03-30 13:15:19.012 DEBUG 34356 --- [ng-feedconsumer] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-feedconsumer-1, groupId=feedconsumer] Received successful Heartbeat response
2021-03-30 13:15:20.857 DEBUG 34356 --- [dconsumer-0-C-1] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-feedconsumer-1, groupId=feedconsumer] Added READ_UNCOMMITTED fetch request for partition test_hello-0 at position FetchPosition{offset=47, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[192.168.1.3:9092 (id: 1 rack: null)], epoch=0}} to node 192.168.1.3:9092 (id: 1 rack: null)
2021-03-30 13:15:20.857 DEBUG 34356 --- [dconsumer-0-C-1] o.a.kafka.clients.FetchSessionHandler    : [Consumer clientId=consumer-feedconsumer-1, groupId=feedconsumer] Built incremental fetch (sessionId=1615838501, epoch=6) for node 1. Added 0 partition(s), altered 1 partition(s), removed 0 partition(s) out of 1 partition(s)
2021-03-30 13:15:20.857 DEBUG 34356 --- [dconsumer-0-C-1] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-feedconsumer-1, groupId=feedconsumer] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(test_hello-0), toForget=(), implied=()) to broker 192.168.1.3:9092 (id: 1 rack: null)
2021-03-30 13:15:20.857 DEBUG 34356 --- [dconsumer-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-feedconsumer-1, groupId=feedconsumer] Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=11, clientId=consumer-feedconsumer-1, correlationId=63) and timeout 120000 to node 1: {replica_id=-1,max_wait_time=10000,min_bytes=10000000,max_bytes=50000000,isolation_level=0,session_id=1615838501,session_epoch=6,topics=[{topic=test_hello,partitions=[{partition=0,current_leader_epoch=0,fetch_offset=47,log_start_offset=-1,partition_max_bytes=1048576}]}],forgotten_topics_data=[],rack_id=}
2021-03-30 13:15:20.858 DEBUG 34356 --- [dconsumer-0-C-1] essageListenerContainer$ListenerConsumer : Received: 1 records
2021-03-30 13:15:20.858 DEBUG 34356 --- [dconsumer-0-C-1] l.a.BatchMessagingMessageListenerAdapter : Processing [GenericMessage [payload=org.springframework.kafka.support.KafkaNull@b4ddc5, headers={id=72ae298d-1a89-d632-342a-282569e5c400, timestamp=1617090254725}]]
2021-03-30 13:15:20.859  INFO 34356 --- [dconsumer-0-C-1] c.r.i.t.m.s.s.i.KafkaConsumerServiceImpl : 1
2021-03-30 13:15:20.859 DEBUG 34356 --- [dconsumer-0-C-1] essageListenerContainer$ListenerConsumer : Committing: {test_hello-0=OffsetAndMetadata{offset=47, leaderEpoch=null, metadata=''}}
2021-03-30 13:15:20.860 DEBUG 34356 --- [dconsumer-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-feedconsumer-1, groupId=feedconsumer] Sending OFFSET_COMMIT request with header RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=8, clientId=consumer-feedconsumer-1, correlationId=64) and timeout 120000 to node 2147483646: {group_id=feedconsumer,generation_id=7,member_id=consumer-feedconsumer-1-ca5f91a1-e17b-40ad-a98f-770abbba1cee,group_instance_id=null,topics=[{name=test_hello,partitions=[{partition_index=0,committed_offset=47,committed_leader_epoch=-1,committed_metadata=,_tagged_fields={}}],_tagged_fields={}}],_tagged_fields={}}
2021-03-30 13:15:20.866 DEBUG 34356 --- [dconsumer-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-feedconsumer-1, groupId=feedconsumer] Received OFFSET_COMMIT response from node 2147483646 for request with header RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=8, clientId=consumer-feedconsumer-1, correlationId=64): OffsetCommitResponseData(throttleTimeMs=0, topics=[OffsetCommitResponseTopic(name='test_hello', partitions=[OffsetCommitResponsePartition(partitionIndex=0, errorCode=0)])])
2021-03-30 13:15:20.867 DEBUG 34356 --- [dconsumer-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-feedconsumer-1, groupId=feedconsumer] Committed offset 47 for partition test_hello-0
2021-03-30 13:15:21.164 DEBUG 34356 --- [ng-feedconsumer] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-feedconsumer-1, groupId=feedconsumer] Received FETCH response from node 1 for request with header RequestHeader(apiKey=FETCH, apiVersion=11, clientId=consumer-feedconsumer-1, correlationId=63): org.apache.kafka.common.requests.FetchResponse@22e83e99
2021-03-30 13:15:21.165 DEBUG 34356 --- [ng-feedconsumer] o.a.kafka.clients.FetchSessionHandler    : [Consumer clientId=consumer-feedconsumer-1, groupId=feedconsumer] Node 0 sent an incremental fetch response with throttleTimeMs = 1 for session 1615838501 with 1 response partition(s)
2021-03-30 13:15:21.165 DEBUG 34356 --- [ng-feedconsumer] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-feedconsumer-1, groupId=feedconsumer] Fetch READ_UNCOMMITTED at offset 47 for partition test_hello-0 returned fetch data (error=NONE, highWaterMark=4563, lastStableOffset = 4563, logStartOffset = 0, preferredReadReplica = absent, abortedTransactions = null, recordsSizeInBytes=1048576)
2021-03-30 13:15:21.991 DEBUG 34356 --- [ng-feedconsumer] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-feedconsumer-1, groupId=feedconsumer] Sending Heartbeat request with generation 7 and member id consumer-feedconsumer-1-ca5f91a1-e17b-40ad-a98f-770abbba1cee to coordinator 192.168.1.3:9092 (id: 2147483646 rack: null)
2021-03-30 13:15:21.992 DEBUG 34356 --- [ng-feedconsumer] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-feedconsumer-1, groupId=feedconsumer] Sending HEARTBEAT request with header RequestHeader(apiKey=HEARTBEAT, apiVersion=4, clientId=consumer-feedconsumer-1, correlationId=65) and timeout 120000 to node 2147483646: {group_id=feedconsumer,generation_id=7,member_id=consumer-feedconsumer-1-ca5f91a1-e17b-40ad-a98f-770abbba1cee,group_instance_id=null,_tagged_fields={}}
2021-03-30 13:15:22.093 DEBUG 34356 --- [ng-feedconsumer] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-feedconsumer-1, groupId=feedconsumer] Received HEARTBEAT response from node 2147483646 for request with header RequestHeader(apiKey=HEARTBEAT, apiVersion=4, clientId=consumer-feedconsumer-1, correlationId=65): org.apache.kafka.common.requests.HeartbeatResponse@4053cb
2021-03-30 13:15:22.093 DEBUG 34356 --- [ng-feedconsumer] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-feedconsumer-1, groupId=feedconsumer] Received successful Heartbeat response

尝试以下设置:

    config.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 10000000);
    config.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 250000000);
    config.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 10000);
    config.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 50000000);

您的消息太大,也无法阅读,请添加 max.partition.fetch.bytes 属性。