SQL 中的查询 Spark/scala 大小超过 Integer.MAX_VALUE
SQL query in Spark/scala Size exceeds Integer.MAX_VALUE
我正在尝试使用 Spark 针对 S3 事件创建一个简单的 sql 查询。我正在加载 ~30GB 的 JSON 文件如下:
val d2 = spark.read.json("s3n://myData/2017/02/01/1234");
d2.persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK);
d2.registerTempTable("d2");
然后我尝试将查询结果写入文件:
val users_count = sql("select count(distinct data.user_id) from d2");
users_count.write.format("com.databricks.spark.csv").option("header", "true").save("s3n://myfolder/UsersCount.csv");
但 Spark 抛出以下异常:
java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)
at org.apache.spark.storage.DiskStore$$anonfun$getBytes.apply(DiskStore.scala:103)
at org.apache.spark.storage.DiskStore$$anonfun$getBytes.apply(DiskStore.scala:91)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1287)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105)
at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:439)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:672)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
请注意,同一查询适用于较少量的数据。这里有什么问题?
Spark shuffle 块不能大于 2GB(Integer.MAX_VALUE 字节),因此您需要更多/更小的分区。
您应该调整 spark.default.parallelism 和 spark.sql.shuffle.partitions(默认 200),这样分区的数量可以容纳您的数据而不会达到 2GB 的限制(您可以尝试将目标设为 256MB/分区,因此 200GB你得到 800 个分区)。数千个分区很常见,所以不要害怕按照建议重新分区到 1000 个。
仅供参考,您可以使用 rdd.getNumPartitions(即 d2.rdd.getNumPartitions)
之类的内容检查 RDD 的分区数
有一个故事可以追踪解决各种 2GB 限制的努力(现在已经开放了一段时间):https://issues.apache.org/jira/browse/SPARK-6235
有关此错误的详细信息,请参阅 http://www.slideshare.net/cloudera/top-5-mistakes-to-avoid-when-writing-apache-spark-applications/25。
当我使用Spark核心处理200G数据时,设置了--conf spark.default.parallelism = 2000
和.repartition(100)
,但是会出现错误,最后,我使用如下设置解决:
val conf = new SparkConf()
.setAppName(appName)
.set("spark.rdd.compress", "true")
Description of spark.rdd.compress
希望对你有帮助
我正在尝试使用 Spark 针对 S3 事件创建一个简单的 sql 查询。我正在加载 ~30GB 的 JSON 文件如下:
val d2 = spark.read.json("s3n://myData/2017/02/01/1234");
d2.persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK);
d2.registerTempTable("d2");
然后我尝试将查询结果写入文件:
val users_count = sql("select count(distinct data.user_id) from d2");
users_count.write.format("com.databricks.spark.csv").option("header", "true").save("s3n://myfolder/UsersCount.csv");
但 Spark 抛出以下异常:
java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)
at org.apache.spark.storage.DiskStore$$anonfun$getBytes.apply(DiskStore.scala:103)
at org.apache.spark.storage.DiskStore$$anonfun$getBytes.apply(DiskStore.scala:91)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1287)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105)
at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:439)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:672)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
请注意,同一查询适用于较少量的数据。这里有什么问题?
Spark shuffle 块不能大于 2GB(Integer.MAX_VALUE 字节),因此您需要更多/更小的分区。
您应该调整 spark.default.parallelism 和 spark.sql.shuffle.partitions(默认 200),这样分区的数量可以容纳您的数据而不会达到 2GB 的限制(您可以尝试将目标设为 256MB/分区,因此 200GB你得到 800 个分区)。数千个分区很常见,所以不要害怕按照建议重新分区到 1000 个。
仅供参考,您可以使用 rdd.getNumPartitions(即 d2.rdd.getNumPartitions)
之类的内容检查 RDD 的分区数有一个故事可以追踪解决各种 2GB 限制的努力(现在已经开放了一段时间):https://issues.apache.org/jira/browse/SPARK-6235
有关此错误的详细信息,请参阅 http://www.slideshare.net/cloudera/top-5-mistakes-to-avoid-when-writing-apache-spark-applications/25。
当我使用Spark核心处理200G数据时,设置了--conf spark.default.parallelism = 2000
和.repartition(100)
,但是会出现错误,最后,我使用如下设置解决:
val conf = new SparkConf()
.setAppName(appName)
.set("spark.rdd.compress", "true")
Description of spark.rdd.compress
希望对你有帮助