在 Google 数据流/Apache Beam 中读取嵌套的 JSON

Reading nested JSON in Google Dataflow / Apache Beam

可以通过以下方式使用 Dataflow 读取 Cloud Storage 上未嵌套的 JSON 文件:

p.apply("read logfiles", TextIO.Read.from("gs://bucket/*").withCoder(TableRowJsonCoder.of()));

如果我只想将那些经过最少过滤的日志写入 BigQuery,我可以使用像这样的 DoFn 来实现:

private static class Formatter extends DoFn<TableRow,TableRow> {

        @Override
        public void processElement(ProcessContext c) throws Exception {

            // .clone() since input is immutable
            TableRow output = c.element().clone();

            // remove misleading timestamp field
            output.remove("@timestamp");

            // set timestamp field by using the element's timestamp
            output.set("timestamp", c.timestamp().toString());

            c.output(output);
        }
    }
}

但是,我不知道如何以这种方式访问​​ JSON 文件中的嵌套字段。

  1. 如果 TableRow 包含名为 rRECORD,是否可以访问其 keys/values 而无需进一步 serialization/deserialization?
  2. 如果我需要 serialize/deserialize 自己使用 Jackson 库,使用 TextIO.Read 的标准 Coder 而不是 [=20 更有意义吗=],以恢复我以这种方式失去的一些性能?

编辑

文件以换行符分隔,看起来像这样:

{"@timestamp":"2015-x", "message":"bla", "r":{"analyzed":"blub", "query": {"where":"9999"}}}
{"@timestamp":"2015-x", "message":"blub", "r":{"analyzed":"bla", "query": {"where":"1111"}}}

您最好的选择可能是按照您在 #2 中描述的操作并直接使用 Jackson。最有意义的做法是让 TextIO 读取执行其构建的目的——使用字符串编码器从文件中读取行——然后使用 DoFn 实际解析元素。类似于以下内容:

PCollection<String> lines = pipeline
  .apply(TextIO.from("gs://bucket/..."));
PCollection<TableRow> objects = lines
  .apply(ParDo.of(new DoFn<String, TableRow>() {
    @Override
    public void processElement(ProcessContext c) {
      String json = c.element();
      SomeObject object = /* parse json using Jackson, etc. */;
      TableRow row = /* create a table row from object */;
      c.output(row);
    }
  });

请注意,您也可以使用多个 ParDos 来执行此操作。