Spark 12 GB 数据加载 Window 函数性能问题
Spark 12 GB data load with Window function Performance issue
我正在使用 sparksql 转换 12 GB data.My 转换是在其中一个字段上应用行号函数和分区依据,然后将数据分成两组,第一组行号为 1,第二组包括其余然后将数据写入 30 个分区中的目标位置。
我的工作目前大约需要 1 hour.I 想 运行 在不到 10 分钟内完成。
我运行在具有规格(16 核和 32 GB RAM)的 3 节点集群上执行此作业。
节点 1 yarn 主节点。
节点 2 两个执行器 1 个驱动程序和 1 个其他
节点 3 两个执行器都用于处理。
每个执行器分配 5 个内核和 10GB 内存。
我的硬件够用还是需要更强大的硬件?
执行者配置是否正确?
如果硬件和配置都很好,那么我肯定需要改进我的代码。
我的代码如下
sqlContext=SQLContext(sc)
SalesDf = sqlContext.read.options(header='true').load(path, format='csv')
SalesDf.cache()
SalesDf_Version=SalesDf.withColumn('row_number',F.row_number().over(Window.partitionBy("id").orderBy(desc("recorddate"))))
initialLoad = SalesDf_Version.withColumn('partition',SalesDf_Version.year).withColumn('isActive', when(col('row_number') == 1, lit('Y')).when(col('row_number') != 1, lit('N')))
initialLoad = initialLoad.withColumn('version_flag',col ('isActive')).withColumn('partition',col('city'))
initialLoad = initialLoad.drop('row_number')
initialLoad.coalesce(1).write.partitionBy('isActive','partition').option("header", "true").mode('overwrite').csv(path +'Temp/target/')
initialLoad.coalesce(1).write.partitionBy('isActive','partition').option("header", "true").mode('overwrite').csv(path +'Temp/target/')
sc.stop()
在此先感谢您的帮助
你写之前有一个coalesce(1)
,这是什么原因? Coalesce 减少了该阶段的并行化,在您的情况下,这将导致窗口查询在 1 个核心上 运行,因此您失去了每个节点 16 个核心的好处。
删除合并,这应该会开始改进。
以下是我们为提高代码性能而实施的更改。
我们删除了 coalesce 并使用了 repartition(50)。我们尝试了括号中更高和更低的数字,但 50 是我们案例中的优化数字。
我们使用 s3 作为我们的目标,但由于在 spark 中重命名,这让我们付出了很多代价,所以我们改用 HDFS,我们的工作时间减少到以前的一半。
总的来说,我们的代码 运行 12 分钟之前是 50 分钟。
谢谢
阿马尔
我正在使用 sparksql 转换 12 GB data.My 转换是在其中一个字段上应用行号函数和分区依据,然后将数据分成两组,第一组行号为 1,第二组包括其余然后将数据写入 30 个分区中的目标位置。
我的工作目前大约需要 1 hour.I 想 运行 在不到 10 分钟内完成。
我运行在具有规格(16 核和 32 GB RAM)的 3 节点集群上执行此作业。 节点 1 yarn 主节点。 节点 2 两个执行器 1 个驱动程序和 1 个其他 节点 3 两个执行器都用于处理。 每个执行器分配 5 个内核和 10GB 内存。
我的硬件够用还是需要更强大的硬件? 执行者配置是否正确? 如果硬件和配置都很好,那么我肯定需要改进我的代码。
我的代码如下
sqlContext=SQLContext(sc)
SalesDf = sqlContext.read.options(header='true').load(path, format='csv')
SalesDf.cache()
SalesDf_Version=SalesDf.withColumn('row_number',F.row_number().over(Window.partitionBy("id").orderBy(desc("recorddate"))))
initialLoad = SalesDf_Version.withColumn('partition',SalesDf_Version.year).withColumn('isActive', when(col('row_number') == 1, lit('Y')).when(col('row_number') != 1, lit('N')))
initialLoad = initialLoad.withColumn('version_flag',col ('isActive')).withColumn('partition',col('city'))
initialLoad = initialLoad.drop('row_number')
initialLoad.coalesce(1).write.partitionBy('isActive','partition').option("header", "true").mode('overwrite').csv(path +'Temp/target/')
initialLoad.coalesce(1).write.partitionBy('isActive','partition').option("header", "true").mode('overwrite').csv(path +'Temp/target/')
sc.stop()
在此先感谢您的帮助
你写之前有一个coalesce(1)
,这是什么原因? Coalesce 减少了该阶段的并行化,在您的情况下,这将导致窗口查询在 1 个核心上 运行,因此您失去了每个节点 16 个核心的好处。
删除合并,这应该会开始改进。
以下是我们为提高代码性能而实施的更改。
我们删除了 coalesce 并使用了 repartition(50)。我们尝试了括号中更高和更低的数字,但 50 是我们案例中的优化数字。 我们使用 s3 作为我们的目标,但由于在 spark 中重命名,这让我们付出了很多代价,所以我们改用 HDFS,我们的工作时间减少到以前的一半。 总的来说,我们的代码 运行 12 分钟之前是 50 分钟。 谢谢 阿马尔