Spark - 读取许多小的镶木地板文件事先获取每个文件的状态
Spark - Reading many small parquet files gets status of each file before hand
我有数十万个较小的 parquet 文件,我正尝试定期使用 Spark 读入这些文件。我的应用程序运行,但在使用执行程序节点读入文件之前,驱动程序节点似乎正在获取每个文件的状态。我读了一下,这是推断架构和分区所必需的。我试过这样提供它们:
sparkSession.baseRelationToDataFrame(
DataSource
.apply(
sparkSession,
paths = paths, // List of thousands of parquet files in S3
partitionColumns = Seq("my_join_column"),
userSpecifiedSchema = Some(schema),
className = "parquet",
options = Seq().toMap
)
.resolveRelation(checkFilesExist = false)
)
但即使在提供架构和分区列时,也需要一段时间才能完成。在稍微查看 resolveRelation
代码之后,看起来它仍然需要查询每个文件的状态才能构建 InMemoryFileIndex
。
有什么办法可以解决这个问题吗?
我正在使用 spark-sql 2.3.1
.
目前的Spark架构还没有很好的办法避免这个问题。
不久前,我与一些 Spark 提交者合作进行了一项 LazyBaseRelation
设计,该设计可以延迟发现文件信息,直到必须知道数据源的分区数量(而不只是模式) ,在操作必须 运行 之前,这在技术上不是必需的,但我们从未完成这项工作。即便如此,当执行一个动作的时候,你也会受到打击。
有四种实用方法可以加快初始文件发现速度:
- 使用大型集群,因为文件发现的某些方面是分布式的。在某些环境中,您可以在发现完成后缩小集群。
- 在需要使用数据之前进行初步发现,以便希望在您需要时可以使用这些数据。我们在具有三级分区的数百万个大型 Parquet 文件中拥有数 PB 的数据。我们使用计划作业来刷新内存中的文件索引。
- 如果您使用的是 Databricks,请使用 Delta 的
OPTIMIZE
将小的 Parquet 文件合并为更少、更大的文件。请注意,Delta 需要额外付费。
- 自己实现
OPTIMIZE
的等价物,重写数据的子集。您能否轻松做到这一点取决于访问模式:您必须考虑幂等性和一致性。
完成初始发现后,缓存内存中的文件列表是您最好的朋友。有两种方法:
使用 Metastore,将您的数据注册为外部 table。您是否可以轻松地做到这一点取决于数据更新模式。如果数据是自然分区的,您可以使用 DDL 添加分区,并且可以轻松实现上面的策略 (4)。
构建您自己的 table 管理器。这就是我们所做的,因为 Metastore 实现对模式演变有 unacceptable 限制。您必须决定范围:driver/JVM-and SparkSession
是两个显而易见的选择。
祝你好运!
我有数十万个较小的 parquet 文件,我正尝试定期使用 Spark 读入这些文件。我的应用程序运行,但在使用执行程序节点读入文件之前,驱动程序节点似乎正在获取每个文件的状态。我读了一下,这是推断架构和分区所必需的。我试过这样提供它们:
sparkSession.baseRelationToDataFrame(
DataSource
.apply(
sparkSession,
paths = paths, // List of thousands of parquet files in S3
partitionColumns = Seq("my_join_column"),
userSpecifiedSchema = Some(schema),
className = "parquet",
options = Seq().toMap
)
.resolveRelation(checkFilesExist = false)
)
但即使在提供架构和分区列时,也需要一段时间才能完成。在稍微查看 resolveRelation
代码之后,看起来它仍然需要查询每个文件的状态才能构建 InMemoryFileIndex
。
有什么办法可以解决这个问题吗?
我正在使用 spark-sql 2.3.1
.
目前的Spark架构还没有很好的办法避免这个问题。
不久前,我与一些 Spark 提交者合作进行了一项 LazyBaseRelation
设计,该设计可以延迟发现文件信息,直到必须知道数据源的分区数量(而不只是模式) ,在操作必须 运行 之前,这在技术上不是必需的,但我们从未完成这项工作。即便如此,当执行一个动作的时候,你也会受到打击。
有四种实用方法可以加快初始文件发现速度:
- 使用大型集群,因为文件发现的某些方面是分布式的。在某些环境中,您可以在发现完成后缩小集群。
- 在需要使用数据之前进行初步发现,以便希望在您需要时可以使用这些数据。我们在具有三级分区的数百万个大型 Parquet 文件中拥有数 PB 的数据。我们使用计划作业来刷新内存中的文件索引。
- 如果您使用的是 Databricks,请使用 Delta 的
OPTIMIZE
将小的 Parquet 文件合并为更少、更大的文件。请注意,Delta 需要额外付费。 - 自己实现
OPTIMIZE
的等价物,重写数据的子集。您能否轻松做到这一点取决于访问模式:您必须考虑幂等性和一致性。
完成初始发现后,缓存内存中的文件列表是您最好的朋友。有两种方法:
使用 Metastore,将您的数据注册为外部 table。您是否可以轻松地做到这一点取决于数据更新模式。如果数据是自然分区的,您可以使用 DDL 添加分区,并且可以轻松实现上面的策略 (4)。
构建您自己的 table 管理器。这就是我们所做的,因为 Metastore 实现对模式演变有 unacceptable 限制。您必须决定范围:driver/JVM-and
SparkSession
是两个显而易见的选择。
祝你好运!