使用 Luigi Orchestrator 在 BigQuery 中附加作业的输出
output for append job in BigQuery using Luigi Orchestrator
我有一个 Bigquery 任务,它的目的只是将每日温度 table (Table-xxxx-xx-xx
) 附加到现有的 table (PersistingTable
).
我不确定如何处理 output(self)
方法。事实上,我不能只将 PersistingTable
输出为 luigi.contrib.bigquery.BigQueryTarget
,因为它在进程开始之前就已经存在了。有没有人问过自己这样的问题?
我在其他地方找不到答案,所以我会给出我的解决方案,即使这是一个非常古老的问题。
我创建了一个继承自 luigi.contrib.bigquery.BigQueryLoadTask
的新 class
class BigQueryLoadIncremental(luigi.contrib.bigquery.BigQueryLoadTask):
'''
a subclass that checks whether a write-log on gcs exists to append data to the table
needs to define Two Outputs! [0] of type BigQueryTarget and [1] of type GCSTarget
Everything else is left unchanged
'''
def exists(self):
return luigi.contrib.gcs.GCSClient.exists(self.output()[1].path)
@property
def write_disposition(self):
"""
Set to WRITE_APPEND as this subclass only makes sense for this
"""
return luigi.contrib.bigquery.WriteDisposition.WRITE_APPEND
def run(self):
output = self.output()[0]
gcs_output = self.output()[1]
assert isinstance(output,
luigi.contrib.bigquery.BigQueryTarget), 'Output[0] must be a BigQueryTarget, not %s' % (
output)
assert isinstance(gcs_output,
luigi.contrib.gcs.GCSTarget), 'Output[1] must be a Cloud Storage Target, not %s' % (
gcs_output)
bq_client = output.client
source_uris = self.source_uris()
assert all(x.startswith('gs://') for x in source_uris)
job = {
'projectId': output.table.project_id,
'configuration': {
'load': {
'destinationTable': {
'projectId': output.table.project_id,
'datasetId': output.table.dataset_id,
'tableId': output.table.table_id,
},
'encoding': self.encoding,
'sourceFormat': self.source_format,
'writeDisposition': self.write_disposition,
'sourceUris': source_uris,
'maxBadRecords': self.max_bad_records,
'ignoreUnknownValues': self.ignore_unknown_values
}
}
}
if self.source_format == luigi.contrib.bigquery.SourceFormat.CSV:
job['configuration']['load']['fieldDelimiter'] = self.field_delimiter
job['configuration']['load']['skipLeadingRows'] = self.skip_leading_rows
job['configuration']['load']['allowJaggedRows'] = self.allow_jagged_rows
job['configuration']['load']['allowQuotedNewlines'] = self.allow_quoted_new_lines
if self.schema:
job['configuration']['load']['schema'] = {'fields': self.schema}
# test write to and removal of GCS pseudo output in order to make sure this does not fail.
gcs_output.fs.put_string(
'test write for task {} (this file should have been removed immediately)'.format(self.task_id),
gcs_output.path)
gcs_output.fs.remove(gcs_output.path)
bq_client.run_job(output.table.project_id, job, dataset=output.table.dataset)
gcs_output.fs.put_string(
'success! The following BigQuery Job went through without errors: {}'.format(self.task_id), gcs_output.path)
它在 google 云存储上使用第二个输出(这可能违反 luigis 原子性原则)。用法示例:
class LeadsToBigQuery(BigQueryLoadIncremental):
date = luigi.DateParameter(default=datetime.date.today())
def output(self):
return luigi.contrib.bigquery.BigQueryTarget(project_id=...,
dataset_id=...,
table_id=...), \
create_gcs_target(...)
我有一个 Bigquery 任务,它的目的只是将每日温度 table (Table-xxxx-xx-xx
) 附加到现有的 table (PersistingTable
).
我不确定如何处理 output(self)
方法。事实上,我不能只将 PersistingTable
输出为 luigi.contrib.bigquery.BigQueryTarget
,因为它在进程开始之前就已经存在了。有没有人问过自己这样的问题?
我在其他地方找不到答案,所以我会给出我的解决方案,即使这是一个非常古老的问题。
我创建了一个继承自 luigi.contrib.bigquery.BigQueryLoadTask
的新 classclass BigQueryLoadIncremental(luigi.contrib.bigquery.BigQueryLoadTask):
'''
a subclass that checks whether a write-log on gcs exists to append data to the table
needs to define Two Outputs! [0] of type BigQueryTarget and [1] of type GCSTarget
Everything else is left unchanged
'''
def exists(self):
return luigi.contrib.gcs.GCSClient.exists(self.output()[1].path)
@property
def write_disposition(self):
"""
Set to WRITE_APPEND as this subclass only makes sense for this
"""
return luigi.contrib.bigquery.WriteDisposition.WRITE_APPEND
def run(self):
output = self.output()[0]
gcs_output = self.output()[1]
assert isinstance(output,
luigi.contrib.bigquery.BigQueryTarget), 'Output[0] must be a BigQueryTarget, not %s' % (
output)
assert isinstance(gcs_output,
luigi.contrib.gcs.GCSTarget), 'Output[1] must be a Cloud Storage Target, not %s' % (
gcs_output)
bq_client = output.client
source_uris = self.source_uris()
assert all(x.startswith('gs://') for x in source_uris)
job = {
'projectId': output.table.project_id,
'configuration': {
'load': {
'destinationTable': {
'projectId': output.table.project_id,
'datasetId': output.table.dataset_id,
'tableId': output.table.table_id,
},
'encoding': self.encoding,
'sourceFormat': self.source_format,
'writeDisposition': self.write_disposition,
'sourceUris': source_uris,
'maxBadRecords': self.max_bad_records,
'ignoreUnknownValues': self.ignore_unknown_values
}
}
}
if self.source_format == luigi.contrib.bigquery.SourceFormat.CSV:
job['configuration']['load']['fieldDelimiter'] = self.field_delimiter
job['configuration']['load']['skipLeadingRows'] = self.skip_leading_rows
job['configuration']['load']['allowJaggedRows'] = self.allow_jagged_rows
job['configuration']['load']['allowQuotedNewlines'] = self.allow_quoted_new_lines
if self.schema:
job['configuration']['load']['schema'] = {'fields': self.schema}
# test write to and removal of GCS pseudo output in order to make sure this does not fail.
gcs_output.fs.put_string(
'test write for task {} (this file should have been removed immediately)'.format(self.task_id),
gcs_output.path)
gcs_output.fs.remove(gcs_output.path)
bq_client.run_job(output.table.project_id, job, dataset=output.table.dataset)
gcs_output.fs.put_string(
'success! The following BigQuery Job went through without errors: {}'.format(self.task_id), gcs_output.path)
它在 google 云存储上使用第二个输出(这可能违反 luigis 原子性原则)。用法示例:
class LeadsToBigQuery(BigQueryLoadIncremental):
date = luigi.DateParameter(default=datetime.date.today())
def output(self):
return luigi.contrib.bigquery.BigQueryTarget(project_id=...,
dataset_id=...,
table_id=...), \
create_gcs_target(...)