来自 table 行的 BigqueryIO 架构
BigqueryIO schema from table row
我有一个 Table 行的 PCollection table 名称和 table 行中的架构,如下所示
我需要将 col1、col2 和 col3 值插入到 Table 中,并使用提到的行中的架构
BigQueryIO.writeTableRows()
我们可以使用 lambda 从行中访问 table 名称,如下所示
BigQueryIO.writeTableRows().to(table行->tablerow.get("Table").toString())
但是我们如何使用 BigQueryIO.writeTableRows().withSchema()
从 table 行访问模式
请帮忙
您需要为每个元素使用 DynamicDestination 到 select table 名称和要应用的架构。
分解文档的代码示例
events.apply(BigQueryIO.<UserEvent>write()
.to(new DynamicDestinations<UserEvent, String>() {
// Here you extract the "key" value for the selection of schema and table.
// If you need the values of the 2 first column, you can create your own structure
// Example: <schema>|<table>
public String getDestination(ValueInSingleWindow<UserEvent> element) {
return element.getValue().getUserId();
}
// Here return the Table destination (dataset and table) according to the "Key"
// If you have the value in your key, you can do key.split("|")[1], according to my previous example
public TableDestination getTable(String user) {
return new TableDestination(tableForUser(user), "Table for user " + user);
}
//Same for the schema here.
public TableSchema getSchema(String user) {
return tableSchemaForUser(user);
}
})
我有一个 Table 行的 PCollection table 名称和 table 行中的架构,如下所示
我需要将 col1、col2 和 col3 值插入到 Table 中,并使用提到的行中的架构 BigQueryIO.writeTableRows()
我们可以使用 lambda 从行中访问 table 名称,如下所示 BigQueryIO.writeTableRows().to(table行->tablerow.get("Table").toString()) 但是我们如何使用 BigQueryIO.writeTableRows().withSchema()
从 table 行访问模式请帮忙
您需要为每个元素使用 DynamicDestination 到 select table 名称和要应用的架构。
分解文档的代码示例
events.apply(BigQueryIO.<UserEvent>write()
.to(new DynamicDestinations<UserEvent, String>() {
// Here you extract the "key" value for the selection of schema and table.
// If you need the values of the 2 first column, you can create your own structure
// Example: <schema>|<table>
public String getDestination(ValueInSingleWindow<UserEvent> element) {
return element.getValue().getUserId();
}
// Here return the Table destination (dataset and table) according to the "Key"
// If you have the value in your key, you can do key.split("|")[1], according to my previous example
public TableDestination getTable(String user) {
return new TableDestination(tableForUser(user), "Table for user " + user);
}
//Same for the schema here.
public TableSchema getSchema(String user) {
return tableSchemaForUser(user);
}
})