PySpark 在终端中工作,但在 Python 代码中执行时不工作

PySpark works in terminal but not when executed in Python code

我正在尝试读取 avro 文件类型。以下是我在网上找到的用于测试我的代码的示例数据源:

https://github.com/Teradata/kylo/blob/master/samples/sample-data/avro/userdata1.avro

以下是我的代码(请假设source_path是上面链接数据的路径):

from pyspark.sql import SparkSession

def avro_reader(source_path: str):

    spark = SparkSession \
        .builder \
        .master("yarn") \
        .enableHiveSupport() \
        .config("hive.exec.dynamic.partition", "true") \
        .config("hive.exec.dynamic.partition.mode", "nonstrict") \
        .getOrCreate()

    reader = spark.read.format("com.databricks.spark.avro").load(source_path)

    return reader.show()

print(avro_reader(source_path))

以下是我收到的错误:

Warning: Ignoring non-Spark config property: hive.exec.dynamic.partition.mode
Warning: Ignoring non-Spark config property: hive.exec.dynamic.partition
Exception in thread "main" org.apache.spark.SparkException: When running with master 'yarn' either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.
        at org.apache.spark.deploy.SparkSubmitArguments.error(SparkSubmitArguments.scala:631)
        at org.apache.spark.deploy.SparkSubmitArguments.validateSubmitArguments(SparkSubmitArguments.scala:271)
        at org.apache.spark.deploy.SparkSubmitArguments.validateArguments(SparkSubmitArguments.scala:234)
        at org.apache.spark.deploy.SparkSubmitArguments.<init>(SparkSubmitArguments.scala:119)
        at org.apache.spark.deploy.SparkSubmit$$anon$$anon.<init>(SparkSubmit.scala:1022)
        at org.apache.spark.deploy.SparkSubmit$$anon.parseArguments(SparkSubmit.scala:1022)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:85)
        at org.apache.spark.deploy.SparkSubmit$$anon.doSubmit(SparkSubmit.scala:1039)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

当我在终端中 运行 pyspark 时,Spark 工作得很好。所以,我不确定是什么导致了这个问题。以下是 运行ning pyspark 在终端中的输出:

Python 3.8.2 (default, Apr  8 2021, 23:19:18) 
[Clang 12.0.5 (clang-1205.0.22.9)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
21/06/15 01:15:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.1.2
      /_/

Using Python version 3.8.2 (default, Apr  8 2021 23:19:18)

按照删除的建议 .master("yarn"),这是错误:

Warning: Ignoring non-Spark config property: hive.exec.dynamic.partition.mode
Warning: Ignoring non-Spark config property: hive.exec.dynamic.partition
21/06/15 12:40:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Traceback (most recent call last):
  File "main.py", line 88, in <module>
    print(avro_reader('userdata1.avro'))
  File "main.py", line 26, in avro_reader
    reader = spark.read.format("com.databricks.spark.avro").load(source_path)
  File "/Users/zaki.siyaji/Desktop/avro_proj/venv/lib/python3.8/site-packages/pyspark/sql/readwriter.py", line 204, in load
    return self._df(self._jreader.load(path))
  File "/Users/zaki.siyaji/Desktop/avro_proj/venv/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/Users/zaki.siyaji/Desktop/avro_proj/venv/lib/python3.8/site-packages/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/Users/zaki.siyaji/Desktop/avro_proj/venv/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o36.load.
: java.lang.ClassNotFoundException: Failed to find data source: org.apache.spark.sql.avro.AvroFileFormat. Please find packages at http://spark.apache.org/third-party-projects.html
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:692)
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:746)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:265)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:239)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.avro.AvroFileFormat.DefaultSource
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
        at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource(DataSource.scala:666)
        at scala.util.Try$.apply(Try.scala:213)
        at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource(DataSource.scala:666)
        at scala.util.Failure.orElse(Try.scala:224)
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:666)
        ... 14 more

如果您 运行 在本地而不是在 YARN 集群上,请删除 .master("yarn")。如果您 运行 在 YARN 集群上,那么您需要正确设置环境,并遵循 documentation on submitting to YARN.

出现新错误消息后更新:

  1. 您需要从 "com.databricks.spark.avro" 更改为 "avro",因为 Avro 现在由 Spark 本身支持。
  2. 并且您需要提交附有正确库的作业 (doc):
./bin/spark-submit --packages org.apache.spark:spark-avro_2.12:<spark_version>