使用 Luigi Orchestrator 在 BigQuery 中附加作业的输出

output for append job in BigQuery using Luigi Orchestrator

我有一个 Bigquery 任务,它的目的只是将每日温度 table (Table-xxxx-xx-xx) 附加到现有的 table (PersistingTable).

我不确定如何处理 output(self) 方法。事实上,我不能只将 PersistingTable 输出为 luigi.contrib.bigquery.BigQueryTarget,因为它在进程开始之前就已经存在了。有没有人问过自己这样的问题?

我在其他地方找不到答案,所以我会给出我的解决方案,即使这是一个非常古老的问题。

我创建了一个继承自 luigi.contrib.bigquery.BigQueryLoadTask

的新 class
class BigQueryLoadIncremental(luigi.contrib.bigquery.BigQueryLoadTask):
        '''
        a subclass that checks whether a write-log on gcs exists to append data to the table
        needs to define Two Outputs! [0] of type BigQueryTarget and [1] of type GCSTarget
        Everything else is left unchanged
        '''

    def exists(self):
        return luigi.contrib.gcs.GCSClient.exists(self.output()[1].path)

    @property
    def write_disposition(self):
        """
        Set to WRITE_APPEND as this subclass only makes sense for this
        """
        return luigi.contrib.bigquery.WriteDisposition.WRITE_APPEND

    def run(self):
        output = self.output()[0]
        gcs_output = self.output()[1]
        assert isinstance(output,
                          luigi.contrib.bigquery.BigQueryTarget), 'Output[0] must be a BigQueryTarget, not %s' % (
        output)
        assert isinstance(gcs_output,
                          luigi.contrib.gcs.GCSTarget), 'Output[1] must be a Cloud Storage Target, not %s' % (
            gcs_output)

        bq_client = output.client

        source_uris = self.source_uris()
        assert all(x.startswith('gs://') for x in source_uris)

        job = {
            'projectId': output.table.project_id,
            'configuration': {
                'load': {
                    'destinationTable': {
                        'projectId': output.table.project_id,
                        'datasetId': output.table.dataset_id,
                        'tableId': output.table.table_id,
                    },
                    'encoding': self.encoding,
                    'sourceFormat': self.source_format,
                    'writeDisposition': self.write_disposition,
                    'sourceUris': source_uris,
                    'maxBadRecords': self.max_bad_records,
                    'ignoreUnknownValues': self.ignore_unknown_values
                }
            }
        }

        if self.source_format == luigi.contrib.bigquery.SourceFormat.CSV:
            job['configuration']['load']['fieldDelimiter'] = self.field_delimiter
            job['configuration']['load']['skipLeadingRows'] = self.skip_leading_rows
            job['configuration']['load']['allowJaggedRows'] = self.allow_jagged_rows
            job['configuration']['load']['allowQuotedNewlines'] = self.allow_quoted_new_lines

        if self.schema:
            job['configuration']['load']['schema'] = {'fields': self.schema}

        # test write to and removal of GCS pseudo output in order to make sure this does not fail.
        gcs_output.fs.put_string(
            'test write for task {} (this file should have been removed immediately)'.format(self.task_id),
            gcs_output.path)
        gcs_output.fs.remove(gcs_output.path)

        bq_client.run_job(output.table.project_id, job, dataset=output.table.dataset)

        gcs_output.fs.put_string(
            'success! The following BigQuery Job went through without errors: {}'.format(self.task_id), gcs_output.path)

它在 google 云存储上使用第二个输出(这可能违反 luigis 原子性原则)。用法示例:

class LeadsToBigQuery(BigQueryLoadIncremental):
    date = luigi.DateParameter(default=datetime.date.today())


    def output(self):
        return luigi.contrib.bigquery.BigQueryTarget(project_id=...,
                                                     dataset_id=...,
                                                     table_id=...), \
               create_gcs_target(...)