我的 spark 应用程序太慢了,我怎样才能显着提高速度?
My spark app is too slow, how can I increase the speed significantly?
这是我的 spark 代码的一部分,它非常慢。慢我的意思是对于 7000 万个数据行,运行 代码需要将近 7 分钟,但如果可能的话,我需要它在 5 秒内 运行。我有一个集群,有 5 个 80 个内核和 177 GB 内存的 spark 节点,目前使用了 33Gb。
range_expr = col("created_at").between(
datetime.now()-timedelta(hours=timespan),
datetime.now()-timedelta(hours=time_delta(timespan))
)
article_ids = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="table", keyspace=source).load().where(range_expr).select('article','created_at').repartition(64*2)
axes = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="table", keyspace=source).load()
#article_ids.join(axes,article_ids.article==axes.article)
speed_df = article_ids.join(axes,article_ids.article==axes.article).select(axes.article,axes.at,axes.comments,axes.likes,axes.reads,axes.shares) \
.map(lambda x:(x.article,[x])).reduceByKey(lambda x,y:x+y) \
.map(lambda x:(x[0],sorted(x[1],key=lambda y:y.at,reverse = False))) \
.filter(lambda x:len(x[1])>=2) \
.map(lambda x:x[1][-1]) \
.map(lambda x:(x.article,(x,(x.comments if x.comments else 0)+(x.likes if x.likes else 0)+(x.reads if x.reads else 0)+(x.shares if x.shares else 0))))
我认为尤其是这部分代码特别慢:
sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="table", keyspace=source).load()
当放入 spark 时,它会转换成这个,我认为这会导致它变慢:
javaToPython at NativeMethodAccessorImpl.java:-2
任何帮助将不胜感激。谢谢
编辑
最大的速度问题似乎是 JavatoPython。附图只是我的部分数据,已经很慢了。
编辑 (2)
关于len(x1)>=2
:
抱歉说了这么久,但我真的希望我能解决这个问题,所以让人们详细了解一个相当复杂的问题是至关重要的:
这是我的 rdd 示例:
rdd1 = [(1,3),(1,5),(1,6),(1,9),(2,10),(2,76),(3,8), (4,87),(4,96),(4,109),(5,10),(6,19),(6,18),(6,65),(6,43),(6,81 ),(7,12),(7,96),(7,452),(8,59)]
经过spark改造后的rdd1有这样的形式:
rdd_result = [(1,9),(2,76),(4,109),(6,81),(7,452)]
结果不包含 (3,8),(5,10) 因为键 3 或 5 只出现一次,我不希望 3 或 5 出现。
下面是我的程序:
首先:rdd1 reduceByKey 然后结果是:
rdd_reduceByKey=[(1,[3,5,6,9]),(2,[10,76]),(3,[8]),(4,[87, 96,109]),(5,[10]),(6,[19,18,65,43,81]),(7,[12,96,452,59]))]
第二个:rdd_reduceByKey 按 len(x1)>=2 过滤,结果为:
rdd_filter=[(1,[3,5,6,9]),(2,[10,76]),(4,[87,96,109]),(6,[ 19,18,65,43,81]),(7,[12,96,452,59]))]
所以 len(x1)>=2 是必要的但是很慢。
任何建议的改进都将不胜感激。
如果我遇到性能问题,我会做一些事情。
- 检查火花 web UI。找到最慢的部分。
- lambda 函数真的很可疑
- 检查执行器配置
- 在中间存储一些数据table。
- 如果在 parquet 中存储数据有帮助,请比较结果。
- 比较 if 使用 Scala 的帮助
编辑:
如果 JavatoPython 最慢,则使用 Scala 而不是 Python 可以解决问题。
这是查找 latest/largest 的代码。应该是NlogN,很可能接近N,因为排序是在小数据集上进行的。
import org.apache.spark.sql.functions._
import scala.collection.mutable.WrappedArray
val data = Seq((1,3),(1,5),(1,6),(1,9),(2,10),
(2,76),(3,8),(4,87),(4,96),(4,109),
(5,10),(6,19),(6,18),(6,65),(6,43),
(6,81),(7,12),(7,96),(7,452),(8,59))
val df = sqlContext.createDataFrame(data)
val dfAgg = df.groupBy("_1").agg(collect_set("_2").alias("_2"))
val udfFirst= udf[Int, WrappedArray[Int]](_.head)
val dfLatest = dfAgg.filter(size($"_2") > 1).
select($"_1", udfFirst(sort_array($"_2", asc=false)).alias("latest"))
dfLatest.show()
这是我的 spark 代码的一部分,它非常慢。慢我的意思是对于 7000 万个数据行,运行 代码需要将近 7 分钟,但如果可能的话,我需要它在 5 秒内 运行。我有一个集群,有 5 个 80 个内核和 177 GB 内存的 spark 节点,目前使用了 33Gb。
range_expr = col("created_at").between(
datetime.now()-timedelta(hours=timespan),
datetime.now()-timedelta(hours=time_delta(timespan))
)
article_ids = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="table", keyspace=source).load().where(range_expr).select('article','created_at').repartition(64*2)
axes = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="table", keyspace=source).load()
#article_ids.join(axes,article_ids.article==axes.article)
speed_df = article_ids.join(axes,article_ids.article==axes.article).select(axes.article,axes.at,axes.comments,axes.likes,axes.reads,axes.shares) \
.map(lambda x:(x.article,[x])).reduceByKey(lambda x,y:x+y) \
.map(lambda x:(x[0],sorted(x[1],key=lambda y:y.at,reverse = False))) \
.filter(lambda x:len(x[1])>=2) \
.map(lambda x:x[1][-1]) \
.map(lambda x:(x.article,(x,(x.comments if x.comments else 0)+(x.likes if x.likes else 0)+(x.reads if x.reads else 0)+(x.shares if x.shares else 0))))
我认为尤其是这部分代码特别慢:
sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="table", keyspace=source).load()
当放入 spark 时,它会转换成这个,我认为这会导致它变慢:
javaToPython at NativeMethodAccessorImpl.java:-2
任何帮助将不胜感激。谢谢
编辑
最大的速度问题似乎是 JavatoPython。附图只是我的部分数据,已经很慢了。
编辑 (2)
关于len(x1)>=2
:
抱歉说了这么久,但我真的希望我能解决这个问题,所以让人们详细了解一个相当复杂的问题是至关重要的:
这是我的 rdd 示例:
rdd1 = [(1,3),(1,5),(1,6),(1,9),(2,10),(2,76),(3,8), (4,87),(4,96),(4,109),(5,10),(6,19),(6,18),(6,65),(6,43),(6,81 ),(7,12),(7,96),(7,452),(8,59)]
经过spark改造后的rdd1有这样的形式: rdd_result = [(1,9),(2,76),(4,109),(6,81),(7,452)] 结果不包含 (3,8),(5,10) 因为键 3 或 5 只出现一次,我不希望 3 或 5 出现。
下面是我的程序:
首先:rdd1 reduceByKey 然后结果是:
rdd_reduceByKey=[(1,[3,5,6,9]),(2,[10,76]),(3,[8]),(4,[87, 96,109]),(5,[10]),(6,[19,18,65,43,81]),(7,[12,96,452,59]))]
第二个:rdd_reduceByKey 按 len(x1)>=2 过滤,结果为:
rdd_filter=[(1,[3,5,6,9]),(2,[10,76]),(4,[87,96,109]),(6,[ 19,18,65,43,81]),(7,[12,96,452,59]))]
所以 len(x1)>=2 是必要的但是很慢。
任何建议的改进都将不胜感激。
如果我遇到性能问题,我会做一些事情。
- 检查火花 web UI。找到最慢的部分。
- lambda 函数真的很可疑
- 检查执行器配置
- 在中间存储一些数据table。
- 如果在 parquet 中存储数据有帮助,请比较结果。
- 比较 if 使用 Scala 的帮助
编辑:
如果 JavatoPython 最慢,则使用 Scala 而不是 Python 可以解决问题。
这是查找 latest/largest 的代码。应该是NlogN,很可能接近N,因为排序是在小数据集上进行的。
import org.apache.spark.sql.functions._
import scala.collection.mutable.WrappedArray
val data = Seq((1,3),(1,5),(1,6),(1,9),(2,10),
(2,76),(3,8),(4,87),(4,96),(4,109),
(5,10),(6,19),(6,18),(6,65),(6,43),
(6,81),(7,12),(7,96),(7,452),(8,59))
val df = sqlContext.createDataFrame(data)
val dfAgg = df.groupBy("_1").agg(collect_set("_2").alias("_2"))
val udfFirst= udf[Int, WrappedArray[Int]](_.head)
val dfLatest = dfAgg.filter(size($"_2") > 1).
select($"_1", udfFirst(sort_array($"_2", asc=false)).alias("latest"))
dfLatest.show()