如何在 12 个 parquet 文件中自动执行 运行 相同聚合的过程,然后使用 PySpark 将结果加入 1 table 中?
How can I automate the process of running the same aggregation in 12 parquet files and then join the results in 1 table using PySpark?
我必须在包含 1 年数据(日级别)的镶木地板文件中进行 6 种不同的计算(按日计算总和和平均值)。问题是文件太大,Jupyter 在这个过程中崩溃了。所以我把文件分成了12个月(12个parquet文件)。我测试了服务器是否能够在合理的时间内对 1 个月的数据进行计算,结果确实如此。我想避免编写 72 个不同的查询(6 个计算 * 12 个月)。每次计算的结果都必须保存在镶木地板文件中,然后加入最终的 table。您建议如何通过在 PySpark 中自动化流程来解决这个问题?我将不胜感激任何建议。谢谢
这是我必须在数据的 12 个部分的每个部分中 运行 的代码示例:
month1= spark.read.parquet("s3://af/my_folder/month1.parquet")
month1.createOrReplaceTempView("month1")
month1sum= spark.sql("select id, date, sum(sessions) as sum_num_sessions from month1 where group by 1,2 order_by 1 asc")
month1sum.write.mode("overwrite").parquet("s3://af/my_folder/month1sum.parquet")
month1sum.createOrReplaceTempView("month1sum")
month_1_calculation=month1sum.groupBy('date').agg(avg('sum_num_sessions').alias('avg_sessions'))
month_1_calculation.write.mode("overwrite").parquet("s3://af/my_folder/month_1_calculation.parquet")```
快速方法:for 循环怎么样?
for i in range(1, 13):
month= spark.read.parquet(f"s3://af/my_folder/month{i}.parquet")
month.createOrReplaceTempView(f"month{i}")
monthsum= spark.sql(f"select id, date, sum(sessions) as sum_num_sessions from month{i} where group by 1,2 order_by 1 asc")
monthsum.write.mode("overwrite").parquet(f"s3://af/my_folder/month{i}sum.parquet")
monthsum.createOrReplaceTempView(f"month{i}sum")
month_calculation = monthsum.groupBy('date').agg(avg('sum_num_sessions').alias('avg_sessions'))
month_calculation.write.mode("overwrite").parquet(f"s3://af/my_folder/month_{i}_calculation.parquet")
长期的做法: Spark是为处理大数据而设计的,所以无论你的数据有多大,只要你有足够的硬件(核心数和内存) ), Spark 应该能够通过正确的配置来处理它。所以调整核心数量、执行程序内存、驱动程序内存、提高并行度(通过更改分区数量)……肯定可以解决您的问题。
我必须在包含 1 年数据(日级别)的镶木地板文件中进行 6 种不同的计算(按日计算总和和平均值)。问题是文件太大,Jupyter 在这个过程中崩溃了。所以我把文件分成了12个月(12个parquet文件)。我测试了服务器是否能够在合理的时间内对 1 个月的数据进行计算,结果确实如此。我想避免编写 72 个不同的查询(6 个计算 * 12 个月)。每次计算的结果都必须保存在镶木地板文件中,然后加入最终的 table。您建议如何通过在 PySpark 中自动化流程来解决这个问题?我将不胜感激任何建议。谢谢
这是我必须在数据的 12 个部分的每个部分中 运行 的代码示例:
month1= spark.read.parquet("s3://af/my_folder/month1.parquet")
month1.createOrReplaceTempView("month1")
month1sum= spark.sql("select id, date, sum(sessions) as sum_num_sessions from month1 where group by 1,2 order_by 1 asc")
month1sum.write.mode("overwrite").parquet("s3://af/my_folder/month1sum.parquet")
month1sum.createOrReplaceTempView("month1sum")
month_1_calculation=month1sum.groupBy('date').agg(avg('sum_num_sessions').alias('avg_sessions'))
month_1_calculation.write.mode("overwrite").parquet("s3://af/my_folder/month_1_calculation.parquet")```
快速方法:for 循环怎么样?
for i in range(1, 13):
month= spark.read.parquet(f"s3://af/my_folder/month{i}.parquet")
month.createOrReplaceTempView(f"month{i}")
monthsum= spark.sql(f"select id, date, sum(sessions) as sum_num_sessions from month{i} where group by 1,2 order_by 1 asc")
monthsum.write.mode("overwrite").parquet(f"s3://af/my_folder/month{i}sum.parquet")
monthsum.createOrReplaceTempView(f"month{i}sum")
month_calculation = monthsum.groupBy('date').agg(avg('sum_num_sessions').alias('avg_sessions'))
month_calculation.write.mode("overwrite").parquet(f"s3://af/my_folder/month_{i}_calculation.parquet")
长期的做法: Spark是为处理大数据而设计的,所以无论你的数据有多大,只要你有足够的硬件(核心数和内存) ), Spark 应该能够通过正确的配置来处理它。所以调整核心数量、执行程序内存、驱动程序内存、提高并行度(通过更改分区数量)……肯定可以解决您的问题。