Spark 12 GB 数据加载 Window 函数性能问题

Spark 12 GB data load with Window function Performance issue

我正在使用 sparksql 转换 12 GB data.My 转换是在其中一个字段上应用行号函数和分区依据,然后将数据分成两组,第一组行号为 1,第二组包括其余然后将数据写入 30 个分区中的目标位置。

我的工作目前大约需要 1 hour.I 想 运行 在不到 10 分钟内完成。

我运行在具有规格(16 核和 32 GB RAM)的 3 节点集群上执行此作业。 节点 1 yarn 主节点。 节点 2 两个执行器 1 个驱动程序和 1 个其他 节点 3 两个执行器都用于处理。 每个执行器分配 5 个内核和 10GB 内存。

我的硬件够用还是需要更强大的硬件? 执行者配置是否正确? 如果硬件和配置都很好,那么我肯定需要改进我的代码。

我的代码如下

sqlContext=SQLContext(sc)

SalesDf = sqlContext.read.options(header='true').load(path, format='csv')
SalesDf.cache()

SalesDf_Version=SalesDf.withColumn('row_number',F.row_number().over(Window.partitionBy("id").orderBy(desc("recorddate"))))

initialLoad = SalesDf_Version.withColumn('partition',SalesDf_Version.year).withColumn('isActive', when(col('row_number') == 1, lit('Y')).when(col('row_number') != 1, lit('N')))
initialLoad = initialLoad.withColumn('version_flag',col ('isActive')).withColumn('partition',col('city'))
initialLoad = initialLoad.drop('row_number')


initialLoad.coalesce(1).write.partitionBy('isActive','partition').option("header", "true").mode('overwrite').csv(path +'Temp/target/')
initialLoad.coalesce(1).write.partitionBy('isActive','partition').option("header", "true").mode('overwrite').csv(path +'Temp/target/')

sc.stop()

在此先感谢您的帮助

你写之前有一个coalesce(1),这是什么原因? Coalesce 减少了该阶段的并行化,在您的情况下,这将导致窗口查询在 1 个核心上 运行,因此您失去了每个节点 16 个核心的好处。

删除合并,这应该会开始改进。

以下是我们为提高代码性能而实施的更改。

我们删除了 coalesce 并使用了 repartition(50)。我们尝试了括号中更高和更低的数字,但 50 是我们案例中的优化数字。 我们使用 s3 作为我们的目标,但由于在 spark 中重命名,这让我们付出了很多代价,所以我们改用 HDFS,我们的工作时间减少到以前的一半。 总的来说,我们的代码 运行 12 分钟之前是 50 分钟。 谢谢 阿马尔