Kafka 生产者可以创建主题和分区吗?
Can a Kafka producer create topics and partitions?
目前我正在评估不同的消息系统。
有一个与 Apache Kafka 相关的问题,我自己无法回答。
Kafka 生产者是否可以动态创建主题和分区(以及在现有主题上)?
如果有,有什么缺点吗?
提前致谢
已更新:
kafka 代理有一个 属性:
auto.create.topics.enable
如果您将其设置为 true,如果生产者使用新主题名称向主题发布消息,它将自动为您创建一个主题。
Confluent 团队建议不要这样做,因为根据您的环境,主题的爆炸式增长可能会变得笨拙,并且主题创建在创建时将始终具有相同的默认值。重要的是复制因子至少为 3,以确保您的主题在发生磁盘故障时的持久性。
当你启动你的 kafka 代理时,你可以在 conf/server.properties
文件中定义一堆属性。 属性 之一是 auto.create.topics.enable
如果您将此设置为 true(默认情况下),当您向不存在的主题发送消息时,kafka 将自动创建一个主题。分区号将由同一文件中的默认设置定义。
缺点:据我所知,以这种方式创建的主题将始终具有相同的默认设置(分区、副本...)。
对于任何消息传递系统,我认为不建议使用生产者动态创建 topic/partition 或任何队列的方法。
对于您的用例,您可能可以使用 device_id 作为分区键来区分您可以使用一个主题的 messages.That 方式。
如果需要,您可以从 java 创建主题。是否推荐,取决于用例。例如。如果您的主题名称是生产者传入有效负载的函数,它可能会有用。以下是适用于 kafka 0 的代码片段。10.x
void createTopic(String zookeeperConnect, String topicName) throws InterruptedException {
int sessionTimeoutMs = <some-int-value>;
int connectionTimeoutMs = <some-int-value>;
ZkClient zkClient = new ZkClient(zookeeperConnect, sessionTimeoutMs, connectionTimeoutMs, ZKStringSerializer$.MODULE$);
boolean isSecureKafkaCluster = false;
ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), isSecureKafkaCluster);
Properties topicConfig = new Properties();
try {
AdminUtils.createTopic(zkUtils, topicName, 1, 1, topicConfig,
RackAwareMode.Disabled$.MODULE$);
} catch (TopicExistsException ex) {
//log it
}
zkClient.close();
}
注意:只允许增加编号。的分区。
目前我正在评估不同的消息系统。 有一个与 Apache Kafka 相关的问题,我自己无法回答。
Kafka 生产者是否可以动态创建主题和分区(以及在现有主题上)? 如果有,有什么缺点吗?
提前致谢
已更新:
kafka 代理有一个 属性: auto.create.topics.enable
如果您将其设置为 true,如果生产者使用新主题名称向主题发布消息,它将自动为您创建一个主题。
Confluent 团队建议不要这样做,因为根据您的环境,主题的爆炸式增长可能会变得笨拙,并且主题创建在创建时将始终具有相同的默认值。重要的是复制因子至少为 3,以确保您的主题在发生磁盘故障时的持久性。
当你启动你的 kafka 代理时,你可以在 conf/server.properties
文件中定义一堆属性。 属性 之一是 auto.create.topics.enable
如果您将此设置为 true(默认情况下),当您向不存在的主题发送消息时,kafka 将自动创建一个主题。分区号将由同一文件中的默认设置定义。
缺点:据我所知,以这种方式创建的主题将始终具有相同的默认设置(分区、副本...)。
对于任何消息传递系统,我认为不建议使用生产者动态创建 topic/partition 或任何队列的方法。
对于您的用例,您可能可以使用 device_id 作为分区键来区分您可以使用一个主题的 messages.That 方式。
如果需要,您可以从 java 创建主题。是否推荐,取决于用例。例如。如果您的主题名称是生产者传入有效负载的函数,它可能会有用。以下是适用于 kafka 0 的代码片段。10.x
void createTopic(String zookeeperConnect, String topicName) throws InterruptedException {
int sessionTimeoutMs = <some-int-value>;
int connectionTimeoutMs = <some-int-value>;
ZkClient zkClient = new ZkClient(zookeeperConnect, sessionTimeoutMs, connectionTimeoutMs, ZKStringSerializer$.MODULE$);
boolean isSecureKafkaCluster = false;
ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), isSecureKafkaCluster);
Properties topicConfig = new Properties();
try {
AdminUtils.createTopic(zkUtils, topicName, 1, 1, topicConfig,
RackAwareMode.Disabled$.MODULE$);
} catch (TopicExistsException ex) {
//log it
}
zkClient.close();
}
注意:只允许增加编号。的分区。