pyspark 以 reduced/compressed 个小文件写入配置单元 table

pyspark write to hive table in reduced/compressed number of small files

每次进程运行时我都会更新一个数据帧记录,这意味着每次进程完成时我都会有一个一行 4 列的数据帧。 然后我将使用数据帧写入和镶木地板格式将其插入到配置单元 table 中。 由于一次一条记录,我在 hfds 的 table 文件夹中看到了很多小文件。

当我将数据写入配置单元时,您能否告诉我如何减少并将其写入同一个文件(parquet 文件)table??

hdfs location: user_id/employe_db/market_table/
from:
part-04498-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet
part-04497-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet
part-04496-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet
part-04496-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet
part-04450-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet
part-04449-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet

to:
part-03049-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet

如何将 parquet 文件的数量减少到固定的 less 文件数量,以及 load/write 将新数据添加到现有文件中?? part-04499-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet

在写入 HDFS 之前,您可以 repartition(1) 以便每次执行创建 1 个文件。

df.repartition(1).write.parquet("<directory>")

Merging files:

Using Hive:

如果您已经在 user_id/employe_db/market_table/ 目录顶部有配置单元 table,那么 运行 通过选择相同的 table.

插入覆盖
spark.sql("insert overwrite table <db>.<tab_market> select * from <db>.<tab_market>")

--只创建一个文件然后使用 order by

spark.sql("insert overwrite table <db>.<tab_market> select * from <db>.<tab_market> order by <column>")

您也可以 运行 插入语句,就像在 Hive 中一样。

(或)

Using Spark:

作为 post 摄取过程,您可以再次 从目录中读取 parquet 文件 然后再次重新分区并写入目录。

df_src=spark.read.parquet("<directory>")
df_src.repartition(<number>).write.mode("overwrite").parquet("<directory>")

NOTE

  • overwrite 首先删除目录,以防万一作业失败我们可能会遇到数据丢失。
  • 最佳做法是将数据备份到 tmp 目录,然后只覆盖