在 spark socket 数据集中读取 .tar.gz 文件

Read .tar.gz file in spark socket dataset

读取 .tar.gz 文件作为 spark 流中的文件流已经足够好了。 但我的要求是,我们可以在 spark-streaming 中读取来自 socketserver 的 .tar.gz 文件吗?

什么时候post tar 套接字中的文件像

nc -l 9990 < input.tar.gz

我可以像

一样在spark中阅读它
Dataset<org.apache.spark.sql.Row> dataset = ss.readStream()
                     .option("host", "localhost")
                     .option("port", "9990")
                     .format("socket")
                     .load();

如果我尝试以 csv 格式存储结果

dataset
               .coalesce(1)
               .writeStream()
               .format("csv")
               .outputMode(OutputMode.Append())
               .option("checkpointLocation", "/tmp/checkpoint")
               .option("path", "hdfs://10.15.xxx.xxx:8020/user/admin/sftp/tar_output4")
               .start()
               .awaitTermination();

我在输出 csv 中得到类似二进制格式的输出

0000000:    22 ef bf bd 08 00 00 00 00 00 00 00 ef bf bd ef   "...............
0000010:    bf bd 3b 0e ef bf bd 40 0c ef bf bd e1 ad 91 ef   ..;....@........
0000020:    bf bd 03 07 ef bf bd 18 ef bf bd 65 09 c7 a1 ef   ...........e....
0000030:    bf bd 41 01 ef bf bd 3a 7f 5c 22 ef bf bd 24 5b   ..A....:.\"...$[
0000040:    21 ef bf bd 2a ef bf bd ef bf bd 35 1e ef bf bd   !...*......5....
0000050:    ef bf bd ef bf bd 56 5b 1b 1e 6d ef bf bd ef bf   ......V[..m.....
0000060:    bd ef bf bd ca a5 21 ef bf bd ef bf bd ef bf bd   ......!.........
0000070:    ef bf bd 0e d1 ac ef bf bd 77 47 36 0b ef bf bd   .........wG6....
0000080:    77 ef bf bd 12 ef bf bd ef bf bd ef bf bd 2d 6b   w.............-k
0000090:    37 3b ef bf bd 1f ef bf bd d7 95 3b ef bf bd 73   7;.........;...s
00000a0:    37 ef bf bd 4b 6d 59 ef bf bd 72 ef bf bd 3c ef   7...KmY...r...<.
00000b0:    bf bd 04 ef bf bd ca b3 7f 17 02 00 00 00 00 00   ................
00000c0:    00 00 00 00 00 00 00 00 ef bf bd ef bf bd 06 59   ...............Y
00000d0:    e9 a9 85 00 28 22 0d 0a   ....("..

任何人都请帮助我..

谢谢

您读取二进制文件(存档)并将输出写入 csv 文件而无需任何转换。这就是为什么您的输出 csv 包含二进制信息,它仍然是存档,但已写入 csv 文件。 据我所知,对于结构化流的二进制文件没有 out-of-the-box 解决方案。 检查 structured streaming programming guide 所有输入和输出模式。

我写了一个TGZ编解码器并注册了。我用它来读取 TGZ 文件作为 text/csv 文件。 也许这有帮助:

final class DecompressTgzCodec extends CompressionCodec {
  override def getDefaultExtension: String = ".tgz"

  override def createOutputStream(out: OutputStream): CompressionOutputStream = ???

  override def createOutputStream(out: OutputStream, compressor: Compressor): CompressionOutputStream = ???

  override def createCompressor(): Compressor = ???

  override def getCompressorType: Class[_ <: Compressor] = ???

  override def createInputStream(in: InputStream): CompressionInputStream = {
    new TarDecompressorStream(new TarArchiveInputStream(new GzipCompressorInputStream(in)))
  }

  override def createInputStream(in: InputStream, decompressor: Decompressor): CompressionInputStream = createInputStream(in)

  // we do not create any decompressor
  override def createDecompressor(): Decompressor = null

  // we do not create any decompressor
  override def getDecompressorType: Class[_ <: Decompressor] = null


  // see https://www.conductor.com/nightlight/how-to-build-a-speedy-custom-compression-codec-for-hadoop/
  final class TarDecompressorStream(in: TarArchiveInputStream) extends DecompressorStream(in) {
    def updateStream(): Unit = {
      // still have data in stream -> done
      if (in.available() <= 0) {
        // create stream content from following tar elements one by one
        in.getNextTarEntry
      }
    }

    override def read: Int = {
      checkStream()
      updateStream()
      in.read()
    }

    override def read(b: Array[Byte], off: Int, len: Int): Int = {
      checkStream()
      updateStream()
      in.read(b, off, len)
    }

    override def resetState(): Unit = {
    }
  }
}
...
// register the new codec:
    val conf = new SparkConf()
    conf.set("spark.hadoop.io.compression.codecs", classOf[DecompressTgzCodec].getName)

    val spark = SparkSession
      .builder()
      .master(...)
      .config(conf)
      .appName("Test")
      .getOrCreate()