如何改进我的 spark 应用程序的 reducebykey 部分?

How can I improve the reducebykey part of my spark app?

我有 64 个 spark 核心。我的 cassandra 集群中有超过 8000 万行数据,总计 4.2 GB。我现在需要 82 秒来处理这些数据。我希望这减少到 8 秒。对此有什么想法吗?这可能吗?谢谢

这是我想要改进的 spark 应用程序的一部分:

axes = sqlContext.read.format("org.apache.spark.sql.cassandra")\
    .options(table="axes", keyspace=source, numPartitions="192").load()\
    .repartition(64*3)\
    .reduceByKey(lambda x,y:x+y,52)\
    .map(lambda x:(x.article,[Row(article=x.article,at=x.at,comments=x.comments,likes=x.likes,reads=x.reads,shares=x.shares)]))\
    .map(lambda x:(x[0],sorted(x[1],key=lambda y:y.at,reverse = False))) \
    .filter(lambda x:len(x[1])>=2) \
    .map(lambda x:x[1][-1])

编辑:

这是我目前使用的代码 运行 上面发布的代码是一个实验,很抱歉造成混淆。上面的问题与这段代码有关。

axes = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load().repartition(64*3) \
                    .map(lambda x:(x.article,[Row(article=x.article,at=x.at,comments=x.comments,likes=x.likes,reads=x.reads,shares=x.shares)])).reduceByKey(lambda x,y:x+y)\
                    .map(lambda x:(x[0],sorted(x[1],key=lambda y:y.at,reverse = False))) \
                    .filter(lambda x:len(x[1])>=2) \
                    .map(lambda x:x[1][-1])

谢谢

问题:

(假设未修改 Spark 分布,为什么这段代码无法正常工作)

一步一步:

  1. 这两行应该创建一个 Spark DataFrame。到目前为止一切顺利:

    sqlContext.read.format("org.apache.spark.sql.cassandra")
      .options(table="axes", keyspace=source, numPartitions="192").load()
    

    唯一可能的问题是 numPartitions,据我所知这不是公认的选项。

  2. 这几乎是一段垃圾代码。在不做任何实际工作的情况下四处乱放数据不太可能让你到任何地方。

    .repartition(64*3)
    
  3. 此时你切换到RDD。由于 Row 实际上是 tuple 的子类,并且 reduceByKey 可能仅适用于 每个元素必须是大小为 2 的元组。我不确定你为什么选择 52虽然分区。

    .reduceByKey(lambda x,y:x+y,52)
    
  4. 因为 reduceByKey 总是导致大小为 2 的元组的 RDD 下面的部分根本不应该工作

    .map(lambda x: (x.article,[Row(article=x.article,at=x.at,comments=x.comments,likes=x.likes,reads=x.reads,shares=x.shares)]))\
    

    特别是 x 不能有像 articlecomments 这样的属性。而且这段代码

    [Row(article=x.article,at=x.at,comments=x.comments,likes=x.likes,reads=x.reads,shares=x.shares)] 
    

    创建大小为 1 的 list(见下文)。

    接下来的部分

    Row(article=x.article, ...)
    

    还有一个原因让人闻起来有腥味。如果有一些过时的列,则应在将数据转换为 RDD 之前将其过滤掉,以避免过多的流量并减少内存使用。如果没有过时的列,则没有理由通过创建新对象对 Python GC 施加更多压力。

  5. 因为 x[1] 只有一个元素排序所以没有意义:

    .map(lambda x:(x[0],sorted(x[1],key=lambda y:y.at,reverse = False))) \
    
  6. 并且这个过滤器应该总是return一个空的RDD

    .filter(lambda x:len(x[1])>=2) \
    
  7. 这并没有执行任何有用的操作:

    .map(lambda x:x[1][-1])
    

总结:

如果您使用此代码的某个版本,问题中显示的顺序很可能会混淆并从点 4 开始映射:

.map(lambda x: (x.article,[Row(....)]))

先于 reduceByKey:

.reduceByKey(lambda x,y:x+y,52)

如果是这种情况,您实际使用的 要么等同于具有所有问题的 groupByKey (Python),要么效率较低 (Scala)。此外,它会减少高度可疑的分区数量。

如果是DataFrame -> RDD转换)对应的序列化-反序列化,即使有,也可以通过实际归约轻松解决 max 不是按键分组。

from operator import attrgetter

(sqlContext.read.format(...).options(...).load()
  .select(...)  # Only the columns you actually need
  .keyBy(attrgetter("article"))
  .reduceByKey(lambda r1, r2: max(r1, r2, key=attrgetter("y"))))

相关问题: