在 Spark 2.3.0 中读取 Zstandard 压缩文件

Read Zstandard-compressed file in Spark 2.3.0

从 Spark 2.3.0 开始,Apache Spark 应该支持 Facebook 的 Zstandard 压缩算法 (https://issues.apache.org/jira/browse/SPARK-19112),但我无法真正读取 Zstandard 压缩文件:

$ spark-shell

...

// Short name throws an exception
scala> val events = spark.read.option("compression", "zstd").json("data.zst")
java.lang.IllegalArgumentException: Codec [zstd] is not available. Known codecs are bzip2, deflate, uncompressed, lz4, gzip, snappy, none.

// Codec class can be imported
scala> import org.apache.spark.io.ZStdCompressionCodec
import org.apache.spark.io.ZStdCompressionCodec

// Fully-qualified code class bypasses error, but results in corrupt records
scala> spark.read.option("compression", "org.apache.spark.io.ZStdCompressionCodec").json("data.zst")
res4: org.apache.spark.sql.DataFrame = [_corrupt_record: string]

我需要做什么才能读取这样的文件?

环境是 AWS EMR 5.14.0.

根据 this comment,Spark 2.3.0 中对 Zstandard 的支持仅限于内部和随机输出。

读取或写入 Zstandard 文件使用 Hadoop 的 org.apache.hadoop.io.compress.ZStandardCodec,它在 Hadoop 2.9.0 中引入(2.8.3 包含在 EMR 5.14.0 中)。