如何使用 Confluent 的 .NET 客户端列出 Kafka 消费者组
How can I list Kafka consumer groups using Confluent's .NET Client
我已经在我的开发 windows 机器上安装了 Kafka (kafka_2.11-2.4.0.tgz)。一切正常,我可以使用示例批处理脚本发送和接收消息。
尝试构建 C# windows 应用程序,我在 C# windows 应用程序上使用 Confluent 的 Apache Kafka .NET 客户端(版本 1.3.0.0)。但是,当我使用 Confluent 提供的示例列出组订阅时,我收到错误 System.ArgumentNullException: 'Argument 'group' must not be null. Parameter name: group'
但我没有将此类参数传递给该方法。这是代码示例:
private static string ListConsumerGroups(string bootstrapServers) {
string s = ($"Consumer Groups:");
using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build()) {
var groups = adminClient.ListGroups(TimeSpan.FromSeconds(10));
foreach (var g in groups) {
s += "\r\n" + ($" Group: {g.Group} {g.Error} {g.State}");
s += "\r\n" + ($" Broker: {g.Broker.BrokerId} {g.Broker.Host}:{g.Broker.Port}");
s += "\r\n" + ($" Protocol: {g.ProtocolType} {g.Protocol}");
s += "\r\n" + ($" Members:");
foreach (var m in g.Members) {
s += "\r\n" + ($" {m.MemberId} {m.ClientId} {m.ClientHost}");
s += "\r\n" + ($" Metadata: {m.MemberMetadata.Length} bytes");
s += "\r\n" + ($" Assignment: {m.MemberAssignment.Length} bytes");
}
}
}
return s;
}
使用时间跨度参数调用 adminClient.ListGroups
时代码失败(此方法中不允许使用其他参数)。我不明白 API 中是否存在错误,或者我只是遗漏了一些东西。
当使用批处理脚本列出消费者组时,我确实得到了有效的答复。
> kafka-consumer-groups.bat --list --bootstrap-server "localhost:9092"
这是 v1.3.0 中引入的回归。在 v1.4.0-RC1 及更高版本中已解决。
我已经在我的开发 windows 机器上安装了 Kafka (kafka_2.11-2.4.0.tgz)。一切正常,我可以使用示例批处理脚本发送和接收消息。
尝试构建 C# windows 应用程序,我在 C# windows 应用程序上使用 Confluent 的 Apache Kafka .NET 客户端(版本 1.3.0.0)。但是,当我使用 Confluent 提供的示例列出组订阅时,我收到错误 System.ArgumentNullException: 'Argument 'group' must not be null. Parameter name: group'
但我没有将此类参数传递给该方法。这是代码示例:
private static string ListConsumerGroups(string bootstrapServers) {
string s = ($"Consumer Groups:");
using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build()) {
var groups = adminClient.ListGroups(TimeSpan.FromSeconds(10));
foreach (var g in groups) {
s += "\r\n" + ($" Group: {g.Group} {g.Error} {g.State}");
s += "\r\n" + ($" Broker: {g.Broker.BrokerId} {g.Broker.Host}:{g.Broker.Port}");
s += "\r\n" + ($" Protocol: {g.ProtocolType} {g.Protocol}");
s += "\r\n" + ($" Members:");
foreach (var m in g.Members) {
s += "\r\n" + ($" {m.MemberId} {m.ClientId} {m.ClientHost}");
s += "\r\n" + ($" Metadata: {m.MemberMetadata.Length} bytes");
s += "\r\n" + ($" Assignment: {m.MemberAssignment.Length} bytes");
}
}
}
return s;
}
使用时间跨度参数调用 adminClient.ListGroups
时代码失败(此方法中不允许使用其他参数)。我不明白 API 中是否存在错误,或者我只是遗漏了一些东西。
当使用批处理脚本列出消费者组时,我确实得到了有效的答复。
> kafka-consumer-groups.bat --list --bootstrap-server "localhost:9092"
这是 v1.3.0 中引入的回归。在 v1.4.0-RC1 及更高版本中已解决。