是否有任何形式可以动态地写入 BigQuery 指定目标表的名称?
Is there any form to write to BigQuery specifying the name of destination tables dynamically?
是否有任何形式可以写入 BigQuery 以动态指定目标名称 table?
现在我有:
bigQueryRQ
.apply(BigQueryIO.Write
.named("Write")
.to("project_name:dataset_name.table_name")
.withSchema(Table.create_auditedTableSchema())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
但我需要 "table_name" 作为动态 table 名称,它取决于我要写入的 "tablerow" 数据。
遗憾的是,我们不提供 API 以数据相关的方式命名 BigQuery table。一般来说,数据相关的 BigQuery table 个目标可能容易出错。
也就是说,我们正在努力提高这方面的灵活性。目前没有估计,但我们希望尽快得到它。
我也遇到了同样的问题。
如何按 tags 对行进行分组,并分别为每个组应用 BigQueryIO.Write?
public static class TagMarker extends DoFn<TableRow, TableRow> {
private Map<String, TupleTag<TableRow>> tagMap;
public TagMarker(Map<String, TupleTag<TableRow>> tagMap) {
this.tagMap = tagMap;
}
@Override
public void processElement(ProcessContext c) throws Exception {
TableRow item = c.element();
c.sideOutput(tagMap.get(getTagName(item)), item);
}
private String getTagName(TableRow row) {
// There will be your logic of determinate table by row
return "table" + ((String)row.get("msg")).substring(0, 1);
}
}
private static class GbqWriter extends PTransform<PCollection<TableRow>, PDone> {
@Override
public PDone apply(PCollection<TableRow> input) {
TupleTag<TableRow> mainTag = new TupleTag<TableRow>();
TupleTag<TableRow> tag2 = new TupleTag<TableRow>();
TupleTag<TableRow> tag3 = new TupleTag<TableRow>();
Map<String, TupleTag<TableRow>> tagMap = new HashMap<String, TupleTag<TableRow>>();
tagMap.put("table1", mainTag);
tagMap.put("table2", tag2);
tagMap.put("table3", tag3);
List<TupleTag<?>> tags = new ArrayList<TupleTag<?>>();
tags.add(tag2);
tags.add(tag3);
PCollectionTuple result = input.apply(
ParDo.withOutputTags(mainTag, TupleTagList.of(tags)).of(new TagMarker(tagMap))
);
PDone done = null;
for (String tableId : tagMap.keySet()) {
done = writeToGbq(tableId, result.get(tagMap.get(tableId)).setCoder(TableRowJsonCoder.of()));
}
return done;
}
private PDone writeToGbq(String tableId, PCollection<TableRow> rows) {
PDone done = rows
.apply(BigQueryIO.Write.named("WriteToGbq")
.to("<project>:<dataset>." + tableId)
.withSchema(getSchema())
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
);
return done;
}
}
我不确定重写变量 完成。这是正确的吗?失败后能否阻止重写GBQ
只有当您在解析行之前知道我们要写入的表列表时,这种方法才适用。
是否有任何形式可以写入 BigQuery 以动态指定目标名称 table?
现在我有:
bigQueryRQ
.apply(BigQueryIO.Write
.named("Write")
.to("project_name:dataset_name.table_name")
.withSchema(Table.create_auditedTableSchema())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
但我需要 "table_name" 作为动态 table 名称,它取决于我要写入的 "tablerow" 数据。
遗憾的是,我们不提供 API 以数据相关的方式命名 BigQuery table。一般来说,数据相关的 BigQuery table 个目标可能容易出错。
也就是说,我们正在努力提高这方面的灵活性。目前没有估计,但我们希望尽快得到它。
我也遇到了同样的问题。 如何按 tags 对行进行分组,并分别为每个组应用 BigQueryIO.Write?
public static class TagMarker extends DoFn<TableRow, TableRow> {
private Map<String, TupleTag<TableRow>> tagMap;
public TagMarker(Map<String, TupleTag<TableRow>> tagMap) {
this.tagMap = tagMap;
}
@Override
public void processElement(ProcessContext c) throws Exception {
TableRow item = c.element();
c.sideOutput(tagMap.get(getTagName(item)), item);
}
private String getTagName(TableRow row) {
// There will be your logic of determinate table by row
return "table" + ((String)row.get("msg")).substring(0, 1);
}
}
private static class GbqWriter extends PTransform<PCollection<TableRow>, PDone> {
@Override
public PDone apply(PCollection<TableRow> input) {
TupleTag<TableRow> mainTag = new TupleTag<TableRow>();
TupleTag<TableRow> tag2 = new TupleTag<TableRow>();
TupleTag<TableRow> tag3 = new TupleTag<TableRow>();
Map<String, TupleTag<TableRow>> tagMap = new HashMap<String, TupleTag<TableRow>>();
tagMap.put("table1", mainTag);
tagMap.put("table2", tag2);
tagMap.put("table3", tag3);
List<TupleTag<?>> tags = new ArrayList<TupleTag<?>>();
tags.add(tag2);
tags.add(tag3);
PCollectionTuple result = input.apply(
ParDo.withOutputTags(mainTag, TupleTagList.of(tags)).of(new TagMarker(tagMap))
);
PDone done = null;
for (String tableId : tagMap.keySet()) {
done = writeToGbq(tableId, result.get(tagMap.get(tableId)).setCoder(TableRowJsonCoder.of()));
}
return done;
}
private PDone writeToGbq(String tableId, PCollection<TableRow> rows) {
PDone done = rows
.apply(BigQueryIO.Write.named("WriteToGbq")
.to("<project>:<dataset>." + tableId)
.withSchema(getSchema())
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
);
return done;
}
}
我不确定重写变量 完成。这是正确的吗?失败后能否阻止重写GBQ
只有当您在解析行之前知道我们要写入的表列表时,这种方法才适用。