使用架构自动检测写入 BigQuery 的数据流作业
Dataflow job to write into BigQuery with schema autodetect
目前我们正在寻找将原始数据转换为通用结构以供进一步分析的最佳方法。我们的数据是JSON个文件,有的文件字段多,有的少,有的可能有数组,但大体上都是一样的结构。
为此,我正在尝试在 Java 中构建 Apache Beam 管道。我所有的管道都基于这个模板:TextIOToBigQuery.java
第一种方法是将整个 JSON 作为字符串加载到一列中,然后使用 JSON Functions in Standard SQL to transform into common structure. This is well described here:
第二种方法是将数据加载到适当的列中。所以现在可以通过标准 SQL 查询数据。它还需要了解模式。可以通过控制台、UI 和其他:Using schema auto-detection 检测到它,但是我没有找到关于如何通过 Java 和 Apache Beam 管道实现这一点的任何信息。
我分析了 BigQueryIO,看起来它没有模式就无法工作(有一个例外,如果 table 已经创建)
正如我之前提到的,新文件可能会带来新的字段,因此应该相应地更新架构。
假设我有三个 JSON 个文件:
1. { "field1": "value1" }
2. { "field2": "value2" }
3. { "field1": "value3", "field10": "value10" }
第一个创建新的 table,其中包含一个字符串类型的字段 "field1"。
所以我的 table 应该是这样的:
|field1 |
----------
|"value1"|
第二个做同样的事情,但添加新字段 "field2"。现在我的 table 应该是这样的:
|field1 |field2 |
-------------------
|"value1"|null |
-------------------
|null |"value2"|
第三个 JSON 应将另一个字段 "field10" 添加到架构中,依此类推。真实 JSON 文件可能有 200 个或更多字段。处理这种情况会有多难?
这种转换哪种方式更好?
我做了一些测试,模拟典型的自动检测模式:首先我 运行 通过所有数据来构建一个 Map
所有可能的字段和类型(这里我只是考虑String
或 Integer
为简单起见)。我使用 stateful pipeline to keep track of the fields that have already been seen and save it as a PCollectionView
. This way I can use .withSchemaFromView()
因为架构在管道构建时是未知的。请注意,此方法仅对批处理作业有效。
首先,我创建了一些没有严格模式的虚拟数据,其中每一行可能包含也可能不包含任何字段:
PCollection<KV<Integer, String>> input = p
.apply("Create data", Create.of(
KV.of(1, "{\"user\":\"Alice\",\"age\":\"22\",\"country\":\"Denmark\"}"),
KV.of(1, "{\"income\":\"1500\",\"blood\":\"A+\"}"),
KV.of(1, "{\"food\":\"pineapple pizza\",\"age\":\"44\"}"),
KV.of(1, "{\"user\":\"Bob\",\"movie\":\"Inception\",\"income\":\"1350\"}"))
);
我们将读取输入数据并构建我们在数据中看到的不同字段名称的 Map
和基本类型检查以确定它是否包含 INTEGER
或 STRING
。当然,如果需要,这可以扩展。请注意,之前创建的所有数据都分配给了相同的键,因此它们被组合在一起,我们可以构建一个完整的字段列表,但这可能是性能瓶颈。我们具体化输出,以便我们可以将其用作辅助输入:
PCollectionView<Map<String, String>> schemaSideInput = input
.apply("Build schema", ParDo.of(new DoFn<KV<Integer, String>, KV<String, String>>() {
// A map containing field-type pairs
@StateId("schema")
private final StateSpec<ValueState<Map<String, String>>> schemaSpec =
StateSpecs.value(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
@ProcessElement
public void processElement(ProcessContext c,
@StateId("schema") ValueState<Map<String, String>> schemaSpec) {
JSONObject message = new JSONObject(c.element().getValue());
Map<String, String> current = firstNonNull(schemaSpec.read(), new HashMap<String, String>());
// iterate through fields
message.keySet().forEach(key ->
{
Object value = message.get(key);
if (!current.containsKey(key)) {
String type = "STRING";
try {
Integer.parseInt(value.toString());
type = "INTEGER";
}
catch(Exception e) {}
// uncomment if debugging is needed
// LOG.info("key: "+ key + " value: " + value + " type: " + type);
c.output(KV.of(key, type));
current.put(key, type);
schemaSpec.write(current);
}
});
}
})).apply("Save as Map", View.asMap());
现在我们可以使用之前的 Map
构建包含 BigQuery table 架构的 PCollectionView
:
PCollectionView<Map<String, String>> schemaView = p
.apply("Start", Create.of("Start"))
.apply("Create Schema", ParDo.of(new DoFn<String, Map<String, String>>() {
@ProcessElement
public void processElement(ProcessContext c) {
Map<String, String> schemaFields = c.sideInput(schemaSideInput);
List<TableFieldSchema> fields = new ArrayList<>();
for (Map.Entry<String, String> field : schemaFields.entrySet())
{
fields.add(new TableFieldSchema().setName(field.getKey()).setType(field.getValue()));
// LOG.info("key: "+ field.getKey() + " type: " + field.getValue());
}
TableSchema schema = new TableSchema().setFields(fields);
String jsonSchema;
try {
jsonSchema = Transport.getJsonFactory().toString(schema);
} catch (IOException e) {
throw new RuntimeException(e);
}
c.output(ImmutableMap.of("PROJECT_ID:DATASET_NAME.dynamic_bq_schema", jsonSchema));
}}).withSideInputs(schemaSideInput))
.apply("Save as Singleton", View.asSingleton());
相应地更改完全限定的 table 名称 PROJECT_ID:DATASET_NAME.dynamic_bq_schema
。
最后,在我们的管道中,我们读取数据,将其转换为 TableRow
并使用 .withSchemaFromView(schemaView)
:
将其写入 BigQuery
input
.apply("Convert to TableRow", ParDo.of(new DoFn<KV<Integer, String>, TableRow>() {
@ProcessElement
public void processElement(ProcessContext c) {
JSONObject message = new JSONObject(c.element().getValue());
TableRow row = new TableRow();
message.keySet().forEach(key ->
{
Object value = message.get(key);
row.set(key, value);
});
c.output(row);
}}))
.apply( BigQueryIO.writeTableRows()
.to("PROJECT_ID:DATASET_NAME.dynamic_bq_schema")
.withSchemaFromView(schemaView)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
完整代码here.
BigQuery table 管道创建的架构:
和生成的稀疏数据:
如果您的数据是基于架构(avro、protobuf 等)序列化的,则您可以 create/update 流作业中的 table 架构。从这个意义上说,它是预定义的,但仍在更新 table 架构作为处理的一部分。
目前我们正在寻找将原始数据转换为通用结构以供进一步分析的最佳方法。我们的数据是JSON个文件,有的文件字段多,有的少,有的可能有数组,但大体上都是一样的结构。
为此,我正在尝试在 Java 中构建 Apache Beam 管道。我所有的管道都基于这个模板:TextIOToBigQuery.java
第一种方法是将整个 JSON 作为字符串加载到一列中,然后使用 JSON Functions in Standard SQL to transform into common structure. This is well described here:
第二种方法是将数据加载到适当的列中。所以现在可以通过标准 SQL 查询数据。它还需要了解模式。可以通过控制台、UI 和其他:Using schema auto-detection 检测到它,但是我没有找到关于如何通过 Java 和 Apache Beam 管道实现这一点的任何信息。
我分析了 BigQueryIO,看起来它没有模式就无法工作(有一个例外,如果 table 已经创建)
正如我之前提到的,新文件可能会带来新的字段,因此应该相应地更新架构。
假设我有三个 JSON 个文件:
1. { "field1": "value1" }
2. { "field2": "value2" }
3. { "field1": "value3", "field10": "value10" }
第一个创建新的 table,其中包含一个字符串类型的字段 "field1"。 所以我的 table 应该是这样的:
|field1 |
----------
|"value1"|
第二个做同样的事情,但添加新字段 "field2"。现在我的 table 应该是这样的:
|field1 |field2 |
-------------------
|"value1"|null |
-------------------
|null |"value2"|
第三个 JSON 应将另一个字段 "field10" 添加到架构中,依此类推。真实 JSON 文件可能有 200 个或更多字段。处理这种情况会有多难?
这种转换哪种方式更好?
我做了一些测试,模拟典型的自动检测模式:首先我 运行 通过所有数据来构建一个 Map
所有可能的字段和类型(这里我只是考虑String
或 Integer
为简单起见)。我使用 stateful pipeline to keep track of the fields that have already been seen and save it as a PCollectionView
. This way I can use .withSchemaFromView()
因为架构在管道构建时是未知的。请注意,此方法仅对批处理作业有效。
首先,我创建了一些没有严格模式的虚拟数据,其中每一行可能包含也可能不包含任何字段:
PCollection<KV<Integer, String>> input = p
.apply("Create data", Create.of(
KV.of(1, "{\"user\":\"Alice\",\"age\":\"22\",\"country\":\"Denmark\"}"),
KV.of(1, "{\"income\":\"1500\",\"blood\":\"A+\"}"),
KV.of(1, "{\"food\":\"pineapple pizza\",\"age\":\"44\"}"),
KV.of(1, "{\"user\":\"Bob\",\"movie\":\"Inception\",\"income\":\"1350\"}"))
);
我们将读取输入数据并构建我们在数据中看到的不同字段名称的 Map
和基本类型检查以确定它是否包含 INTEGER
或 STRING
。当然,如果需要,这可以扩展。请注意,之前创建的所有数据都分配给了相同的键,因此它们被组合在一起,我们可以构建一个完整的字段列表,但这可能是性能瓶颈。我们具体化输出,以便我们可以将其用作辅助输入:
PCollectionView<Map<String, String>> schemaSideInput = input
.apply("Build schema", ParDo.of(new DoFn<KV<Integer, String>, KV<String, String>>() {
// A map containing field-type pairs
@StateId("schema")
private final StateSpec<ValueState<Map<String, String>>> schemaSpec =
StateSpecs.value(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
@ProcessElement
public void processElement(ProcessContext c,
@StateId("schema") ValueState<Map<String, String>> schemaSpec) {
JSONObject message = new JSONObject(c.element().getValue());
Map<String, String> current = firstNonNull(schemaSpec.read(), new HashMap<String, String>());
// iterate through fields
message.keySet().forEach(key ->
{
Object value = message.get(key);
if (!current.containsKey(key)) {
String type = "STRING";
try {
Integer.parseInt(value.toString());
type = "INTEGER";
}
catch(Exception e) {}
// uncomment if debugging is needed
// LOG.info("key: "+ key + " value: " + value + " type: " + type);
c.output(KV.of(key, type));
current.put(key, type);
schemaSpec.write(current);
}
});
}
})).apply("Save as Map", View.asMap());
现在我们可以使用之前的 Map
构建包含 BigQuery table 架构的 PCollectionView
:
PCollectionView<Map<String, String>> schemaView = p
.apply("Start", Create.of("Start"))
.apply("Create Schema", ParDo.of(new DoFn<String, Map<String, String>>() {
@ProcessElement
public void processElement(ProcessContext c) {
Map<String, String> schemaFields = c.sideInput(schemaSideInput);
List<TableFieldSchema> fields = new ArrayList<>();
for (Map.Entry<String, String> field : schemaFields.entrySet())
{
fields.add(new TableFieldSchema().setName(field.getKey()).setType(field.getValue()));
// LOG.info("key: "+ field.getKey() + " type: " + field.getValue());
}
TableSchema schema = new TableSchema().setFields(fields);
String jsonSchema;
try {
jsonSchema = Transport.getJsonFactory().toString(schema);
} catch (IOException e) {
throw new RuntimeException(e);
}
c.output(ImmutableMap.of("PROJECT_ID:DATASET_NAME.dynamic_bq_schema", jsonSchema));
}}).withSideInputs(schemaSideInput))
.apply("Save as Singleton", View.asSingleton());
相应地更改完全限定的 table 名称 PROJECT_ID:DATASET_NAME.dynamic_bq_schema
。
最后,在我们的管道中,我们读取数据,将其转换为 TableRow
并使用 .withSchemaFromView(schemaView)
:
input
.apply("Convert to TableRow", ParDo.of(new DoFn<KV<Integer, String>, TableRow>() {
@ProcessElement
public void processElement(ProcessContext c) {
JSONObject message = new JSONObject(c.element().getValue());
TableRow row = new TableRow();
message.keySet().forEach(key ->
{
Object value = message.get(key);
row.set(key, value);
});
c.output(row);
}}))
.apply( BigQueryIO.writeTableRows()
.to("PROJECT_ID:DATASET_NAME.dynamic_bq_schema")
.withSchemaFromView(schemaView)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
完整代码here.
BigQuery table 管道创建的架构:
和生成的稀疏数据:
如果您的数据是基于架构(avro、protobuf 等)序列化的,则您可以 create/update 流作业中的 table 架构。从这个意义上说,它是预定义的,但仍在更新 table 架构作为处理的一部分。