使用 Beam/Dataflow 在每个元素上下拉 BigQuery table 架构很慢

Pulling down BigQuery table schema on each element is slow using Beam/Dataflow

以下是我的代码,它读取包含 1000 行的 CSV 文件并将它们写入 BQ table:

    public static void main(String[] args) {
        PipelineOptionsFactory.register(TemplateOptions.class);
        TemplateOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(TemplateOptions.class);
        options.setZone("europe-west1-c");
        options.setProject("myProject-dev");
        options.setRunner(DataflowRunner.class);
        Pipeline pipeline = Pipeline.create(options);
        pipeline.apply("READ", TextIO.read().from("gs://myBucket/LOG_EBM_1000.csv"))
                .apply("TRANSFORM", ParDo.of(new WikiParDo()))
                .apply("WRITE", BigQueryIO.writeTableRows()
                                .to("myProject:DF_TEST.LOG_EBM_PM")
                                .withCreateDisposition(CreateDisposition.CREATE_NEVER).withWriteDisposition(WRITE_APPEND)
                                );
        pipeline.run().waitUntilFinish();
    }

    private static Schema getTableSchema() {
        List<TableFieldSchema> fields = new ArrayList<>();
        // The name for the new dataset and Table
        String datasetId = "DF_TEST";
        String tableId = "LOG_EBM_PM";
        BigQuery bigQuery = BigQueryOptions.getDefaultInstance().getService();
        return bigQuery.getTable(datasetId, tableId).getDefinition().getSchema();
    }

    public interface TemplateOptions extends DataflowPipelineOptions {
        @Description("GCS path of the file to read from")
        ValueProvider<String> getInputFile();
        void setInputFile(ValueProvider<String> value);

        @Description("My custom command line argument.")
        @Default.String("D-FAULT")
        String getMyCustomOption();
        void setMyCustomOption(String myCustomOption);
    }

    private static class WikiParDo extends DoFn<String, TableRow> {
        @ProcessElement
        public void processElement(ProcessContext c) throws Exception {
            String[] split = c.element().split(",");
            TableRow row = new TableRow();
            for (int i = 0; i < split.length; i++) {
                Field col = getTableSchema().getFields().get(i);
                row.set(col.getName(), split[i]);
            }
            c.output(row);
        }
    }

下面是pipline执行任务时的截图:

如您所见,Transform 任务的 Wall time 为 1:45 minutes.In wall time 的定义是这样写的:

Approximate time spent in this step on initializing, processing data, shuffling data, and terminating, across all threads in all workers. For composite steps, the sum of time spent in the component steps. This estimate helps you identify slow steps.

使用机器类型:n1-highcpu-16.

1000 行,整个流水线的执行总共需要大约 10 分钟

在我最终的 CSV 文件中,我们将有数百万条记录(文件大小约为 2GB),因此管道应该运行得更快。我的管道有什么问题,即使我使用的是高 CPU 机器,它还是那么慢?

您的代码为每一行的每个字段调用 getTableSchema()(对 BigQuery 服务进行 API 调用)- 数千个相同(因此冗余)的 BigQuery API电话。您似乎也在建立与 BigQuery (getService()) 的连接数千次。这就是为什么它很慢。

您可以通过多种方式逐步加快速度:

  • 每个 processElement() 只调用一次:这将加快它的速度(字段数)。
  • 每个 DoFn 实例只调用一次,方法是在 DoFn 的 @Setup 方法中调用并将结果缓存在 transient 实例变量中:这将进一步加快速度几百个。
  • 每个管道只调用一次,方法是在您的主程序中调用并将字段列表作为构造函数参数传递给您的 DoFn:

像这样:

class WikiParDo ... {
  private final List<String> fields;
  WikiParDo(List<String> fields) { this.fields = fields; }
    // in processElement method, use "fields" instead of calling 
    // "getTableSchema"
}

... main program: ...
TableSchema schema = getTableSchema();
List<String> fields = new ArrayList<>();
... populate fields from schema ...

p.apply(...)
 .apply(..., new WikiParDo(fields))