使用 Spark 压缩文件

Gzip files with Spark

我有一个 Spark 作业,它将数千个文件作为输入并从 Amazon S3 下载它们并在映射阶段处理它们,其中每个映射步骤 returns 一个字符串。我想将输出压缩到 .tar.gz 文件,然后将其上传到 S3。一种方法是

outputs = sc.map(filenames).collect()
for output in outputs:
    with tempfile.NamedTemporaryFile() as tar_temp:
        tar = tarfile.open(tar_temp.name, "w:gz")
        for output in outputs:
            with tempfile.NamedTemporaryFile() as output_temp:
                output_temp.write(output)
                tar.add(output_temp.name)
        tar.close()

问题是 outputs 不适合内存(但适合磁盘)。有没有办法在映射阶段将输出保存到主文件系统?或者也许使用循环 for output in outputs 作为生成器,这样我就不必将所有内容加载到内存中?

在 Spark 1.3.0 中,您将能够在 Python 中使用相同的 Java/Scala 方法 toLocalIterator

拉取请求已合并:https://github.com/apache/spark/pull/4237

这里是指定文档:

    """
    Return an iterator that contains all of the elements in this RDD.
    The iterator will consume as much memory as the largest partition in this RDD.
    >>> rdd = sc.parallelize(range(10))
    >>> [x for x in rdd.toLocalIterator()]
    [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
    """

总而言之,它将允许您迭代输出,而无需将所有内容收集到驱动程序。

此致,