提高火花应用程序的速度
Improve speed of spark app
这是我的 python-spark 代码的一部分,它的某些部分 运行 对我的需要来说太慢了。
尤其是这部分代码,我真的很想提高它的速度,但不知道该怎么做。目前处理 6000 万行数据大约需要 1 分钟,我想将其缩短到 10 秒以下。
sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load()
我的 spark 应用程序的更多上下文:
article_ids = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="article_by_created_at", keyspace=source).load().where(range_expr).select('article','created_at').repartition(64*2)
axes = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load()
speed_df = article_ids.join(axes,article_ids.article==axes.article).select(axes.article,axes.at,axes.comments,axes.likes,axes.reads,axes.shares) \
.map(lambda x:(x.article,[x])).reduceByKey(lambda x,y:x+y) \
.map(lambda x:(x[0],sorted(x[1],key=lambda y:y.at,reverse = False))) \
.filter(lambda x:len(x[1])>=2) \
.map(lambda x:x[1][-1]) \
.map(lambda x:(x.article,(x,(x.comments if x.comments else 0)+(x.likes if x.likes else 0)+(x.reads if x.reads else 0)+(x.shares if x.shares else 0))))
非常感谢您的建议。
编辑:
Count占用大部分时间(50s)未加入
我也试过增加并行度但没有任何明显效果:
sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load().repartition(number)
和
sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source,numPartitions=number).load()
首先,您应该弄清楚什么实际花费的时间最多。
例如确定读取数据需要多长时间
axes = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(table="axes", keyspace=source)
.load()
.count()
增加并行度或并行读取器的数量可能对此有所帮助,但前提是您没有最大化 Cassandra 集群的 IO。
其次,看看您是否可以使用 Dataframes 做所有事情 api。每次使用 python lambda 时,都会在 python 和 scala 类型之间产生序列化成本。
编辑:
sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load().repartition(number)
仅在加载完成后生效,因此这对您没有帮助。
sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source,numPartitions=number).load()
不是 Spark Cassandra 连接器的有效参数,因此这不会执行任何操作。
见
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md#read-tuning-parameters
Input Split Size 决定了在一个 Spark 分区中放入多少个 C* 分区。
这是我的 python-spark 代码的一部分,它的某些部分 运行 对我的需要来说太慢了。 尤其是这部分代码,我真的很想提高它的速度,但不知道该怎么做。目前处理 6000 万行数据大约需要 1 分钟,我想将其缩短到 10 秒以下。
sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load()
我的 spark 应用程序的更多上下文:
article_ids = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="article_by_created_at", keyspace=source).load().where(range_expr).select('article','created_at').repartition(64*2)
axes = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load()
speed_df = article_ids.join(axes,article_ids.article==axes.article).select(axes.article,axes.at,axes.comments,axes.likes,axes.reads,axes.shares) \
.map(lambda x:(x.article,[x])).reduceByKey(lambda x,y:x+y) \
.map(lambda x:(x[0],sorted(x[1],key=lambda y:y.at,reverse = False))) \
.filter(lambda x:len(x[1])>=2) \
.map(lambda x:x[1][-1]) \
.map(lambda x:(x.article,(x,(x.comments if x.comments else 0)+(x.likes if x.likes else 0)+(x.reads if x.reads else 0)+(x.shares if x.shares else 0))))
非常感谢您的建议。
编辑:
Count占用大部分时间(50s)未加入
我也试过增加并行度但没有任何明显效果:
sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load().repartition(number)
和
sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source,numPartitions=number).load()
首先,您应该弄清楚什么实际花费的时间最多。
例如确定读取数据需要多长时间
axes = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(table="axes", keyspace=source)
.load()
.count()
增加并行度或并行读取器的数量可能对此有所帮助,但前提是您没有最大化 Cassandra 集群的 IO。
其次,看看您是否可以使用 Dataframes 做所有事情 api。每次使用 python lambda 时,都会在 python 和 scala 类型之间产生序列化成本。
编辑:
sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load().repartition(number)
仅在加载完成后生效,因此这对您没有帮助。
sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source,numPartitions=number).load()
不是 Spark Cassandra 连接器的有效参数,因此这不会执行任何操作。
见 https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md#read-tuning-parameters Input Split Size 决定了在一个 Spark 分区中放入多少个 C* 分区。