使用 Spark 压缩文件
Gzip files with Spark
我有一个 Spark 作业,它将数千个文件作为输入并从 Amazon S3 下载它们并在映射阶段处理它们,其中每个映射步骤 returns 一个字符串。我想将输出压缩到 .tar.gz
文件,然后将其上传到 S3。一种方法是
outputs = sc.map(filenames).collect()
for output in outputs:
with tempfile.NamedTemporaryFile() as tar_temp:
tar = tarfile.open(tar_temp.name, "w:gz")
for output in outputs:
with tempfile.NamedTemporaryFile() as output_temp:
output_temp.write(output)
tar.add(output_temp.name)
tar.close()
问题是 outputs
不适合内存(但适合磁盘)。有没有办法在映射阶段将输出保存到主文件系统?或者也许使用循环 for output in outputs
作为生成器,这样我就不必将所有内容加载到内存中?
在 Spark 1.3.0 中,您将能够在 Python 中使用相同的 Java/Scala 方法 toLocalIterator
。
拉取请求已合并:https://github.com/apache/spark/pull/4237
这里是指定文档:
"""
Return an iterator that contains all of the elements in this RDD.
The iterator will consume as much memory as the largest partition in this RDD.
>>> rdd = sc.parallelize(range(10))
>>> [x for x in rdd.toLocalIterator()]
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
"""
总而言之,它将允许您迭代输出,而无需将所有内容收集到驱动程序。
此致,
我有一个 Spark 作业,它将数千个文件作为输入并从 Amazon S3 下载它们并在映射阶段处理它们,其中每个映射步骤 returns 一个字符串。我想将输出压缩到 .tar.gz
文件,然后将其上传到 S3。一种方法是
outputs = sc.map(filenames).collect()
for output in outputs:
with tempfile.NamedTemporaryFile() as tar_temp:
tar = tarfile.open(tar_temp.name, "w:gz")
for output in outputs:
with tempfile.NamedTemporaryFile() as output_temp:
output_temp.write(output)
tar.add(output_temp.name)
tar.close()
问题是 outputs
不适合内存(但适合磁盘)。有没有办法在映射阶段将输出保存到主文件系统?或者也许使用循环 for output in outputs
作为生成器,这样我就不必将所有内容加载到内存中?
在 Spark 1.3.0 中,您将能够在 Python 中使用相同的 Java/Scala 方法 toLocalIterator
。
拉取请求已合并:https://github.com/apache/spark/pull/4237
这里是指定文档:
"""
Return an iterator that contains all of the elements in this RDD.
The iterator will consume as much memory as the largest partition in this RDD.
>>> rdd = sc.parallelize(range(10))
>>> [x for x in rdd.toLocalIterator()]
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
"""
总而言之,它将允许您迭代输出,而无需将所有内容收集到驱动程序。
此致,