如何在流式数据集中加载 tar.gz 个文件?

How to load tar.gz files in streaming datasets?

我想从 tar-gzip 文件 (tgz) 进行流式传输,其中包括我实际的 CSV 存储数据。

当我的数据以 CSV 文件形式出现时,我已经设法使用 spark 2.2 进行结构化流式传输,但实际上,数据以 gzip 压缩的 csv 文件形式出现。

有没有办法让结构化流式处理的触发器在处理 CSV 流之前进行解压缩?

我用来处理文件的代码是这样的:

val schema = Encoders.product[RawData].schema
val trackerData = spark
  .readStream
  .option("delimiter", "\t")
  .schema(schema)
  .csv(path)
val exceptions = rawCientData
  .as[String]
  .flatMap(extractExceptions)
  .as[ExceptionData]

当路径指向 csv 文件时,产生了预期的输出。 但我想使用 tar gzip 文件。 当我尝试将这些文件放在给定路径时,我没有收到任何异常,批处理输出告诉我

  "sources" : [ {
    "description" : "FileStreamSource[file:/Users/matthias/spark/simple_spark/src/main/resources/zsessionlog*]",
    "startOffset" : null,
    "endOffset" : {
      "logOffset" : 0
    },
    "numInputRows" : 1095,
    "processedRowsPerSecond" : 211.0233185584891
  } ],

但我没有处理任何实际数据。 控制台接收器看起来像这样:

+------+---+-----+
|window|id |count|
+------+---+-----+
+------+---+-----+

认为在 Spark 中读取 tar.gz 编辑的文件是可能的(请参阅 or gzip support in Spark 了解一些想法)。

Spark 确实支持 gzip 文件,但不推荐使用它们,因为它们不可拆分并且会导致单个分区(这反过来使 Spark 几乎没有帮助)。

为了在 Spark Structured Streaming 中加载 gzip 文件,您必须指定路径模式,以便文件包含在加载中,比如 zsessionlog*.csv.gz 或类似的。否则,csv 单独仅加载 CSV 文件。

如果您坚持使用 Spark Structured Streaming 来处理 tar.gz'ed 文件,您可以编写自定义流数据 Source 来执行非 tar.gz.

鉴于 gzip 文件不推荐作为 Spark 中的数据格式,使用 Spark Structured Streaming 的整个想法没有多大意义。

我通过这种方式解决了读取 .tar.gz (.tgz) 文件的部分: 受此启发 site 我创建了自己的 TGZ 编解码器

final class DecompressTgzCodec extends CompressionCodec {
  override def getDefaultExtension: String = ".tgz"

  override def createOutputStream(out: OutputStream): CompressionOutputStream = ???
  override def createOutputStream(out: OutputStream, compressor: Compressor): CompressionOutputStream = ???
  override def createCompressor(): Compressor = ???
  override def getCompressorType: Class[_ <: Compressor] = ???

  override def createInputStream(in: InputStream): CompressionInputStream = {
    new TarDecompressorStream(new TarArchiveInputStream(new GzipCompressorInputStream(in)))
  }
  override def createInputStream(in: InputStream, decompressor: Decompressor): CompressionInputStream = createInputStream(in)

  override def createDecompressor(): Decompressor = null
  override def getDecompressorType: Class[_ <: Decompressor] = null

  final class TarDecompressorStream(in: TarArchiveInputStream) extends DecompressorStream(in) {
    def updateStream(): Unit = {
      // still have data in stream -> done
      if (in.available() <= 0) {
        // create stream content from following tar elements one by one
        in.getNextTarEntry()
      }
    }

    override def read: Int = {
      checkStream()
      updateStream()
      in.read()
    }

    override def read(b: Array[Byte], off: Int, len: Int): Int = {
      checkStream()
      updateStream()
      in.read(b, off, len)
    }

    override def resetState(): Unit = {}
  }
}

并注册供spark使用。

val conf = new SparkConf()
conf.set("spark.hadoop.io.compression.codecs", classOf[DecompressTgzCodec].getName)

val spark = SparkSession
  .builder()
  .master("local[*]")
  .config(conf)
  .appName("Streaming Example")
  .getOrCreate()

完全按照我想要的方式工作。