PySpark 实木复合地板文件在转换后覆盖

PySpark parquet file overwriting after transformation

我正在尝试在 parquet 文件中进行转换(联合两个表,或添加一列)然后保存它,而 运行 进入 FileNotFound 错误。

重现我的错误的片段如下:

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pandas as pd

spark = SparkSession.builder.appName('test').getOrCreate()

# create some data
a = spark.createDataFrame(pd.DataFrame({
    'x': [1, 2, 3], 'y':[1, 2, 3]
}))
a.write.mode('overwrite').parquet('./a')

# make some transformation onto the data
a = spark.read.parquet('./a')
b = spark.read.parquet('./a')
# such as Union, and the same error replacing the following by spark.sql()
c = a.union(b).withColumn('id', monotonically_increasing_id())
c.show()

# overwrite (WHERE I got the FileNotFound error)
# one can see the parquet-xxxxxx.snappy.parquet got suppressed when overwrite
c.write.mode('overwrite').parquet('./a')

错误:

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 105.0 failed 1 times, most recent failure: Lost task 2.0 in stage 105.0 (TID 265) (suweiguodembp.home executor driver): java.io.FileNotFoundException: File file: /data-xxx-xxxxxxx.snappy.parquet

我的目标是为新应用程序修改数据中的某些行。但为什么它不能合并或插入列呢?为 parquet 文件追加记录的正确方法是什么

spark 的哲学是'not to overwrite the data'。 mode('overwrite') 要做的第一件事就是删除路径。由于您使用的是 相同的 路径,因此在重写之前您已经丢失了数据。您需要进行转换并写入另一个位置(例如 ./temp),然后在新位置进一步使用 'cleaned' 数据。 (即使你需要)你把它从另一个地方移回原来的地方(./a)。