为什么 spark-submit 失败并显示 "AnalysisException: kafka is not a valid Spark SQL Data Source"?
Why does spark-submit fail with "AnalysisException: kafka is not a valid Spark SQL Data Source"?
我使用 Spark 2.1.0 和 Kafka 0.10.2.1。
我编写了一个从 Kafka 主题读取数据集的 Spark 应用程序。
代码如下:
package com.example;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
public class MLP {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("MLP")
.getOrCreate();
Dataset<Row> df = spark
.read()
.format("kafka")
.option("kafka.bootstrap.servers","localhost:9092,localhost:9093")
.option("subscribe", "resultsTopic")
.load();
df.show();
spark.stop();
}
}
我的部署脚本如下:
spark-submit \
--verbose \
--jars${echo /home/hduser1/spark/jars/*.jar | tr ' ' ',') \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.10 \
--class com.**** \
--master (Spark Master URL) /path/to/jar
但是我得到错误:
Exception in thread "main" org.apache.spark.sql.AnalysisException:
kafka is not a valid Spark SQL Data Source.;
我试过将同一个应用程序与非 Jafka 数据源一起使用,并且数据框已正确创建。我也试过在客户端模式下使用纱线,但我得到了同样的错误。
Kafka 作为非流 DataFrame 的数据源 - Spark 2.2 将提供数据集,reference in this issue on Spark JIRA
如@JacekLaskowski 所述,将包更改为(修改 Jacek 的版本以使用 2.2):
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0
更重要的是,使用readStream
读取数据流
您不能将 show
用于流数据源,而是使用 console
格式。
StreamingQuery query = df.writeStream()
.outputMode("append")
.format("console")
.start();
query.awaitTermination();
首先,您应该将 --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.10
(我怀疑它有效)替换为以下内容:
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.1
我认为版本 2.10
从未可用过。您可能已经考虑过 2.1.0
如果您使用 2.1.0
(而不是 2.10
)可能会起作用。
其次,删除 Spark 加载的 --jars${echo /home/hduser1/spark/jars/*.jar | tr ' ' ',')
,除了一些额外的 jar,例如用于 Kafka 源的 jar。
您应该可以访问 kafka
源格式。
我使用 Spark 2.1.0 和 Kafka 0.10.2.1。
我编写了一个从 Kafka 主题读取数据集的 Spark 应用程序。
代码如下:
package com.example;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
public class MLP {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("MLP")
.getOrCreate();
Dataset<Row> df = spark
.read()
.format("kafka")
.option("kafka.bootstrap.servers","localhost:9092,localhost:9093")
.option("subscribe", "resultsTopic")
.load();
df.show();
spark.stop();
}
}
我的部署脚本如下:
spark-submit \
--verbose \
--jars${echo /home/hduser1/spark/jars/*.jar | tr ' ' ',') \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.10 \
--class com.**** \
--master (Spark Master URL) /path/to/jar
但是我得到错误:
Exception in thread "main" org.apache.spark.sql.AnalysisException:
kafka is not a valid Spark SQL Data Source.;
我试过将同一个应用程序与非 Jafka 数据源一起使用,并且数据框已正确创建。我也试过在客户端模式下使用纱线,但我得到了同样的错误。
Kafka 作为非流 DataFrame 的数据源 - Spark 2.2 将提供数据集,reference in this issue on Spark JIRA
如@JacekLaskowski 所述,将包更改为(修改 Jacek 的版本以使用 2.2):
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0
更重要的是,使用readStream
读取数据流
您不能将 show
用于流数据源,而是使用 console
格式。
StreamingQuery query = df.writeStream()
.outputMode("append")
.format("console")
.start();
query.awaitTermination();
首先,您应该将 --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.10
(我怀疑它有效)替换为以下内容:
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.1
我认为版本 2.10
从未可用过。您可能已经考虑过 2.1.0
如果您使用 2.1.0
(而不是 2.10
)可能会起作用。
其次,删除 Spark 加载的 --jars${echo /home/hduser1/spark/jars/*.jar | tr ' ' ',')
,除了一些额外的 jar,例如用于 Kafka 源的 jar。
您应该可以访问 kafka
源格式。