连接 Pyspark 和 Kafka
Connecting Pyspark with Kafka
我在理解如何连接 Kafka 和 PySpark 时遇到问题。
我在 Windows 10 上安装了 kafka,主题很好地流式传输数据。
我已经安装了可以正常运行的 pyspark - 我能够毫无问题地创建测试 DataFrame。
但是当我尝试连接到 Kafka 流时出现错误:
AnalysisException: Failed to find data source: kafka. Please deploy
the application as per the deployment section of "Structured Streaming-
Kafka Integration Guide".
Spark 文档并不是很有帮助——它说:
...
groupId = org.apache.spark
artifactId = spark-sql-kafka-0-10_2.12
版本 = 3.2.0
...
对于Python 应用程序,您需要在部署应用程序时添加上述库及其依赖项。请参阅下面的部署小节。
然后当您转到“部署”部分时,它会显示:
与任何 Spark 应用程序一样,spark-submit 用于启动您的应用程序。 spark-sql-kafka-0-10_2.12及其依赖可以直接添加到spark-submit中使用--packages,例如,
./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 ...
我正在开发应用程序,我不想部署它。
如果我正在开发 pyspark 应用程序,在哪里以及如何添加这些依赖项?
试了几个教程最后还是比较糊涂。
看到回答说
"You need to add kafka-clients JAR to your --packages".so-answer
更多的步骤可能会有用,因为对于新手来说这还不清楚。
版本:
- 卡夫卡 2.13-2.8.1
- 火花 3.1.2
- java 11.0.12
所有环境变量和路径都已正确设置。
编辑
我已经加载了:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,org.apache.kafka:kafka-clients:2.8.1'
按照建议但仍然出现相同的错误。
我已经三次检查了 kafka、scala 和 spark 版本并尝试了各种组合,但没有成功,我仍然遇到相同的错误:
AnalysisException: Failed to find data source: kafka. Please deploy
the application as per the deployment section of "Structured Streaming-Kafka Integration Guide".
编辑 2
我安装了最新的 Spark 3.2.0 和 Hadoop 3.3.1 以及 kafka 版本 kafka_2.12-2.8.1。更改了所有环境变量,测试了 Spark 和 Kafka - 工作正常。
我的环境变量现在看起来像这样:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0,org.apache.kafka:kafka-clients:2.8.1'
仍然没有运气,我得到同样的错误:(
Spark documentation is not really helpful - it says ... artifactId = spark-sql-kafka-0-10_2.12 version = 3.2.0 ...
是的,这是正确的...但是对于最新版本的 Spark
versions:
- spark 3.1.2
您是否尝试查看 version specific docs?
换句话说,您想要匹配的 spark-sql-kafka
版本 3.1.2.
bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2
或在Python,
import os
spark_version = '3.1.2'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:{}'.format(spark_version)
# init spark here
need to add this above library and its dependencies
如您在我之前的回答中所见,还使用逗号分隔列表附加 kafka-clients
包。
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,org.apache.kafka:kafka-clients:2.8.1
I'm developing app, I don't want to deploy it.
“部署”是 Spark 术语。 运行本地还是一个“deployment”
我在理解如何连接 Kafka 和 PySpark 时遇到问题。
我在 Windows 10 上安装了 kafka,主题很好地流式传输数据。 我已经安装了可以正常运行的 pyspark - 我能够毫无问题地创建测试 DataFrame。
但是当我尝试连接到 Kafka 流时出现错误:
AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming- Kafka Integration Guide".
Spark 文档并不是很有帮助——它说: ... groupId = org.apache.spark artifactId = spark-sql-kafka-0-10_2.12 版本 = 3.2.0 ...
对于Python 应用程序,您需要在部署应用程序时添加上述库及其依赖项。请参阅下面的部署小节。
然后当您转到“部署”部分时,它会显示:
与任何 Spark 应用程序一样,spark-submit 用于启动您的应用程序。 spark-sql-kafka-0-10_2.12及其依赖可以直接添加到spark-submit中使用--packages,例如, ./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 ...
我正在开发应用程序,我不想部署它。 如果我正在开发 pyspark 应用程序,在哪里以及如何添加这些依赖项?
试了几个教程最后还是比较糊涂。
看到回答说
"You need to add kafka-clients JAR to your --packages".so-answer
更多的步骤可能会有用,因为对于新手来说这还不清楚。
版本:
- 卡夫卡 2.13-2.8.1
- 火花 3.1.2
- java 11.0.12
所有环境变量和路径都已正确设置。
编辑
我已经加载了:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,org.apache.kafka:kafka-clients:2.8.1'
按照建议但仍然出现相同的错误。 我已经三次检查了 kafka、scala 和 spark 版本并尝试了各种组合,但没有成功,我仍然遇到相同的错误:
AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming-Kafka Integration Guide".
编辑 2
我安装了最新的 Spark 3.2.0 和 Hadoop 3.3.1 以及 kafka 版本 kafka_2.12-2.8.1。更改了所有环境变量,测试了 Spark 和 Kafka - 工作正常。
我的环境变量现在看起来像这样:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0,org.apache.kafka:kafka-clients:2.8.1'
仍然没有运气,我得到同样的错误:(
Spark documentation is not really helpful - it says ... artifactId = spark-sql-kafka-0-10_2.12 version = 3.2.0 ...
是的,这是正确的...但是对于最新版本的 Spark
versions:
- spark 3.1.2
您是否尝试查看 version specific docs?
换句话说,您想要匹配的 spark-sql-kafka
版本 3.1.2.
bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2
或在Python,
import os
spark_version = '3.1.2'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:{}'.format(spark_version)
# init spark here
need to add this above library and its dependencies
如您在我之前的回答中所见,还使用逗号分隔列表附加 kafka-clients
包。
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,org.apache.kafka:kafka-clients:2.8.1
I'm developing app, I don't want to deploy it.
“部署”是 Spark 术语。 运行本地还是一个“deployment”