在 Google 数据流/Apache Beam 中读取嵌套的 JSON
Reading nested JSON in Google Dataflow / Apache Beam
可以通过以下方式使用 Dataflow 读取 Cloud Storage 上未嵌套的 JSON 文件:
p.apply("read logfiles", TextIO.Read.from("gs://bucket/*").withCoder(TableRowJsonCoder.of()));
如果我只想将那些经过最少过滤的日志写入 BigQuery,我可以使用像这样的 DoFn 来实现:
private static class Formatter extends DoFn<TableRow,TableRow> {
@Override
public void processElement(ProcessContext c) throws Exception {
// .clone() since input is immutable
TableRow output = c.element().clone();
// remove misleading timestamp field
output.remove("@timestamp");
// set timestamp field by using the element's timestamp
output.set("timestamp", c.timestamp().toString());
c.output(output);
}
}
}
但是,我不知道如何以这种方式访问 JSON 文件中的嵌套字段。
- 如果 TableRow 包含名为
r
的 RECORD
,是否可以访问其 keys/values 而无需进一步 serialization/deserialization?
- 如果我需要 serialize/deserialize 自己使用
Jackson
库,使用 TextIO.Read
的标准 Coder
而不是 [=20 更有意义吗=],以恢复我以这种方式失去的一些性能?
编辑
文件以换行符分隔,看起来像这样:
{"@timestamp":"2015-x", "message":"bla", "r":{"analyzed":"blub", "query": {"where":"9999"}}}
{"@timestamp":"2015-x", "message":"blub", "r":{"analyzed":"bla", "query": {"where":"1111"}}}
您最好的选择可能是按照您在 #2 中描述的操作并直接使用 Jackson。最有意义的做法是让 TextIO 读取执行其构建的目的——使用字符串编码器从文件中读取行——然后使用 DoFn
实际解析元素。类似于以下内容:
PCollection<String> lines = pipeline
.apply(TextIO.from("gs://bucket/..."));
PCollection<TableRow> objects = lines
.apply(ParDo.of(new DoFn<String, TableRow>() {
@Override
public void processElement(ProcessContext c) {
String json = c.element();
SomeObject object = /* parse json using Jackson, etc. */;
TableRow row = /* create a table row from object */;
c.output(row);
}
});
请注意,您也可以使用多个 ParDos 来执行此操作。
可以通过以下方式使用 Dataflow 读取 Cloud Storage 上未嵌套的 JSON 文件:
p.apply("read logfiles", TextIO.Read.from("gs://bucket/*").withCoder(TableRowJsonCoder.of()));
如果我只想将那些经过最少过滤的日志写入 BigQuery,我可以使用像这样的 DoFn 来实现:
private static class Formatter extends DoFn<TableRow,TableRow> {
@Override
public void processElement(ProcessContext c) throws Exception {
// .clone() since input is immutable
TableRow output = c.element().clone();
// remove misleading timestamp field
output.remove("@timestamp");
// set timestamp field by using the element's timestamp
output.set("timestamp", c.timestamp().toString());
c.output(output);
}
}
}
但是,我不知道如何以这种方式访问 JSON 文件中的嵌套字段。
- 如果 TableRow 包含名为
r
的RECORD
,是否可以访问其 keys/values 而无需进一步 serialization/deserialization? - 如果我需要 serialize/deserialize 自己使用
Jackson
库,使用TextIO.Read
的标准Coder
而不是 [=20 更有意义吗=],以恢复我以这种方式失去的一些性能?
编辑
文件以换行符分隔,看起来像这样:
{"@timestamp":"2015-x", "message":"bla", "r":{"analyzed":"blub", "query": {"where":"9999"}}}
{"@timestamp":"2015-x", "message":"blub", "r":{"analyzed":"bla", "query": {"where":"1111"}}}
您最好的选择可能是按照您在 #2 中描述的操作并直接使用 Jackson。最有意义的做法是让 TextIO 读取执行其构建的目的——使用字符串编码器从文件中读取行——然后使用 DoFn
实际解析元素。类似于以下内容:
PCollection<String> lines = pipeline
.apply(TextIO.from("gs://bucket/..."));
PCollection<TableRow> objects = lines
.apply(ParDo.of(new DoFn<String, TableRow>() {
@Override
public void processElement(ProcessContext c) {
String json = c.element();
SomeObject object = /* parse json using Jackson, etc. */;
TableRow row = /* create a table row from object */;
c.output(row);
}
});
请注意,您也可以使用多个 ParDos 来执行此操作。