spark - 如何减少 JavaPairRDD<Integer, Integer[]> 的随机播放大小?
spark - How to reduce the shuffle size of a JavaPairRDD<Integer, Integer[]>?
我有一个 JavaPairRDD<Integer, Integer[]>
,我想对其执行 groupByKey
操作。
groupByKey
操作给了我:
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle
如果我没记错的话,这实际上是一个 OutOfMemory 错误。这只发生在大数据集中(在我的例子中,Web UI 中显示的 "Shuffle Write" 约为 96GB)。
我已设置:
spark.serializer org.apache.spark.serializer.KryoSerializer
在$SPARK_HOME/conf/spark-defaults.conf
,但我不确定Kryo是否用于序列化我的JavaPairRDD。
除了设置这个 conf 参数之外,我还应该做些什么来使用 Kryo 来序列化我的 RDD?我可以在 serialization instructions 中看到:
Spark automatically includes Kryo serializers for the many commonly-used core Scala classes covered in the AllScalaRegistrar from the Twitter chill library.
还有那个:
Since Spark 2.0.0, we internally use Kryo serializer when shuffling RDDs with simple types, arrays of simple types, or string type.
我还注意到,当我将 spark.serializer 设置为 Kryo 时,Web 中的 Shuffle Write UI 从 ~96GB(使用默认序列化器)增加到 243GB!
编辑: 在评论中,有人问我程序的逻辑,以防 groupByKey 可以替换为 reduceByKey。我不认为这是可能的,但无论如何:
输入的形式为:
- key:索引桶id,
- 值:此存储桶中实体 ID 的整数数组
随机写入操作生成以下形式的对:
- entityId
- 同一桶中所有实体 ID 的整数数组(称它们为邻居)
groupByKey
操作收集每个实体的所有邻居数组,有些可能出现不止一次(在许多桶中)。
在 groupByKey
操作之后,我为每个桶保留一个权重(基于它包含的负实体 ID 的数量),并为每个邻居 ID 总结它所属的桶。
我用另一个值(假设它是给定的)对每个邻居 ID 的分数进行归一化,并发出每个实体的前 3 个邻居。
我得到的不同键的数量约为 1000 万(大约 500 万个正实体 ID 和 500 万个负实体 ID)。
EDIT2:我尝试分别使用 Hadoop 的 Writables(VIntWritable 和 VIntArrayWritable 扩展 ArrayWritable)而不是 Integer 和 Integer[],但是随机播放的大小仍然大于默认的 JavaSerializer .
然后我将 spark.shuffle.memoryFraction
从 0.2 增加到 0.4(即使在 2.1.0 版中已弃用,也没有说明应该使用什么来代替)并启用 offHeap 内存,并且减少了 shuffle 大小约 20GB。即使这符合标题的要求,我还是更喜欢算法更多的解决方案,或者包含更好压缩的解决方案。
我认为这里推荐的最佳方法(没有更具体的输入数据知识)通常是在输入 RDD 上使用持久化 API。
作为第一步,我会尝试在输入 RDD 上调用 .persist(MEMORY_ONLY_SER)
以降低内存使用量(尽管有一定的 CPU 开销,但不应该那么多int
s 在你的情况下的问题)。
如果这还不够,您可以尝试 .persist(MEMORY_AND_DISK_SER)
,或者如果您的随机播放仍然占用太多内存,以至于需要在内存上简化输入数据集 .persist(DISK_ONLY)
可能是一个选项,但会严重降低性能。
简答: 使用 fastutil 并可能增加 spark.shuffle.memoryFraction
.
更多详情:
这个 RDD 的问题是 Java 需要存储 Object
个引用,它消耗的 space 比原始类型多得多。在这个例子中,我需要存储 Integer
s,而不是 int
值。 Java Integer
占用 16 个字节,而原始 Java int
占用 4 个字节。另一方面,Scala 的 Int
类型是 32 位(4 字节)类型,就像 Java 的 int
一样,这就是为什么使用 Scala 的人可能没有遇到过一些事情相似的。
除了将 spark.shuffle.memoryFraction
增加到 0.4 之外,另一个不错的解决方案是使用 fastutil library, as suggest in Spark's tuning documentation:
The first way to reduce memory consumption is to avoid the Java features that add overhead, such as pointer-based data structures and wrapper objects. There are several ways to do this: Design your data structures to prefer arrays of objects, and primitive types, instead of the standard Java or Scala collection classes (e.g. HashMap). The fastutil library provides convenient collection classes for primitive types that are compatible with the Java standard library.
这可以将我的 RDD 对的 int 数组中的每个元素存储为 int
类型(即,对数组的每个元素使用 4 个字节而不是 16 个字节)。就我而言,我使用 IntArrayList
而不是 Integer[]
。这使得 shuffle 大小显着下降,并允许我的程序在集群中 运行。我还在代码的其他部分使用了这个库,在那里我制作了一些临时的 Map 结构。总体而言,通过将 spark.shuffle.memoryFraction
增加到 0.4 并使用 fastutil 库,使用默认的 Java 序列化程序(不是 Kryo),shuffle 大小从 96GB 下降到 50GB (!)。
备选方案: 我还尝试对 rdd 对的每个 int 数组进行排序,并使用 Hadoop 的 VIntArrayWritable 类型存储增量(较小的数字比较大的数字使用更少 space ),但这还需要在 Kryo 中注册 VIntWritable 和 VIntArrayWritable,毕竟它没有保存任何 space。总的来说,我认为 Kryo 只会让事情运行得更快,但不会减少所需的 space,但我仍然不确定。
我还没有将这个答案标记为已接受,因为其他人可能有更好的主意,而且因为我毕竟没有使用 Kryo,正如我的 OP 所要求的那样。我希望阅读它,能帮助其他人解决同样的问题。如果我设法进一步减小随机播放大小,我将更新此答案。
仍然不太确定你想做什么。但是,因为你用groupByKey
,说用[=11=就没办法了,所以让我更迷糊了。
我想你有 rdd = (Integer, Integer[])
并且你想要像 (Integer, Iterable[Integer[]])
这样的东西,这就是你使用 groupByKey
的原因。
无论如何,我不太熟悉 Spark 中的 Java,但在 Scala 中我会使用 reduceByKey
来避免随机播放
rdd.mapValues(Iterable(_)).reduceByKey(_++_)
。基本上,您想将值转换为数组列表,然后将列表组合在一起。
我有一个 JavaPairRDD<Integer, Integer[]>
,我想对其执行 groupByKey
操作。
groupByKey
操作给了我:
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle
如果我没记错的话,这实际上是一个 OutOfMemory 错误。这只发生在大数据集中(在我的例子中,Web UI 中显示的 "Shuffle Write" 约为 96GB)。
我已设置:
spark.serializer org.apache.spark.serializer.KryoSerializer
在$SPARK_HOME/conf/spark-defaults.conf
,但我不确定Kryo是否用于序列化我的JavaPairRDD。
除了设置这个 conf 参数之外,我还应该做些什么来使用 Kryo 来序列化我的 RDD?我可以在 serialization instructions 中看到:
Spark automatically includes Kryo serializers for the many commonly-used core Scala classes covered in the AllScalaRegistrar from the Twitter chill library.
还有那个:
Since Spark 2.0.0, we internally use Kryo serializer when shuffling RDDs with simple types, arrays of simple types, or string type.
我还注意到,当我将 spark.serializer 设置为 Kryo 时,Web 中的 Shuffle Write UI 从 ~96GB(使用默认序列化器)增加到 243GB!
编辑: 在评论中,有人问我程序的逻辑,以防 groupByKey 可以替换为 reduceByKey。我不认为这是可能的,但无论如何:
输入的形式为:
- key:索引桶id,
- 值:此存储桶中实体 ID 的整数数组
随机写入操作生成以下形式的对:
- entityId
- 同一桶中所有实体 ID 的整数数组(称它们为邻居)
groupByKey
操作收集每个实体的所有邻居数组,有些可能出现不止一次(在许多桶中)。在
groupByKey
操作之后,我为每个桶保留一个权重(基于它包含的负实体 ID 的数量),并为每个邻居 ID 总结它所属的桶。我用另一个值(假设它是给定的)对每个邻居 ID 的分数进行归一化,并发出每个实体的前 3 个邻居。
我得到的不同键的数量约为 1000 万(大约 500 万个正实体 ID 和 500 万个负实体 ID)。
EDIT2:我尝试分别使用 Hadoop 的 Writables(VIntWritable 和 VIntArrayWritable 扩展 ArrayWritable)而不是 Integer 和 Integer[],但是随机播放的大小仍然大于默认的 JavaSerializer .
然后我将 spark.shuffle.memoryFraction
从 0.2 增加到 0.4(即使在 2.1.0 版中已弃用,也没有说明应该使用什么来代替)并启用 offHeap 内存,并且减少了 shuffle 大小约 20GB。即使这符合标题的要求,我还是更喜欢算法更多的解决方案,或者包含更好压缩的解决方案。
我认为这里推荐的最佳方法(没有更具体的输入数据知识)通常是在输入 RDD 上使用持久化 API。
作为第一步,我会尝试在输入 RDD 上调用 .persist(MEMORY_ONLY_SER)
以降低内存使用量(尽管有一定的 CPU 开销,但不应该那么多int
s 在你的情况下的问题)。
如果这还不够,您可以尝试 .persist(MEMORY_AND_DISK_SER)
,或者如果您的随机播放仍然占用太多内存,以至于需要在内存上简化输入数据集 .persist(DISK_ONLY)
可能是一个选项,但会严重降低性能。
简答: 使用 fastutil 并可能增加 spark.shuffle.memoryFraction
.
更多详情:
这个 RDD 的问题是 Java 需要存储 Object
个引用,它消耗的 space 比原始类型多得多。在这个例子中,我需要存储 Integer
s,而不是 int
值。 Java Integer
占用 16 个字节,而原始 Java int
占用 4 个字节。另一方面,Scala 的 Int
类型是 32 位(4 字节)类型,就像 Java 的 int
一样,这就是为什么使用 Scala 的人可能没有遇到过一些事情相似的。
除了将 spark.shuffle.memoryFraction
增加到 0.4 之外,另一个不错的解决方案是使用 fastutil library, as suggest in Spark's tuning documentation:
The first way to reduce memory consumption is to avoid the Java features that add overhead, such as pointer-based data structures and wrapper objects. There are several ways to do this: Design your data structures to prefer arrays of objects, and primitive types, instead of the standard Java or Scala collection classes (e.g. HashMap). The fastutil library provides convenient collection classes for primitive types that are compatible with the Java standard library.
这可以将我的 RDD 对的 int 数组中的每个元素存储为 int
类型(即,对数组的每个元素使用 4 个字节而不是 16 个字节)。就我而言,我使用 IntArrayList
而不是 Integer[]
。这使得 shuffle 大小显着下降,并允许我的程序在集群中 运行。我还在代码的其他部分使用了这个库,在那里我制作了一些临时的 Map 结构。总体而言,通过将 spark.shuffle.memoryFraction
增加到 0.4 并使用 fastutil 库,使用默认的 Java 序列化程序(不是 Kryo),shuffle 大小从 96GB 下降到 50GB (!)。
备选方案: 我还尝试对 rdd 对的每个 int 数组进行排序,并使用 Hadoop 的 VIntArrayWritable 类型存储增量(较小的数字比较大的数字使用更少 space ),但这还需要在 Kryo 中注册 VIntWritable 和 VIntArrayWritable,毕竟它没有保存任何 space。总的来说,我认为 Kryo 只会让事情运行得更快,但不会减少所需的 space,但我仍然不确定。
我还没有将这个答案标记为已接受,因为其他人可能有更好的主意,而且因为我毕竟没有使用 Kryo,正如我的 OP 所要求的那样。我希望阅读它,能帮助其他人解决同样的问题。如果我设法进一步减小随机播放大小,我将更新此答案。
仍然不太确定你想做什么。但是,因为你用groupByKey
,说用[=11=就没办法了,所以让我更迷糊了。
我想你有 rdd = (Integer, Integer[])
并且你想要像 (Integer, Iterable[Integer[]])
这样的东西,这就是你使用 groupByKey
的原因。
无论如何,我不太熟悉 Spark 中的 Java,但在 Scala 中我会使用 reduceByKey
来避免随机播放
rdd.mapValues(Iterable(_)).reduceByKey(_++_)
。基本上,您想将值转换为数组列表,然后将列表组合在一起。