UPSERT 在镶木地板 Pyspark 中

UPSERT in parquet Pyspark

我在 s3 中有以下分区的镶木地板文件: 年/月/日/some_id 使用 Spark (PySpark),每天我都想对最后 14 天 进行 UPSERT - 我想替换 s3 中的现有数据(每个分区一个镶木地板文件),但是不要删除 14 天之前的日期.. 我尝试了两种保存模式: append - 不好,因为它只是添加了另一个文件。 覆盖 - 正在删除过去的数据和其他分区的数据。

有什么方法或最佳实践可以克服这个问题吗?我应该从每个 运行 中的 s3 读取所有数据,然后再写回吗?也许重命名文件以便 append 将替换 s3 中的当前文件?

非常感谢!

据我所知,S3 没有更新操作。对象一旦添加到 s3 就无法修改。 (您必须替换另一个对象或附加文件)

无论如何你担心你必须读取所有数据,你可以指定你想要读取的时间线,分区修剪有助于只读取时间线内的分区。

我经常做类似的事情。在我的例子中,我做了一个 ETL 并将一天的数据附加到 parquet 文件:

关键是要处理您要写入的数据(在我的例子中是实际日期),确保按 date 列进行分区并覆盖 当前的所有数据日期.

这将保留所有旧数据。例如:

(
    sdf
    .write
    .format("parquet")
    .mode("overwrite")
    .partitionBy("date")
    .option("replaceWhere", "2020-01-27")
    .save(uri)
)

您还可以查看 delta.io,它是 parquet 格式的扩展,提供了一些有趣的功能,例如 ACID 交易。

感谢大家提供有用的解决方案。 我最终使用了一些服务于我的用例的配置——在我编写 parquet 时使用 overwrite 模式,以及这个配置:

我添加了这个配置:

spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

使用此配置,spark 将只覆盖它有数据要写入的分区。所有其他(过去的)分区都保持不变 - 请参阅此处:

https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-dynamic-partition-inserts.html