GCP 数据流 - 从存储中读取 CSV 文件并写入 BigQuery

GCP Dataflow- read CSV file from Storage and write into BigQuery

我在存储中有一个 CSV 文件,我想读取它并将其写入 BigQuery Table。这是我的 CSV 文件,第一行是 header:

GroupName,Groupcode,GroupOwner,GroupCategoryID
System Administrators,sysadmin,13456,100
Independence High Teachers,HS Teachers,,101
John Glenn Middle Teachers,MS Teachers,13458,102
Liberty Elementary Teachers,Elem Teachers,13559,103
1st Grade Teachers,1stgrade,,104
2nd Grade Teachers,2nsgrade,13561,105
3rd Grade Teachers,3rdgrade,13562,106
Guidance Department,guidance,,107
Independence Math Teachers,HS Math,13660,108
Independence English Teachers,HS English,13661,109
John Glenn 8th Grade Teachers,8thgrade,,110
John Glenn 7th Grade Teachers,7thgrade,13452,111
Elementary Parents,Elem Parents,,112
Middle School Parents,MS Parents,18001,113
High School Parents,HS Parents,18002,114

这是我的代码:

    public class StorgeBq {

        public static class StringToRowConverter extends DoFn<String, TableRow> {

            private String[] columnNames;

            private boolean isFirstRow = true;

            @ProcessElement
            public void processElement(ProcessContext c) {
                TableRow row = new TableRow();

                String[] parts = c.element().split(",");

                if (isFirstRow) {
                    columnNames = Arrays.copyOf(parts, parts.length);
                    isFirstRow = false;
                } else {
                    for (int i = 0; i < parts.length; i++) {
                        row.set(columnNames[i], parts[i]);
                    }
                    c.output(row);
                }
            }
        }

        public static void main(String[] args) {

            DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
                      .as(DataflowPipelineOptions.class);
                    options.setZone("europe-west1-c");
                    options.setProject("mydata-dev");
                    options.setRunner(DataflowRunner.class);
                    Pipeline p = Pipeline.create(options);

            p.apply("ReadLines", TextIO.read().from("gs://mydata3-dataflow/C2ImportGroupsSample.csv"))
            .apply("ConverToBqRow",ParDo.of(new StringToRowConverter()))
            .apply("WriteToBq", BigQueryIO.<TableRow>writeTableRows()
                    .to("mydata-dev:DF_TEST.dataflow_table")
                    .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                    .withCreateDisposition(CreateDisposition.CREATE_NEVER));
            p.run().waitUntilFinish();
        }

}

存在一些问题: 1) 当作业开始执行时,我看到有一个名为 "DropInputs" 的进程,我没有在我的代码中定义它!!并在所有任务之前开始 运行,为什么??

2) 为什么管道不从第一个任务 "ReadLines" 开始? 3) 在日志文件中,我看到在任务 "WriteToBq" 中它试图找到一个数据作为字段,例如“一年级教师”不是字段而是 "GroupName" 的数据:

"message" : "JSON parsing error in row starting at position 0: No such field: 1st Grade Teachers.",

您的代码中有几个问题。但是,首先,关于 "DropInputs" 阶段 - 您可以安全地忽略它。这是 this 错误报告的结果。我仍然不明白为什么需要显示它(这也让我们的很多用户感到困惑),我希望 Google 员工能对此发表意见。在我看来,这只是混乱。

好的,现在到你的代码:

  1. 您假设读取的第一行是您的 header。这是一个不正确的假设。数据流并行读取,因此 header 行可能随时到达。不要使用 boolean 标志来检查,而是每次在 ParDo 中检查 string 值本身,例如if (c.element.contains("GroupName") then..
  2. 您缺少 BigQuery table 架构。您需要将 withSchema(..) 添加到 BigQuery 接收器。这是来自我的 public 管道之一的 example