Alpakka如何创建多个分区
How to create several partitions by Alpakka
我正在尝试创建一个简单的生产者,它创建一个主题,其中包含配置提供的一些分区。
根据 Alpakka Producer Setting Doc any property from org.apache.kafka.clients.producer.ProducerConfig
can be set in kafka-clients
section. And, there is a num.partitions
property as commented in Producer API Doc .
因此,我将 属性 添加到我的 application.conf
文件中,如下所示:
topic = "topic"
topic = ${?TOPIC}
# Properties for akka.kafka.ProducerSettings can be
# defined in this section or a configuration section with
# the same layout.
akka.kafka.producer {
# Tuning parameter of how many sends that can run in parallel.
parallelism = 100
parallelism = ${?PARALLELISM}
# Duration to wait for `KafkaConsumer.close` to finish.
close-timeout = 20s
# Fully qualified config path which holds the dispatcher configuration
# to be used by the producer stages. Some blocking may occur.
# When this value is empty, the dispatcher configured for the stream
# will be used.
use-dispatcher = "akka.kafka.default-dispatcher"
# The time interval to commit a transaction when using the `Transactional.sink` or `Transactional.flow`
eos-commit-interval = 100ms
# Properties defined by org.apache.kafka.clients.producer.ProducerConfig
# can be defined in this configuration section.
kafka-clients {
bootstrap.servers = "my-kafka:9092"
bootstrap.servers = ${?BOOTSTRAPSERVERS}
num.partitions = "3"
num.partitions = ${?NUM_PARTITIONS}
}
}
生产者申请代码也如下:
object Main extends App {
val config = ConfigFactory.load()
implicit val system: ActorSystem = ActorSystem("producer")
implicit val materializer: Materializer = ActorMaterializer()
val producerConfigs = config.getConfig("akka.kafka.producer")
val producerSettings = ProducerSettings(producerConfigs, new StringSerializer, new StringSerializer)
val topic = config.getString("topic")
val done: Future[Done] =
Source(1 to 100000)
.map(_.toString)
.map(value => new ProducerRecord[String, String](topic, value))
.runWith(Producer.plainSink(producerSettings))
implicit val ec: ExecutionContextExecutor = system.dispatcher
done onComplete {
case Success(_) => println("Done"); system.terminate()
case Failure(err) => println(err.toString); system.terminate()
}
}
但是,这不起作用。生产者创建一个主题,其中包含一个分区,而不是我通过配置设置的 3 个分区:
num.partitions = "3"
最后Kafkacat的输出如下:
~$ kafkacat -b my-kafka:9092 -L
Metadata for all topics (from broker -1: my-kafka:9092/bootstrap):
3 brokers:
broker 2 at my-kafka-2.my-kafka-headless.default:9092
broker 1 at my-kafka-1.my-kafka-headless.default:9092
broker 0 at my-kafka-0.my-kafka-headless.default:9092
1 topics:
topic "topic" with 1 partitions:
partition 0, leader 2, replicas: 2, isrs: 2
怎么了?是否可以使用 Alpakka 在 kafka-clients
部分中的 Kafka Producer API 设置属性?
主题似乎正在默认创建,这是 Kafka 的默认行为。如果是这种情况,您需要在 server.properties 文件中为您的代理定义默认分区数。
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=3
# Properties defined by org.apache.kafka.clients.producer.ProducerConfig
# can be defined in this configuration section.
正如上面所说,ProducerConfig
用于生产者设置,而不是经纪人设置,这就是 num.partitions
是什么(我想你迷路了 table 属性 显示在 Apache Kafka 文档中......滚动到它的顶部以查看正确的 header)。
无法从生产者设置主题的分区...您需要使用 AdminClient
class 创建主题,分区数是一个参数在那里,没有配置 属性。
示例代码
val props = new Properties()
props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
val adminClient = AdminClient.create(props)
val numPartitions = 3
val replicationFactor = 3.toShort
val newTopic = new NewTopic("new-topic-name", numPartitions, replicationFactor)
val configs = Map(TopicConfig.COMPRESSION_TYPE_CONFIG -> "gzip")
// settings some configs
newTopic.configs(configs.asJava)
adminClient.createTopics(List(newTopic).asJavaCollection)
然后就可以启动生产者了
我正在尝试创建一个简单的生产者,它创建一个主题,其中包含配置提供的一些分区。
根据 Alpakka Producer Setting Doc any property from org.apache.kafka.clients.producer.ProducerConfig
can be set in kafka-clients
section. And, there is a num.partitions
property as commented in Producer API Doc .
因此,我将 属性 添加到我的 application.conf
文件中,如下所示:
topic = "topic"
topic = ${?TOPIC}
# Properties for akka.kafka.ProducerSettings can be
# defined in this section or a configuration section with
# the same layout.
akka.kafka.producer {
# Tuning parameter of how many sends that can run in parallel.
parallelism = 100
parallelism = ${?PARALLELISM}
# Duration to wait for `KafkaConsumer.close` to finish.
close-timeout = 20s
# Fully qualified config path which holds the dispatcher configuration
# to be used by the producer stages. Some blocking may occur.
# When this value is empty, the dispatcher configured for the stream
# will be used.
use-dispatcher = "akka.kafka.default-dispatcher"
# The time interval to commit a transaction when using the `Transactional.sink` or `Transactional.flow`
eos-commit-interval = 100ms
# Properties defined by org.apache.kafka.clients.producer.ProducerConfig
# can be defined in this configuration section.
kafka-clients {
bootstrap.servers = "my-kafka:9092"
bootstrap.servers = ${?BOOTSTRAPSERVERS}
num.partitions = "3"
num.partitions = ${?NUM_PARTITIONS}
}
}
生产者申请代码也如下:
object Main extends App {
val config = ConfigFactory.load()
implicit val system: ActorSystem = ActorSystem("producer")
implicit val materializer: Materializer = ActorMaterializer()
val producerConfigs = config.getConfig("akka.kafka.producer")
val producerSettings = ProducerSettings(producerConfigs, new StringSerializer, new StringSerializer)
val topic = config.getString("topic")
val done: Future[Done] =
Source(1 to 100000)
.map(_.toString)
.map(value => new ProducerRecord[String, String](topic, value))
.runWith(Producer.plainSink(producerSettings))
implicit val ec: ExecutionContextExecutor = system.dispatcher
done onComplete {
case Success(_) => println("Done"); system.terminate()
case Failure(err) => println(err.toString); system.terminate()
}
}
但是,这不起作用。生产者创建一个主题,其中包含一个分区,而不是我通过配置设置的 3 个分区:
num.partitions = "3"
最后Kafkacat的输出如下:
~$ kafkacat -b my-kafka:9092 -L
Metadata for all topics (from broker -1: my-kafka:9092/bootstrap):
3 brokers:
broker 2 at my-kafka-2.my-kafka-headless.default:9092
broker 1 at my-kafka-1.my-kafka-headless.default:9092
broker 0 at my-kafka-0.my-kafka-headless.default:9092
1 topics:
topic "topic" with 1 partitions:
partition 0, leader 2, replicas: 2, isrs: 2
怎么了?是否可以使用 Alpakka 在 kafka-clients
部分中的 Kafka Producer API 设置属性?
主题似乎正在默认创建,这是 Kafka 的默认行为。如果是这种情况,您需要在 server.properties 文件中为您的代理定义默认分区数。
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=3
# Properties defined by org.apache.kafka.clients.producer.ProducerConfig
# can be defined in this configuration section.
正如上面所说,ProducerConfig
用于生产者设置,而不是经纪人设置,这就是 num.partitions
是什么(我想你迷路了 table 属性 显示在 Apache Kafka 文档中......滚动到它的顶部以查看正确的 header)。
无法从生产者设置主题的分区...您需要使用 AdminClient
class 创建主题,分区数是一个参数在那里,没有配置 属性。
示例代码
val props = new Properties()
props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
val adminClient = AdminClient.create(props)
val numPartitions = 3
val replicationFactor = 3.toShort
val newTopic = new NewTopic("new-topic-name", numPartitions, replicationFactor)
val configs = Map(TopicConfig.COMPRESSION_TYPE_CONFIG -> "gzip")
// settings some configs
newTopic.configs(configs.asJava)
adminClient.createTopics(List(newTopic).asJavaCollection)
然后就可以启动生产者了