kafka 与 Pyspark 结构化流的集成 (Windows)
kafka integration with Pyspark structured streaming (Windows)
在我的 windows 10 机器上安装 anaconda 之后,我按照以下教程在我的机器上设置它并 运行 使用 jupyter : https://changhsinlee.com/install-pyspark-windows-jupyter/
- spark 版本是 3.1.2 python 是 3.8.8 所以兼容
现在将 kafka 与 pyspark 集成,这是我的代码:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import time
kafka_topic_name = "test_spark"
kafka_bootstrap_servers = '192.168.1.3:9092'
spark = SparkSession \
.builder \
.appName("PySpark Structured Streaming with Kafka and Message Format as JSON") \
.master("local[*]") \
.getOrCreate()
#Construct a streaming DataFrame that reads from TEST-SPARK
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
.option("subscribe", kafka_topic_name) \
.option("startingOffsets", "latest") \
.load()
这里显示了一个错误,我需要部署连接器:
AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide"
然后我转到页面并找到部署它的命令 是:./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 ...
- 我导航到我的 spark 文件夹所在的位置,并在 PowerShell 中以管理员身份执行命令,但出现以下错误:
PS D:\Spark\spark-3.1.2-bin-hadoop3.2> .\bin\spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 ...
:: loading settings :: url = jar:file:/D:/Spark/spark-3.1.2-bin-hadoop3.2/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: C:\Users\T460S\.ivy2\cache
The jars for the packages stored in: C:\Users\T460S\.ivy2\jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-03401043-c6c7-40dd-8667-8001083bfb4c;1.0
confs: [default]
found org.apache.spark#spark-sql-kafka-0-10_2.12;3.1.2 in central
found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.1.2 in central
found org.apache.kafka#kafka-clients;2.6.0 in central
found com.github.luben#zstd-jni;1.4.8-1 in central
found org.lz4#lz4-java;1.7.1 in central
found org.xerial.snappy#snappy-java;1.1.8.2 in central
found org.slf4j#slf4j-api;1.7.30 in local-m2-cache
found org.spark-project.spark#unused;1.0.0 in central
found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 595ms :: artifacts dl 19ms
:: modules in use:
com.github.luben#zstd-jni;1.4.8-1 from central in [default]
org.apache.commons#commons-pool2;2.6.2 from central in [default]
org.apache.kafka#kafka-clients;2.6.0 from central in [default]
org.apache.spark#spark-sql-kafka-0-10_2.12;3.1.2 from central in [default]
org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.1.2 from central in [default]
org.lz4#lz4-java;1.7.1 from central in [default]
org.slf4j#slf4j-api;1.7.30 from local-m2-cache in [default]
org.spark-project.spark#unused;1.0.0 from central in [default]
org.xerial.snappy#snappy-java;1.1.8.2 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 9 | 0 | 0 | 0 || 9 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-03401043-c6c7-40dd-8667-8001083bfb4c
confs: [default]
0 artifacts copied, 9 already retrieved (0kB/19ms)
21/06/23 19:32:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Exception in thread "main" org.apache.spark.SparkException: Failed to get main class in JAR with error 'D:\Spark\spark-3.1.2-bin-hadoop3.2\... (Accès refusé)'. Please specify one with --class.
at org.apache.spark.deploy.SparkSubmit.error(SparkSubmit.scala:968)
at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:486)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
at org.apache.spark.deploy.SparkSubmit.doRunMain(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$$anon.doSubmit(SparkSubmit.scala:1039)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
我尝试寻找解决方案,但没有任何效果,我不知道将什么作为参数传递给他们告诉我添加的 --class,它说:Access refuse 这意味着访问被拒绝 我不明白这一点,所以有人可以告诉我该怎么做吗?
- ps: 环境变量都已到位并且工作完美所以我不认为它来自
类似的错误(和相同的答案)- Spark Kafka Data Consuming Package
你真的在 --packages
选项后面写了 ...
吗?
错误提示您提供 .py
文件或 --class
以及包含您的应用程序代码的 JAR 文件
如果您确实提供了一个,那么 Spark 用户似乎无法访问您提供的 D:\
驱动器路径,您可能需要使用 winutils chmod
来修改该路径
如果你想 运行 Jupyter 中的代码,你也可以在那里添加 --packages
import os
SCALA_VERSION = '2.12'
SPARK_VERISON = '3.1.2'
os.environ['PYSPARK_SUBMIT_ARGS'] = f'--packages org.apache.spark:spark-sql-kafka-0-10_{SCALA_VERSION}:{SPARK_VERSION} pyspark-shell'
import findspark
import pyspark
findspark.init()
...
或使用findspark.add_packages()
- https://github.com/minrk/findspark/pull/11
在我的 windows 10 机器上安装 anaconda 之后,我按照以下教程在我的机器上设置它并 运行 使用 jupyter : https://changhsinlee.com/install-pyspark-windows-jupyter/
- spark 版本是 3.1.2 python 是 3.8.8 所以兼容 现在将 kafka 与 pyspark 集成,这是我的代码:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import time
kafka_topic_name = "test_spark"
kafka_bootstrap_servers = '192.168.1.3:9092'
spark = SparkSession \
.builder \
.appName("PySpark Structured Streaming with Kafka and Message Format as JSON") \
.master("local[*]") \
.getOrCreate()
#Construct a streaming DataFrame that reads from TEST-SPARK
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
.option("subscribe", kafka_topic_name) \
.option("startingOffsets", "latest") \
.load()
这里显示了一个错误,我需要部署连接器:
AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide"
然后我转到页面并找到部署它的命令 是:./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 ...
- 我导航到我的 spark 文件夹所在的位置,并在 PowerShell 中以管理员身份执行命令,但出现以下错误:
PS D:\Spark\spark-3.1.2-bin-hadoop3.2> .\bin\spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 ...
:: loading settings :: url = jar:file:/D:/Spark/spark-3.1.2-bin-hadoop3.2/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: C:\Users\T460S\.ivy2\cache
The jars for the packages stored in: C:\Users\T460S\.ivy2\jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-03401043-c6c7-40dd-8667-8001083bfb4c;1.0
confs: [default]
found org.apache.spark#spark-sql-kafka-0-10_2.12;3.1.2 in central
found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.1.2 in central
found org.apache.kafka#kafka-clients;2.6.0 in central
found com.github.luben#zstd-jni;1.4.8-1 in central
found org.lz4#lz4-java;1.7.1 in central
found org.xerial.snappy#snappy-java;1.1.8.2 in central
found org.slf4j#slf4j-api;1.7.30 in local-m2-cache
found org.spark-project.spark#unused;1.0.0 in central
found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 595ms :: artifacts dl 19ms
:: modules in use:
com.github.luben#zstd-jni;1.4.8-1 from central in [default]
org.apache.commons#commons-pool2;2.6.2 from central in [default]
org.apache.kafka#kafka-clients;2.6.0 from central in [default]
org.apache.spark#spark-sql-kafka-0-10_2.12;3.1.2 from central in [default]
org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.1.2 from central in [default]
org.lz4#lz4-java;1.7.1 from central in [default]
org.slf4j#slf4j-api;1.7.30 from local-m2-cache in [default]
org.spark-project.spark#unused;1.0.0 from central in [default]
org.xerial.snappy#snappy-java;1.1.8.2 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 9 | 0 | 0 | 0 || 9 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-03401043-c6c7-40dd-8667-8001083bfb4c
confs: [default]
0 artifacts copied, 9 already retrieved (0kB/19ms)
21/06/23 19:32:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Exception in thread "main" org.apache.spark.SparkException: Failed to get main class in JAR with error 'D:\Spark\spark-3.1.2-bin-hadoop3.2\... (Accès refusé)'. Please specify one with --class.
at org.apache.spark.deploy.SparkSubmit.error(SparkSubmit.scala:968)
at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:486)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
at org.apache.spark.deploy.SparkSubmit.doRunMain(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$$anon.doSubmit(SparkSubmit.scala:1039)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
我尝试寻找解决方案,但没有任何效果,我不知道将什么作为参数传递给他们告诉我添加的 --class,它说:Access refuse 这意味着访问被拒绝 我不明白这一点,所以有人可以告诉我该怎么做吗?
- ps: 环境变量都已到位并且工作完美所以我不认为它来自
类似的错误(和相同的答案)- Spark Kafka Data Consuming Package
你真的在 --packages
选项后面写了 ...
吗?
错误提示您提供 .py
文件或 --class
以及包含您的应用程序代码的 JAR 文件
如果您确实提供了一个,那么 Spark 用户似乎无法访问您提供的 D:\
驱动器路径,您可能需要使用 winutils chmod
来修改该路径
如果你想 运行 Jupyter 中的代码,你也可以在那里添加 --packages
import os
SCALA_VERSION = '2.12'
SPARK_VERISON = '3.1.2'
os.environ['PYSPARK_SUBMIT_ARGS'] = f'--packages org.apache.spark:spark-sql-kafka-0-10_{SCALA_VERSION}:{SPARK_VERSION} pyspark-shell'
import findspark
import pyspark
findspark.init()
...
或使用findspark.add_packages()
- https://github.com/minrk/findspark/pull/11