如何使用 Kafka .NET 客户端 API 列出每个分区和消费者组的 Kafka 消费者滞后和最新偏移量?
How can I list the Kafka consumer lag and latest offset per partition and consumer group using the Kafka .NET client API?
我正在尝试使用 .NET Confluent.Kafka 1.4.0-RC1(对于 Net472)来解决消费者延迟问题。
我可以使用此脚本获得所需的结果:
$ bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand --bootstrap-server localhost:9092 --group Grp1 --describe
结果:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
Grp1 test3 1 15 15 0 rdkafka-ca76855f-7b66-4bf1-82bc-73e9a1c1cf71 /10.186.129.93 rdkafka
Grp1 test3 2 13 13 0 rdkafka-d64379dc-881a-4f6f-a793-51e832cc2f5a /10.186.129.93 rdkafka
Grp1 test3 0 9 9 0 rdkafka-a25bdb80-3b70-4e42-963e-d41ad9e2a99a /10.186.129.93 rdkafka
Grp1 test 0 68 68 0 - - -
我无法使用 .NET 客户端代码获得类似的报告。这是我一直在尝试的代码 - 但由于 consumer.Assignment
属性 有一个空集合,所以一无所获。
private string WriteConsumerGroupLags(string bootstrapServers, string consumerGroupName) {
// kafka-console-consumer.bat --zookeeper MW45670117:2380 --topic powertelemetry --consumer-property group.id=test123 --consumer-property enable.auto.commit=true
StringBuilder sb = new StringBuilder();
sb.AppendLine("\n");
sb.AppendLine("Consumer Group Lag Report");
sb.AppendLine("-------------------------");
ConsumerConfig config = new ConsumerConfig {
BootstrapServers = bootstrapServers,
GroupId = consumerGroupName,
AutoOffsetReset = AutoOffsetReset.Earliest
};
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build()) {
foreach (TopicPartition tp in consumer.Assignment) {
string topic = tp.Topic;
int partitionID = tp.Partition.Value;
// gets the current position (offset) for the specific topic/partition
Offset offset = consumer.Position(new TopicPartition(topic, new Partition(partitionID)));
sb.AppendLine($"Offset value is: {offset.Value}");
// returns current commited offset for the current assignment
List<TopicPartitionOffset> tpos = consumer.Committed(TimeSpan.FromSeconds(4));
foreach (TopicPartitionOffset tpo in tpos) {
sb.AppendLine($"Commited offset for partition {tpo.TopicPartition} is {tpo.Offset}");
}
}
}
return sb.ToString();
}
正在寻找每个 partition/consumer-group 的消费者滞后和最新偏移量。
好的,我终于找到了怎么做。看起来您需要将主题分区分配给消费者组。在我的例子中,主题是 test3
,我有 3 个分区。这必须在代码中设置。我想一个更通用的实现是将与该组关联的列表传递给该方法。
以下是我如何获得消费者延迟:
private string WriteConsumerGroupLags(string bootstrapServers, string consumerGroupName) {
StringBuilder sb = new StringBuilder();
sb.AppendLine("\n");
sb.AppendLine("Consumer Group Lag Report");
sb.AppendLine("-------------------------");
ConsumerConfig config = new ConsumerConfig {
BootstrapServers = bootstrapServers,
GroupId = consumerGroupName,
AutoOffsetReset = AutoOffsetReset.Earliest
};
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build()) {
List<TopicPartition> topic_partitions = new List<TopicPartition>() {
{ new TopicPartition("test3", new Partition(0)) },
{ new TopicPartition("test3", new Partition(1) ) },
{ new TopicPartition("test3", new Partition(2) ) } };
consumer.Assign(topic_partitions);
List<TopicPartitionOffset> tpos = consumer.Committed(TimeSpan.FromSeconds(40));
foreach (TopicPartitionOffset tpo in tpos) {
WatermarkOffsets w = consumer.QueryWatermarkOffsets(tpo.TopicPartition, TimeSpan.FromSeconds(40));
long commited = tpo.Offset.Value;
long log_end_offset = w.High.Value;
long lag = log_end_offset - commited;
sb.AppendLine($"Commited offset for Topic {tpo.TopicPartition.Topic} Partition {tpo.TopicPartition.Partition} is {commited} out of Wantermark end offset {log_end_offset} Lag is: {lag}");
}
sb.AppendLine();
}
return sb.ToString();
}
我对您的回答进行了扩展,包括为主题分区生成模型。
public class Offsets
{
public string WriteConsumerGroupLags(string bootstrapServers, string consumerGroupName)
{
var sb = new StringBuilder();
sb.AppendLine("\n");
sb.AppendLine("Consumer Group Lag Report");
sb.AppendLine("-------------------------");
var config = new ConsumerConfig
{
BootstrapServers = bootstrapServers,
GroupId = consumerGroupName,
SecurityProtocol = SecurityProtocol.Ssl
};
using var adminClient = new AdminClientBuilder(config).Build();
using var consumer = new ConsumerBuilder<Ignore, string>(config).Build();
var meta = adminClient.GetMetadata(TimeSpan.FromSeconds(20));
var topicPartitions = new List<TopicPartition>();
foreach (var topicMeta in meta.Topics.Where(x => !x.Topic.Contains("__")))
{
topicPartitions.AddRange(topicMeta.Partitions.Select(topicMetaPartition => new TopicPartition(topicMeta.Topic, new Partition(topicMetaPartition.PartitionId))));
}
consumer.Assign(topicPartitions);
List<TopicPartitionOffset> tpos = consumer.Committed(TimeSpan.FromSeconds(40));
foreach (TopicPartitionOffset tpo in tpos)
{
WatermarkOffsets w = consumer.QueryWatermarkOffsets(tpo.TopicPartition, TimeSpan.FromSeconds(40));
long commited = tpo.Offset.Value;
long log_end_offset = w.High.Value;
long lag = log_end_offset - commited;
sb.AppendLine(
$"Commited offset for Topic {tpo.TopicPartition.Topic} Partition {tpo.TopicPartition.Partition} is {commited} out of Wantermark end offset {log_end_offset} Lag is: {lag}");
}
sb.AppendLine();
return sb.ToString();
}
}
我正在尝试使用 .NET Confluent.Kafka 1.4.0-RC1(对于 Net472)来解决消费者延迟问题。 我可以使用此脚本获得所需的结果:
$ bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand --bootstrap-server localhost:9092 --group Grp1 --describe
结果:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
Grp1 test3 1 15 15 0 rdkafka-ca76855f-7b66-4bf1-82bc-73e9a1c1cf71 /10.186.129.93 rdkafka
Grp1 test3 2 13 13 0 rdkafka-d64379dc-881a-4f6f-a793-51e832cc2f5a /10.186.129.93 rdkafka
Grp1 test3 0 9 9 0 rdkafka-a25bdb80-3b70-4e42-963e-d41ad9e2a99a /10.186.129.93 rdkafka
Grp1 test 0 68 68 0 - - -
我无法使用 .NET 客户端代码获得类似的报告。这是我一直在尝试的代码 - 但由于 consumer.Assignment
属性 有一个空集合,所以一无所获。
private string WriteConsumerGroupLags(string bootstrapServers, string consumerGroupName) {
// kafka-console-consumer.bat --zookeeper MW45670117:2380 --topic powertelemetry --consumer-property group.id=test123 --consumer-property enable.auto.commit=true
StringBuilder sb = new StringBuilder();
sb.AppendLine("\n");
sb.AppendLine("Consumer Group Lag Report");
sb.AppendLine("-------------------------");
ConsumerConfig config = new ConsumerConfig {
BootstrapServers = bootstrapServers,
GroupId = consumerGroupName,
AutoOffsetReset = AutoOffsetReset.Earliest
};
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build()) {
foreach (TopicPartition tp in consumer.Assignment) {
string topic = tp.Topic;
int partitionID = tp.Partition.Value;
// gets the current position (offset) for the specific topic/partition
Offset offset = consumer.Position(new TopicPartition(topic, new Partition(partitionID)));
sb.AppendLine($"Offset value is: {offset.Value}");
// returns current commited offset for the current assignment
List<TopicPartitionOffset> tpos = consumer.Committed(TimeSpan.FromSeconds(4));
foreach (TopicPartitionOffset tpo in tpos) {
sb.AppendLine($"Commited offset for partition {tpo.TopicPartition} is {tpo.Offset}");
}
}
}
return sb.ToString();
}
正在寻找每个 partition/consumer-group 的消费者滞后和最新偏移量。
好的,我终于找到了怎么做。看起来您需要将主题分区分配给消费者组。在我的例子中,主题是 test3
,我有 3 个分区。这必须在代码中设置。我想一个更通用的实现是将与该组关联的列表传递给该方法。
以下是我如何获得消费者延迟:
private string WriteConsumerGroupLags(string bootstrapServers, string consumerGroupName) {
StringBuilder sb = new StringBuilder();
sb.AppendLine("\n");
sb.AppendLine("Consumer Group Lag Report");
sb.AppendLine("-------------------------");
ConsumerConfig config = new ConsumerConfig {
BootstrapServers = bootstrapServers,
GroupId = consumerGroupName,
AutoOffsetReset = AutoOffsetReset.Earliest
};
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build()) {
List<TopicPartition> topic_partitions = new List<TopicPartition>() {
{ new TopicPartition("test3", new Partition(0)) },
{ new TopicPartition("test3", new Partition(1) ) },
{ new TopicPartition("test3", new Partition(2) ) } };
consumer.Assign(topic_partitions);
List<TopicPartitionOffset> tpos = consumer.Committed(TimeSpan.FromSeconds(40));
foreach (TopicPartitionOffset tpo in tpos) {
WatermarkOffsets w = consumer.QueryWatermarkOffsets(tpo.TopicPartition, TimeSpan.FromSeconds(40));
long commited = tpo.Offset.Value;
long log_end_offset = w.High.Value;
long lag = log_end_offset - commited;
sb.AppendLine($"Commited offset for Topic {tpo.TopicPartition.Topic} Partition {tpo.TopicPartition.Partition} is {commited} out of Wantermark end offset {log_end_offset} Lag is: {lag}");
}
sb.AppendLine();
}
return sb.ToString();
}
我对您的回答进行了扩展,包括为主题分区生成模型。
public class Offsets
{
public string WriteConsumerGroupLags(string bootstrapServers, string consumerGroupName)
{
var sb = new StringBuilder();
sb.AppendLine("\n");
sb.AppendLine("Consumer Group Lag Report");
sb.AppendLine("-------------------------");
var config = new ConsumerConfig
{
BootstrapServers = bootstrapServers,
GroupId = consumerGroupName,
SecurityProtocol = SecurityProtocol.Ssl
};
using var adminClient = new AdminClientBuilder(config).Build();
using var consumer = new ConsumerBuilder<Ignore, string>(config).Build();
var meta = adminClient.GetMetadata(TimeSpan.FromSeconds(20));
var topicPartitions = new List<TopicPartition>();
foreach (var topicMeta in meta.Topics.Where(x => !x.Topic.Contains("__")))
{
topicPartitions.AddRange(topicMeta.Partitions.Select(topicMetaPartition => new TopicPartition(topicMeta.Topic, new Partition(topicMetaPartition.PartitionId))));
}
consumer.Assign(topicPartitions);
List<TopicPartitionOffset> tpos = consumer.Committed(TimeSpan.FromSeconds(40));
foreach (TopicPartitionOffset tpo in tpos)
{
WatermarkOffsets w = consumer.QueryWatermarkOffsets(tpo.TopicPartition, TimeSpan.FromSeconds(40));
long commited = tpo.Offset.Value;
long log_end_offset = w.High.Value;
long lag = log_end_offset - commited;
sb.AppendLine(
$"Commited offset for Topic {tpo.TopicPartition.Topic} Partition {tpo.TopicPartition.Partition} is {commited} out of Wantermark end offset {log_end_offset} Lag is: {lag}");
}
sb.AppendLine();
return sb.ToString();
}
}