如何使用 Confluent.Kafka .Net 客户端创建 Kafka 主题
How to create a Kafka Topic using Confluent.Kafka .Net Client
似乎最流行的 Kafka .net 客户端 (https://github.com/confluentinc/confluent-kafka-dotnet) 缺少设置和创建主题的方法。
调用 Producer.ProduceAsync()
时,主题会自动创建,但我找不到设置分区、保留策略和其他设置的方法。
我试图在网上找到任何示例,但我找到的只是使用默认值。
也许我可以使用另一个 .net 客户端?
Confluent.Kafka.AdminClient
在版本 1.0.0-experimental-2
中可用,但不允许创建主题等。
它基于 librdkafka
which doesn't have APIs for this yet.
所以现在你必须在代理上配置它,例如使用bin\windows\kafka-topics.sh --create ...
Confluent 尚未提供任何 API 来从 .net 客户端创建主题,但是有一个解决方法。
Set auto.create.topics.enable = true
in kafka configuration
use var brokerMetadata = producer.GetMetadata(false, topicName);
to query available topics in existing brokers, if specified topic is
not available then kafka will create a topic with specified name.
private static bool CreateTopicIfNotExist(Producer producer, string topicName)
{
bool isTopicExist = producer.GetMetadata().Topics.Any(t => t.Topic == topicName);
if (!isTopicExist)
{
//Creates topic if it is not exist; Only in case of auto.create.topics.enable = true is set into kafka configuration
var topicMetadata = producer.GetMetadata(false, topicName).Topics.FirstOrDefault();
if (topicMetadata != null && (topicMetadata.Error.Code != ErrorCode.UnknownTopicOrPart || topicMetadata.Error.Code == ErrorCode.Local_UnknownTopic))
isTopicExist = true;
}
return isTopicExist;
}
因此你可以使用这个变通方法,我知道这是一个肮脏的解决方案,但目前似乎没有其他方法。
它现在在 Confluent.Kafka .Net 客户端库的最新版本中可用。
using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build())
{
try
{
await adminClient.CreateTopicsAsync(new TopicSpecification[] {
new TopicSpecification { Name = topicName, ReplicationFactor = 1, NumPartitions = 1 } });
}
catch (CreateTopicsException e)
{
Console.WriteLine($"An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}");
}
}
似乎最流行的 Kafka .net 客户端 (https://github.com/confluentinc/confluent-kafka-dotnet) 缺少设置和创建主题的方法。
调用 Producer.ProduceAsync()
时,主题会自动创建,但我找不到设置分区、保留策略和其他设置的方法。
我试图在网上找到任何示例,但我找到的只是使用默认值。
也许我可以使用另一个 .net 客户端?
Confluent.Kafka.AdminClient
在版本 1.0.0-experimental-2
中可用,但不允许创建主题等。
它基于 librdkafka
which doesn't have APIs for this yet.
所以现在你必须在代理上配置它,例如使用bin\windows\kafka-topics.sh --create ...
Confluent 尚未提供任何 API 来从 .net 客户端创建主题,但是有一个解决方法。
Set
auto.create.topics.enable = true
in kafka configurationuse
var brokerMetadata = producer.GetMetadata(false, topicName);
to query available topics in existing brokers, if specified topic is not available then kafka will create a topic with specified name.
private static bool CreateTopicIfNotExist(Producer producer, string topicName)
{
bool isTopicExist = producer.GetMetadata().Topics.Any(t => t.Topic == topicName);
if (!isTopicExist)
{
//Creates topic if it is not exist; Only in case of auto.create.topics.enable = true is set into kafka configuration
var topicMetadata = producer.GetMetadata(false, topicName).Topics.FirstOrDefault();
if (topicMetadata != null && (topicMetadata.Error.Code != ErrorCode.UnknownTopicOrPart || topicMetadata.Error.Code == ErrorCode.Local_UnknownTopic))
isTopicExist = true;
}
return isTopicExist;
}
因此你可以使用这个变通方法,我知道这是一个肮脏的解决方案,但目前似乎没有其他方法。
它现在在 Confluent.Kafka .Net 客户端库的最新版本中可用。
using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build())
{
try
{
await adminClient.CreateTopicsAsync(new TopicSpecification[] {
new TopicSpecification { Name = topicName, ReplicationFactor = 1, NumPartitions = 1 } });
}
catch (CreateTopicsException e)
{
Console.WriteLine($"An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}");
}
}