如何使用 PySpark 从数据框中获取 1000 条记录并写入文件?

How to get 1000 records from dataframe and write into a file using PySpark?

我在数据框中有超过 100,000 条记录。我想动态创建一个文件并为每个文件推送 1000 条记录。谁能帮我解决这个问题,先谢谢了。

首先,创建一个行号列

df = df.withColumn('row_num', F.row_number().over(Window.orderBy('any_column'))

现在,运行循环并保存记录。

for i in range(0, df.count(), 1000):
   records = df.where(F.col("row_num").between(i, i+999))
   records.toPandas().to_csv("file-{}.csv".format(i))

您可以在写入 dataframe 时使用 maxRecordsPerFile 选项。

  • 如果你需要整个数据帧在每个文件中写入1000条记录然后使用repartition(1)(or)每个分区写入1000条记录 使用 .coalesce(1)

Example:

# 1000 records written per file in each partition
df.coalesce(1).write.option("maxRecordsPerFile", 1000).mode("overwrite").parquet(<path>)

# 1000 records written per file for dataframe 100 files created for 100,000
df.repartition(1).write.option("maxRecordsPerFile", 1000).mode("overwrite").parquet(<path>)

#or by set config on spark session
spark.conf.set("spark.sql.files.maxRecordsPerFile", 1000)
#or
spark.sql("set spark.sql.files.maxRecordsPerFile=1000").show()

df.coalesce(1).write.mode("overwrite").parquet(<path>)
df.repartition(1).write.mode("overwrite").parquet(<path>)

Method-2:

Caluculating number of partitions then repartition the dataframe:

df = spark.range(10000)

#caluculate partitions
no_partitions=df.count()/1000

from pyspark.sql.functions import *

#repartition and check number of records on each partition
df.repartition(no_partitions).\
withColumn("partition_id",spark_partition_id()).\
groupBy(col("partition_id")).\
agg(count("*")).\
show()

#+-----------+--------+
#|partiton_id|count(1)|
#+-----------+--------+
#|          1|    1001|
#|          6|    1000|
#|          3|     999|
#|          5|    1000|
#|          9|    1000|
#|          4|     999|
#|          8|    1000|
#|          7|    1000|
#|          2|    1001|
#|          0|    1000|
#+-----------+--------+

df.repartition(no_partitions).write.mode("overwrite").parquet(<path>)