使用 Pyspark 读取 S3 上的随机文件样本
Read random sample of files on S3 with Pyspark
我在 S3 上有一个包含 1000 个文件的存储桶。每个大约1GB。我想阅读这个文件的随机样本。假设所有文件的 5%。我就是这样做的
fileDF = sqlContext.jsonRDD(self.sc.textFile(self.path).sample(withReplacement=False, fraction=0.05, seed=42).repartition(160))
但上面的代码似乎会读取所有文件然后进行采样。虽然我想获取文件样本并阅读它们。有人可以帮忙吗?
用你喜欢的方法列出路径下的文件,取名字样本,然后使用RDD联合:
import pyspark
import random
sc = pyspark.SparkContext(appName = "Sampler")
file_list = list_files(path)
desired_pct = 5
file_sample = random.sample(file_list, int(len(file_list) * desired_pct / 100))
file_sample_rdd = sc.emptyRDD()
for f in file_sample:
file_sample_rdd = file_sample_rdd.union(sc.textFile(f))
sample_data_rdd = file_sample_rdd.repartition(160)
这是 "list_files" 的一种可能的快速但肮脏的实现方式,它将列出 S3 上 "directory" 下的文件:
import os
def list_files(path, profile = None):
if not path.endswith("/"):
raise Exception("not handled...")
command = 'aws s3 ls %s' % path
if profile is not None:
command = 'aws --profile %s s3 ls %s' % (profile, path)
result = os.popen(command)
_r = result.read().strip().split('\n')
_r = [path + i.strip().split(' ')[-1] for i in _r]
return _r
我在 S3 上有一个包含 1000 个文件的存储桶。每个大约1GB。我想阅读这个文件的随机样本。假设所有文件的 5%。我就是这样做的
fileDF = sqlContext.jsonRDD(self.sc.textFile(self.path).sample(withReplacement=False, fraction=0.05, seed=42).repartition(160))
但上面的代码似乎会读取所有文件然后进行采样。虽然我想获取文件样本并阅读它们。有人可以帮忙吗?
用你喜欢的方法列出路径下的文件,取名字样本,然后使用RDD联合:
import pyspark
import random
sc = pyspark.SparkContext(appName = "Sampler")
file_list = list_files(path)
desired_pct = 5
file_sample = random.sample(file_list, int(len(file_list) * desired_pct / 100))
file_sample_rdd = sc.emptyRDD()
for f in file_sample:
file_sample_rdd = file_sample_rdd.union(sc.textFile(f))
sample_data_rdd = file_sample_rdd.repartition(160)
这是 "list_files" 的一种可能的快速但肮脏的实现方式,它将列出 S3 上 "directory" 下的文件:
import os
def list_files(path, profile = None):
if not path.endswith("/"):
raise Exception("not handled...")
command = 'aws s3 ls %s' % path
if profile is not None:
command = 'aws --profile %s s3 ls %s' % (profile, path)
result = os.popen(command)
_r = result.read().strip().split('\n')
_r = [path + i.strip().split(' ')[-1] for i in _r]
return _r