使用 SparkR 编写分区的 parquet 文件

Writing a partitioned parquet file with SparkR

我有两个脚本,一个在 R 中,另一个在 pyspark 中使用输出。为了简单起见,我正在尝试将该功能复制到第一个脚本中。

第二个脚本非常简单——读取一堆 csv 文件并将它们作为分区的 parquet 发出:

spark.read.csv(path_to_csv, header = True) \
     .repartition(partition_column).write \
     .partitionBy(partition_column).mode('overwrite') \
     .parquet(path_to_parquet)

这在 R 中应该同样简单,但我不知道如何匹配 SparkR 中的 partitionBy 功能。到目前为止我已经知道了:

library(SparkR); library(magrittr)
read.df(path_to_csv, 'csv', header = TRUE) %>%
  repartition(col = .$partition_column) %>%
  write.df(path_to_parquet, 'parquet', mode = 'overwrite')

这成功地为 partition_column 的每个值写入了一个 parquet 文件。问题是发出的文件目录结构错误;而 Python 产生类似

的东西
/path/to/parquet/
  partition_column=key1/
    file.parquet.gz
  partition_column=key2/
    file.parquet.gz
  ...

R 只生产

/path/to/parquet/
  file_for_key1.parquet.gz
  file_for_key2.parquet.gz
  ...

我错过了什么吗? SparkR 中的 partitionBy 函数似乎仅指代 window 函数的上下文,我在手册中看不到任何其他可能相关的内容。也许有一种方法可以在 ... 中传递一些东西,但我没有在文档或在线搜索中看到任何示例。

Spark <= 2.x 不支持输出分区。

但是,SparR >= 3.0.0 (SPARK-21291 - R partitionBy API) 将支持它,语法如下:

write.df(
  df, path_to_csv, "parquet", mode = "overwrite",
  partitionBy = "partition_column"
)

因为 corresponding PR 只修改 R 文件,你应该能够修补任何 SparkR 2.x 发行版,如果升级到开发版本不是一个选项:

git clone https://github.com/apache/spark.git
git checkout v2.4.3  # Or whatever branch you use
# https://github.com/apache/spark/commit/cb77a6689137916e64bc5692b0c942e86ca1a0ea
git cherry-pick cb77a6689137916e64bc5692b0c942e86ca1a0ea
R -e "devtools::install('R/pkg')"

在客户端模式下,这应该只在驱动程序节点上需要。

但这些都不是致命的,应该不会引起任何严重的问题。