如何从 akka-stream-kafka 中的配置(application.conf)创建 ProducerSettings?

How to create ProducerSettings from configuration(application.conf) in akka-stream-kafka?

我正在尝试学习和使用 akka-stream-kafka 并且正在阅读它的 [文档][1]。在 Producer settings 部分,它告诉我们可以使用编程方式和配置来创建 ProducerSettings。有程序化构造的示例,但没有说明如何通过配置创建它的示例。编程构造很简单,下面是一个例子。但是我想使用配置基础结构并希望配置来自 application.conf 因为它会给我更多的控制权。我似乎无法在 google 上找到它的示例。

val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
  .withBootstrapServers("localhost:9092")

文档只是将您转到 ProducerConfig 的 Apache Kafka Javadoc,因为它包含一堆常量,您可以在 akka.kafka.producer.kafka-clients 配置部分中用作键。

扩展文档中的参考配置,例如:

# Properties for akka.kafka.ProducerSettings can be
# defined in this section or a configuration section with
# the same layout. 
akka.kafka.producer {
  # Tuning parameter of how many sends that can run in parallel.
  parallelism = 100

  # How long to wait for `KafkaProducer.close`
  close-timeout = 60s

  # Fully qualified config path which holds the dispatcher configuration
  # to be used by the producer stages. Some blocking may occur.
  # When this value is empty, the dispatcher configured for the stream
  # will be used.
  use-dispatcher = "akka.kafka.default-dispatcher"

  # Properties defined by org.apache.kafka.clients.producer.ProducerConfig
  # can be defined in this configuration section.
  kafka-clients {
    bootstrap.servers = "localhost:9092"
    enable.auto.commit = true
    auto.commit.interval.ms = 10000
    acks = "all"
    retries = 0
    batch.size = 16384
  }
}

你的 application.conf 文件的内容将默认由你的 ActorSystem 加载,所以每当你按照下面创建一个 ProducerSettings 对象时,它应该从 akka.kafka.producer。您不需要显式地将配置传递给构造函数。

val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)