pyspark df.write 创建文件失败

pyspark df.write fails on file creation

写入数据帧时,pyspark 创建目录,在该目录创建一个临时目录,但没有文件。 csv 和 parquet 格式 return 类似错误。它失败了:

```Py4JJavaError: An error occurred while calling o99.save: org.apache.spark.SparkException: Job aborted.
    at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:496)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:251)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:186)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands.$anonfun$applyOrElse(QueryExecution.scala:110)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:90)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands.applyOrElse(QueryExecution.scala:110)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands.applyOrElse(QueryExecution.scala:106)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning(TreeNode.scala:481)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
    at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
    at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
    at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
    at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1215)
    at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1420)
    at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
    at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
    at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
    at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
    at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
    at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:334)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:404)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
    at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
    at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:182)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write(FileFormatWriter.scala:240)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:605)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:240)
    ... 41 more```

我的代码:

```
output_path = ".data/output/FireIncidents/"


# fire_df.write.format("parquet").mode("overwrite").save(output_path)# fails, makes folder but no files.  
# fire_df.write.parquet(output_path,  mode='overwrite')  # fails creates folder by no file. 
# fire_df.write.csv(output_path, mode='overwrite')  # fails, makes folder but no files. 
# fire_df.write.save(output_path,format='parquet')  # fails, makes folder but no files. 
# fire_df.write.format("parquet").saveAsTable("tab1")  # fails, makes folder but no files. 
# fire_df.select("incident_number","call_datetime","incident_address").write.save("namesAndFavColors.parquet") # from the documentation, same effect
df_writer = pyspark.sql.DataFrameWriter(fire_df)
df_writer = df_writer.saveAsTable('test', format='parquet', mode='overwrite',path=output_path)

type(fire_df) ==pyspark.sql.dataframe.DataFrame

fire_df.select('incident_number', 'call_datetime','incident_address').show()
```+---------------+--------------------+--------------------+
|incident_number|       call_datetime|    incident_address|
+---------------+--------------------+--------------------+
|     18-0032836|2018/09/06 12:13:...|  16888 RIVERVIEW ST|
|     19-0019239|2019/06/03 06:46:...| 18469 GREENVIEW AVE|
|     20-0010724|2020/04/05 10:44:...|        2930 CODY ST|
```
etc. 

文档:


我已经尝试了上面的所有变体,多种格式,不止一个版本的 Hadoop,

HADOOP_HOME== "c:\hadoop"

hadoop 3.2.1 和/或 3.2.2(都试过了) pyspark 3.2.0

类似的 SO 问题,没有解决。 pyspark creates output file as folder (请注意请求者指出创建的目录为空的注释。)

看起来 windows 本机 IO 库不存在。

Hadoop requires native libraries on Windows to work properly -that includes to access the file:// filesystem, where Hadoop uses some Windows APIs to implement posix-like file access permissions.

This is implemented in HADOOP.DLL and WINUTILS.EXE

In particular, %HADOOP_HOME%\BIN\WINUTILS.EXE must be locatable.

If it is not, Hadoop or an application built on top of Hadoop will fail.

https://cwiki.apache.org/confluence/display/HADOOP2/WindowsProblems

winutils repo

您没有收到 java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. 错误这一事实告诉我您已经设置了 HADOOP_HOME 环境变量。

如果 HADOOP_HOME 目录 没有 有一个包含 winutils.exehadoop.dll 文件的“bin”子目录,IIRC,你得到一个不同的错误。

Tangent/FYI:当我试图找出如何获取那 2 个 Windows 二进制文件时,我尝试了这 3 个位置:

  1. https://github.com/steveloughran/winutils/(至 v3.0.0)
  2. https://github.com/cdarlint/winutils/(新点;我在 PySpark 3.2.1 上,我选择了 3.2.2 版本(最新版本为 2022-01-29)。)
  3. https://github.com/kontext-tech/winutils/(这有更新的版本,但我最终没有使用它们)

因此,根据以上所述,我认为您缺少的是 PATH 中的 $env:HADOOP_HOME\bin 文件夹。我 运行 在提升的 Powershell 提示符下执行此操作,然后重新启动我的环境 (VsCode)。在那之后,我能够按预期保存 Parquet 文件。

[Environment]::SetEnvironmentVariable("PATH", "$env:PATH;$env:HADOOP_HOME\bin", "Machine")

还有最后一点上下文,这是我用来测试的 Python 代码; .csv 没什么特别的。

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sdf = spark.read.options(header=True).csv("baseline-test.csv")
sdf.write.mode("overwrite").parquet("baseline-test-parquet")