遍历文件列表,提取它们的内容? (SparkContext 错误)
Iterate over a list of files, extracting their contents? (SparkContext error)
我需要遍历磁盘上的大量文件,打开每个文件并对其进行解析。我有一个包含文件名的文件,我 仅 需要遍历这些文件名。
我将此函数传递给 map():
%python
def parse(filename):
try:
tf = sc.textFile(filename)
# run parsing code, produce text
return text
except:
return None
当我尝试 运行 以下内容时:
parsed_contents = filenames.map(parse)
parsed_contents.top(5)
我收到此错误:
异常:您似乎正试图从广播变量、操作或转换中引用 SparkContext。 SparkContext 只能用在驱动程序上,不能用在它 运行 工作人员的代码中。有关详细信息,请参阅 SPARK-5063。
如果我 运行 单独指定一个文件名,try 块中的代码就可以工作。
我应该如何遍历指定的文件列表,提取它们的内容?
当您对 rdd 执行转换时(在本例中是您的调用 filnames.map(parse)
),驱动程序会分配 worker 来处理您的 rdd 的每个分区。因此,您的 map 调用实质上是发送给工作人员以应用于您的 rdd。在您提供的代码中,您基本上是从工作人员的 运行 代码调用 sparkContext
实例,这会导致错误。需要在驱动进程上读取文件。
sc.textFile
接受逗号分隔的字符串,指定要读入的文件名。
所以你可以这样做:
filenames = sc.textFile("filesToRead.txt")
parsed_contents = sc.textFile(",".join(filenames.collect()))
parsed_contents.top(5)
您还可以将模式指定为 sc.textFile
方法的输入。例如,
parsed_contents = sc.textFile("file[0-5].txt")
更新
用于过滤磁盘上存在的文件。
def check_exists(name):
try:
open(name, 'r')
True
except:
False
existingFiles = filenames.filter(check_exists)
我需要遍历磁盘上的大量文件,打开每个文件并对其进行解析。我有一个包含文件名的文件,我 仅 需要遍历这些文件名。
我将此函数传递给 map():
%python
def parse(filename):
try:
tf = sc.textFile(filename)
# run parsing code, produce text
return text
except:
return None
当我尝试 运行 以下内容时:
parsed_contents = filenames.map(parse)
parsed_contents.top(5)
我收到此错误:
异常:您似乎正试图从广播变量、操作或转换中引用 SparkContext。 SparkContext 只能用在驱动程序上,不能用在它 运行 工作人员的代码中。有关详细信息,请参阅 SPARK-5063。
如果我 运行 单独指定一个文件名,try 块中的代码就可以工作。
我应该如何遍历指定的文件列表,提取它们的内容?
当您对 rdd 执行转换时(在本例中是您的调用 filnames.map(parse)
),驱动程序会分配 worker 来处理您的 rdd 的每个分区。因此,您的 map 调用实质上是发送给工作人员以应用于您的 rdd。在您提供的代码中,您基本上是从工作人员的 运行 代码调用 sparkContext
实例,这会导致错误。需要在驱动进程上读取文件。
sc.textFile
接受逗号分隔的字符串,指定要读入的文件名。
所以你可以这样做:
filenames = sc.textFile("filesToRead.txt")
parsed_contents = sc.textFile(",".join(filenames.collect()))
parsed_contents.top(5)
您还可以将模式指定为 sc.textFile
方法的输入。例如,
parsed_contents = sc.textFile("file[0-5].txt")
更新 用于过滤磁盘上存在的文件。
def check_exists(name):
try:
open(name, 'r')
True
except:
False
existingFiles = filenames.filter(check_exists)