为什么 Kafka 服务器限制为 100K 条消息?

Why is Kafka server limited to 100K messages?

开箱即用的 Kafka 服务器并具有以下脚本

#!/usr/bin/perl

use Net::Kafka::Producer;
use AnyEvent;

my $condvar     = AnyEvent->condvar;
my $producer    = Net::Kafka::Producer->new(
    'bootstrap.servers' => 'localhost:9092'
);

for (my $index = 1;;$index++) {
    my $msg = "message: " . $index;
    $producer->produce(
        payload => $msg,
        topic   => "tracked-coords"
    )->then(sub {
        my $delivery_report = shift;
        $condvar->send;
        print "Message successfully delivered with offset " . $delivery_report->{offset};
    }, sub {
        my $error = shift;
        $condvar->send;
        die "Unable to produce a message: " . $error->{error} . ", code: " . $error->{code};
    });

}

为什么 Kafka 服务器在 100K 条消息时停止?

编辑

服务器停止报告它正在接收消息。消费者也停止接收消息

编辑

Kafka 服务器记录这个(最后)

message: 99998
message: 99999
message: 100000
[2022-03-21 14:43:30,597] INFO [ProducerStateManager partition=tracked-coords-0] Wrote producer snapshot at offset 500000 with 0 producer ids in 15 ms. (kafka.log.ProducerStateManager)
[2022-03-21 14:43:30,598] INFO [Log partition=tracked-coords-0, dir=/tmp/kafka-logs] Rolled new log segment at offset 500000 in 18 ms. (kafka.log.Log)
[2022-03-21 14:43:30,599] INFO [Log partition=tracked-coords-0, dir=/tmp/kafka-logs] Deleting segment LogSegment(baseOffset=400000, size=2191596, lastModifiedTime=1647873685289, largestRecordTimestamp=Some(1647873685290)) due to retention time 2000ms breach based on the largest record timestamp in the segment (kafka.log.Log)
[2022-03-21 14:43:30,610] INFO [Log partition=tracked-coords-0, dir=/tmp/kafka-logs] Incremented log start offset to 500000 due to segment deletion (kafka.log.Log)
[2022-03-21 14:44:30,610] INFO [Log partition=tracked-coords-0, dir=/tmp/kafka-logs] Deleting segment files LogSegment(baseOffset=400000, size=2191596, lastModifiedTime=1647873685289, largestRecordTimestamp=Some(1647873685290)) (kafka.log.Log$)
[2022-03-21 14:44:30,612] INFO Deleted log /tmp/kafka-logs/tracked-coords-0/00000000000000400000.log.deleted. (kafka.log.LogSegment)
[2022-03-21 14:44:30,612] INFO Deleted offset index /tmp/kafka-logs/tracked-coords-0/00000000000000400000.index.deleted. (kafka.log.LogSegment)
[2022-03-21 14:44:30,612] INFO Deleted time index /tmp/kafka-logs/tracked-coords-0/00000000000000400000.timeindex.deleted. (kafka.log.LogSegment)
[2022-03-21 14:44:30,613] INFO Deleted producer state snapshot /tmp/kafka-logs/tracked-coords-0/00000000000000400000.snapshot.deleted (kafka.log.SnapshotFile)

这是消费者的代码

#!/usr/bin/perl

use feature qw( say );
use Net::Kafka::Consumer;
use AnyEvent;
use Data::Dumper;
use JSON;

my $consumer    = Net::Kafka::Consumer->new(
    'bootstrap.servers'     => 'localhost:9092',
    'group.id'              => 'mock_data',
    'enable.auto.commit'    => 'true',
);

$consumer->subscribe( [ "tracked-coords"] );

while (1) {
  my $msg = $consumer->poll(1000);
  if ($msg) {
    $consumer->commit(); #_message(0, $msg);
    say "====================================================================";
    if ( $msg->err ) {
      say "Error: ", Net::Kafka::Error::to_string($err);
    } else {
      say $msg->payload;
    }
  }
}

消费者停在 100K

由于您使用的是 Net::Kafka,它使用 librdkafka 库,可能是 queue.buffering.max.messages 设置。默认为 100,000。含义:

Maximum number of messages allowed on the producer queue. This queue is shared by all topics and partitions. See: https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html

尝试在您的 Net::Kafka::Producer->new() 调用中将其设置为较低的数字,看看它是否会更快中断。该设置支持 1-10M 范围。奇怪的是我在 Kafka 服务器设置中没有看到它,所以猜测它只是一个 edenhill 驱动程序设置。