Spark "Failed to construct kafka consumer" 通过 SSL

Spark "Failed to construct kafka consumer" via SSL

我正在尝试设置一个 Spark 作业来使用来自 Kafka 的数据。 Kafka 经纪人设置了 SSL,但我无法正确 build/authenticate 消费者。

spark-shell 命令:

spark-2.3.4-bin-hadoop2.7/bin/spark-shell 
    --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.4 
    --files "spark-kafka.jaas"   
    --driver-java-options "-Djava.security.auth.login.config=./spark-kafka.jaas"   
    --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./spark-kafka.jaas"

spark-kafka.jaas

KafkaClient {
   org.apache.kafka.common.security.plain.PlainLoginModule required
   username="myusername"
   password="mypwd"
};

Shell 命令:

val df = spark
    .read
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1, host2:port2")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.ssl.truststore.location", "./truststore.jks")
    .option("kafka.ssl.truststore.password", "truststore-pwd")
    .option("kafka.ssl.endpoint.identification.algorithm", "")
    .option("kafka.sasl.mechanism", "SCRAM-SHA-256")
    .option("subscribe", "mytopic")
    .option("startingOffsets", "earliest")
    .load()

df.show()

错误:

2019-09-23 16:32:19 WARN  ObjectStore:568 - Failed to get database global_temp, returning NoSuchObjectException
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
  at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702)
  at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557)
  at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:540)
  at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62)
  at org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:314)
  at org.apache.spark.sql.kafka010.KafkaOffsetReader.<init>(KafkaOffsetReader.scala:78)
  at org.apache.spark.sql.kafka010.KafkaRelation.buildScan(KafkaRelation.scala:62)
  at org.apache.spark.sql.execution.datasources.DataSourceStrategy.apply(DataSourceStrategy.scala:308)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun.apply(QueryPlanner.scala:63)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun.apply(QueryPlanner.scala:63)
  at scala.collection.Iterator$$anon.nextCur(Iterator.scala:434)
  at scala.collection.Iterator$$anon.hasNext(Iterator.scala:440)
  at scala.collection.Iterator$$anon.hasNext(Iterator.scala:439)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$$anonfun$apply.apply(QueryPlanner.scala:78)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$$anonfun$apply.apply(QueryPlanner.scala:75)
  at scala.collection.TraversableOnce$$anonfun$foldLeft.apply(TraversableOnce.scala:157)
  at scala.collection.TraversableOnce$$anonfun$foldLeft.apply(TraversableOnce.scala:157)
  at scala.collection.Iterator$class.foreach(Iterator.scala:893)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
  at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
  at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun.apply(QueryPlanner.scala:75)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun.apply(QueryPlanner.scala:67)
  at scala.collection.Iterator$$anon.nextCur(Iterator.scala:434)
  at scala.collection.Iterator$$anon.hasNext(Iterator.scala:440)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$$anonfun$apply.apply(QueryPlanner.scala:78)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$$anonfun$apply.apply(QueryPlanner.scala:75)
  at scala.collection.TraversableOnce$$anonfun$foldLeft.apply(TraversableOnce.scala:157)
  at scala.collection.TraversableOnce$$anonfun$foldLeft.apply(TraversableOnce.scala:157)
  at scala.collection.Iterator$class.foreach(Iterator.scala:893)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
  at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
  at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun.apply(QueryPlanner.scala:75)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun.apply(QueryPlanner.scala:67)
  at scala.collection.Iterator$$anon.nextCur(Iterator.scala:434)
  at scala.collection.Iterator$$anon.hasNext(Iterator.scala:440)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
  at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
  at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3260)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2495)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2709)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:729)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:688)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:697)
  ... 49 elided
Caused by: org.apache.kafka.common.KafkaException: java.lang.SecurityException: java.io.IOException: Configuration Error:
    Line 5: expected [option key]
  at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86)
  at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:70)
  at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:83)
  at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:623)
  ... 99 more
Caused by: java.lang.SecurityException: java.io.IOException: Configuration Error:
    Line 5: expected [option key]
  at sun.security.provider.ConfigFile$Spi.<init>(ConfigFile.java:137)
  at sun.security.provider.ConfigFile.<init>(ConfigFile.java:102)
  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
  at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
  at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
  at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
  at java.lang.Class.newInstance(Class.java:442)
  at javax.security.auth.login.Configuration.run(Configuration.java:255)
  at javax.security.auth.login.Configuration.run(Configuration.java:247)
  at java.security.AccessController.doPrivileged(Native Method)
  at javax.security.auth.login.Configuration.getConfiguration(Configuration.java:246)
  at org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:61)
  at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:46)
  at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:68)
  at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:78)
  ... 102 more
Caused by: java.io.IOException: Configuration Error:
    Line 5: expected [option key]
  at sun.security.provider.ConfigFile$Spi.ioException(ConfigFile.java:666)
  at sun.security.provider.ConfigFile$Spi.match(ConfigFile.java:562)
  at sun.security.provider.ConfigFile$Spi.parseLoginEntry(ConfigFile.java:477)
  at sun.security.provider.ConfigFile$Spi.readConfig(ConfigFile.java:427)
  at sun.security.provider.ConfigFile$Spi.init(ConfigFile.java:329)
  at sun.security.provider.ConfigFile$Spi.init(ConfigFile.java:271)
  at sun.security.provider.ConfigFile$Spi.<init>(ConfigFile.java:135)
  ... 116 more

消息“配置错误: 第 5 行:预期的 [option key]” 表示您的 jaas.conf 有问题,很可能您在 password=... 之后缺少分号。JAAS syntax 需要 ; 来终止每个该文件中的 LoginModule 块。