pyspark df.write 创建文件失败
pyspark df.write fails on file creation
写入数据帧时,pyspark 创建目录,在该目录创建一个临时目录,但没有文件。 csv 和 parquet 格式 return 类似错误。它失败了:
```Py4JJavaError: An error occurred while calling o99.save: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:496)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:251)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:186)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands.$anonfun$applyOrElse(QueryExecution.scala:110)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands.applyOrElse(QueryExecution.scala:110)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands.applyOrElse(QueryExecution.scala:106)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning(TreeNode.scala:481)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1215)
at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1420)
at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:334)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:404)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:182)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write(FileFormatWriter.scala:240)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:605)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:240)
... 41 more```
我的代码:
```
output_path = ".data/output/FireIncidents/"
# fire_df.write.format("parquet").mode("overwrite").save(output_path)# fails, makes folder but no files.
# fire_df.write.parquet(output_path, mode='overwrite') # fails creates folder by no file.
# fire_df.write.csv(output_path, mode='overwrite') # fails, makes folder but no files.
# fire_df.write.save(output_path,format='parquet') # fails, makes folder but no files.
# fire_df.write.format("parquet").saveAsTable("tab1") # fails, makes folder but no files.
# fire_df.select("incident_number","call_datetime","incident_address").write.save("namesAndFavColors.parquet") # from the documentation, same effect
df_writer = pyspark.sql.DataFrameWriter(fire_df)
df_writer = df_writer.saveAsTable('test', format='parquet', mode='overwrite',path=output_path)
type(fire_df) ==pyspark.sql.dataframe.DataFrame
fire_df.select('incident_number', 'call_datetime','incident_address').show()
```+---------------+--------------------+--------------------+
|incident_number| call_datetime| incident_address|
+---------------+--------------------+--------------------+
| 18-0032836|2018/09/06 12:13:...| 16888 RIVERVIEW ST|
| 19-0019239|2019/06/03 06:46:...| 18469 GREENVIEW AVE|
| 20-0010724|2020/04/05 10:44:...| 2930 CODY ST|
```
etc.
文档:
我已经尝试了上面的所有变体,多种格式,不止一个版本的 Hadoop,
HADOOP_HOME== "c:\hadoop"
hadoop 3.2.1 和/或 3.2.2(都试过了)
pyspark 3.2.0
类似的 SO 问题,没有解决。
pyspark creates output file as folder
(请注意请求者指出创建的目录为空的注释。)
看起来 windows 本机 IO 库不存在。
Hadoop requires native libraries on Windows to work properly -that includes to access the file:// filesystem, where Hadoop uses some Windows APIs to implement posix-like file access permissions.
This is implemented in HADOOP.DLL and WINUTILS.EXE
In particular, %HADOOP_HOME%\BIN\WINUTILS.EXE must be locatable.
If it is not, Hadoop or an application built on top of Hadoop will fail.
见https://cwiki.apache.org/confluence/display/HADOOP2/WindowsProblems
您没有收到 java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.
错误这一事实告诉我您已经设置了 HADOOP_HOME
环境变量。
如果 HADOOP_HOME
目录 没有 有一个包含 winutils.exe
和 hadoop.dll
文件的“bin”子目录,IIRC,你得到一个不同的错误。
Tangent/FYI:当我试图找出如何获取那 2 个 Windows 二进制文件时,我尝试了这 3 个位置:
- https://github.com/steveloughran/winutils/(至 v3.0.0)
- https://github.com/cdarlint/winutils/(新点;我在 PySpark 3.2.1 上,我选择了 3.2.2 版本(最新版本为 2022-01-29)。)
- https://github.com/kontext-tech/winutils/(这有更新的版本,但我最终没有使用它们)
因此,根据以上所述,我认为您缺少的是 PATH
中的 $env:HADOOP_HOME\bin
文件夹。我 运行 在提升的 Powershell 提示符下执行此操作,然后重新启动我的环境 (VsCode)。在那之后,我能够按预期保存 Parquet 文件。
[Environment]::SetEnvironmentVariable("PATH", "$env:PATH;$env:HADOOP_HOME\bin", "Machine")
还有最后一点上下文,这是我用来测试的 Python 代码; .csv 没什么特别的。
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sdf = spark.read.options(header=True).csv("baseline-test.csv")
sdf.write.mode("overwrite").parquet("baseline-test-parquet")
写入数据帧时,pyspark 创建目录,在该目录创建一个临时目录,但没有文件。 csv 和 parquet 格式 return 类似错误。它失败了:
```Py4JJavaError: An error occurred while calling o99.save: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:496)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:251)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:186)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands.$anonfun$applyOrElse(QueryExecution.scala:110)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands.applyOrElse(QueryExecution.scala:110)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands.applyOrElse(QueryExecution.scala:106)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning(TreeNode.scala:481)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1215)
at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1420)
at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:334)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:404)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:182)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write(FileFormatWriter.scala:240)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:605)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:240)
... 41 more```
我的代码:
```
output_path = ".data/output/FireIncidents/"
# fire_df.write.format("parquet").mode("overwrite").save(output_path)# fails, makes folder but no files.
# fire_df.write.parquet(output_path, mode='overwrite') # fails creates folder by no file.
# fire_df.write.csv(output_path, mode='overwrite') # fails, makes folder but no files.
# fire_df.write.save(output_path,format='parquet') # fails, makes folder but no files.
# fire_df.write.format("parquet").saveAsTable("tab1") # fails, makes folder but no files.
# fire_df.select("incident_number","call_datetime","incident_address").write.save("namesAndFavColors.parquet") # from the documentation, same effect
df_writer = pyspark.sql.DataFrameWriter(fire_df)
df_writer = df_writer.saveAsTable('test', format='parquet', mode='overwrite',path=output_path)
type(fire_df) ==pyspark.sql.dataframe.DataFrame
fire_df.select('incident_number', 'call_datetime','incident_address').show()
```+---------------+--------------------+--------------------+
|incident_number| call_datetime| incident_address|
+---------------+--------------------+--------------------+
| 18-0032836|2018/09/06 12:13:...| 16888 RIVERVIEW ST|
| 19-0019239|2019/06/03 06:46:...| 18469 GREENVIEW AVE|
| 20-0010724|2020/04/05 10:44:...| 2930 CODY ST|
```
etc.
文档: 我已经尝试了上面的所有变体,多种格式,不止一个版本的 Hadoop, HADOOP_HOME== "c:\hadoop" hadoop 3.2.1 和/或 3.2.2(都试过了)
pyspark 3.2.0 类似的 SO 问题,没有解决。
pyspark creates output file as folder
(请注意请求者指出创建的目录为空的注释。)
看起来 windows 本机 IO 库不存在。
Hadoop requires native libraries on Windows to work properly -that includes to access the file:// filesystem, where Hadoop uses some Windows APIs to implement posix-like file access permissions.
This is implemented in HADOOP.DLL and WINUTILS.EXE
In particular, %HADOOP_HOME%\BIN\WINUTILS.EXE must be locatable.
If it is not, Hadoop or an application built on top of Hadoop will fail.
见https://cwiki.apache.org/confluence/display/HADOOP2/WindowsProblems
您没有收到 java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.
错误这一事实告诉我您已经设置了 HADOOP_HOME
环境变量。
如果 HADOOP_HOME
目录 没有 有一个包含 winutils.exe
和 hadoop.dll
文件的“bin”子目录,IIRC,你得到一个不同的错误。
Tangent/FYI:当我试图找出如何获取那 2 个 Windows 二进制文件时,我尝试了这 3 个位置:
- https://github.com/steveloughran/winutils/(至 v3.0.0)
- https://github.com/cdarlint/winutils/(新点;我在 PySpark 3.2.1 上,我选择了 3.2.2 版本(最新版本为 2022-01-29)。)
- https://github.com/kontext-tech/winutils/(这有更新的版本,但我最终没有使用它们)
因此,根据以上所述,我认为您缺少的是 PATH
中的 $env:HADOOP_HOME\bin
文件夹。我 运行 在提升的 Powershell 提示符下执行此操作,然后重新启动我的环境 (VsCode)。在那之后,我能够按预期保存 Parquet 文件。
[Environment]::SetEnvironmentVariable("PATH", "$env:PATH;$env:HADOOP_HOME\bin", "Machine")
还有最后一点上下文,这是我用来测试的 Python 代码; .csv 没什么特别的。
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sdf = spark.read.options(header=True).csv("baseline-test.csv")
sdf.write.mode("overwrite").parquet("baseline-test-parquet")