避免分区使 Spark 不平衡
avoid partitions unbalancing Spark
我有一个 性能问题 我正在修改的代码,每次执行计数时都会给出 OOM
。
我想我发现了问题,基本上在 keyBy
转换之后,正在执行 aggregateByKey.
问题在于,几乎 98% 的 RDD 元素具有相同的键,因此 aggregationByKey,生成 shuffle,将几乎所有记录放入同一分区,底线:只有少数执行者工作,并且有很大的内存压力。
这是代码:
val rddAnomaliesByProcess : RDD[AnomalyPO] = rddAnomalies
.keyBy(po => po.getProcessCreator.name)
.aggregateByKey(List[AnomalyPO]())((list,value) => value +: list,_++_)
.map {case(name,list) =>
val groupByKeys = list.groupBy(po => (po.getPodId, po.getAnomalyCode, po.getAnomalyReason, po.getAnomalyDate, po.getMeasureUUID))
val lastOfGroupByKeys = groupByKeys.map{po => (po._1, List(po._2.sortBy { po => po.getProcessDate.getMillis }.last))}
lastOfGroupByKeys.flatMap(f => f._2)
}
.flatMap(f => f)
log.info("not duplicated Anomalies: " + rddAnomaliesByProcess.count)
我想要一种以更并行的方式执行操作的方法,允许所有执行者几乎平等地工作。我该怎么做?
我必须使用自定义分区程序吗?
如果您的观察是正确的并且
98% of the RDD elements has the same key
那么更改分区程序对您根本没有帮助。根据分区程序的定义,98% 的数据必须由单个执行程序处理。
幸运的是,这里的错误代码可能是比偏斜更大的问题。跳过:
.aggregateByKey(List[AnomalyPO]())((list,value) => value +: list,_++_)
这只是一个民间魔术,看起来整个管道可以重写为一个简单的 reuceByKey
。伪代码:
将 name
和本地键合并为一个键:
def key(po: AnomalyPO) = (
// "major" key
po.getProcessCreator.name,
// "minor" key
po.getPodId, po.getAnomalyCode,
po.getAnomalyReason, po.getAnomalyDate, po.getMeasureUUID
)
包含名称、日期和其他字段的键应该比单独的名称具有更高的基数。
Map to pairs and reduce by key:
rddAnomalies
.map(po => (key(po), po))
.reduceByKey((x, y) =>
if(x.getProcessDate.getMillis > y.getProcessDate.getMillis) x else y
)
我有一个 性能问题 我正在修改的代码,每次执行计数时都会给出 OOM
。
我想我发现了问题,基本上在 keyBy
转换之后,正在执行 aggregateByKey.
问题在于,几乎 98% 的 RDD 元素具有相同的键,因此 aggregationByKey,生成 shuffle,将几乎所有记录放入同一分区,底线:只有少数执行者工作,并且有很大的内存压力。
这是代码:
val rddAnomaliesByProcess : RDD[AnomalyPO] = rddAnomalies
.keyBy(po => po.getProcessCreator.name)
.aggregateByKey(List[AnomalyPO]())((list,value) => value +: list,_++_)
.map {case(name,list) =>
val groupByKeys = list.groupBy(po => (po.getPodId, po.getAnomalyCode, po.getAnomalyReason, po.getAnomalyDate, po.getMeasureUUID))
val lastOfGroupByKeys = groupByKeys.map{po => (po._1, List(po._2.sortBy { po => po.getProcessDate.getMillis }.last))}
lastOfGroupByKeys.flatMap(f => f._2)
}
.flatMap(f => f)
log.info("not duplicated Anomalies: " + rddAnomaliesByProcess.count)
我想要一种以更并行的方式执行操作的方法,允许所有执行者几乎平等地工作。我该怎么做?
我必须使用自定义分区程序吗?
如果您的观察是正确的并且
98% of the RDD elements has the same key
那么更改分区程序对您根本没有帮助。根据分区程序的定义,98% 的数据必须由单个执行程序处理。
幸运的是,这里的错误代码可能是比偏斜更大的问题。跳过:
.aggregateByKey(List[AnomalyPO]())((list,value) => value +: list,_++_)
这只是一个民间魔术,看起来整个管道可以重写为一个简单的 reuceByKey
。伪代码:
将
name
和本地键合并为一个键:def key(po: AnomalyPO) = ( // "major" key po.getProcessCreator.name, // "minor" key po.getPodId, po.getAnomalyCode, po.getAnomalyReason, po.getAnomalyDate, po.getMeasureUUID )
包含名称、日期和其他字段的键应该比单独的名称具有更高的基数。
Map to pairs and reduce by key:
rddAnomalies .map(po => (key(po), po)) .reduceByKey((x, y) => if(x.getProcessDate.getMillis > y.getProcessDate.getMillis) x else y )