如何从 akka-stream-kafka 中的配置(application.conf)创建 ProducerSettings?
How to create ProducerSettings from configuration(application.conf) in akka-stream-kafka?
我正在尝试学习和使用 akka-stream-kafka
并且正在阅读它的 [文档][1]。在 Producer settings 部分,它告诉我们可以使用编程方式和配置来创建 ProducerSettings
。有程序化构造的示例,但没有说明如何通过配置创建它的示例。编程构造很简单,下面是一个例子。但是我想使用配置基础结构并希望配置来自 application.conf
因为它会给我更多的控制权。我似乎无法在 google 上找到它的示例。
val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")
文档只是将您转到 ProducerConfig 的 Apache Kafka Javadoc,因为它包含一堆常量,您可以在 akka.kafka.producer.kafka-clients
配置部分中用作键。
扩展文档中的参考配置,例如:
# Properties for akka.kafka.ProducerSettings can be
# defined in this section or a configuration section with
# the same layout.
akka.kafka.producer {
# Tuning parameter of how many sends that can run in parallel.
parallelism = 100
# How long to wait for `KafkaProducer.close`
close-timeout = 60s
# Fully qualified config path which holds the dispatcher configuration
# to be used by the producer stages. Some blocking may occur.
# When this value is empty, the dispatcher configured for the stream
# will be used.
use-dispatcher = "akka.kafka.default-dispatcher"
# Properties defined by org.apache.kafka.clients.producer.ProducerConfig
# can be defined in this configuration section.
kafka-clients {
bootstrap.servers = "localhost:9092"
enable.auto.commit = true
auto.commit.interval.ms = 10000
acks = "all"
retries = 0
batch.size = 16384
}
}
你的 application.conf
文件的内容将默认由你的 ActorSystem
加载,所以每当你按照下面创建一个 ProducerSettings
对象时,它应该从 akka.kafka.producer
。您不需要显式地将配置传递给构造函数。
val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
我正在尝试学习和使用 akka-stream-kafka
并且正在阅读它的 [文档][1]。在 Producer settings 部分,它告诉我们可以使用编程方式和配置来创建 ProducerSettings
。有程序化构造的示例,但没有说明如何通过配置创建它的示例。编程构造很简单,下面是一个例子。但是我想使用配置基础结构并希望配置来自 application.conf
因为它会给我更多的控制权。我似乎无法在 google 上找到它的示例。
val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")
文档只是将您转到 ProducerConfig 的 Apache Kafka Javadoc,因为它包含一堆常量,您可以在 akka.kafka.producer.kafka-clients
配置部分中用作键。
扩展文档中的参考配置,例如:
# Properties for akka.kafka.ProducerSettings can be
# defined in this section or a configuration section with
# the same layout.
akka.kafka.producer {
# Tuning parameter of how many sends that can run in parallel.
parallelism = 100
# How long to wait for `KafkaProducer.close`
close-timeout = 60s
# Fully qualified config path which holds the dispatcher configuration
# to be used by the producer stages. Some blocking may occur.
# When this value is empty, the dispatcher configured for the stream
# will be used.
use-dispatcher = "akka.kafka.default-dispatcher"
# Properties defined by org.apache.kafka.clients.producer.ProducerConfig
# can be defined in this configuration section.
kafka-clients {
bootstrap.servers = "localhost:9092"
enable.auto.commit = true
auto.commit.interval.ms = 10000
acks = "all"
retries = 0
batch.size = 16384
}
}
你的 application.conf
文件的内容将默认由你的 ActorSystem
加载,所以每当你按照下面创建一个 ProducerSettings
对象时,它应该从 akka.kafka.producer
。您不需要显式地将配置传递给构造函数。
val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)