为什么在 DataFrame 上使用 union()/coalesce(1,false) 时会在 Spark 中混洗大量数据?
Why is huge data shuffling in Spark when using union()/coalesce(1,false) on DataFrame?
我有 Spark 作业,它对 ORC 数据进行一些处理并使用 Spark 1.4.0 中引入的 DataFrameWriter save() API 存储回 ORC 数据。我有以下使用大量随机播放内存的代码。如何优化以下代码?它有什么问题吗?它按预期工作正常,只是由于 GC 暂停和洗牌大量数据而导致速度变慢,从而导致内存问题。我是 Spark 的新手。
JavaRDD<Row> updatedDsqlRDD = orderedFrame.toJavaRDD().coalesce(1, false).map(new Function<Row, Row>() {
@Override
public Row call(Row row) throws Exception {
List<Object> rowAsList;
Row row1 = null;
if (row != null) {
rowAsList = iterate(JavaConversions.seqAsJavaList(row.toSeq()));
row1 = RowFactory.create(rowAsList.toArray());
}
return row1;
}
}).union(modifiedRDD);
DataFrame updatedDataFrame = hiveContext.createDataFrame(updatedDsqlRDD,renamedSourceFrame.schema());
updatedDataFrame.write().mode(SaveMode.Append).format("orc").partitionBy("entity", "date").save("baseTable");
编辑
根据建议,我尝试使用 mapPartitionsWithIndex
() 将上面的代码转换为以下代码,但我仍然看到数据改组它比上面的代码更好,但它仍然因达到 GC 限制并抛出 OOM 或继续而失败进入 GC 暂停长时间和超时,YARN 将杀死执行程序。
我使用 spark.storage.memoryFraction 作为 0.5,spark.shuffle.memoryFraction 作为 0.4;我尝试使用默认值并更改了很多组合,但没有任何帮助。
JavaRDD<Row> indexedRdd = sourceRdd.cache().mapPartitionsWithIndex(new Function2<Integer, Iterator<Row>, Iterator<Row>>() {
@Override
public Iterator<Row> call(Integer ind, Iterator<Row> rowIterator) throws Exception {
List<Row> rowList = new ArrayList<>();
while (rowIterator.hasNext()) {
Row row = rowIterator.next();
List<Object> rowAsList = iterate(JavaConversions.seqAsJavaList(row.toSeq()));
Row updatedRow = RowFactory.create(rowAsList.toArray());
rowList.add(updatedRow);
}
return rowList.iterator();
}
}, true).coalesce(200,true);
将 RDD 或 Dataframe 合并到单个分区意味着您的所有处理都在一台机器上进行。这不是一件好事,原因有很多:所有数据都必须通过网络进行洗牌,没有更多的并行性等。相反,您应该看看其他操作符,例如 reduceByKey、mapPartitions,或者除此之外的几乎所有其他操作符将数据合并到一台机器上。
注意:看看你的代码我没看到为什么你要把它放到一台机器上,你可以删除那部分。
我有 Spark 作业,它对 ORC 数据进行一些处理并使用 Spark 1.4.0 中引入的 DataFrameWriter save() API 存储回 ORC 数据。我有以下使用大量随机播放内存的代码。如何优化以下代码?它有什么问题吗?它按预期工作正常,只是由于 GC 暂停和洗牌大量数据而导致速度变慢,从而导致内存问题。我是 Spark 的新手。
JavaRDD<Row> updatedDsqlRDD = orderedFrame.toJavaRDD().coalesce(1, false).map(new Function<Row, Row>() {
@Override
public Row call(Row row) throws Exception {
List<Object> rowAsList;
Row row1 = null;
if (row != null) {
rowAsList = iterate(JavaConversions.seqAsJavaList(row.toSeq()));
row1 = RowFactory.create(rowAsList.toArray());
}
return row1;
}
}).union(modifiedRDD);
DataFrame updatedDataFrame = hiveContext.createDataFrame(updatedDsqlRDD,renamedSourceFrame.schema());
updatedDataFrame.write().mode(SaveMode.Append).format("orc").partitionBy("entity", "date").save("baseTable");
编辑
根据建议,我尝试使用 mapPartitionsWithIndex
() 将上面的代码转换为以下代码,但我仍然看到数据改组它比上面的代码更好,但它仍然因达到 GC 限制并抛出 OOM 或继续而失败进入 GC 暂停长时间和超时,YARN 将杀死执行程序。
我使用 spark.storage.memoryFraction 作为 0.5,spark.shuffle.memoryFraction 作为 0.4;我尝试使用默认值并更改了很多组合,但没有任何帮助。
JavaRDD<Row> indexedRdd = sourceRdd.cache().mapPartitionsWithIndex(new Function2<Integer, Iterator<Row>, Iterator<Row>>() {
@Override
public Iterator<Row> call(Integer ind, Iterator<Row> rowIterator) throws Exception {
List<Row> rowList = new ArrayList<>();
while (rowIterator.hasNext()) {
Row row = rowIterator.next();
List<Object> rowAsList = iterate(JavaConversions.seqAsJavaList(row.toSeq()));
Row updatedRow = RowFactory.create(rowAsList.toArray());
rowList.add(updatedRow);
}
return rowList.iterator();
}
}, true).coalesce(200,true);
将 RDD 或 Dataframe 合并到单个分区意味着您的所有处理都在一台机器上进行。这不是一件好事,原因有很多:所有数据都必须通过网络进行洗牌,没有更多的并行性等。相反,您应该看看其他操作符,例如 reduceByKey、mapPartitions,或者除此之外的几乎所有其他操作符将数据合并到一台机器上。
注意:看看你的代码我没看到为什么你要把它放到一台机器上,你可以删除那部分。