如何在 apache kafka 中创建主题?

How to create topics in apache kafka?

在kafka中创建主题的最佳方式是什么?

在新的生产者API中,当我尝试向不存在的主题发布消息时,第一次失败然后成功发布。

分区号决定了主题的并行度,因为一个分区只能被一个消费者组中的一个消费者消费。例如,如果一个主题只有 10 个分区,一个消费者组中有 20 个消费者,则有 10 个消费者处于空闲状态,没有收到任何消息。数量确实取决于您的应用程序,但 1-1000 都是合理的。

副本数量取决于您的耐用性要求。对于复制因子为 N 的主题,Kafka 最多可以容忍 N-1 个服务器故障而不会丢失任何提交到日志的消息。 3 个副本是常见的配置。当然,副本编号必须小于或等于您的经纪人编号。

auto.create.topics.enable 属性 控制 Kafka 何时在服务器上启用自动创建主题。如果设置为 true,当应用程序尝试为不存在的主题生成、使用或获取元数据时,Kafka 将自动创建具有默认复制因子和分区数的主题。我建议在生产中关闭它并提前创建主题。

当您启动 Kafka 代理时,您可以在 conf/server.properties 文件中定义一组属性。该文件只是键值 属性 文件。其中一个属性是 auto.create.topics.enable,如果它设置为 true(默认情况下),当您向不存在的主题发送消息时,Kafka 将自动创建主题。

您可以找到的所有配置选项均已定义 here。恕我直言,创建主题的简单规则如下:副本数不能超过您拥有的节点数。主题和分区的数量不受集群中节点数量的影响

例如:

  • 您有 9 个节点集群
  • 您的主题可以有 9 个分区和 9 个副本,或者 18 个分区和 9 个副本或 36个分区和9个副本等等...

设置属性 auto.create.topics.enable=true 在您的 server.properties 文件中,如果您有多个代理,请对所有服务器*.properties 文件执行相同的操作并重新启动您的 kafka-server。 但是一定要在server*.properties中设置合适的分区数num.partitions=int,否则以后增加分区会出现性能问题

Kafka 中并行的基本级别是分区。在生产者和代理端,对不同分区的写入可以完全并行完成。

注意事项

  • 更多分区需要更多打开文件句柄
  • 更多分区可能会增加不可用性
  • 更多分区可能会增加端到端延迟

根据经验,将每个代理的分区数量限制为 100 x b x r 可能是个好主意, 其中 b 是经纪人的数量,r 是复制因子。

例如: 如果您的集群中有 9 brokers/nodes,您的主题可能有

  • 1800 个分区,3 个副本,或
  • 900 个分区和 2 个副本

编辑:有关详细信息,请参阅文章 How to choose the number of topics/partitions in a Kafka cluster?(已从那里获取答案)

我想分享我最近在我的博客上描述的经历 The Side Effect of Fetching Kafka Topic Metadata 并回答这里提出的某些问题。

1) 在kafka中创建主题的最佳方式是什么?我们需要在发布消息之前创建主题吗?

我认为如果我们事先知道我们将使用固定名称的 Kafka 主题,那么我们最好在从中写入或读取消息之前创建主题。这通常可以通过使用 bin/kafka-topics.sh 在 post 启动脚本中完成,请参阅 Kafka 0.11.0.0.

中引入的 official documentation for example. Or we can use KafkaAdminClient

另一方面,我确实看到某些情况下我们需要即时生成主题名称。在这些情况下,我们将无法知道固定的主题名称,我们可以依赖 "auto.create.topics.enable" 属性。启用后,将自动创建一个主题。这就引出了第二个问题:

2) 当auto.create.topics.enable 为真

时哪些操作会导致创建

正如@Lan 已经指出的

If this is set to true, when applications attempt to produce, consume, or fetch metadata for a non-existent topic, Kafka will automatically create the topic with the default replication factor and number of partitions.

我想再简单点:

如果为 Kafka 代理启用自动创建主题,每当 Kafka 代理看到特定主题名称时,如果该主题尚不存在,就会创建该主题

此外,包括我在内的人们经常忽视获取元数据会自动创建主题这一事实。一个具体的例子是使用 consumer.partitionFor(topic) API,如果给定的主题不存在,此方法将创建它。

任何对我上面提到的更多细节感兴趣的人,您也可以看看我自己关于同一主题的博客 post The Side Effect of Fetching Kafka Topic Metadata

您可以通过编程方式创建主题。

public class CreateTopic {
  public static void main(String[] args) throws ExecutionException, InterruptedException {
      Properties config = new Properties();
      config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
      AdminClient admin = AdminClient.create(config);
      //creating new topic
      System.out.println("-- creating --");
      NewTopic newTopic = new NewTopic("my-new-topic", 1, (short) 1);
      admin.createTopics(Collections.singleton(newTopic));

      //listing
      System.out.println("-- listing --");
      admin.listTopics().names().get().forEach(System.out::println);
  }
}