使用 Dataset.groupByKey 时如何绕过 2GB 缓冲区限制?
How can you get around the 2GB buffer limit when using Dataset.groupByKey?
在 Spark 中使用 Dataset.groupByKey(_.key).mapGroups
或 Dataset.groupByKey(_.key).cogroup
时,当其中一个分组产生超过 2GB 的数据时,我 运行 遇到了问题。
在开始减少数据之前,我需要按组对数据进行归一化,并且我想将这些组拆分为更小的子组,以便更好地分布。例如,这是我尝试拆分组的一种方法:
val groupedInputs = inputData.groupByKey(_.key).mapGroups {
case(key, inputSeries) => inputSeries.grouped(maxGroupSize).map(group => (key, group))
}
但不幸的是,无论我如何尝试解决它,我的工作总是因这样的错误而终止:java.lang.UnsupportedOperationException: Cannot grow BufferHolder by size 23816 because the size after growing exceeds size limitation 2147483632
。使用 Kryo 序列化时,我收到一个不同的 Kryo serialization failed: Buffer overflow
错误,建议我增加 spark.kryoserializer.buffer.max,但我已经将其增加到 2GB 限制。
我想到的一个解决方案是在对键进行分组之前向键添加一个随机值。这并不理想,因为它会拆分每个组(不仅仅是大组),但我愿意为了 "working" 而牺牲 "ideal"。该代码看起来像这样:
val splitInputs = inputData.map( record => (record, ThreadLocalRandom.current.nextInt(splitFactor)))
val groupedInputs = splitInputs.groupByKey{ case(record, split) => (record.key, split)).mapGroups {
case((key, _), inputSeries) => inputSeries.grouped(maxGroupSize).map(group => (key, group.map(_._1)))
}
添加一个 salt key 并在你的密钥和 salt key 上执行 groupBy,然后
import scala.util.Random
val start = 1
val end = 5
val randUdf = udf({() => start + Random.nextInt((end - start) + 1)})
val saltGroupBy=skewDF.withColumn("salt_key", randUdf())
.groupBy(col("name"), col("salt_key"))
因此您所有的倾斜数据不会进入一个执行程序并导致 2GB 限制。
但是你必须开发一个逻辑来聚合上面的结果,最后去掉最后的salt key。
当您使用 groupBy 时,具有相同键的所有记录将到达一个执行程序并发生瓶颈。
以上是其中一种缓解方法。
在这种情况下,数据集有很大偏差并且将记录分组为规则大小的组很重要,我决定分两次处理数据集。首先,我使用 window 函数按键对行进行编号,然后根据可配置的 "maxGroupSize":
将其转换为 "group index,"
// The "orderBy" doesn't seem necessary here,
// but the row_number function requires it.
val partitionByKey = Window.partitionBy(key).orderBy(key)
val indexedData = inputData.withColumn("groupIndex",
(row_number.over(partitionByKey) / maxGroupSize).cast(IntegerType))
.as[(Record, Int)]
然后我可以按键和索引进行分组,并生成大小一致的组——记录很多的键被拆分得更多,而记录很少的键可能根本不会拆分。
indexedData.groupByKey{ case (record, groupIndex) => (record.key, groupIndex) }
.mapGroups{ case((key, _), recordGroup) =>
// Remove the index values before returning the groups
(key, recordGroup.map(_._1))
}
在 Spark 中使用 Dataset.groupByKey(_.key).mapGroups
或 Dataset.groupByKey(_.key).cogroup
时,当其中一个分组产生超过 2GB 的数据时,我 运行 遇到了问题。
在开始减少数据之前,我需要按组对数据进行归一化,并且我想将这些组拆分为更小的子组,以便更好地分布。例如,这是我尝试拆分组的一种方法:
val groupedInputs = inputData.groupByKey(_.key).mapGroups {
case(key, inputSeries) => inputSeries.grouped(maxGroupSize).map(group => (key, group))
}
但不幸的是,无论我如何尝试解决它,我的工作总是因这样的错误而终止:java.lang.UnsupportedOperationException: Cannot grow BufferHolder by size 23816 because the size after growing exceeds size limitation 2147483632
。使用 Kryo 序列化时,我收到一个不同的 Kryo serialization failed: Buffer overflow
错误,建议我增加 spark.kryoserializer.buffer.max,但我已经将其增加到 2GB 限制。
我想到的一个解决方案是在对键进行分组之前向键添加一个随机值。这并不理想,因为它会拆分每个组(不仅仅是大组),但我愿意为了 "working" 而牺牲 "ideal"。该代码看起来像这样:
val splitInputs = inputData.map( record => (record, ThreadLocalRandom.current.nextInt(splitFactor)))
val groupedInputs = splitInputs.groupByKey{ case(record, split) => (record.key, split)).mapGroups {
case((key, _), inputSeries) => inputSeries.grouped(maxGroupSize).map(group => (key, group.map(_._1)))
}
添加一个 salt key 并在你的密钥和 salt key 上执行 groupBy,然后
import scala.util.Random
val start = 1
val end = 5
val randUdf = udf({() => start + Random.nextInt((end - start) + 1)})
val saltGroupBy=skewDF.withColumn("salt_key", randUdf())
.groupBy(col("name"), col("salt_key"))
因此您所有的倾斜数据不会进入一个执行程序并导致 2GB 限制。
但是你必须开发一个逻辑来聚合上面的结果,最后去掉最后的salt key。
当您使用 groupBy 时,具有相同键的所有记录将到达一个执行程序并发生瓶颈。 以上是其中一种缓解方法。
在这种情况下,数据集有很大偏差并且将记录分组为规则大小的组很重要,我决定分两次处理数据集。首先,我使用 window 函数按键对行进行编号,然后根据可配置的 "maxGroupSize":
将其转换为 "group index,"// The "orderBy" doesn't seem necessary here,
// but the row_number function requires it.
val partitionByKey = Window.partitionBy(key).orderBy(key)
val indexedData = inputData.withColumn("groupIndex",
(row_number.over(partitionByKey) / maxGroupSize).cast(IntegerType))
.as[(Record, Int)]
然后我可以按键和索引进行分组,并生成大小一致的组——记录很多的键被拆分得更多,而记录很少的键可能根本不会拆分。
indexedData.groupByKey{ case (record, groupIndex) => (record.key, groupIndex) }
.mapGroups{ case((key, _), recordGroup) =>
// Remove the index values before returning the groups
(key, recordGroup.map(_._1))
}