Confluent Kafka如何在不订阅的情况下获取特定主题的元数据
Confluent Kafka how to get specific topic metadata without subscribe
我使用一些代码来确定读取偏移量和最大值之间的差异。它需要内部诊断,例如,如果差异低于“基线”值 - 所以,读取速度很好。
提供的代码示例运行良好,但我应该知道主题的分区数。并且,稍后获取主题中每个分区的最大偏移值。
如何在不订阅主题的情况下获取主题元数据?
类似于:主题“TestTopic”有 4 个分区。
public async Task MaxOffsetValues()
{
// just for tests
// ToDo: use config from settings
while (true)
{
var topicName = "testTopic";
var config = new ConsumerConfig
{
BootstrapServers = "192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092",
GroupId = Guid.NewGuid().ToString(),
ClientId = Dns.GetHostName(),
EnableAutoCommit = false,
};
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
var offsetBorders = consumer.QueryWatermarkOffsets(new TopicPartition(topicName, 0), TimeSpan.FromSeconds(10));
_log.Debug($"[Diagnostic] Topic: ({topicName}), Partition: ({0}) Minimal offset: ({offsetBorders.Low}) Maximum offset: ({offsetBorders.High})");
}
await Task.Delay(TimeSpan.FromSeconds(60));
}
}
如果您正在寻找以编程方式和非编程方式获取元数据,则可以通过 3 种方式获取有关主题的信息而无需订阅它。
- 管理员客户端(https://kafka.apache.org/23/javadoc/index.html?org/apache/kafka/clients/admin/AdminClient.html)
- 如果您有权访问 confluent-control-center,那么它会显示有关主题的元数据。
- 你也可以使用Kafka-topics.sh CLI工具
因为我的目标已经足够(为清楚起见减少了代码部分):
// ...
var brokerServers = _diagnosticSettings.Value.BrokerHosts;
var brokerDelayInSeconds = TimeSpan.FromSeconds(_diagnosticSettings.Value.BrokerDelayInSeconds);
var adminClientConfig = new AdminClientConfig
{
BootstrapServers = brokerServers,
ClientId = Dns.GetHostName(),
};
adminClient = new AdminClientBuilder(adminClientConfig).Build();
Metadata topicMetadata = null;
topicMetadata = adminClient.GetMetadata(topicExternalName, brokerDelayInSeconds);
partitionCount = topicMetadata.Topics[0].Partitions.Count;
// ...
我使用一些代码来确定读取偏移量和最大值之间的差异。它需要内部诊断,例如,如果差异低于“基线”值 - 所以,读取速度很好。
提供的代码示例运行良好,但我应该知道主题的分区数。并且,稍后获取主题中每个分区的最大偏移值。
如何在不订阅主题的情况下获取主题元数据?
类似于:主题“TestTopic”有 4 个分区。
public async Task MaxOffsetValues()
{
// just for tests
// ToDo: use config from settings
while (true)
{
var topicName = "testTopic";
var config = new ConsumerConfig
{
BootstrapServers = "192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092",
GroupId = Guid.NewGuid().ToString(),
ClientId = Dns.GetHostName(),
EnableAutoCommit = false,
};
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
var offsetBorders = consumer.QueryWatermarkOffsets(new TopicPartition(topicName, 0), TimeSpan.FromSeconds(10));
_log.Debug($"[Diagnostic] Topic: ({topicName}), Partition: ({0}) Minimal offset: ({offsetBorders.Low}) Maximum offset: ({offsetBorders.High})");
}
await Task.Delay(TimeSpan.FromSeconds(60));
}
}
如果您正在寻找以编程方式和非编程方式获取元数据,则可以通过 3 种方式获取有关主题的信息而无需订阅它。
- 管理员客户端(https://kafka.apache.org/23/javadoc/index.html?org/apache/kafka/clients/admin/AdminClient.html)
- 如果您有权访问 confluent-control-center,那么它会显示有关主题的元数据。
- 你也可以使用Kafka-topics.sh CLI工具
因为我的目标已经足够(为清楚起见减少了代码部分):
// ...
var brokerServers = _diagnosticSettings.Value.BrokerHosts;
var brokerDelayInSeconds = TimeSpan.FromSeconds(_diagnosticSettings.Value.BrokerDelayInSeconds);
var adminClientConfig = new AdminClientConfig
{
BootstrapServers = brokerServers,
ClientId = Dns.GetHostName(),
};
adminClient = new AdminClientBuilder(adminClientConfig).Build();
Metadata topicMetadata = null;
topicMetadata = adminClient.GetMetadata(topicExternalName, brokerDelayInSeconds);
partitionCount = topicMetadata.Topics[0].Partitions.Count;
// ...