为什么 Spark RDD 分区对 HDFS 有 2GB 的限制?

Why does Spark RDD partition has 2GB limit for HDFS?

我在使用 mllib RandomForest 训练数据时遇到错误。由于我的数据集很大,默认分区相对较小。所以抛出一个异常,表明 "Size exceeds Integer.MAX_VALUE" ,原始堆栈跟踪如下,

15/04/16 14:13:03 WARN scheduler.TaskSetManager: Lost task 19.0 in stage 6.0 (TID 120, 10.215.149.47): java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517) at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:432) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:618) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:146) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)

Integer.MAX_SIZE是2GB,好像是某个分区内存不足。所以我将我的 rdd 分区重新分配到 1000,这样每个分区可以容纳比以前少得多的数据。终于,问题解决了!!!

所以,我的问题是: 为什么分区大小有2G的限制?好像spark

里面没有设置limit的configure

spark 中块的基本抽象是 ByteBuffer,不幸的是它有一个限制 Integer.MAX_VALUE (~2GB)。

它是一个 critical issue,可防止在非常大的数据集上使用 spark。 增加分区的数量可以解决它(就像在 OP 的情况下),但并不总是可行的,例如,当有大量的转换链,其中一部分可以增加数据(flatMap 等)或在数据倾斜的情况下。

提出的解决方案是想出一个像LargeByteBuffer这样的抽象,它可以支持一个块的字节缓冲区列表。这会影响整体的 spark 架构,因此很长一段时间都没有解决。

问题是在使用 Casandra、HBase 或 Accumulo 等数据存储时,块大小基于数据存储拆分(可能超过 10 gig)。从这些数据存储加载数据时,您必须立即使用 1000 个分区重新分区,这样您就可以在不超过 2gig 限制的情况下操作数据。

大多数使用 spark 的人并没有真正使用大数据;对他们来说,如果 excel 可以容纳更大,或者画面对他们来说是大数据;大多数数据科学家使用高质量数据或使用足够小的样本量来处理极限。

当处理大量数据时,我不再需要返回到 mapreduce 并且只在数据被清理后才使用 spark。不幸的是,Spark 社区的大多数人都没有兴趣解决这个问题。

一个简单的解决方案是创建一个抽象并默认使用 bytearray;但是,允许使用 64 位数据指针重载 spark 作业以处理大型作业。

Spark 2.4.0 release removes this limit by replicating block data as a stream. See Spark-24926了解详情。