如何在 Dataflow 中启用并行读取文件?

How to enable parallel reading of files in Dataflow?

我正在开发一个数据流管道,该管道从 GCS 读取 1000 个文件(每个 50 MB)并对所有文件的行执行一些计算。每个文件都是具有相同结构的 CSV,只是其中的数字不同,我正在计算所有文件中每个单元格的平均值。

管道看起来像这样 (python):

additional_side_inputs = {'key1': 'value1', 'key2': 'value2'}  # etc.

p | 'Collect CSV files' >> MatchFiles(input_dir + "*.csv")
  | 'Read files' >> ReadMatches()
  | 'Parse contents' >> beam.ParDo(FileToRowsFn(), additional_side_inputs)
  | 'Compute average' >> beam.CombinePerKey(AverageCalculatorFn())

FileToRowsFn class 看起来像这样(见下文,省略了一些细节)。 row_id 是第一列,是每一行的唯一键;它在每个文件中只存在一次,因此我可以计算所有文件的平均值。有一些额外的值作为侧输入提供给转换器,它没有显示在下面的方法体内,但仍被实际实现使用。该值是在管道外部创建的字典。我在这里提到它,以防这可能是缺乏并行化的原因。

class FileToRowsFn(beam.DoFn):
  def process(self, file_element, additional_side_inputs):
    with file_element.open() as csv_file:
      for row_id, *values in csv.reader(TextIOWrapper(csv_file, encoding='utf-8')):
        yield row_id, values

AverageCalculatorFn 是带有累加器的典型 beam.CombineFn,它对所有文件中具有相同 row_id 的所有行执行给定行的每个单元格的平均值。

这一切工作正常,但性能和吞吐量存在问题:执行此管道需要 60 多个小时。从监控控制台,我注意到文件是按顺序读取的(每 2 分钟 1 个文件)。我知道读取一个文件可能需要2分钟(每个文件50MB),但我不明白为什么dataflow不分配更多的worker并行读取多个文件。 cpu 保持在 ~2-3%,因为大部分时间花在文件 IO 上,并且 worker 的数量不超过 2(尽管没有设置限制)。

ReadMatches 的输出是 1000 条文件记录,那么为什么数据流不创建大量 FileToRowsFn 实例并将它们分派给新的工作人员,每个工作人员处理一个文件?

有没有办法强制执行这种行为?

这可能是因为您的所有步骤都被数据流运行程序融合为一个步骤。

要使这样的融合包并行化,第一步需要是可并行化的。在您的情况下,这是不可并行化的 glob 扩展。

要使您的管道可并行化,您可以尝试打破融合。这可以通过添加 Reshuffle 转换作为生成许多元素的步骤之一的消费者来完成。

例如,

from apache_beam import Reshuffle

additional_side_inputs = {'key1': 'value1', 'key2': 'value2'}  # etc.

p | 'Collect CSV files' >> MatchFiles(input_dir + "*.csv")
  | 'Read files' >> ReadMatches()
  | 'Reshuffle' >> Reshuffle()
  | 'Parse contents' >> beam.ParDo(FileToRowsFn(), additional_side_inputs)
  | 'Compute average' >> beam.CombinePerKey(AverageCalculatorFn())

如果您使用 Beam 中可用的标准来源之一(例如 textio.ReadFromText())来读取数据,则不必执行此操作。 (不幸的是,我们没有 CSV 源,但 ReadFromText 支持跳过 header 行)。

有关融合优化和防止融合的更多信息,请参见here