Akka Stream Kafka:找不到键 'kafka-clients' 的配置设置

Akka Stream Kafka: No configuration setting found for key 'kafka-clients'

我正在尝试使用 Alpakka Kafka connector (Akka Stream Kafka) 创建一个简单的原型。

当 运行 应用程序时,我收到以下错误:

com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'kafka-clients'

我在./src/main/scala/App.scala中有以下代码:

import akka.Done
import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.{ActorMaterializer, Materializer}
import akka.stream.scaladsl.Source
import com.typesafe.config.ConfigFactory
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer

import scala.concurrent.Future

object App {
  def main(args: Array[String]): Unit = {
    println("Hello from producer")

    implicit val system = ActorSystem("fakeProducer")
    implicit val materializer: Materializer = ActorMaterializer()

    val config = system.settings.config // ConfigFactory.load()

    val producerSettings =
      ProducerSettings(config, new StringSerializer, new StringSerializer)
      .withBootstrapServers("localhost:9092")

    val done: Future[Done] =
      Source(1 to 100)
        .map(_.toString)
        .map(value => new ProducerRecord[String, String]("test-basic-numbers", value))
        .runWith(Producer.plainSink(producerSettings))


    println("Done")
  }
}

以下build.sbt

name := "test-akka-stream"

version := "0.1"

scalaVersion := "2.11.8"

libraryDependencies += "com.typesafe.akka" %% "akka-stream-kafka" % "0.21.1"

我 运行 该应用程序使用 sbt run。我还没有配置任何 uber/assembly jar。

我可能遗漏了一些明显的东西,但我看不到它...我怀疑 akka 依赖项存在一些问题。

更新

正如@terminally-chill 所建议的,调用 ProducerSettings(system, new StringSerializer, new StringSerializer)(传递 ActorSystem 而不是配置)解决了问题。我只是不明白这是设计使然还是错误。

更新 2

我创建了一个 github issue 并且已经修复。现在文档更加准确,并解释了创建 ProducerSettings/ConsumerSettings.

的正确方法
val config = system.settings.config.getConfig("akka.kafka.producer")
val producerSettings =
      ProducerSettings(config, new StringSerializer, new StringSerializer)
      .withBootstrapServers("localhost:9092")

或者您可以通过 ActorSystem 如上所述。

通常我将所有配置保存在 AkkaSystem 实例中。我没有将 Alpakka 用于 Kafka,但我的许多实现都基于源代码。

使用 val config = ConfigFactory.load() 加载类型安全配置对象,然后将 config 传递给 val system = ActorSystem("fakeProducer", config)

最后,将system.settings.config传递给ProducerSettings

您当前的代码没有传递任何设置,因为您还没有将配置加载到您的 Akka 系统中。您的 val config = system.settings.config 引用了一个空配置,其中没有 kafka-clients 部分(最佳猜测)。

我想我 运行 遇到了与您相同的问题(几乎在同一时间),尽管我正在尝试创建一个基本的 'hello world' Kafka 消费者而不是生产者。我猜您只是浏览了 Alpakka Kafka connector documentation 中的文档,并遵循了他们首先定义

的示例
val config = system.settings.config

然后将其传递给新的 ConsumerSettings 对象。我猜想在线文档存在缺陷,但我对 Akka Streams 还很陌生(这是我第一次尝试通过示例学习),所以我没有资格弄清楚什么是对的,什么是错的。

我曾尝试创建 application.conf 文件,然后执行 ConfigFactory.load() 然后在创建时手动将其传递给 ActorSystem,然后将该系统传递给 ConsumerSettings 构造函数,关于丢失 "kafka-clients" 的错误消失了,但显然我什至不必这样做。正如您所说,只需传递 'system' 变量而不是 'config' 变量就可以了。

希望这对同样在黑暗中摸索的人有所帮助。我不得不说,对于 Akka Streams 周围的所有讨论,似乎确实缺乏文档。一旦我弄清楚了这些东西,我可能不得不写一篇博客文章!

感谢@terminally-chill 和@murray-todd-williams 的回答。我做了一些进一步的研究,我试着在这里总结一下:

ConsumerSettingsProducerSettings 都具有采用 Configapply 函数(参见 here) or an ActorSystem (see here)。

问题是当使用 ActorSystem 时代码是:

val config = system.settings.config.getConfig("akka.kafka.consumer")
apply(config, keyDeserializer, valueDeserializer) // call the other overload

而当使用 Config 时,代码是:

val properties = ConfigSettings.parseKafkaClientsProperties(config.getConfig("kafka-clients"))

因此,当直接传递配置时,代码会搜索 kafka-clients 属性,而不是当传递 ActorSystem 时,代码会检查 akka.kafka.consumer/akka.kafka.producer.

最后考虑一下,在默认情况下创建 ActorSystem 实例时,大多数设置都是从​​嵌入式 reference.conf 文件加载的,如果存在,则与您的 application.conf 文件合并。更多信息 here。 所以基本上唯一需要设置的 属性 通常是 bootstrap.servers.

所以您现在可以理解为什么在使用 system.settings.config 时代码不起作用。此配置实例已加载 reference.conf(具有所有默认值,请参阅 here)和自定义 application.confkafka-clients 属性 在 akka.kafka.consumer/akka.kafka.producer 里面,但是代码直接检查 kafka-clients.

一些可能的解决方案:

  • 使用另一个重载
  • 直接传递ActorSystem
  • 使用 system.settings.config.getConfig("akka.kafka.consumer")
  • 通过正确的部分
  • 使用 kafka-clients 部分手动构造一个 Config 实例

对我来说,问题是提供的官方文档 here 没有提及这些差异,并且提供的示例不完整 and/or 不准确。对于 Akka 专家来说,这可能很清楚,但对于新开发人员来说,这可能会非常混乱。

我创建了一个更多 "ready to use" 示例 in this gist and open an issue

感谢您注意到并提交 Alpakka Kafka 连接器项目中的问题。 文档现已更新:https://doc.akka.io/docs/akka-stream-kafka/current/producer.html