在 Spark 中对 RDD 执行分组并将每个组写入单独的 Parquet 文件
Perform group by on RDD in Spark and write each group as individual Parquet file
我在内存中有一个RDD。我想使用一些任意函数对 RDD 进行分组,然后将每个单独的组作为单独的 Parquet 文件写出。
例如,如果我的 RDD 由 JSON 个形式的字符串组成:
{"type":"finish","resolution":"success","csr_id": 214}
{"type":"create","resolution":"failure","csr_id": 321}
{"type":"action","resolution":"success","csr_id": 262}
我想按 "type" 属性 对 JSON 字符串进行分组,并将具有相同 "type" 的每组字符串写入同一个 Parquet 文件。
我可以看到 DataFrame API 支持如下写出 Parquet 文件(例如,如果 RDD 由 JSON 个字符串组成):
final JavaRDD<String> rdd = ...
final SQLContext sqlContext = SQLContext.getOrCreate(rdd.context());
final DataFrame dataFrame = sqlContext.read().json(rdd);
dataFrame.write().parquet(location);
这意味着整个 DataFrame 都写入了 Parquet 文件,因此 Parquet 文件将包含 "type" 属性.
具有不同值的记录
Dataframe API 还提供了一个 groupBy 函数:
final GroupedData groupedData = dataFrame.groupBy(this::myFunction);
但是 GroupedData API 似乎没有提供任何将每个组写入单个文件的功能。
有什么想法吗?
您不能写入 GroupedData
但您可以在写入时对数据进行分区:
dataFrame.write.partitionBy("type").format("parquet").save("/tmp/foo")
每种类型都将以${column}=${value}
格式写入自己的目录。这些可以单独加载:
sqlContext.read.parquet("/tmp/foo/type=action").show
// +------+----------+
// |csr_id|resolution|
// +------+----------+
// | 262| success|
// +------+----------+
我在内存中有一个RDD。我想使用一些任意函数对 RDD 进行分组,然后将每个单独的组作为单独的 Parquet 文件写出。
例如,如果我的 RDD 由 JSON 个形式的字符串组成:
{"type":"finish","resolution":"success","csr_id": 214}
{"type":"create","resolution":"failure","csr_id": 321}
{"type":"action","resolution":"success","csr_id": 262}
我想按 "type" 属性 对 JSON 字符串进行分组,并将具有相同 "type" 的每组字符串写入同一个 Parquet 文件。
我可以看到 DataFrame API 支持如下写出 Parquet 文件(例如,如果 RDD 由 JSON 个字符串组成):
final JavaRDD<String> rdd = ...
final SQLContext sqlContext = SQLContext.getOrCreate(rdd.context());
final DataFrame dataFrame = sqlContext.read().json(rdd);
dataFrame.write().parquet(location);
这意味着整个 DataFrame 都写入了 Parquet 文件,因此 Parquet 文件将包含 "type" 属性.
具有不同值的记录Dataframe API 还提供了一个 groupBy 函数:
final GroupedData groupedData = dataFrame.groupBy(this::myFunction);
但是 GroupedData API 似乎没有提供任何将每个组写入单个文件的功能。
有什么想法吗?
您不能写入 GroupedData
但您可以在写入时对数据进行分区:
dataFrame.write.partitionBy("type").format("parquet").save("/tmp/foo")
每种类型都将以${column}=${value}
格式写入自己的目录。这些可以单独加载:
sqlContext.read.parquet("/tmp/foo/type=action").show
// +------+----------+
// |csr_id|resolution|
// +------+----------+
// | 262| success|
// +------+----------+