避免分区使 Spark 不平衡

avoid partitions unbalancing Spark

我有一个 性能问题 我正在修改的代码,每次执行计数时都会给出 OOM。 我想我发现了问题,基本上在 keyBy 转换之后,正在执行 aggregateByKey. 问题在于,几乎 98% 的 RDD 元素具有相同的键,因此 aggregationByKey,生成 shuffle,将几乎所有记录放入同一分区,底线:只有少数执行者工作,并且有很大的内存压力。

这是代码:

val rddAnomaliesByProcess : RDD[AnomalyPO] = rddAnomalies
    .keyBy(po => po.getProcessCreator.name)
    .aggregateByKey(List[AnomalyPO]())((list,value) => value +: list,_++_)
    .map {case(name,list) =>
      val groupByKeys = list.groupBy(po => (po.getPodId, po.getAnomalyCode, po.getAnomalyReason, po.getAnomalyDate, po.getMeasureUUID))
      val lastOfGroupByKeys = groupByKeys.map{po => (po._1, List(po._2.sortBy { po => po.getProcessDate.getMillis }.last))}
      lastOfGroupByKeys.flatMap(f => f._2)
    }
    .flatMap(f => f)

log.info("not duplicated Anomalies: " + rddAnomaliesByProcess.count)

我想要一种以更并行的方式执行操作的方法,允许所有执行者几乎平等地工作。我该怎么做?

我必须使用自定义分区程序吗?

如果您的观察是正确的并且

98% of the RDD elements has the same key

那么更改分区程序对您根本没有帮助。根据分区程序的定义,98% 的数据必须由单个执行程序处理。

幸运的是,这里的错误代码可能是比偏斜更大的问题。跳过:

.aggregateByKey(List[AnomalyPO]())((list,value) => value +: list,_++_)

这只是一个民间魔术,看起来整个管道可以重写为一个简单的 reuceByKey。伪代码:

  • name 和本地键合并为一个键:

    def key(po: AnomalyPO) = (
      // "major" key
      po.getProcessCreator.name, 
      // "minor" key
      po.getPodId, po.getAnomalyCode,
      po.getAnomalyReason, po.getAnomalyDate, po.getMeasureUUID
    )
    

    包含名称、日期和其他字段的键应该比单独的名称具有更高的基数。

  • Map to pairs and reduce by key:

    rddAnomalies
      .map(po => (key(po), po))
      .reduceByKey((x, y) => 
        if(x.getProcessDate.getMillis > y.getProcessDate.getMillis) x else y
      )