PySpark HDFS 数据流 reading/writing

PySpark HDFS data streams reading/writing

我有一个包含多个文件的 HDFS 目录,我想合并为一个。我不想使用 Spark DF 来执行此操作,而是使用数据流与 HDFS 交互。到目前为止,这是我的代码:

...
sc = SparkContext()
hadoop = sc._jvm.org.apache.hadoop
hdfs = hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())

out_stream = hdfs.create(hadoop.fs.Path('/tmp/combinedFile.parquet'))  # FSDataOutputStream

for f in hdfs.listStatus(hadoop.fs.Path('/tmp/dirWithSeparatedFiles/')):

    buffer = bytes(256)
    in_stream = hdfs.open(f.getPath())  # FSDataInputStream

    bytesRead = in_stream.read(buffer)
    while (bytesRead > 0):
        out_stream.writeBytes(bytesRead)
        out_stream.flush()
    in_stream.close()

out_stream.close()

这段代码的第一个问题是,我不确定如何通过缓冲区从输入流中读取数据。第一个问题是,输出文件是在 HDFS 中创建的,但没有写入任何内容(即使我向其中写入固定值)。

经过一番调查,我找到了解决问题的办法。解决方案涉及通过 spark 上下文创建一些 JVM 对象并将它们用于缓冲 i/o 操作:

...
sc = SparkContext()
hadoop = sc._jvm.org.apache.hadoop
hdfs = hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())

raw_out = hdfs.create(hadoop.fs.Path('/tmp/combinedFile.parquet'))  # FSDataOutputStream
out_stream = sc._jvm.java.io.BufferedOutputStream(raw_out)

for f in hdfs.listStatus(hadoop.fs.Path('/tmp/dirWithSeparatedFiles/')):

    raw_in = hdfs.open(f.getPath())  # FSDataInputStream
    in_stream = sc._jvm.java.io.BufferedInputStream(raw_in)

    while in_stream.available() > 0:
        out_stream.write(in_stream.read()) 
        out_stream.flush()
    in_stream.close()
out_stream.close()