如何在 Spark 运行 上制作 Scala databricks Notebook 更快、更高效
How to Make a Scala databricks Notebook on Spark Run Faster, More Performant
val df1= spark.read.format("delta").table("...100K_rows...")
val xform = udf ( (message: String) => {
// abstract transformation, ideally this comes from a .jar library
// such as: (abstract) https://github.com/cosmycx/scala_transformer
val t0 = System.currentTimeMillis
Thread.sleep(5)
System.currentTimeMillis - t0
}) // .xformText
spark.udf.register("xform", xform)
val df2= df1.withColumn("xformResult", xform($"SomeText"))
df2.write.format("delta")
.mode(SaveMode.Overwrite)
.saveAsTable("...")
如何才能使 运行 更快?
我尝试了什么:
- 增加 databricks spark 集群节点大小:DS3_v2 14GB 4 核对比 DS5_v2 56GB 16 核
- 增加databricks spark cluster workers的数量,driver plus: 3, 5, and 10 (same speed!?)
- 更改:spark.conf.set("spark.sql.shuffle.partitions", "auto") 或不同的值
结果始终在此范围内:10K 行为 1 分钟,100K 行为 8 分钟,无论更改如何。
对于 100K+,理想的结果应该少于 1 分钟。这甚至可以在 databricks spark 中实现吗?
如果重要的话,这个 运行s 在 Azure 中。
我缺少什么,还有其他要考虑的事情,试试看?谢谢
首先,这可能不是 spark 的用例。数据集很小,spark 优化器甚至不知道如何处理它。
发生的情况是文件很小并且由单个任务读取,spark 将转换识别为简单并在读取数据时应用它 - 所以您按顺序处理事情并等待 Thread.sleep 总共 8 分钟()
最好的想法可能是不要为此使用 spark - 但如果你确实尝试让 spark 进行洗牌(排序或其他)并确保你的代码 运行s 在洗牌之后那么你会有 x 个(默认为 200 个)分区,并且可以 运行 处理最多 x 个不同的任务(但同样 - 我不会为此使用 Spark)
转换是按顺序发生的,而不是并发的,因为 df1 DataFrame 只有一个分区。
重新分区初始 DataFrame,然后 运行 转换新的分区 DataFrame 显着提高速度和性能,至少提高 10 倍。
println(df1.rdd.getNumPartitions) // 1
val df2 = df1.repartition(20) // run transform on df2 (100K rows in 35 sec.)
val df1= spark.read.format("delta").table("...100K_rows...")
val xform = udf ( (message: String) => {
// abstract transformation, ideally this comes from a .jar library
// such as: (abstract) https://github.com/cosmycx/scala_transformer
val t0 = System.currentTimeMillis
Thread.sleep(5)
System.currentTimeMillis - t0
}) // .xformText
spark.udf.register("xform", xform)
val df2= df1.withColumn("xformResult", xform($"SomeText"))
df2.write.format("delta")
.mode(SaveMode.Overwrite)
.saveAsTable("...")
如何才能使 运行 更快?
我尝试了什么:
- 增加 databricks spark 集群节点大小:DS3_v2 14GB 4 核对比 DS5_v2 56GB 16 核
- 增加databricks spark cluster workers的数量,driver plus: 3, 5, and 10 (same speed!?)
- 更改:spark.conf.set("spark.sql.shuffle.partitions", "auto") 或不同的值
结果始终在此范围内:10K 行为 1 分钟,100K 行为 8 分钟,无论更改如何。
对于 100K+,理想的结果应该少于 1 分钟。这甚至可以在 databricks spark 中实现吗? 如果重要的话,这个 运行s 在 Azure 中。
我缺少什么,还有其他要考虑的事情,试试看?谢谢
首先,这可能不是 spark 的用例。数据集很小,spark 优化器甚至不知道如何处理它。
发生的情况是文件很小并且由单个任务读取,spark 将转换识别为简单并在读取数据时应用它 - 所以您按顺序处理事情并等待 Thread.sleep 总共 8 分钟()
最好的想法可能是不要为此使用 spark - 但如果你确实尝试让 spark 进行洗牌(排序或其他)并确保你的代码 运行s 在洗牌之后那么你会有 x 个(默认为 200 个)分区,并且可以 运行 处理最多 x 个不同的任务(但同样 - 我不会为此使用 Spark)
转换是按顺序发生的,而不是并发的,因为 df1 DataFrame 只有一个分区。 重新分区初始 DataFrame,然后 运行 转换新的分区 DataFrame 显着提高速度和性能,至少提高 10 倍。
println(df1.rdd.getNumPartitions) // 1
val df2 = df1.repartition(20) // run transform on df2 (100K rows in 35 sec.)