为什么 spark-submit 失败并显示 "AnalysisException: kafka is not a valid Spark SQL Data Source"?

Why does spark-submit fail with "AnalysisException: kafka is not a valid Spark SQL Data Source"?

我使用 Spark 2.1.0 和 Kafka 0.10.2.1。

我编写了一个从 Kafka 主题读取数据集的 Spark 应用程序。

代码如下:

package com.example;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;

import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

public class MLP {

    public static void main(String[] args) {
        SparkSession spark = SparkSession
            .builder()
            .appName("MLP")
            .getOrCreate();

        Dataset<Row> df = spark
            .read()
            .format("kafka")
            .option("kafka.bootstrap.servers","localhost:9092,localhost:9093")
            .option("subscribe", "resultsTopic")
            .load();
        df.show();
        spark.stop();
    }
}

我的部署脚本如下:

spark-submit \
  --verbose \
  --jars${echo /home/hduser1/spark/jars/*.jar | tr ' ' ',') \
  --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.10 \
  --class com.**** \
  --master (Spark Master URL) /path/to/jar 

但是我得到错误:

Exception in thread "main" org.apache.spark.sql.AnalysisException:
kafka is not a valid Spark SQL Data Source.;

我试过将同一个应用程序与非 Jafka 数据源一起使用,并且数据框已正确创建。我也试过在客户端模式下使用纱线,但我得到了同样的错误。

Kafka 作为非流 DataFrame 的数据源 - Spark 2.2 将提供数据集,reference in this issue on Spark JIRA

如@JacekLaskowski 所述,将包更改为(修改 Jacek 的版本以使用 2.2):

--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0

更重要的是,使用readStream读取数据流

您不能将 show 用于流数据源,而是使用 console 格式。

StreamingQuery query = df.writeStream()
  .outputMode("append")
  .format("console")
  .start();

query.awaitTermination();

See this link

首先,您应该将 --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.10(我怀疑它有效)替换为以下内容:

--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.1

我认为版本 2.10 从未可用过。您可能已经考虑过 2.1.0 如果您使用 2.1.0(而不是 2.10)可能会起作用。

其次,删除 Spark 加载的 --jars${echo /home/hduser1/spark/jars/*.jar | tr ' ' ','),除了一些额外的 jar,例如用于 Kafka 源的 jar。

您应该可以访问 kafka 源格式。