pyspark 以 reduced/compressed 个小文件写入配置单元 table
pyspark write to hive table in reduced/compressed number of small files
每次进程运行时我都会更新一个数据帧记录,这意味着每次进程完成时我都会有一个一行 4 列的数据帧。
然后我将使用数据帧写入和镶木地板格式将其插入到配置单元 table 中。
由于一次一条记录,我在 hfds 的 table 文件夹中看到了很多小文件。
当我将数据写入配置单元时,您能否告诉我如何减少并将其写入同一个文件(parquet 文件)table??
hdfs location: user_id/employe_db/market_table/
from:
part-04498-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet
part-04497-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet
part-04496-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet
part-04496-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet
part-04450-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet
part-04449-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet
to:
part-03049-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet
如何将 parquet 文件的数量减少到固定的 less 文件数量,以及 load/write 将新数据添加到现有文件中??
part-04499-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet
在写入 HDFS
之前,您可以 repartition(1)
以便每次执行创建 1 个文件。
df.repartition(1).write.parquet("<directory>")
Merging files:
Using Hive:
如果您已经在 user_id/employe_db/market_table/
目录顶部有配置单元 table,那么 运行 通过选择相同的 table.
插入覆盖
spark.sql("insert overwrite table <db>.<tab_market> select * from <db>.<tab_market>")
--只创建一个文件然后使用 order by
spark.sql("insert overwrite table <db>.<tab_market> select * from <db>.<tab_market> order by <column>")
您也可以 运行 插入语句,就像在 Hive 中一样。
(或)
Using Spark:
作为 post 摄取过程,您可以再次 从目录中读取 parquet 文件 然后再次重新分区并写入目录。
df_src=spark.read.parquet("<directory>")
df_src.repartition(<number>).write.mode("overwrite").parquet("<directory>")
NOTE
- overwrite 首先删除目录,以防万一作业失败我们可能会遇到数据丢失。
- 最佳做法是将数据备份到 tmp 目录,然后只覆盖
每次进程运行时我都会更新一个数据帧记录,这意味着每次进程完成时我都会有一个一行 4 列的数据帧。 然后我将使用数据帧写入和镶木地板格式将其插入到配置单元 table 中。 由于一次一条记录,我在 hfds 的 table 文件夹中看到了很多小文件。
当我将数据写入配置单元时,您能否告诉我如何减少并将其写入同一个文件(parquet 文件)table??
hdfs location: user_id/employe_db/market_table/
from:
part-04498-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet
part-04497-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet
part-04496-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet
part-04496-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet
part-04450-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet
part-04449-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet
to:
part-03049-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet
如何将 parquet 文件的数量减少到固定的 less 文件数量,以及 load/write 将新数据添加到现有文件中?? part-04499-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet
在写入 HDFS
之前,您可以 repartition(1)
以便每次执行创建 1 个文件。
df.repartition(1).write.parquet("<directory>")
Merging files:
Using Hive:
如果您已经在 user_id/employe_db/market_table/
目录顶部有配置单元 table,那么 运行 通过选择相同的 table.
spark.sql("insert overwrite table <db>.<tab_market> select * from <db>.<tab_market>")
--只创建一个文件然后使用 order by
spark.sql("insert overwrite table <db>.<tab_market> select * from <db>.<tab_market> order by <column>")
您也可以 运行 插入语句,就像在 Hive 中一样。
(或)
Using Spark:
作为 post 摄取过程,您可以再次 从目录中读取 parquet 文件 然后再次重新分区并写入目录。
df_src=spark.read.parquet("<directory>")
df_src.repartition(<number>).write.mode("overwrite").parquet("<directory>")
NOTE
- overwrite 首先删除目录,以防万一作业失败我们可能会遇到数据丢失。
- 最佳做法是将数据备份到 tmp 目录,然后只覆盖