Spring Kafka NewTopic TopicBuilder,--command-config 选项?
Spring Kafka NewTopic TopicBuilder, --command-config option?
当我使用 kafka-topics.bat
创建主题时,我会这样做:
kafka-topics.bat --bootstrap-server %host%:%port% --create --topic %%t
--partitions %partitions% --replication-factor %replication_factor% --config max.message.bytes=%max_message_bytes% --config min.insync.replicas=%min_insync_replicas% --config
retention.ms=%retention_ms% --command-config client.properties
我正在尝试使用 2.3 中引入的 Spring Kafka TopicBuilder
转换 以上内容。但我不知道如何转换 command-config
选项。可能吗?
其余的直接按照 documentation:
@Bean
public NewTopic topic(){
return TopicBuilder.name("topic-name")
.partitions(x)
.replicas(x)
.config(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, "xxx")
.config(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "xxx")
.config(TopicConfig.RETENTION_MS_CONFIG, "xxx")
.build();
}
终于找到了!如果它对某人有帮助,这里是解决方案:
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
configs.put("security.protocol", "SASL_PLAINTEXT");
configs.put("sasl.mechanism", "PLAIN");
configs.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=username " +
"password=password;");
return new KafkaAdmin(configs);
}
干杯
当我使用 kafka-topics.bat
创建主题时,我会这样做:
kafka-topics.bat --bootstrap-server %host%:%port% --create --topic %%t --partitions %partitions% --replication-factor %replication_factor% --config max.message.bytes=%max_message_bytes% --config min.insync.replicas=%min_insync_replicas% --config retention.ms=%retention_ms% --command-config client.properties
我正在尝试使用 2.3 中引入的 Spring Kafka TopicBuilder
转换 以上内容。但我不知道如何转换 command-config
选项。可能吗?
其余的直接按照 documentation:
@Bean
public NewTopic topic(){
return TopicBuilder.name("topic-name")
.partitions(x)
.replicas(x)
.config(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, "xxx")
.config(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "xxx")
.config(TopicConfig.RETENTION_MS_CONFIG, "xxx")
.build();
}
终于找到了!如果它对某人有帮助,这里是解决方案:
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
configs.put("security.protocol", "SASL_PLAINTEXT");
configs.put("sasl.mechanism", "PLAIN");
configs.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=username " +
"password=password;");
return new KafkaAdmin(configs);
}
干杯