遍历文件列表,提取它们的内容? (SparkContext 错误)

Iterate over a list of files, extracting their contents? (SparkContext error)

我需要遍历磁盘上的大量文件,打开每个文件并对其进行解析。我有一个包含文件名的文件,我 需要遍历这些文件名。

我将此函数传递给 map():

%python

def parse(filename):
  try:
    tf = sc.textFile(filename)
    # run parsing code, produce text
    return text
  except:
      return None

当我尝试 运行 以下内容时:

parsed_contents = filenames.map(parse)
parsed_contents.top(5)

我收到此错误:

异常:您似乎正试图从广播变量、操作或转换中引用 SparkContext。 SparkContext 只能用在驱动程序上,不能用在它 运行 工作人员的代码中。有关详细信息,请参阅 SPARK-5063。

如果我 运行 单独指定一个文件名,try 块中的代码就可以工作。

我应该如何遍历指定的文件列表,提取它们的内容?

当您对 rdd 执行转换时(在本例中是您的调用 filnames.map(parse)),驱动程序会分配 worker 来处理您的 rdd 的每个分区。因此,您的 map 调用实质上是发送给工作人员以应用于您的 rdd。在您提供的代码中,您基本上是从工作人员的 运行 代码调用 sparkContext 实例,这会导致错误。需要在驱动进程上读取文件。

sc.textFile 接受逗号分隔的字符串,指定要读入的文件名。 所以你可以这样做:

filenames = sc.textFile("filesToRead.txt")

parsed_contents = sc.textFile(",".join(filenames.collect()))

parsed_contents.top(5)

您还可以将模式指定为 sc.textFile 方法的输入。例如,

parsed_contents = sc.textFile("file[0-5].txt")

更新 用于过滤磁盘上存在的文件。

def check_exists(name):
    try:
        open(name, 'r')
        True
    except:
        False

existingFiles = filenames.filter(check_exists)