Spark:将 DataFrame 写入压缩 JSON
Spark: writing DataFrame as compressed JSON
Apache Spark 的 DataFrameReader.json()
可以自动处理压缩的 JSONlines 文件,但似乎没有办法让 DataFrameWriter.json()
写入压缩的 JSONlines 文件。额外的网络 I/O 在云中非常昂贵。
有办法解决这个问题吗?
以下解决方案使用 pyspark,但我认为 Scala 中的代码会类似。
第一个选项是在初始化 SparkConf 时设置以下内容:
conf = SparkConf()
conf.set("spark.hadoop.mapred.output.compress", "true")
conf.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec")
conf.set("spark.hadoop.mapred.output.compression.type", "BLOCK")
使用上面的代码,您使用 sparkContext 生成的任何文件都会使用 gzip 自动压缩。
第二个选项,如果您只想压缩上下文中选定的文件。假设 "df" 是您的数据框,文件名是您的目的地:
df_rdd = self.df.toJSON()
df_rdd.saveAsTextFile(filename,compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec")
使用 Spark 2.X(也许更早,我没有测试)有一种更简单的方法来编写压缩 JSON,不需要更改配置:
val df: DataFrame = ...
df.write.option("compression", "gzip").json("/foo/bar")
这也适用于 CSV 和 Parquet,只需使用 .csv() 和 .parquet() 而不是 .json() 在设置压缩选项后写入文件。
可能的编解码器是:none、bzip2、deflate、gzip、lz4 和 snappy。
在 SparkConf
上设置压缩选项 不是 一个好的做法,作为公认的答案。它改变了 全局 的行为,而不是在每个文件的基础上指示设置。事实上,显式 总是 比隐式更好。在某些情况下,用户无法轻松操作上下文配置,例如 spark-shell 或设计为另一个子模块的代码。
正确的方法
自 Spark 1.4 起支持压缩写入 DataFrame
。实现该目标的几种方法:
一个
df.write.json("filename.json", compression="gzip")
就是这样!随便用DataFrameWriter.json()
就可以了。
魔法就藏在代码里pyspark/sql/readwriter.py
@since(1.4)
def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None):
"""Saves the content of the :class:`DataFrame` in JSON format
(`JSON Lines text format or newline-delimited JSON <http://jsonlines.org/>`_) at the
specified path.
:param path: the path in any Hadoop supported file system
:param mode: ...
:param compression: compression codec to use when saving to file. This can be one of the
known case-insensitive shorten names (none, bzip2, gzip, lz4,
snappy and deflate).
:param dateFormat: ...
:param timestampFormat: ...
>>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data'))
"""
self.mode(mode)
self._set_opts(
compression=compression, dateFormat=dateFormat, timestampFormat=timestampFormat)
self._jwrite.json(path)
支持的压缩格式有bzip2、gzip、lz4、snappy和deflate,不区分大小写。
scala API 应该是一样的。
另一个
df.write.options(compression="gzip").json("filename.json")
同上。可以提供更多选项作为关键字参数。自 Spark 1.4 起可用。
第三
df.write.option("compression", "gzip").json("filename.json")
DataFrameWriter.option()
从 Spark 1.5 开始添加。一次只能添加一个参数。
Apache Spark 的 DataFrameReader.json()
可以自动处理压缩的 JSONlines 文件,但似乎没有办法让 DataFrameWriter.json()
写入压缩的 JSONlines 文件。额外的网络 I/O 在云中非常昂贵。
有办法解决这个问题吗?
以下解决方案使用 pyspark,但我认为 Scala 中的代码会类似。
第一个选项是在初始化 SparkConf 时设置以下内容:
conf = SparkConf()
conf.set("spark.hadoop.mapred.output.compress", "true")
conf.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec")
conf.set("spark.hadoop.mapred.output.compression.type", "BLOCK")
使用上面的代码,您使用 sparkContext 生成的任何文件都会使用 gzip 自动压缩。
第二个选项,如果您只想压缩上下文中选定的文件。假设 "df" 是您的数据框,文件名是您的目的地:
df_rdd = self.df.toJSON()
df_rdd.saveAsTextFile(filename,compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec")
使用 Spark 2.X(也许更早,我没有测试)有一种更简单的方法来编写压缩 JSON,不需要更改配置:
val df: DataFrame = ...
df.write.option("compression", "gzip").json("/foo/bar")
这也适用于 CSV 和 Parquet,只需使用 .csv() 和 .parquet() 而不是 .json() 在设置压缩选项后写入文件。
可能的编解码器是:none、bzip2、deflate、gzip、lz4 和 snappy。
在 SparkConf
上设置压缩选项 不是 一个好的做法,作为公认的答案。它改变了 全局 的行为,而不是在每个文件的基础上指示设置。事实上,显式 总是 比隐式更好。在某些情况下,用户无法轻松操作上下文配置,例如 spark-shell 或设计为另一个子模块的代码。
正确的方法
自 Spark 1.4 起支持压缩写入 DataFrame
。实现该目标的几种方法:
一个
df.write.json("filename.json", compression="gzip")
就是这样!随便用DataFrameWriter.json()
就可以了。
魔法就藏在代码里pyspark/sql/readwriter.py
@since(1.4)
def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None):
"""Saves the content of the :class:`DataFrame` in JSON format
(`JSON Lines text format or newline-delimited JSON <http://jsonlines.org/>`_) at the
specified path.
:param path: the path in any Hadoop supported file system
:param mode: ...
:param compression: compression codec to use when saving to file. This can be one of the
known case-insensitive shorten names (none, bzip2, gzip, lz4,
snappy and deflate).
:param dateFormat: ...
:param timestampFormat: ...
>>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data'))
"""
self.mode(mode)
self._set_opts(
compression=compression, dateFormat=dateFormat, timestampFormat=timestampFormat)
self._jwrite.json(path)
支持的压缩格式有bzip2、gzip、lz4、snappy和deflate,不区分大小写。
scala API 应该是一样的。
另一个
df.write.options(compression="gzip").json("filename.json")
同上。可以提供更多选项作为关键字参数。自 Spark 1.4 起可用。
第三
df.write.option("compression", "gzip").json("filename.json")
DataFrameWriter.option()
从 Spark 1.5 开始添加。一次只能添加一个参数。