在 Spark 中对 RDD 执行分组并将每个组写入单独的 Parquet 文件

Perform group by on RDD in Spark and write each group as individual Parquet file

我在内存中有一个RDD。我想使用一些任意函数对 RDD 进行分组,然后将每个单独的组作为单独的 Parquet 文件写出。

例如,如果我的 RDD 由 JSON 个形式的字符串组成:

{"type":"finish","resolution":"success","csr_id": 214}
{"type":"create","resolution":"failure","csr_id": 321}
{"type":"action","resolution":"success","csr_id": 262}

我想按 "type" 属性 对 JSON 字符串进行分组,并将具有相同 "type" 的每组字符串写入同一个 Parquet 文件。

我可以看到 DataFrame API 支持如下写出 Parquet 文件(例如,如果 RDD 由 JSON 个字符串组成):

final JavaRDD<String> rdd = ...
final SQLContext sqlContext = SQLContext.getOrCreate(rdd.context());
final DataFrame dataFrame = sqlContext.read().json(rdd);
dataFrame.write().parquet(location);

这意味着整个 DataFrame 都写入了 Parquet 文件,因此 Parquet 文件将包含 "type" 属性.

具有不同值的记录

Dataframe API 还提供了一个 groupBy 函数:

final GroupedData groupedData = dataFrame.groupBy(this::myFunction);

但是 GroupedData API 似乎没有提供任何将每个组写入单个文件的功能。

有什么想法吗?

您不能写入 GroupedData 但您可以在写入时对数据进行分区:

dataFrame.write.partitionBy("type").format("parquet").save("/tmp/foo")

每种类型都将以${column}=${value}格式写入自己的目录。这些可以单独加载:

sqlContext.read.parquet("/tmp/foo/type=action").show
// +------+----------+
// |csr_id|resolution|
// +------+----------+
// |   262|   success|
// +------+----------+