Akka Stream Kafka:找不到键 'kafka-clients' 的配置设置
Akka Stream Kafka: No configuration setting found for key 'kafka-clients'
我正在尝试使用 Alpakka Kafka connector (Akka Stream Kafka) 创建一个简单的原型。
当 运行 应用程序时,我收到以下错误:
com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'kafka-clients'
我在./src/main/scala/App.scala
中有以下代码:
import akka.Done
import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.{ActorMaterializer, Materializer}
import akka.stream.scaladsl.Source
import com.typesafe.config.ConfigFactory
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer
import scala.concurrent.Future
object App {
def main(args: Array[String]): Unit = {
println("Hello from producer")
implicit val system = ActorSystem("fakeProducer")
implicit val materializer: Materializer = ActorMaterializer()
val config = system.settings.config // ConfigFactory.load()
val producerSettings =
ProducerSettings(config, new StringSerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")
val done: Future[Done] =
Source(1 to 100)
.map(_.toString)
.map(value => new ProducerRecord[String, String]("test-basic-numbers", value))
.runWith(Producer.plainSink(producerSettings))
println("Done")
}
}
以下build.sbt
:
name := "test-akka-stream"
version := "0.1"
scalaVersion := "2.11.8"
libraryDependencies += "com.typesafe.akka" %% "akka-stream-kafka" % "0.21.1"
我 运行 该应用程序使用 sbt run
。我还没有配置任何 uber/assembly jar。
我可能遗漏了一些明显的东西,但我看不到它...我怀疑 akka 依赖项存在一些问题。
更新
正如@terminally-chill 所建议的,调用 ProducerSettings(system, new StringSerializer, new StringSerializer)
(传递 ActorSystem
而不是配置)解决了问题。我只是不明白这是设计使然还是错误。
更新 2
我创建了一个 github issue 并且已经修复。现在文档更加准确,并解释了创建 ProducerSettings
/ConsumerSettings
.
的正确方法
val config = system.settings.config.getConfig("akka.kafka.producer")
val producerSettings =
ProducerSettings(config, new StringSerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")
或者您可以通过 ActorSystem
如上所述。
通常我将所有配置保存在 AkkaSystem 实例中。我没有将 Alpakka 用于 Kafka,但我的许多实现都基于源代码。
使用 val config = ConfigFactory.load()
加载类型安全配置对象,然后将 config
传递给 val system = ActorSystem("fakeProducer", config)
。
最后,将system.settings.config
传递给ProducerSettings
。
您当前的代码没有传递任何设置,因为您还没有将配置加载到您的 Akka 系统中。您的 val config = system.settings.config
引用了一个空配置,其中没有 kafka-clients 部分(最佳猜测)。
我想我 运行 遇到了与您相同的问题(几乎在同一时间),尽管我正在尝试创建一个基本的 'hello world' Kafka 消费者而不是生产者。我猜您只是浏览了 Alpakka Kafka connector documentation 中的文档,并遵循了他们首先定义
的示例
val config = system.settings.config
然后将其传递给新的 ConsumerSettings 对象。我猜想在线文档存在缺陷,但我对 Akka Streams 还很陌生(这是我第一次尝试通过示例学习),所以我没有资格弄清楚什么是对的,什么是错的。
我曾尝试创建 application.conf 文件,然后执行 ConfigFactory.load() 然后在创建时手动将其传递给 ActorSystem,然后将该系统传递给 ConsumerSettings 构造函数,关于丢失 "kafka-clients" 的错误消失了,但显然我什至不必这样做。正如您所说,只需传递 'system' 变量而不是 'config' 变量就可以了。
希望这对同样在黑暗中摸索的人有所帮助。我不得不说,对于 Akka Streams 周围的所有讨论,似乎确实缺乏文档。一旦我弄清楚了这些东西,我可能不得不写一篇博客文章!
感谢@terminally-chill 和@murray-todd-williams 的回答。我做了一些进一步的研究,我试着在这里总结一下:
ConsumerSettings
和 ProducerSettings
都具有采用 Config
的 apply
函数(参见 here) or an ActorSystem
(see here)。
问题是当使用 ActorSystem
时代码是:
val config = system.settings.config.getConfig("akka.kafka.consumer")
apply(config, keyDeserializer, valueDeserializer) // call the other overload
而当使用 Config
时,代码是:
val properties = ConfigSettings.parseKafkaClientsProperties(config.getConfig("kafka-clients"))
因此,当直接传递配置时,代码会搜索 kafka-clients
属性,而不是当传递 ActorSystem
时,代码会检查 akka.kafka.consumer/akka.kafka.producer
.
最后考虑一下,在默认情况下创建 ActorSystem
实例时,大多数设置都是从嵌入式 reference.conf
文件加载的,如果存在,则与您的 application.conf
文件合并。更多信息 here。
所以基本上唯一需要设置的 属性 通常是 bootstrap.servers
.
所以您现在可以理解为什么在使用 system.settings.config
时代码不起作用。此配置实例已加载 reference.conf
(具有所有默认值,请参阅 here)和自定义 application.conf
。 kafka-clients
属性 在 akka.kafka.consumer/akka.kafka.producer
里面,但是代码直接检查 kafka-clients
.
一些可能的解决方案:
- 使用另一个重载
直接传递ActorSystem
- 使用
system.settings.config.getConfig("akka.kafka.consumer")
通过正确的部分
- 使用
kafka-clients
部分手动构造一个 Config
实例
对我来说,问题是提供的官方文档 here 没有提及这些差异,并且提供的示例不完整 and/or 不准确。对于 Akka 专家来说,这可能很清楚,但对于新开发人员来说,这可能会非常混乱。
我创建了一个更多 "ready to use" 示例 in this gist and open an issue。
感谢您注意到并提交 Alpakka Kafka 连接器项目中的问题。
文档现已更新:https://doc.akka.io/docs/akka-stream-kafka/current/producer.html
我正在尝试使用 Alpakka Kafka connector (Akka Stream Kafka) 创建一个简单的原型。
当 运行 应用程序时,我收到以下错误:
com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'kafka-clients'
我在./src/main/scala/App.scala
中有以下代码:
import akka.Done
import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.{ActorMaterializer, Materializer}
import akka.stream.scaladsl.Source
import com.typesafe.config.ConfigFactory
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer
import scala.concurrent.Future
object App {
def main(args: Array[String]): Unit = {
println("Hello from producer")
implicit val system = ActorSystem("fakeProducer")
implicit val materializer: Materializer = ActorMaterializer()
val config = system.settings.config // ConfigFactory.load()
val producerSettings =
ProducerSettings(config, new StringSerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")
val done: Future[Done] =
Source(1 to 100)
.map(_.toString)
.map(value => new ProducerRecord[String, String]("test-basic-numbers", value))
.runWith(Producer.plainSink(producerSettings))
println("Done")
}
}
以下build.sbt
:
name := "test-akka-stream"
version := "0.1"
scalaVersion := "2.11.8"
libraryDependencies += "com.typesafe.akka" %% "akka-stream-kafka" % "0.21.1"
我 运行 该应用程序使用 sbt run
。我还没有配置任何 uber/assembly jar。
我可能遗漏了一些明显的东西,但我看不到它...我怀疑 akka 依赖项存在一些问题。
更新
正如@terminally-chill 所建议的,调用 ProducerSettings(system, new StringSerializer, new StringSerializer)
(传递 ActorSystem
而不是配置)解决了问题。我只是不明白这是设计使然还是错误。
更新 2
我创建了一个 github issue 并且已经修复。现在文档更加准确,并解释了创建 ProducerSettings
/ConsumerSettings
.
val config = system.settings.config.getConfig("akka.kafka.producer")
val producerSettings =
ProducerSettings(config, new StringSerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")
或者您可以通过 ActorSystem
如上所述。
通常我将所有配置保存在 AkkaSystem 实例中。我没有将 Alpakka 用于 Kafka,但我的许多实现都基于源代码。
使用 val config = ConfigFactory.load()
加载类型安全配置对象,然后将 config
传递给 val system = ActorSystem("fakeProducer", config)
。
最后,将system.settings.config
传递给ProducerSettings
。
您当前的代码没有传递任何设置,因为您还没有将配置加载到您的 Akka 系统中。您的 val config = system.settings.config
引用了一个空配置,其中没有 kafka-clients 部分(最佳猜测)。
我想我 运行 遇到了与您相同的问题(几乎在同一时间),尽管我正在尝试创建一个基本的 'hello world' Kafka 消费者而不是生产者。我猜您只是浏览了 Alpakka Kafka connector documentation 中的文档,并遵循了他们首先定义
的示例val config = system.settings.config
然后将其传递给新的 ConsumerSettings 对象。我猜想在线文档存在缺陷,但我对 Akka Streams 还很陌生(这是我第一次尝试通过示例学习),所以我没有资格弄清楚什么是对的,什么是错的。
我曾尝试创建 application.conf 文件,然后执行 ConfigFactory.load() 然后在创建时手动将其传递给 ActorSystem,然后将该系统传递给 ConsumerSettings 构造函数,关于丢失 "kafka-clients" 的错误消失了,但显然我什至不必这样做。正如您所说,只需传递 'system' 变量而不是 'config' 变量就可以了。
希望这对同样在黑暗中摸索的人有所帮助。我不得不说,对于 Akka Streams 周围的所有讨论,似乎确实缺乏文档。一旦我弄清楚了这些东西,我可能不得不写一篇博客文章!
感谢@terminally-chill 和@murray-todd-williams 的回答。我做了一些进一步的研究,我试着在这里总结一下:
ConsumerSettings
和 ProducerSettings
都具有采用 Config
的 apply
函数(参见 here) or an ActorSystem
(see here)。
问题是当使用 ActorSystem
时代码是:
val config = system.settings.config.getConfig("akka.kafka.consumer")
apply(config, keyDeserializer, valueDeserializer) // call the other overload
而当使用 Config
时,代码是:
val properties = ConfigSettings.parseKafkaClientsProperties(config.getConfig("kafka-clients"))
因此,当直接传递配置时,代码会搜索 kafka-clients
属性,而不是当传递 ActorSystem
时,代码会检查 akka.kafka.consumer/akka.kafka.producer
.
最后考虑一下,在默认情况下创建 ActorSystem
实例时,大多数设置都是从嵌入式 reference.conf
文件加载的,如果存在,则与您的 application.conf
文件合并。更多信息 here。
所以基本上唯一需要设置的 属性 通常是 bootstrap.servers
.
所以您现在可以理解为什么在使用 system.settings.config
时代码不起作用。此配置实例已加载 reference.conf
(具有所有默认值,请参阅 here)和自定义 application.conf
。 kafka-clients
属性 在 akka.kafka.consumer/akka.kafka.producer
里面,但是代码直接检查 kafka-clients
.
一些可能的解决方案:
- 使用另一个重载 直接传递
- 使用
system.settings.config.getConfig("akka.kafka.consumer")
通过正确的部分
- 使用
kafka-clients
部分手动构造一个Config
实例
ActorSystem
对我来说,问题是提供的官方文档 here 没有提及这些差异,并且提供的示例不完整 and/or 不准确。对于 Akka 专家来说,这可能很清楚,但对于新开发人员来说,这可能会非常混乱。
我创建了一个更多 "ready to use" 示例 in this gist and open an issue。
感谢您注意到并提交 Alpakka Kafka 连接器项目中的问题。 文档现已更新:https://doc.akka.io/docs/akka-stream-kafka/current/producer.html