如何在流式数据集中加载 tar.gz 个文件?
How to load tar.gz files in streaming datasets?
我想从 tar-gzip 文件 (tgz) 进行流式传输,其中包括我实际的 CSV 存储数据。
当我的数据以 CSV 文件形式出现时,我已经设法使用 spark 2.2 进行结构化流式传输,但实际上,数据以 gzip 压缩的 csv 文件形式出现。
有没有办法让结构化流式处理的触发器在处理 CSV 流之前进行解压缩?
我用来处理文件的代码是这样的:
val schema = Encoders.product[RawData].schema
val trackerData = spark
.readStream
.option("delimiter", "\t")
.schema(schema)
.csv(path)
val exceptions = rawCientData
.as[String]
.flatMap(extractExceptions)
.as[ExceptionData]
当路径指向 csv 文件时,产生了预期的输出。
但我想使用 tar gzip 文件。
当我尝试将这些文件放在给定路径时,我没有收到任何异常,批处理输出告诉我
"sources" : [ {
"description" : "FileStreamSource[file:/Users/matthias/spark/simple_spark/src/main/resources/zsessionlog*]",
"startOffset" : null,
"endOffset" : {
"logOffset" : 0
},
"numInputRows" : 1095,
"processedRowsPerSecond" : 211.0233185584891
} ],
但我没有处理任何实际数据。
控制台接收器看起来像这样:
+------+---+-----+
|window|id |count|
+------+---+-----+
+------+---+-----+
我不认为在 Spark 中读取 tar.gz 编辑的文件是可能的(请参阅 or gzip support in Spark 了解一些想法)。
Spark 确实支持 gzip 文件,但不推荐使用它们,因为它们不可拆分并且会导致单个分区(这反过来使 Spark 几乎没有帮助)。
为了在 Spark Structured Streaming 中加载 gzip 文件,您必须指定路径模式,以便文件包含在加载中,比如 zsessionlog*.csv.gz
或类似的。否则,csv
单独仅加载 CSV 文件。
如果您坚持使用 Spark Structured Streaming 来处理 tar.gz'ed 文件,您可以编写自定义流数据 Source
来执行非 tar.gz
.
鉴于 gzip 文件不推荐作为 Spark 中的数据格式,使用 Spark Structured Streaming 的整个想法没有多大意义。
我通过这种方式解决了读取 .tar.gz (.tgz) 文件的部分:
受此启发 site 我创建了自己的 TGZ 编解码器
final class DecompressTgzCodec extends CompressionCodec {
override def getDefaultExtension: String = ".tgz"
override def createOutputStream(out: OutputStream): CompressionOutputStream = ???
override def createOutputStream(out: OutputStream, compressor: Compressor): CompressionOutputStream = ???
override def createCompressor(): Compressor = ???
override def getCompressorType: Class[_ <: Compressor] = ???
override def createInputStream(in: InputStream): CompressionInputStream = {
new TarDecompressorStream(new TarArchiveInputStream(new GzipCompressorInputStream(in)))
}
override def createInputStream(in: InputStream, decompressor: Decompressor): CompressionInputStream = createInputStream(in)
override def createDecompressor(): Decompressor = null
override def getDecompressorType: Class[_ <: Decompressor] = null
final class TarDecompressorStream(in: TarArchiveInputStream) extends DecompressorStream(in) {
def updateStream(): Unit = {
// still have data in stream -> done
if (in.available() <= 0) {
// create stream content from following tar elements one by one
in.getNextTarEntry()
}
}
override def read: Int = {
checkStream()
updateStream()
in.read()
}
override def read(b: Array[Byte], off: Int, len: Int): Int = {
checkStream()
updateStream()
in.read(b, off, len)
}
override def resetState(): Unit = {}
}
}
并注册供spark使用。
val conf = new SparkConf()
conf.set("spark.hadoop.io.compression.codecs", classOf[DecompressTgzCodec].getName)
val spark = SparkSession
.builder()
.master("local[*]")
.config(conf)
.appName("Streaming Example")
.getOrCreate()
完全按照我想要的方式工作。
我想从 tar-gzip 文件 (tgz) 进行流式传输,其中包括我实际的 CSV 存储数据。
当我的数据以 CSV 文件形式出现时,我已经设法使用 spark 2.2 进行结构化流式传输,但实际上,数据以 gzip 压缩的 csv 文件形式出现。
有没有办法让结构化流式处理的触发器在处理 CSV 流之前进行解压缩?
我用来处理文件的代码是这样的:
val schema = Encoders.product[RawData].schema
val trackerData = spark
.readStream
.option("delimiter", "\t")
.schema(schema)
.csv(path)
val exceptions = rawCientData
.as[String]
.flatMap(extractExceptions)
.as[ExceptionData]
当路径指向 csv 文件时,产生了预期的输出。 但我想使用 tar gzip 文件。 当我尝试将这些文件放在给定路径时,我没有收到任何异常,批处理输出告诉我
"sources" : [ {
"description" : "FileStreamSource[file:/Users/matthias/spark/simple_spark/src/main/resources/zsessionlog*]",
"startOffset" : null,
"endOffset" : {
"logOffset" : 0
},
"numInputRows" : 1095,
"processedRowsPerSecond" : 211.0233185584891
} ],
但我没有处理任何实际数据。 控制台接收器看起来像这样:
+------+---+-----+
|window|id |count|
+------+---+-----+
+------+---+-----+
我不认为在 Spark 中读取 tar.gz 编辑的文件是可能的(请参阅
Spark 确实支持 gzip 文件,但不推荐使用它们,因为它们不可拆分并且会导致单个分区(这反过来使 Spark 几乎没有帮助)。
为了在 Spark Structured Streaming 中加载 gzip 文件,您必须指定路径模式,以便文件包含在加载中,比如 zsessionlog*.csv.gz
或类似的。否则,csv
单独仅加载 CSV 文件。
如果您坚持使用 Spark Structured Streaming 来处理 tar.gz'ed 文件,您可以编写自定义流数据 Source
来执行非 tar.gz
.
鉴于 gzip 文件不推荐作为 Spark 中的数据格式,使用 Spark Structured Streaming 的整个想法没有多大意义。
我通过这种方式解决了读取 .tar.gz (.tgz) 文件的部分: 受此启发 site 我创建了自己的 TGZ 编解码器
final class DecompressTgzCodec extends CompressionCodec {
override def getDefaultExtension: String = ".tgz"
override def createOutputStream(out: OutputStream): CompressionOutputStream = ???
override def createOutputStream(out: OutputStream, compressor: Compressor): CompressionOutputStream = ???
override def createCompressor(): Compressor = ???
override def getCompressorType: Class[_ <: Compressor] = ???
override def createInputStream(in: InputStream): CompressionInputStream = {
new TarDecompressorStream(new TarArchiveInputStream(new GzipCompressorInputStream(in)))
}
override def createInputStream(in: InputStream, decompressor: Decompressor): CompressionInputStream = createInputStream(in)
override def createDecompressor(): Decompressor = null
override def getDecompressorType: Class[_ <: Decompressor] = null
final class TarDecompressorStream(in: TarArchiveInputStream) extends DecompressorStream(in) {
def updateStream(): Unit = {
// still have data in stream -> done
if (in.available() <= 0) {
// create stream content from following tar elements one by one
in.getNextTarEntry()
}
}
override def read: Int = {
checkStream()
updateStream()
in.read()
}
override def read(b: Array[Byte], off: Int, len: Int): Int = {
checkStream()
updateStream()
in.read(b, off, len)
}
override def resetState(): Unit = {}
}
}
并注册供spark使用。
val conf = new SparkConf()
conf.set("spark.hadoop.io.compression.codecs", classOf[DecompressTgzCodec].getName)
val spark = SparkSession
.builder()
.master("local[*]")
.config(conf)
.appName("Streaming Example")
.getOrCreate()
完全按照我想要的方式工作。