如何加快批量导入 google 多个工作人员的云数据存储?

How to speedup bulk importing into google cloud datastore with multiple workers?

我有一个基于 apache-beam 的数据流作业要使用 vcf source from a single text file (stored in google cloud storage), transform text lines into datastore Entities and write them into the datastore sink 读取。工作流程工作正常,但我注意到的缺点是:

我没有使用键的祖先路径;所有实体都相同 kind.

管道代码如下所示:

def write_to_datastore(project, user_options, pipeline_options):
"""Creates a pipeline that writes entities to Cloud Datastore."""
  with beam.Pipeline(options=pipeline_options) as p:
  (p
   | 'Read vcf files' >> vcfio.ReadFromVcf(user_options.input)
   | 'Create my entity' >> beam.ParDo(
     ToEntityFn(), user_options.kind)
   | 'Write to datastore' >> WriteToDatastore(project))

因为我有数百万行要写入数据存储,所以以 30 entities/sec 的速度写入会花费太长时间。

问题:输入只是一个巨大的压缩文件。是否需要拆分成多个小文件来触发多个worker?有没有其他方法可以加快导入速度?我是否遗漏了 num_workers 设置中的某些内容?谢谢!

我对apache beam不熟悉,从一般流程的角度回答。

假设各个输入文件部分中的实体数据之间没有依赖关系需要考虑,那么是的,使用多个输入文件肯定会有所帮助,因为所有这些文件都可以虚拟地并行处理(当然,取决于可用工人的最大数量)。

可能不需要事先拆分巨大的 zip 文件,可以简单地将单个输入数据流的段交给单独的数据段工作者进行写入,如果与实际数据段处理相比,这种切换本身的开销可以忽略不计。

整体性能限制是读取输入数据、将其拆分为段并移交给段数据工作者的速度。

数据段工作者会将其接收的数据段进一步拆分为更小的块,最多相当于最多 500 个实体,这些实体可以在单个批处理操作中转换为实体并写入数据存储。根据所使用的数据存储客户端库,可以异步执行此操作,从而允许继续拆分为块并转换为实体,而无需等待先前的数据存储写入完成。

数据段工作器的性能限制将是数据段可以拆分为块并将块转换为实体的速度

如果异步操作不可用或需要更高的吞吐量,则可以将每个块再移交给段工作程序,由段工作程序执行到实体的转换和数据存储批量写入。

数据段工作者级别的性能限制将只是数据段可以拆分成块并移交给块工作者的速度。

通过这种方法,实际转换为实体并将它们批量写入数据存储(异步或非异步)将不再位于拆分输入数据流的关键路径中,我认为这是性能限制你目前的做法。

我研究了 vcfio. I suspect (if I understand correctly) that the reason I always get one worker when the input is a single file is due to the limit of the _VcfSource and the VCF format 约束的设计。这种格式有一个 header 部分定义了如何翻译 non-header 行。这导致读取源文件的每个工作人员都必须处理整个文件。当我将单个文件拆分为共享相同 header 的 5 个单独文件时,我成功地获得了最多 5 个工作人员(但可能由于相同的原因不再增加)。

我不明白的一件事是读取的工人数量可以限制为 5(在这种情况下)。但是为什么我们被限制为只有 5 个 worker 可以写呢?不管怎样,我想我已经找到了使用 beam Dataflow-Runner 触发多个 worker 的替代方法(使用 pre-split VCF 文件 )。 gcp variant transforms project 中也有一个相关的方法,其中 vcfio 得到了显着扩展。它似乎支持使用单个输入 vcf 文件的多个工作人员。我希望该项目中的更改也可以合并到梁项目中。