使用修改后的 PySpark DataFrame 覆盖现有的 Parquet 数据集
overwrite existing Parquet dataset with modified PySpark DataFrame
用例是将列附加到 Parquet 数据集,然后在同一位置高效地重写。这是一个最小的例子。
创建一个 pandas
DataFrame 并写入一个分区的 Parquet 数据集。
import pandas as pd
df = pd.DataFrame({
'id': ['a','a','a','b','b','b','b','c','c'],
'value': [0,1,2,3,4,5,6,7,8]})
path = r'c:/data.parquet'
df.to_parquet(path=path, engine='pyarrow', compression='snappy', index=False, partition_cols=['id'], flavor='spark')
然后将 Parquet 数据集加载为 pyspark
视图并创建修改后的数据集作为 pyspark
DataFrame。
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.read.parquet(path).createTempView('data')
sf = spark.sql(f"""SELECT id, value, 0 AS segment FROM data""")
此时 sf
数据与 df
数据相同,但多了一个全为零的 segment
列。我想用 sf
有效地覆盖 path
处的现有 Parquet 数据集作为同一位置的 Parquet 数据集。以下是不起作用的。也不希望将 sf
写入新位置、删除旧的 Parquet 数据集并重命名,因为这似乎效率不高。
# saves existing data and new data
sf.write.partitionBy('id').mode('append').parquet(path)
# immediately deletes existing data then crashes
sf.write.partitionBy('id').mode('overwrite').parquet(path)
我的简短回答:你不应该 :\
大数据(spark 是大数据)的一个原则是永远不要覆盖东西。当然,存在 .mode('overwrite')
,但这不是正确的用法。
我对它可能(应该)失败的原因的猜测:
- 您添加了一列,因此写入的数据集的格式与当前存储在那里的数据集的格式不同。这会造成模式混乱
- 您在处理时覆盖了输入数据。所以 spark 读取一些行,处理它们并覆盖输入文件。但是这些文件仍然是其他行要处理的输入。
在这种情况下我通常做的是创建另一个数据集,当没有理由保留旧数据集时(即处理完全完成时),清理它。要删除文件,您可以检查 。它应该适用于 spark 可访问的所有文件。但是它是在scala中,所以我不确定它是否可以适应pyspark。
请注意,效率不是覆盖的好理由,它做了更多的工作
简单的写。
用例是将列附加到 Parquet 数据集,然后在同一位置高效地重写。这是一个最小的例子。
创建一个 pandas
DataFrame 并写入一个分区的 Parquet 数据集。
import pandas as pd
df = pd.DataFrame({
'id': ['a','a','a','b','b','b','b','c','c'],
'value': [0,1,2,3,4,5,6,7,8]})
path = r'c:/data.parquet'
df.to_parquet(path=path, engine='pyarrow', compression='snappy', index=False, partition_cols=['id'], flavor='spark')
然后将 Parquet 数据集加载为 pyspark
视图并创建修改后的数据集作为 pyspark
DataFrame。
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.read.parquet(path).createTempView('data')
sf = spark.sql(f"""SELECT id, value, 0 AS segment FROM data""")
此时 sf
数据与 df
数据相同,但多了一个全为零的 segment
列。我想用 sf
有效地覆盖 path
处的现有 Parquet 数据集作为同一位置的 Parquet 数据集。以下是不起作用的。也不希望将 sf
写入新位置、删除旧的 Parquet 数据集并重命名,因为这似乎效率不高。
# saves existing data and new data
sf.write.partitionBy('id').mode('append').parquet(path)
# immediately deletes existing data then crashes
sf.write.partitionBy('id').mode('overwrite').parquet(path)
我的简短回答:你不应该 :\
大数据(spark 是大数据)的一个原则是永远不要覆盖东西。当然,存在 .mode('overwrite')
,但这不是正确的用法。
我对它可能(应该)失败的原因的猜测:
- 您添加了一列,因此写入的数据集的格式与当前存储在那里的数据集的格式不同。这会造成模式混乱
- 您在处理时覆盖了输入数据。所以 spark 读取一些行,处理它们并覆盖输入文件。但是这些文件仍然是其他行要处理的输入。
在这种情况下我通常做的是创建另一个数据集,当没有理由保留旧数据集时(即处理完全完成时),清理它。要删除文件,您可以检查
请注意,效率不是覆盖的好理由,它做了更多的工作 简单的写。