以分布式方式在 Spark 中读取 CSV 文件
Reading CSV file in Spark in a distributed manner
我正在开发一个 Spark 处理框架,它读取大型 CSV 文件,将它们加载到 RDD 中,执行一些转换,最后保存一些统计信息。
有问题的 CSV 文件平均约为 50GB。我正在使用 Spark 2.0.
我的问题是:
当我使用sparkContext.textFile()函数加载文件时,是否需要先将文件存储在驱动程序的内存中,然后再分发给工作人员(因此需要相当大的数量驱动程序上的内存)?或者文件由每个工作人员读取 "in parallel",其中 none 需要存储整个文件,而驱动程序仅充当 "manager"?
提前致谢
当您定义读取时,文件将根据您的并行方案划分为多个分区,并将指令发送给工作人员。然后文件由工作人员直接从文件系统读取(因此需要一个分布式文件系统可用于所有节点,如 HDFS)。
作为旁注,使用 spark.read.csv 而不是在 RDD 中将其读入数据帧会更好。这将占用更少的内存,并允许 spark 优化您的查询。
更新
在评论中,有人问如果文件系统不分布式,文件只位于一台机器上,会发生什么情况。
答案是,如果你有超过 1 台机器,它很可能会失败。
当您执行 sparkContext.textFile 时,实际上并没有读取任何内容,它只是告诉 spark 您想要读取的内容。然后你对它做了一些转换,但仍然没有任何内容被读取,因为你正在定义一个计划。一旦你执行了一个动作(例如收集),那么实际的处理就开始了。 Spark 会将工作分解为任务,并将它们发送给执行者。然后,执行程序(可能在主节点或工作节点上)将尝试读取文件的一部分。问题是任何不在主节点上的执行程序都会查找该文件,但找不到它会导致任务失败。 Spark 会重试几次(我相信默认是 4 次)然后完全失败。
当然,如果你只有一个节点,那么所有的执行者都会看到这个文件,一切都会好起来的。同样在理论上,任务可能会在 worker 上失败,然后在 master 上重新运行并在那里成功,但在任何情况下,worker 都不会做任何工作,除非他们看到文件的副本。
您可以通过将文件复制到所有节点中完全相同的路径或使用任何类型的分布式文件系统(甚至 NFS 共享也可以)来解决此问题。
当然,您始终可以在单个节点上工作,但那样您就无法利用 spark 的可扩展性。
我正在开发一个 Spark 处理框架,它读取大型 CSV 文件,将它们加载到 RDD 中,执行一些转换,最后保存一些统计信息。
有问题的 CSV 文件平均约为 50GB。我正在使用 Spark 2.0.
我的问题是:
当我使用sparkContext.textFile()函数加载文件时,是否需要先将文件存储在驱动程序的内存中,然后再分发给工作人员(因此需要相当大的数量驱动程序上的内存)?或者文件由每个工作人员读取 "in parallel",其中 none 需要存储整个文件,而驱动程序仅充当 "manager"?
提前致谢
当您定义读取时,文件将根据您的并行方案划分为多个分区,并将指令发送给工作人员。然后文件由工作人员直接从文件系统读取(因此需要一个分布式文件系统可用于所有节点,如 HDFS)。
作为旁注,使用 spark.read.csv 而不是在 RDD 中将其读入数据帧会更好。这将占用更少的内存,并允许 spark 优化您的查询。
更新
在评论中,有人问如果文件系统不分布式,文件只位于一台机器上,会发生什么情况。 答案是,如果你有超过 1 台机器,它很可能会失败。
当您执行 sparkContext.textFile 时,实际上并没有读取任何内容,它只是告诉 spark 您想要读取的内容。然后你对它做了一些转换,但仍然没有任何内容被读取,因为你正在定义一个计划。一旦你执行了一个动作(例如收集),那么实际的处理就开始了。 Spark 会将工作分解为任务,并将它们发送给执行者。然后,执行程序(可能在主节点或工作节点上)将尝试读取文件的一部分。问题是任何不在主节点上的执行程序都会查找该文件,但找不到它会导致任务失败。 Spark 会重试几次(我相信默认是 4 次)然后完全失败。
当然,如果你只有一个节点,那么所有的执行者都会看到这个文件,一切都会好起来的。同样在理论上,任务可能会在 worker 上失败,然后在 master 上重新运行并在那里成功,但在任何情况下,worker 都不会做任何工作,除非他们看到文件的副本。
您可以通过将文件复制到所有节点中完全相同的路径或使用任何类型的分布式文件系统(甚至 NFS 共享也可以)来解决此问题。
当然,您始终可以在单个节点上工作,但那样您就无法利用 spark 的可扩展性。