在 Spark 中导入镶木地板文件时出现内存问题

Memory issue when importing parquet files in Spark

我正在尝试从 Scala Spark (1.5) 中的镶木地板文件中查询数据,包括 200 万行的查询(以下代码中的"variants")。

val sqlContext = new org.apache.spark.sql.SQLContext(sc)  
sqlContext.sql("SET spark.sql.parquet.binaryAsString=true")

val parquetFile = sqlContext.read.parquet(<path>)

parquetFile.registerTempTable("tmpTable")
sqlContext.cacheTable("tmpTable")

val patients = sqlContext.sql("SELECT DISTINCT patient FROM tmpTable ...)

val variants = sqlContext.sql("SELECT DISTINCT ... FROM tmpTable ... )

当获取的行数较少时运行良好,但在请求大量数据时失败并出现 "Size exceeds Integer.MAX_VALUE" 错误。 错误如下所示:

User class threw exception: org.apache.spark.SparkException:
Job aborted due to stage failure: Task 43 in stage 1.0 failed 4 times,
most recent failure: Lost task 43.3 in stage 1.0 (TID 123, node009):
java.lang.RuntimeException: java.lang.IllegalArgumentException:
Size exceeds Integer.MAX_VALUE at
sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) at
org.apache.spark.storage.DiskStore$$anonfun$getBytes.apply(DiskStore.scala:125) at
org.apache.spark.storage.DiskStore$$anonfun$getBytes.apply(DiskStore.scala:113) at ...

我该怎么做才能完成这项工作?

这看起来像是一个内存问题,但我已经尝试使用多达 100 个执行器,没有任何区别(无论涉及多少执行器,失败所需的时间都保持不变)。感觉数据没有跨节点分区?

我试图通过天真地替换这条线来强制更高的并行化,但无济于事:

val variants = sqlContext.sql("SELECT DISTINCT ... FROM tmpTable ... ).repartition(sc.defaultParallelism*10)

我认为这个问题不是镶木地板特有的。您 "hitting" 限制了 Spark 中分区的最大大小。

Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) at ...

Integer.MAX_VALUE 检测到您的分区大小(我相信)超过 2GB(需要超过 int32 来为其编制索引)。

Joe Widen 的评论很到位。您需要对数据进行更多的重新分区。尝试 1000 或更多。

例如,

val data = sqlContext.read.parquet("data.parquet").rdd.repartition(1000).toDF