Creating/Writing 通过 Google Cloud Dataflow 到 Parititoned BigQuery table
Creating/Writing to Parititoned BigQuery table via Google Cloud Dataflow
我想利用时间分区 tables 的新 BigQuery 功能,但我不确定这在 Dataflow SDK 的 1.6 版本中目前是否可行。
查看 BigQuery JSON API,要创建一天分区 table,需要传入一个
"timePartitioning": { "type": "DAY" }
选项,但是com.google.cloud.dataflow.sdk.io.BigQueryIO接口只允许指定一个TableReference。
我想也许我可以预先创建 table,然后通过 BigQueryIO.Write.toTableReference lambda 潜入分区装饰器..?是否有其他人通过数据流成功使用 creating/writing 分区 tables?
这似乎与设置 table expiration time 的问题类似,后者目前也不可用。
我相信当您不使用流式传输时,应该可以使用分区装饰器。我们正在积极致力于通过流媒体支持分区装饰器。如果您今天在非流模式下看到任何错误,请告诉我们。
正如 Pavan 所说,绝对可以使用 Dataflow 写入分区 tables。您使用的 DataflowPipelineRunner
是在流模式还是批模式下运行?
您提出的解决方案应该有效。具体来说,如果您预先创建一个设置了日期分区的 table,那么您可以使用 BigQueryIO.Write.toTableReference
lambda 来写入日期分区。例如:
/**
* A Joda-time formatter that prints a date in format like {@code "20160101"}.
* Threadsafe.
*/
private static final DateTimeFormatter FORMATTER =
DateTimeFormat.forPattern("yyyyMMdd").withZone(DateTimeZone.UTC);
// This code generates a valid BigQuery partition name:
Instant instant = Instant.now(); // any Joda instant in a reasonable time range
String baseTableName = "project:dataset.table"; // a valid BigQuery table name
String partitionName =
String.format("%s$%s", baseTableName, FORMATTER.print(instant));
我采用的方法(也适用于流模式):
- 为传入记录定义自定义 window
将window转换为table/partition名称
p.apply(PubsubIO.Read
.subscription(subscription)
.withCoder(TableRowJsonCoder.of())
)
.apply(Window.into(new TablePartitionWindowFn()) )
.apply(BigQueryIO.Write
.to(new DayPartitionFunc(dataset, table))
.withSchema(schema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
);
根据传入数据设置window,End Instant可以忽略,因为起始值用于设置分区:
public class TablePartitionWindowFn extends NonMergingWindowFn<Object, IntervalWindow> {
private IntervalWindow assignWindow(AssignContext context) {
TableRow source = (TableRow) context.element();
String dttm_str = (String) source.get("DTTM");
DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd").withZoneUTC();
Instant start_point = Instant.parse(dttm_str,formatter);
Instant end_point = start_point.withDurationAdded(1000, 1);
return new IntervalWindow(start_point, end_point);
};
@Override
public Coder<IntervalWindow> windowCoder() {
return IntervalWindow.getCoder();
}
@Override
public Collection<IntervalWindow> assignWindows(AssignContext c) throws Exception {
return Arrays.asList(assignWindow(c));
}
@Override
public boolean isCompatible(WindowFn<?, ?> other) {
return false;
}
@Override
public IntervalWindow getSideInputWindow(BoundedWindow window) {
if (window instanceof GlobalWindow) {
throw new IllegalArgumentException(
"Attempted to get side input window for GlobalWindow from non-global WindowFn");
}
return null;
}
动态设置 table 分区:
public class DayPartitionFunc implements SerializableFunction<BoundedWindow, String> {
String destination = "";
public DayPartitionFunc(String dataset, String table) {
this.destination = dataset + "." + table+ "$";
}
@Override
public String apply(BoundedWindow boundedWindow) {
// The cast below is safe because CalendarWindows.days(1) produces IntervalWindows.
String dayString = DateTimeFormat.forPattern("yyyyMMdd")
.withZone(DateTimeZone.UTC)
.print(((IntervalWindow) boundedWindow).start());
return destination + dayString;
}}
有没有更好的方法达到同样的效果?
Apache Beam 2.0 版支持分片 BigQuery 输出表 out of the box。
如果以table_name_YYYYMMDD
格式传递table名称,那么BigQuery会将其视为分片table,可以模拟分区table的特性。
请参阅文档:https://cloud.google.com/bigquery/docs/partitioned-tables
我已经通过数据流将数据写入bigquery分区表。这些写入是动态的,因为如果该分区中的数据已经存在,那么我可以附加到它或覆盖它。
我已经在Python中编写了代码。这是对 bigquery 的批处理模式写入操作。
client = bigquery.Client(project=projectName)
dataset_ref = client.dataset(datasetName)
table_ref = dataset_ref.table(bqTableName)
job_config = bigquery.LoadJobConfig()
job_config.skip_leading_rows = skipLeadingRows
job_config.source_format = bigquery.SourceFormat.CSV
if tableExists(client, table_ref):
job_config.autodetect = autoDetect
previous_rows = client.get_table(table_ref).num_rows
#assert previous_rows > 0
if allowJaggedRows is True:
job_config.allowJaggedRows = True
if allowFieldAddition is True:
job_config._properties['load']['schemaUpdateOptions'] = ['ALLOW_FIELD_ADDITION']
if isPartitioned is True:
job_config._properties['load']['timePartitioning'] = {"type": "DAY"}
if schemaList is not None:
job_config.schema = schemaList
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
else:
job_config.autodetect = autoDetect
job_config._properties['createDisposition'] = 'CREATE_IF_NEEDED'
job_config.schema = schemaList
if isPartitioned is True:
job_config._properties['load']['timePartitioning'] = {"type": "DAY"}
if schemaList is not None:
table = bigquery.Table(table_ref, schema=schemaList)
load_job = client.load_table_from_uri(gcsFileName, table_ref, job_config=job_config)
assert load_job.job_type == 'load'
load_job.result()
assert load_job.state == 'DONE'
它工作正常。
我想利用时间分区 tables 的新 BigQuery 功能,但我不确定这在 Dataflow SDK 的 1.6 版本中目前是否可行。
查看 BigQuery JSON API,要创建一天分区 table,需要传入一个
"timePartitioning": { "type": "DAY" }
选项,但是com.google.cloud.dataflow.sdk.io.BigQueryIO接口只允许指定一个TableReference。
我想也许我可以预先创建 table,然后通过 BigQueryIO.Write.toTableReference lambda 潜入分区装饰器..?是否有其他人通过数据流成功使用 creating/writing 分区 tables?
这似乎与设置 table expiration time 的问题类似,后者目前也不可用。
我相信当您不使用流式传输时,应该可以使用分区装饰器。我们正在积极致力于通过流媒体支持分区装饰器。如果您今天在非流模式下看到任何错误,请告诉我们。
正如 Pavan 所说,绝对可以使用 Dataflow 写入分区 tables。您使用的 DataflowPipelineRunner
是在流模式还是批模式下运行?
您提出的解决方案应该有效。具体来说,如果您预先创建一个设置了日期分区的 table,那么您可以使用 BigQueryIO.Write.toTableReference
lambda 来写入日期分区。例如:
/**
* A Joda-time formatter that prints a date in format like {@code "20160101"}.
* Threadsafe.
*/
private static final DateTimeFormatter FORMATTER =
DateTimeFormat.forPattern("yyyyMMdd").withZone(DateTimeZone.UTC);
// This code generates a valid BigQuery partition name:
Instant instant = Instant.now(); // any Joda instant in a reasonable time range
String baseTableName = "project:dataset.table"; // a valid BigQuery table name
String partitionName =
String.format("%s$%s", baseTableName, FORMATTER.print(instant));
我采用的方法(也适用于流模式):
- 为传入记录定义自定义 window
将window转换为table/partition名称
p.apply(PubsubIO.Read .subscription(subscription) .withCoder(TableRowJsonCoder.of()) ) .apply(Window.into(new TablePartitionWindowFn()) ) .apply(BigQueryIO.Write .to(new DayPartitionFunc(dataset, table)) .withSchema(schema) .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) );
根据传入数据设置window,End Instant可以忽略,因为起始值用于设置分区:
public class TablePartitionWindowFn extends NonMergingWindowFn<Object, IntervalWindow> {
private IntervalWindow assignWindow(AssignContext context) {
TableRow source = (TableRow) context.element();
String dttm_str = (String) source.get("DTTM");
DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd").withZoneUTC();
Instant start_point = Instant.parse(dttm_str,formatter);
Instant end_point = start_point.withDurationAdded(1000, 1);
return new IntervalWindow(start_point, end_point);
};
@Override
public Coder<IntervalWindow> windowCoder() {
return IntervalWindow.getCoder();
}
@Override
public Collection<IntervalWindow> assignWindows(AssignContext c) throws Exception {
return Arrays.asList(assignWindow(c));
}
@Override
public boolean isCompatible(WindowFn<?, ?> other) {
return false;
}
@Override
public IntervalWindow getSideInputWindow(BoundedWindow window) {
if (window instanceof GlobalWindow) {
throw new IllegalArgumentException(
"Attempted to get side input window for GlobalWindow from non-global WindowFn");
}
return null;
}
动态设置 table 分区:
public class DayPartitionFunc implements SerializableFunction<BoundedWindow, String> {
String destination = "";
public DayPartitionFunc(String dataset, String table) {
this.destination = dataset + "." + table+ "$";
}
@Override
public String apply(BoundedWindow boundedWindow) {
// The cast below is safe because CalendarWindows.days(1) produces IntervalWindows.
String dayString = DateTimeFormat.forPattern("yyyyMMdd")
.withZone(DateTimeZone.UTC)
.print(((IntervalWindow) boundedWindow).start());
return destination + dayString;
}}
有没有更好的方法达到同样的效果?
Apache Beam 2.0 版支持分片 BigQuery 输出表 out of the box。
如果以table_name_YYYYMMDD
格式传递table名称,那么BigQuery会将其视为分片table,可以模拟分区table的特性。
请参阅文档:https://cloud.google.com/bigquery/docs/partitioned-tables
我已经通过数据流将数据写入bigquery分区表。这些写入是动态的,因为如果该分区中的数据已经存在,那么我可以附加到它或覆盖它。
我已经在Python中编写了代码。这是对 bigquery 的批处理模式写入操作。
client = bigquery.Client(project=projectName)
dataset_ref = client.dataset(datasetName)
table_ref = dataset_ref.table(bqTableName)
job_config = bigquery.LoadJobConfig()
job_config.skip_leading_rows = skipLeadingRows
job_config.source_format = bigquery.SourceFormat.CSV
if tableExists(client, table_ref):
job_config.autodetect = autoDetect
previous_rows = client.get_table(table_ref).num_rows
#assert previous_rows > 0
if allowJaggedRows is True:
job_config.allowJaggedRows = True
if allowFieldAddition is True:
job_config._properties['load']['schemaUpdateOptions'] = ['ALLOW_FIELD_ADDITION']
if isPartitioned is True:
job_config._properties['load']['timePartitioning'] = {"type": "DAY"}
if schemaList is not None:
job_config.schema = schemaList
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
else:
job_config.autodetect = autoDetect
job_config._properties['createDisposition'] = 'CREATE_IF_NEEDED'
job_config.schema = schemaList
if isPartitioned is True:
job_config._properties['load']['timePartitioning'] = {"type": "DAY"}
if schemaList is not None:
table = bigquery.Table(table_ref, schema=schemaList)
load_job = client.load_table_from_uri(gcsFileName, table_ref, job_config=job_config)
assert load_job.job_type == 'load'
load_job.result()
assert load_job.state == 'DONE'
它工作正常。