如何在运行中/启动时创建一个 Kafka 主题供生产者发送给?

How do I create a Kafka topic on the fly / on startup for the producer to send to?

我开始使用 Kafka 的 Confluent .NET 库,并尝试实现我过去用于 Azure 服务总线的模式,以在生产者应用程序启动时创建主题(如果不存在则创建) .这在 Kafka API 中如何完成,是否可以完成?

这将允许主题成为源代码控制的一部分并在自动发布过程中配置,而不是根据 topic/environment 手动设置。另外,我希望我的开发人员不必去每个 Kafka 实例/环境并首先配置它们以匹配。

如果我不能这样做,我将不得不在发布过程中将其烘焙到 bash 脚本中,但更喜欢在启动代码中。

您可以启用 cluster-wide 配置 auto.create.topics.enable

如果新的生产者试图将数据发送到尚不存在的主题,这将自动创建一个主题。

但是,请注意以下几点:

  • 将使用有关复制、分区数和保留的默认设置创建主题。确保根据需要更改这些默认设置。无论如何,所有自动创建的主题将具有相同的配置。
  • 生产者代码中主题名称配置中的拼写错误会导致创建不需要的主题。

或者,您可以使用 AdminClient API。示例显示 here:

static async Task CreateTopicAsync(string bootstrapServers, string topicName) { using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build()) { try { await adminClient.CreateTopicsAsync(new TopicSpecification[] { new TopicSpecification { Name = topicName, ReplicationFactor = 1, NumPartitions = 1 } }); } catch (CreateTopicsException e) { Console.WriteLine($"An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}"); } } }