UPSERT 在镶木地板 Pyspark 中
UPSERT in parquet Pyspark
我在 s3 中有以下分区的镶木地板文件:
年/月/日/some_id
使用 Spark (PySpark),每天我都想对最后 14 天 进行 UPSERT - 我想替换 s3 中的现有数据(每个分区一个镶木地板文件),但是不要删除 14 天之前的日期..
我尝试了两种保存模式:
append - 不好,因为它只是添加了另一个文件。
覆盖 - 正在删除过去的数据和其他分区的数据。
有什么方法或最佳实践可以克服这个问题吗?我应该从每个 运行 中的 s3 读取所有数据,然后再写回吗?也许重命名文件以便 append 将替换 s3 中的当前文件?
非常感谢!
据我所知,S3 没有更新操作。对象一旦添加到 s3 就无法修改。 (您必须替换另一个对象或附加文件)
无论如何你担心你必须读取所有数据,你可以指定你想要读取的时间线,分区修剪有助于只读取时间线内的分区。
我经常做类似的事情。在我的例子中,我做了一个 ETL 并将一天的数据附加到 parquet 文件:
关键是要处理您要写入的数据(在我的例子中是实际日期),确保按 date
列进行分区并覆盖 当前的所有数据日期.
这将保留所有旧数据。例如:
(
sdf
.write
.format("parquet")
.mode("overwrite")
.partitionBy("date")
.option("replaceWhere", "2020-01-27")
.save(uri)
)
您还可以查看 delta.io,它是 parquet 格式的扩展,提供了一些有趣的功能,例如 ACID 交易。
感谢大家提供有用的解决方案。
我最终使用了一些服务于我的用例的配置——在我编写 parquet 时使用 overwrite 模式,以及这个配置:
我添加了这个配置:
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
使用此配置,spark 将只覆盖它有数据要写入的分区。所有其他(过去的)分区都保持不变 - 请参阅此处:
https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-dynamic-partition-inserts.html
我在 s3 中有以下分区的镶木地板文件: 年/月/日/some_id 使用 Spark (PySpark),每天我都想对最后 14 天 进行 UPSERT - 我想替换 s3 中的现有数据(每个分区一个镶木地板文件),但是不要删除 14 天之前的日期.. 我尝试了两种保存模式: append - 不好,因为它只是添加了另一个文件。 覆盖 - 正在删除过去的数据和其他分区的数据。
有什么方法或最佳实践可以克服这个问题吗?我应该从每个 运行 中的 s3 读取所有数据,然后再写回吗?也许重命名文件以便 append 将替换 s3 中的当前文件?
非常感谢!
据我所知,S3 没有更新操作。对象一旦添加到 s3 就无法修改。 (您必须替换另一个对象或附加文件)
无论如何你担心你必须读取所有数据,你可以指定你想要读取的时间线,分区修剪有助于只读取时间线内的分区。
我经常做类似的事情。在我的例子中,我做了一个 ETL 并将一天的数据附加到 parquet 文件:
关键是要处理您要写入的数据(在我的例子中是实际日期),确保按 date
列进行分区并覆盖 当前的所有数据日期.
这将保留所有旧数据。例如:
(
sdf
.write
.format("parquet")
.mode("overwrite")
.partitionBy("date")
.option("replaceWhere", "2020-01-27")
.save(uri)
)
您还可以查看 delta.io,它是 parquet 格式的扩展,提供了一些有趣的功能,例如 ACID 交易。
感谢大家提供有用的解决方案。 我最终使用了一些服务于我的用例的配置——在我编写 parquet 时使用 overwrite 模式,以及这个配置:
我添加了这个配置:
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
使用此配置,spark 将只覆盖它有数据要写入的分区。所有其他(过去的)分区都保持不变 - 请参阅此处:
https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-dynamic-partition-inserts.html