Spark 数据集 - 每行的 "edit" 个镶木地板文件

Spark Dataset - "edit" parquet file for each row

上下文

我正在尝试使用 Spark/Scala 以有效地“编辑”多个镶木地板文件(可能超过 50k+)。唯一需要完成的编辑是根据给定的一组行 ID 进行删除(即删除 records/rows)。

parquet 文件作为分区 DataFrame 存储在 s3 中,示例分区如下所示:

s3://mybucket/transformed/year=2021/month=11/day=02/*.snappy.parquet

每个分区最多可以有 100 个 parquet 文件,每个文件的大小在 50mb 到 500mb 之间。

输入

我们得到一个名为 filesToModify 的火花 Dataset[MyClass],它有 2 列:

  1. s3path: String = s3 中需要编辑的 parquet 文件的完整 s3 路径
  2. ids: Set[String] = 位于s3path
  3. 的parquet文件中需要删除的一组ID(行)

示例输入数据集 filesToModify:

s3path ids
s3://mybucket/transformed/year=2021/month=11/day=02/part-1.snappy.parquet Set("a", "b")
s3://mybucket/transformed/year=2021/month=11/day=02/part-2.snappy.parquet Set("b")

预期行为

给定 filesToModify 我想利用 Spark 中的并行性对每个 row:

执行以下操作
  1. 加载位于row.s3path
  2. 的parquet文件
  3. 过滤,以便我们排除 id 在集合 row.ids
  4. 中的任何行
  5. 计算 row.ids 中每个 ID 的 deleted/excluded 行数(可选)
  6. 将过滤后的数据存回原处row.s3path覆盖文件
  7. Return删除的行数(可选)

我试过的

我试过使用 filesToModify.map(row => deleteIDs(row.s3path, row.ids)),其中 deleteIDs 看起来像这样:

def deleteIDs(s3path: String, ids: Set[String]): Int = {
    import spark.implicits._
    val data = spark
        .read
        .parquet(s3path)
        .as[DataModel]

    val clean = data
        .filter(not(col("id").isInCollection(ids)))

    // write to a temp directory and then upload to s3 with same
    // prefix as original file to overwrite it
    writeToSingleFile(clean, s3path)

    1 // dummy output for simplicity (otherwise it should correspond to the number of deleted rows)
    }

然而,当在 map 操作中执行时,这会导致 NullPointerException。如果我在 map 块之外单独执行它,那么它可以工作,但我不明白为什么它不在其中(与惰性评估有关?)。

传递给 deleteIDs

s3pathids 参数实际上分别不是字符串和集合。它们是列。

为了对这些值进行操作,您可以创建一个接受列而不是内部类型的 UDF,或者您可以收集足够小的数据集,以便您可以使用 deleteIDs 直接发挥作用。如果您想利用 Spark 的并行性,前者可能是您的最佳选择。

您可以阅读有关 UDF 的内容here

你得到一个 NullPointerException 因为你试图从执行者那里检索你的 spark 会话。

它不是显式的,但要执行 spark 操作,您的 DeleteIDs 函数需要检索活动的 spark 会话。为此,它从 SparkSession 对象调用方法 getActiveSession。但是当从执行者调用时,这个 getActiveSession 方法 returns NoneSparkSession's source code:

中所述

Returns the default SparkSession that is returned by the builder.

Note: Return None, when calling this function on executors

因此当您的代码开始使用此 None spark 会话时抛出 NullPointerException

更一般地说,您不能重新创建数据集并在另一个数据集的转换中使用 spark transformations/actions。

所以我看到了两个解决你问题的方法:

  • 要么在不使用 spark 的情况下重写 DeleteIDs 函数的代码,要么使用 parquet4s 修改您的 parquet 文件。
  • 或将 filesToModify 转换为 Scala 集合并使用 Scala 的 map 而不是 Spark 的集合。