Apache Flink 的 BZip2 压缩输入
BZip2 compressed input for Apache Flink
我有一个用 bzip2 压缩的维基百科转储(从 http://dumps.wikimedia.org/enwiki/ 下载),但我不想解压它:我想在即时解压的同时处理它。
我知道可以用普通的 Java 来做到这一点(参见例如 Java - Read BZ2 file and uncompress/parse on the fly), but I was wondering how do it in Apache Flink? What I probably need is something like https://github.com/whym/wikihadoop 但对于 Flink,而不是 Hadoop。
Apache Flink 可以读取以下格式的压缩文件:
org.apache.hadoop.io.compress.BZip2Codec
org.apache.hadoop.io.compress.DefaultCodec
org.apache.hadoop.io.compress.DeflateCodec
org.apache.hadoop.io.compress.GzipCodec
org.apache.hadoop.io.compress.Lz4Codec
org.apache.hadoop.io.compress.SnappyCodec
从包名可以看出,Flink 使用的是 Hadoop 的 InputFormats。
这是一个使用 Flink 的 Scala API 读取 gz 文件的例子:
(至少需要 Flink 0.8.1)
def main(args: Array[String]) {
val env = ExecutionEnvironment.getExecutionEnvironment
val job = new JobConf()
val hadoopInput = new TextInputFormat()
FileInputFormat.addInputPath(job, new Path("/home/robert/Downloads/cawiki-20140407-all-titles.gz"))
val lines = env.createHadoopInput(hadoopInput, classOf[LongWritable], classOf[Text], job)
lines.print
env.execute("Read gz files")
}
Apache Flink 仅内置支持 .deflate 文件。添加对更多压缩编解码器的支持很容易,但尚未完成。
在 Flink 中使用 HadoopInputFormats 不会造成任何性能损失。 Flink 内置了对 Hadoop 的 Writable
类型的序列化支持。
我有一个用 bzip2 压缩的维基百科转储(从 http://dumps.wikimedia.org/enwiki/ 下载),但我不想解压它:我想在即时解压的同时处理它。
我知道可以用普通的 Java 来做到这一点(参见例如 Java - Read BZ2 file and uncompress/parse on the fly), but I was wondering how do it in Apache Flink? What I probably need is something like https://github.com/whym/wikihadoop 但对于 Flink,而不是 Hadoop。
Apache Flink 可以读取以下格式的压缩文件:
org.apache.hadoop.io.compress.BZip2Codec
org.apache.hadoop.io.compress.DefaultCodec
org.apache.hadoop.io.compress.DeflateCodec
org.apache.hadoop.io.compress.GzipCodec
org.apache.hadoop.io.compress.Lz4Codec
org.apache.hadoop.io.compress.SnappyCodec
从包名可以看出,Flink 使用的是 Hadoop 的 InputFormats。 这是一个使用 Flink 的 Scala API 读取 gz 文件的例子: (至少需要 Flink 0.8.1)
def main(args: Array[String]) {
val env = ExecutionEnvironment.getExecutionEnvironment
val job = new JobConf()
val hadoopInput = new TextInputFormat()
FileInputFormat.addInputPath(job, new Path("/home/robert/Downloads/cawiki-20140407-all-titles.gz"))
val lines = env.createHadoopInput(hadoopInput, classOf[LongWritable], classOf[Text], job)
lines.print
env.execute("Read gz files")
}
Apache Flink 仅内置支持 .deflate 文件。添加对更多压缩编解码器的支持很容易,但尚未完成。
在 Flink 中使用 HadoopInputFormats 不会造成任何性能损失。 Flink 内置了对 Hadoop 的 Writable
类型的序列化支持。