使用 Spark 处理大于集群可以容纳的数据集

Using Spark to process dataset larger than the cluster can fit

我在 Spark 2.3 集群上有 5 个节点,每个节点有 12Gb 的可用内存,我正在尝试使用大约 130Gb 的 Parquet 数据集,我在其上创建了一个分区的外部 Hive table .

假设我想知道数据集中的记录数。我最初的猜测是 Spark 会逐个分区读取数据,聚合当前分区以获取记录数,将结果传递给驱动程序,然后删除该分区以读取下一个分区。但是,这不是它的工作方式(相反,Spark 尝试首先读取内存中的所有内容),或者我编码错误。

sql("select count(*) from myhivetable") 这样愚蠢的方法不起作用:作业失败并出现 java 堆 space 错误。 sql("select * from myhivetable").count() 也没有(我想它们无论如何都被编译为相同的执行计划)。

我可能会忘记配置单元 table,使用文件系统 API 获取包含 table 的文件列表,然后逐个文件对记录进行计数,总结计入结果,例如 fileList.foldLeft{ (recCount, file) => file.read(parquet).count + recCount} —— 但是,a) 此方法可能不会 "scale" 用于其他可能的用例,b) 我仍然认为应该有更优雅的方法来做到这一点只使用 Spark 工具集。我没看到吗?

您不需要集群内存来容纳所有数据。这是镶木地板数据,因此解压缩需要一些内存。尝试增加执行程序内存(它是计数查询,因此驱动程序内存应该不是问题)。如果在此之后仍然失败,请分享堆栈跟踪。

假设每个节点有 8 个核心,您可以尝试将这些参数与 spark-submitspark-shell 一起使用:

Total memory - 5 * 12GB = 60GB Total Cores - 5 * 8 = 40

--driver-cores 1
--driver-memory 2G
--num-executors 9
--executor-cores 4
--executor-memory 6G

如果还是不行,你能换个号码再试一次吗?请 post 错误日志,Spark UI 屏幕截图。

我不确定使用 SELECT COUNT(1) FROM table 而不是 SELECT COUNT(*) FROM table 是否有任何区别!!