如何使用 PySpark 从数据框中获取 1000 条记录并写入文件?
How to get 1000 records from dataframe and write into a file using PySpark?
我在数据框中有超过 100,000 条记录。我想动态创建一个文件并为每个文件推送 1000 条记录。谁能帮我解决这个问题,先谢谢了。
首先,创建一个行号列
df = df.withColumn('row_num', F.row_number().over(Window.orderBy('any_column'))
现在,运行循环并保存记录。
for i in range(0, df.count(), 1000):
records = df.where(F.col("row_num").between(i, i+999))
records.toPandas().to_csv("file-{}.csv".format(i))
您可以在写入 dataframe
时使用 maxRecordsPerFile
选项。
- 如果你需要整个数据帧在每个文件中写入1000条记录然后使用
repartition(1)
(or)
为每个分区写入1000条记录 使用 .coalesce(1)
Example:
# 1000 records written per file in each partition
df.coalesce(1).write.option("maxRecordsPerFile", 1000).mode("overwrite").parquet(<path>)
# 1000 records written per file for dataframe 100 files created for 100,000
df.repartition(1).write.option("maxRecordsPerFile", 1000).mode("overwrite").parquet(<path>)
#or by set config on spark session
spark.conf.set("spark.sql.files.maxRecordsPerFile", 1000)
#or
spark.sql("set spark.sql.files.maxRecordsPerFile=1000").show()
df.coalesce(1).write.mode("overwrite").parquet(<path>)
df.repartition(1).write.mode("overwrite").parquet(<path>)
Method-2:
Caluculating number of partitions then repartition the dataframe:
df = spark.range(10000)
#caluculate partitions
no_partitions=df.count()/1000
from pyspark.sql.functions import *
#repartition and check number of records on each partition
df.repartition(no_partitions).\
withColumn("partition_id",spark_partition_id()).\
groupBy(col("partition_id")).\
agg(count("*")).\
show()
#+-----------+--------+
#|partiton_id|count(1)|
#+-----------+--------+
#| 1| 1001|
#| 6| 1000|
#| 3| 999|
#| 5| 1000|
#| 9| 1000|
#| 4| 999|
#| 8| 1000|
#| 7| 1000|
#| 2| 1001|
#| 0| 1000|
#+-----------+--------+
df.repartition(no_partitions).write.mode("overwrite").parquet(<path>)
我在数据框中有超过 100,000 条记录。我想动态创建一个文件并为每个文件推送 1000 条记录。谁能帮我解决这个问题,先谢谢了。
首先,创建一个行号列
df = df.withColumn('row_num', F.row_number().over(Window.orderBy('any_column'))
现在,运行循环并保存记录。
for i in range(0, df.count(), 1000):
records = df.where(F.col("row_num").between(i, i+999))
records.toPandas().to_csv("file-{}.csv".format(i))
您可以在写入 dataframe
时使用 maxRecordsPerFile
选项。
- 如果你需要整个数据帧在每个文件中写入1000条记录然后使用
repartition(1)
(or)
为每个分区写入1000条记录 使用.coalesce(1)
Example:
# 1000 records written per file in each partition
df.coalesce(1).write.option("maxRecordsPerFile", 1000).mode("overwrite").parquet(<path>)
# 1000 records written per file for dataframe 100 files created for 100,000
df.repartition(1).write.option("maxRecordsPerFile", 1000).mode("overwrite").parquet(<path>)
#or by set config on spark session
spark.conf.set("spark.sql.files.maxRecordsPerFile", 1000)
#or
spark.sql("set spark.sql.files.maxRecordsPerFile=1000").show()
df.coalesce(1).write.mode("overwrite").parquet(<path>)
df.repartition(1).write.mode("overwrite").parquet(<path>)
Method-2:
Caluculating number of partitions then repartition the dataframe:
df = spark.range(10000)
#caluculate partitions
no_partitions=df.count()/1000
from pyspark.sql.functions import *
#repartition and check number of records on each partition
df.repartition(no_partitions).\
withColumn("partition_id",spark_partition_id()).\
groupBy(col("partition_id")).\
agg(count("*")).\
show()
#+-----------+--------+
#|partiton_id|count(1)|
#+-----------+--------+
#| 1| 1001|
#| 6| 1000|
#| 3| 999|
#| 5| 1000|
#| 9| 1000|
#| 4| 999|
#| 8| 1000|
#| 7| 1000|
#| 2| 1001|
#| 0| 1000|
#+-----------+--------+
df.repartition(no_partitions).write.mode("overwrite").parquet(<path>)