Spark 数据集 - 每行的 "edit" 个镶木地板文件
Spark Dataset - "edit" parquet file for each row
上下文
我正在尝试使用 Spark/Scala 以有效地“编辑”多个镶木地板文件(可能超过 50k+)。唯一需要完成的编辑是根据给定的一组行 ID 进行删除(即删除 records/rows)。
parquet 文件作为分区 DataFrame 存储在 s3 中,示例分区如下所示:
s3://mybucket/transformed/year=2021/month=11/day=02/*.snappy.parquet
每个分区最多可以有 100 个 parquet 文件,每个文件的大小在 50mb 到 500mb 之间。
输入
我们得到一个名为 filesToModify
的火花 Dataset[MyClass]
,它有 2 列:
s3path: String
= s3 中需要编辑的 parquet 文件的完整 s3 路径
ids: Set[String]
= 位于s3path
的parquet文件中需要删除的一组ID(行)
示例输入数据集 filesToModify
:
s3path
ids
s3://mybucket/transformed/year=2021/month=11/day=02/part-1.snappy.parquet
Set("a", "b")
s3://mybucket/transformed/year=2021/month=11/day=02/part-2.snappy.parquet
Set("b")
预期行为
给定 filesToModify
我想利用 Spark 中的并行性对每个 row
:
执行以下操作
- 加载位于
row.s3path
的parquet文件
- 过滤,以便我们排除
id
在集合 row.ids
中的任何行
- 计算
row.ids
中每个 ID 的 deleted/excluded 行数(可选)
- 将过滤后的数据存回原处
row.s3path
覆盖文件
- Return删除的行数(可选)
我试过的
我试过使用 filesToModify.map(row => deleteIDs(row.s3path, row.ids))
,其中 deleteIDs
看起来像这样:
def deleteIDs(s3path: String, ids: Set[String]): Int = {
import spark.implicits._
val data = spark
.read
.parquet(s3path)
.as[DataModel]
val clean = data
.filter(not(col("id").isInCollection(ids)))
// write to a temp directory and then upload to s3 with same
// prefix as original file to overwrite it
writeToSingleFile(clean, s3path)
1 // dummy output for simplicity (otherwise it should correspond to the number of deleted rows)
}
然而,当在 map
操作中执行时,这会导致 NullPointerException
。如果我在 map
块之外单独执行它,那么它可以工作,但我不明白为什么它不在其中(与惰性评估有关?)。
传递给 deleteIDs
的 s3path
和 ids
参数实际上分别不是字符串和集合。它们是列。
为了对这些值进行操作,您可以创建一个接受列而不是内部类型的 UDF,或者您可以收集足够小的数据集,以便您可以使用 deleteIDs
直接发挥作用。如果您想利用 Spark 的并行性,前者可能是您的最佳选择。
您可以阅读有关 UDF 的内容here
你得到一个 NullPointerException
因为你试图从执行者那里检索你的 spark 会话。
它不是显式的,但要执行 spark 操作,您的 DeleteIDs
函数需要检索活动的 spark 会话。为此,它从 SparkSession
对象调用方法 getActiveSession
。但是当从执行者调用时,这个 getActiveSession
方法 returns None
如 SparkSession's source code:
中所述
Returns the default SparkSession that is returned by the builder.
Note: Return None, when calling this function on executors
因此当您的代码开始使用此 None
spark 会话时抛出 NullPointerException
。
更一般地说,您不能重新创建数据集并在另一个数据集的转换中使用 spark transformations/actions。
所以我看到了两个解决你问题的方法:
- 要么在不使用 spark 的情况下重写
DeleteIDs
函数的代码,要么使用 parquet4s 修改您的 parquet 文件。
- 或将
filesToModify
转换为 Scala 集合并使用 Scala 的 map
而不是 Spark 的集合。
上下文
我正在尝试使用 Spark/Scala 以有效地“编辑”多个镶木地板文件(可能超过 50k+)。唯一需要完成的编辑是根据给定的一组行 ID 进行删除(即删除 records/rows)。
parquet 文件作为分区 DataFrame 存储在 s3 中,示例分区如下所示:
s3://mybucket/transformed/year=2021/month=11/day=02/*.snappy.parquet
每个分区最多可以有 100 个 parquet 文件,每个文件的大小在 50mb 到 500mb 之间。
输入
我们得到一个名为 filesToModify
的火花 Dataset[MyClass]
,它有 2 列:
s3path: String
= s3 中需要编辑的 parquet 文件的完整 s3 路径ids: Set[String]
= 位于s3path
的parquet文件中需要删除的一组ID(行)
示例输入数据集 filesToModify
:
s3path | ids |
---|---|
s3://mybucket/transformed/year=2021/month=11/day=02/part-1.snappy.parquet | Set("a", "b") |
s3://mybucket/transformed/year=2021/month=11/day=02/part-2.snappy.parquet | Set("b") |
预期行为
给定 filesToModify
我想利用 Spark 中的并行性对每个 row
:
- 加载位于
row.s3path
的parquet文件
- 过滤,以便我们排除
id
在集合row.ids
中的任何行
- 计算
row.ids
中每个 ID 的 deleted/excluded 行数(可选) - 将过滤后的数据存回原处
row.s3path
覆盖文件 - Return删除的行数(可选)
我试过的
我试过使用 filesToModify.map(row => deleteIDs(row.s3path, row.ids))
,其中 deleteIDs
看起来像这样:
def deleteIDs(s3path: String, ids: Set[String]): Int = {
import spark.implicits._
val data = spark
.read
.parquet(s3path)
.as[DataModel]
val clean = data
.filter(not(col("id").isInCollection(ids)))
// write to a temp directory and then upload to s3 with same
// prefix as original file to overwrite it
writeToSingleFile(clean, s3path)
1 // dummy output for simplicity (otherwise it should correspond to the number of deleted rows)
}
然而,当在 map
操作中执行时,这会导致 NullPointerException
。如果我在 map
块之外单独执行它,那么它可以工作,但我不明白为什么它不在其中(与惰性评估有关?)。
deleteIDs
的 s3path
和 ids
参数实际上分别不是字符串和集合。它们是列。
为了对这些值进行操作,您可以创建一个接受列而不是内部类型的 UDF,或者您可以收集足够小的数据集,以便您可以使用 deleteIDs
直接发挥作用。如果您想利用 Spark 的并行性,前者可能是您的最佳选择。
您可以阅读有关 UDF 的内容here
你得到一个 NullPointerException
因为你试图从执行者那里检索你的 spark 会话。
它不是显式的,但要执行 spark 操作,您的 DeleteIDs
函数需要检索活动的 spark 会话。为此,它从 SparkSession
对象调用方法 getActiveSession
。但是当从执行者调用时,这个 getActiveSession
方法 returns None
如 SparkSession's source code:
Returns the default SparkSession that is returned by the builder.
Note: Return None, when calling this function on executors
因此当您的代码开始使用此 None
spark 会话时抛出 NullPointerException
。
更一般地说,您不能重新创建数据集并在另一个数据集的转换中使用 spark transformations/actions。
所以我看到了两个解决你问题的方法:
- 要么在不使用 spark 的情况下重写
DeleteIDs
函数的代码,要么使用 parquet4s 修改您的 parquet 文件。 - 或将
filesToModify
转换为 Scala 集合并使用 Scala 的map
而不是 Spark 的集合。