Spark - 读取许多小的镶木地板文件事先获取每个文件的状态

Spark - Reading many small parquet files gets status of each file before hand

我有数十万个较小的 parquet 文件,我正尝试定期使用 Spark 读入这些文件。我的应用程序运行,但在使用执行程序节点读入文件之前,驱动程序节点似乎正在获取每个文件的状态。我读了一下,这是推断架构和分区所必需的。我试过这样提供它们:

sparkSession.baseRelationToDataFrame(
  DataSource
    .apply(
      sparkSession,
      paths = paths, // List of thousands of parquet files in S3
      partitionColumns = Seq("my_join_column"),
      userSpecifiedSchema = Some(schema),
      className = "parquet",
      options = Seq().toMap
    )
    .resolveRelation(checkFilesExist = false)
)

但即使在提供架构和分区列时,也需要一段时间才能完成。在稍微查看 resolveRelation 代码之后,看起来它仍然需要查询每个文件的状态才能构建 InMemoryFileIndex

有什么办法可以解决这个问题吗?

我正在使用 spark-sql 2.3.1.

目前的Spark架构还没有很好的办法避免这个问题。

不久前,我与一些 Spark 提交者合作进行了一项 LazyBaseRelation 设计,该设计可以延迟发现文件信息,直到必须知道数据源的分区数量(而不只是模式) ,在操作必须 运行 之前,这在技术上不是必需的,但我们从未完成这项工作。即便如此,当执行一个动作的时候,你也会受到打击。

有四种实用方法可以加快初始文件发现速度:

  1. 使用大型集群,因为文件发现的某些方面是分布式的。在某些环境中,您可以在发现完成后缩小集群。
  2. 需要使用数据之前进行初步发现,以便希望在您需要时可以使用这些数据。我们在具有三级分区的数百万个大型 Parquet 文件中拥有数 PB 的数据。我们使用计划作业来刷新内存中的文件索引。
  3. 如果您使用的是 Databricks,请使用 Delta 的 OPTIMIZE 将小的 Parquet 文件合并为更少、更大的文件。请注意,Delta 需要额外付费。
  4. 自己实现 OPTIMIZE 的等价物,重写数据的子集。您能否轻松做到这一点取决于访问模式:您必须考虑幂等性和一致性。

完成初始发现后,缓存内存中的文件列表是您最好的朋友。有两种方法:

  • 使用 Metastore,将您的数据注册为外部 table。您是否可以轻松地做到这一点取决于数据更新模式。如果数据是自然分区的,您可以使用 DDL 添加分区,并且可以轻松实现上面的策略 (4)。

  • 构建您自己的 table 管理器。这就是我们所做的,因为 Metastore 实现对模式演变有 unacceptable 限制。您必须决定范围:driver/JVM-and SparkSession 是两个显而易见的选择。

祝你好运!