如何确保从 Parquet 加载 Spark DataFrame 是分布式和并行的?
How to ensure that loading of Spark DataFrame from Parquet is distributed and parallelized?
当 Spark 将文件中的源数据加载到 DataFrame 中时,哪些因素决定了数据是完全加载到单个节点(最有可能是 driver/master 节点)还是最小并行子集中的内存中需要计算(大概在 worker/executor 节点上)?
特别是,如果使用 Parquet 作为输入格式并通过 Spark DataFrame 加载 API,需要考虑哪些因素才能确保从 Parquet 文件加载并行化并延迟到执行程序,以及范围仅限于相关执行程序节点上计算所需的列?
(我希望了解 Spark 在分布式执行计划中用于调度源数据加载的机制,以避免通过加载完整数据集耗尽任何一个节点上的内存。)
只要你使用spark操作,所有的数据转换和聚合都只在executor上进行。因此驱动程序不需要加载数据,它的工作是管理处理流程。驱动程序仅在您使用某些终端操作时获取数据,例如 collect()
、first()
、show()
、toPandas()
、toLocalIterator()
等。此外,执行器不会将所有文件内容加载到内存中,而是获取最小的可能块(称为分区)。
如果您使用 Parquet 等列存储格式,则仅加载执行计划所需的列 - 这是 spark 中的默认行为。
编辑:我刚刚看到 spark 中可能存在错误,如果您在架构中使用嵌套列,则可能会加载不必要的列,请参阅:Why does Apache Spark read unnecessary Parquet columns within nested structures?
当 Spark 将文件中的源数据加载到 DataFrame 中时,哪些因素决定了数据是完全加载到单个节点(最有可能是 driver/master 节点)还是最小并行子集中的内存中需要计算(大概在 worker/executor 节点上)?
特别是,如果使用 Parquet 作为输入格式并通过 Spark DataFrame 加载 API,需要考虑哪些因素才能确保从 Parquet 文件加载并行化并延迟到执行程序,以及范围仅限于相关执行程序节点上计算所需的列?
(我希望了解 Spark 在分布式执行计划中用于调度源数据加载的机制,以避免通过加载完整数据集耗尽任何一个节点上的内存。)
只要你使用spark操作,所有的数据转换和聚合都只在executor上进行。因此驱动程序不需要加载数据,它的工作是管理处理流程。驱动程序仅在您使用某些终端操作时获取数据,例如 collect()
、first()
、show()
、toPandas()
、toLocalIterator()
等。此外,执行器不会将所有文件内容加载到内存中,而是获取最小的可能块(称为分区)。
如果您使用 Parquet 等列存储格式,则仅加载执行计划所需的列 - 这是 spark 中的默认行为。
编辑:我刚刚看到 spark 中可能存在错误,如果您在架构中使用嵌套列,则可能会加载不必要的列,请参阅:Why does Apache Spark read unnecessary Parquet columns within nested structures?