Spark/Scala这段代码的性能瓶颈在哪里?
Where is the performance bottleneck in this Spark/Scala code?
首先让我指出,我对 Spark 和 Scala 都很陌生。我一直在尝试通过迁移我过去完成的 Hadoop Map/Reduce 作业之一来调查承诺的 Spark 性能。这项工作在 Hadoop 上需要 14 分钟,使用 3x r3.2xlarge 机器输入 16 个压缩的 bzip 文件,每个文件 170mb。我尽我所能将它翻译成 Scala/Spark 成这样:
val conceptData = spark.textFile(inputPath)
val result = conceptData.repartition(60).cache()
.map(line => {val metrics = JsonUtil.fromJson[ArticleMetrics](line); (metrics.source, metrics.data.get("entities").get)})
.flatMap(metrics => metrics._2.map(t => (t._1,(1,List((metrics._1,t._2.head))))))
.reduceByKey((a,b) => combine(a,b))
.map(t => t._1 + "\t" + t._2._1 + "\t" + print(t._2._2))
result.saveAsTextFile(outputPath)
def print(tuples: List[(String, Any)]): String =
{
tuples.map(l => l._1 + "\u200e" + l._2).reduce(_ + "\u200f" + _)
}
def combine(a: (Int, List[(String, Any)]), b: (Int, List[(String, Any)])): (Int, List[(String, Any)]) =
{
(a._1 + b._1,a._2 ++ b._2)
}
object JsonUtil {
val mapper = new ObjectMapper() with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
def fromJson[T](json: String)(implicit m : Manifest[T]): T = {
mapper.readValue[T](json)
}
}
我一开始使用重新分区命令将分区设置为 60,因为我在某处读到每个内核有 2-3 个分区是好的。
我在相同的 3x r3.2xlarge 机器上运行这个 Spark 作业(每台机器有 8 个内核和 58G 可用)所以我按以下方式提交我的作业:
spark/bin/spark-submit --executor-memory 58G --total-executor-cores 24 (... other arguments ...)
同样的输入花费了 1 个多小时...我不确定问题是出在 Scala 还是 Spark 配置中,欢迎任何帮助。
此致,
奥古斯托
编辑 1: 某些操作的平均时间:
从 S3 读取文件:~ 2 分钟
平面地图:~ 11 分钟
reduceByKey:> 1 小时
使用的密钥是 S3 路径,因此它们可能会变得很长,不知道这是否会有所不同。
编辑 2: 我用 .reduceByKey((a,b) => a)
替换了 reduceByKey
函数,工作在 10 分钟内结束,所以 combine
函数
基于大部分时间花在 flatMap 之后的事实,我怀疑是 shuffle 拖慢了你的速度,而不是 CPU 利用率。您可能想尝试使用 运行 分区较少的作业。您可以尝试的另一件事是将 reduceByKey()
替换为 foldByKey()
,它是关联的但不是可交换的,这意味着当 运行 合并时它必须保持 RDD 顺序,这可能会转化为更少随机播放期间的网络流量。
这归结为我的菜鸟 Scala 编程技能 - 更改为以下更高性能的 Scala 仅需 15 分钟:
val conceptData = spark.textFile(inputPath).repartition(24)
val result = conceptData.map(line => {val metrics = JsonUtil.fromJson[ArticleMetrics](line); (metrics.source, metrics.data.get("entities").get)})
.flatMap(metrics => metrics._2.map(t => (t._1,(1, List(metrics._1+"\u200e"+ t._2.head)))))
.reduceByKey((a,b) => (a._1 + b._1, a._2:::b._2))
.map(t=> t._1 + "\t" + t._2._1 + "\t" + t._2._2.mkString("\u200f"))
它可能还可以进一步改进。总之,谢谢大家的帮助。
此致,
奥古斯托
首先让我指出,我对 Spark 和 Scala 都很陌生。我一直在尝试通过迁移我过去完成的 Hadoop Map/Reduce 作业之一来调查承诺的 Spark 性能。这项工作在 Hadoop 上需要 14 分钟,使用 3x r3.2xlarge 机器输入 16 个压缩的 bzip 文件,每个文件 170mb。我尽我所能将它翻译成 Scala/Spark 成这样:
val conceptData = spark.textFile(inputPath)
val result = conceptData.repartition(60).cache()
.map(line => {val metrics = JsonUtil.fromJson[ArticleMetrics](line); (metrics.source, metrics.data.get("entities").get)})
.flatMap(metrics => metrics._2.map(t => (t._1,(1,List((metrics._1,t._2.head))))))
.reduceByKey((a,b) => combine(a,b))
.map(t => t._1 + "\t" + t._2._1 + "\t" + print(t._2._2))
result.saveAsTextFile(outputPath)
def print(tuples: List[(String, Any)]): String =
{
tuples.map(l => l._1 + "\u200e" + l._2).reduce(_ + "\u200f" + _)
}
def combine(a: (Int, List[(String, Any)]), b: (Int, List[(String, Any)])): (Int, List[(String, Any)]) =
{
(a._1 + b._1,a._2 ++ b._2)
}
object JsonUtil {
val mapper = new ObjectMapper() with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
def fromJson[T](json: String)(implicit m : Manifest[T]): T = {
mapper.readValue[T](json)
}
}
我一开始使用重新分区命令将分区设置为 60,因为我在某处读到每个内核有 2-3 个分区是好的。 我在相同的 3x r3.2xlarge 机器上运行这个 Spark 作业(每台机器有 8 个内核和 58G 可用)所以我按以下方式提交我的作业:
spark/bin/spark-submit --executor-memory 58G --total-executor-cores 24 (... other arguments ...)
同样的输入花费了 1 个多小时...我不确定问题是出在 Scala 还是 Spark 配置中,欢迎任何帮助。
此致, 奥古斯托
编辑 1: 某些操作的平均时间:
从 S3 读取文件:~ 2 分钟
平面地图:~ 11 分钟
reduceByKey:> 1 小时
使用的密钥是 S3 路径,因此它们可能会变得很长,不知道这是否会有所不同。
编辑 2: 我用 .reduceByKey((a,b) => a)
替换了 reduceByKey
函数,工作在 10 分钟内结束,所以 combine
函数
基于大部分时间花在 flatMap 之后的事实,我怀疑是 shuffle 拖慢了你的速度,而不是 CPU 利用率。您可能想尝试使用 运行 分区较少的作业。您可以尝试的另一件事是将 reduceByKey()
替换为 foldByKey()
,它是关联的但不是可交换的,这意味着当 运行 合并时它必须保持 RDD 顺序,这可能会转化为更少随机播放期间的网络流量。
这归结为我的菜鸟 Scala 编程技能 - 更改为以下更高性能的 Scala 仅需 15 分钟:
val conceptData = spark.textFile(inputPath).repartition(24)
val result = conceptData.map(line => {val metrics = JsonUtil.fromJson[ArticleMetrics](line); (metrics.source, metrics.data.get("entities").get)})
.flatMap(metrics => metrics._2.map(t => (t._1,(1, List(metrics._1+"\u200e"+ t._2.head)))))
.reduceByKey((a,b) => (a._1 + b._1, a._2:::b._2))
.map(t=> t._1 + "\t" + t._2._1 + "\t" + t._2._2.mkString("\u200f"))
它可能还可以进一步改进。总之,谢谢大家的帮助。
此致,
奥古斯托