为什么 Kafka 服务器限制为 100K 条消息?
Why is Kafka server limited to 100K messages?
开箱即用的 Kafka 服务器并具有以下脚本
#!/usr/bin/perl
use Net::Kafka::Producer;
use AnyEvent;
my $condvar = AnyEvent->condvar;
my $producer = Net::Kafka::Producer->new(
'bootstrap.servers' => 'localhost:9092'
);
for (my $index = 1;;$index++) {
my $msg = "message: " . $index;
$producer->produce(
payload => $msg,
topic => "tracked-coords"
)->then(sub {
my $delivery_report = shift;
$condvar->send;
print "Message successfully delivered with offset " . $delivery_report->{offset};
}, sub {
my $error = shift;
$condvar->send;
die "Unable to produce a message: " . $error->{error} . ", code: " . $error->{code};
});
}
为什么 Kafka 服务器在 100K 条消息时停止?
编辑
服务器停止报告它正在接收消息。消费者也停止接收消息
编辑
Kafka 服务器记录这个(最后)
message: 99998
message: 99999
message: 100000
[2022-03-21 14:43:30,597] INFO [ProducerStateManager partition=tracked-coords-0] Wrote producer snapshot at offset 500000 with 0 producer ids in 15 ms. (kafka.log.ProducerStateManager)
[2022-03-21 14:43:30,598] INFO [Log partition=tracked-coords-0, dir=/tmp/kafka-logs] Rolled new log segment at offset 500000 in 18 ms. (kafka.log.Log)
[2022-03-21 14:43:30,599] INFO [Log partition=tracked-coords-0, dir=/tmp/kafka-logs] Deleting segment LogSegment(baseOffset=400000, size=2191596, lastModifiedTime=1647873685289, largestRecordTimestamp=Some(1647873685290)) due to retention time 2000ms breach based on the largest record timestamp in the segment (kafka.log.Log)
[2022-03-21 14:43:30,610] INFO [Log partition=tracked-coords-0, dir=/tmp/kafka-logs] Incremented log start offset to 500000 due to segment deletion (kafka.log.Log)
[2022-03-21 14:44:30,610] INFO [Log partition=tracked-coords-0, dir=/tmp/kafka-logs] Deleting segment files LogSegment(baseOffset=400000, size=2191596, lastModifiedTime=1647873685289, largestRecordTimestamp=Some(1647873685290)) (kafka.log.Log$)
[2022-03-21 14:44:30,612] INFO Deleted log /tmp/kafka-logs/tracked-coords-0/00000000000000400000.log.deleted. (kafka.log.LogSegment)
[2022-03-21 14:44:30,612] INFO Deleted offset index /tmp/kafka-logs/tracked-coords-0/00000000000000400000.index.deleted. (kafka.log.LogSegment)
[2022-03-21 14:44:30,612] INFO Deleted time index /tmp/kafka-logs/tracked-coords-0/00000000000000400000.timeindex.deleted. (kafka.log.LogSegment)
[2022-03-21 14:44:30,613] INFO Deleted producer state snapshot /tmp/kafka-logs/tracked-coords-0/00000000000000400000.snapshot.deleted (kafka.log.SnapshotFile)
这是消费者的代码
#!/usr/bin/perl
use feature qw( say );
use Net::Kafka::Consumer;
use AnyEvent;
use Data::Dumper;
use JSON;
my $consumer = Net::Kafka::Consumer->new(
'bootstrap.servers' => 'localhost:9092',
'group.id' => 'mock_data',
'enable.auto.commit' => 'true',
);
$consumer->subscribe( [ "tracked-coords"] );
while (1) {
my $msg = $consumer->poll(1000);
if ($msg) {
$consumer->commit(); #_message(0, $msg);
say "====================================================================";
if ( $msg->err ) {
say "Error: ", Net::Kafka::Error::to_string($err);
} else {
say $msg->payload;
}
}
}
消费者停在 100K
由于您使用的是 Net::Kafka,它使用 librdkafka 库,可能是 queue.buffering.max.messages 设置。默认为 100,000。含义:
Maximum number of messages allowed on the producer queue. This queue
is shared by all topics and partitions.
See: https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html
尝试在您的 Net::Kafka::Producer->new() 调用中将其设置为较低的数字,看看它是否会更快中断。该设置支持 1-10M 范围。奇怪的是我在 Kafka 服务器设置中没有看到它,所以猜测它只是一个 edenhill 驱动程序设置。
开箱即用的 Kafka 服务器并具有以下脚本
#!/usr/bin/perl
use Net::Kafka::Producer;
use AnyEvent;
my $condvar = AnyEvent->condvar;
my $producer = Net::Kafka::Producer->new(
'bootstrap.servers' => 'localhost:9092'
);
for (my $index = 1;;$index++) {
my $msg = "message: " . $index;
$producer->produce(
payload => $msg,
topic => "tracked-coords"
)->then(sub {
my $delivery_report = shift;
$condvar->send;
print "Message successfully delivered with offset " . $delivery_report->{offset};
}, sub {
my $error = shift;
$condvar->send;
die "Unable to produce a message: " . $error->{error} . ", code: " . $error->{code};
});
}
为什么 Kafka 服务器在 100K 条消息时停止?
编辑
服务器停止报告它正在接收消息。消费者也停止接收消息
编辑
Kafka 服务器记录这个(最后)
message: 99998
message: 99999
message: 100000
[2022-03-21 14:43:30,597] INFO [ProducerStateManager partition=tracked-coords-0] Wrote producer snapshot at offset 500000 with 0 producer ids in 15 ms. (kafka.log.ProducerStateManager)
[2022-03-21 14:43:30,598] INFO [Log partition=tracked-coords-0, dir=/tmp/kafka-logs] Rolled new log segment at offset 500000 in 18 ms. (kafka.log.Log)
[2022-03-21 14:43:30,599] INFO [Log partition=tracked-coords-0, dir=/tmp/kafka-logs] Deleting segment LogSegment(baseOffset=400000, size=2191596, lastModifiedTime=1647873685289, largestRecordTimestamp=Some(1647873685290)) due to retention time 2000ms breach based on the largest record timestamp in the segment (kafka.log.Log)
[2022-03-21 14:43:30,610] INFO [Log partition=tracked-coords-0, dir=/tmp/kafka-logs] Incremented log start offset to 500000 due to segment deletion (kafka.log.Log)
[2022-03-21 14:44:30,610] INFO [Log partition=tracked-coords-0, dir=/tmp/kafka-logs] Deleting segment files LogSegment(baseOffset=400000, size=2191596, lastModifiedTime=1647873685289, largestRecordTimestamp=Some(1647873685290)) (kafka.log.Log$)
[2022-03-21 14:44:30,612] INFO Deleted log /tmp/kafka-logs/tracked-coords-0/00000000000000400000.log.deleted. (kafka.log.LogSegment)
[2022-03-21 14:44:30,612] INFO Deleted offset index /tmp/kafka-logs/tracked-coords-0/00000000000000400000.index.deleted. (kafka.log.LogSegment)
[2022-03-21 14:44:30,612] INFO Deleted time index /tmp/kafka-logs/tracked-coords-0/00000000000000400000.timeindex.deleted. (kafka.log.LogSegment)
[2022-03-21 14:44:30,613] INFO Deleted producer state snapshot /tmp/kafka-logs/tracked-coords-0/00000000000000400000.snapshot.deleted (kafka.log.SnapshotFile)
这是消费者的代码
#!/usr/bin/perl
use feature qw( say );
use Net::Kafka::Consumer;
use AnyEvent;
use Data::Dumper;
use JSON;
my $consumer = Net::Kafka::Consumer->new(
'bootstrap.servers' => 'localhost:9092',
'group.id' => 'mock_data',
'enable.auto.commit' => 'true',
);
$consumer->subscribe( [ "tracked-coords"] );
while (1) {
my $msg = $consumer->poll(1000);
if ($msg) {
$consumer->commit(); #_message(0, $msg);
say "====================================================================";
if ( $msg->err ) {
say "Error: ", Net::Kafka::Error::to_string($err);
} else {
say $msg->payload;
}
}
}
消费者停在 100K
由于您使用的是 Net::Kafka,它使用 librdkafka 库,可能是 queue.buffering.max.messages 设置。默认为 100,000。含义:
Maximum number of messages allowed on the producer queue. This queue is shared by all topics and partitions. See: https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html
尝试在您的 Net::Kafka::Producer->new() 调用中将其设置为较低的数字,看看它是否会更快中断。该设置支持 1-10M 范围。奇怪的是我在 Kafka 服务器设置中没有看到它,所以猜测它只是一个 edenhill 驱动程序设置。