如何在加载到 Apache Airflow 中的 BigQuery 之前转换数据?

How to transform data before loading into BigQuery in Apache Airflow?

我是 Apache Airflow 的新手。我的任务是从 Google Cloud Storage 读取数据,转换数据并将转换后的数据上传到 BigQuery table。我能够从 Cloud Storage 存储桶中获取数据并将其直接存储到 BigQuery table。我不确定如何在此管道中包含转换函数。

这是我的代码:

# Import libraries needed for the operation
import airflow
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator

# Default Argument
default_args = {
    'owner': <OWNER_NAME>,
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=2),
}

# DAG Definition
dag = DAG('load_from_bucket_to_bq',
schedule_interval='0 * * * *',
default_args=default_args)

# Variable Configurations
BQ_CONN_ID = <CONN_ID>
BQ_PROJECT = <PROJECT_ID>
BQ_DATASET = <DATASET_ID>

with dag:
    # Tasks
    start = DummyOperator(
        task_id='start'
    )

    upload = GoogleCloudStorageToBigQueryOperator(
        task_id='load_from_bucket_to_bigquery',
        bucket=<BUCKET_NAME>,
        source_objects=['*.csv'],
        schema_fields=[
            {'name': 'Active_Cases', 'type': 'STRING', 'mode': 'NULLABLE'},
            {'name': 'Country', 'type': 'STRING', 'mode': 'NULLABLE'},
            {'name': 'Last_Update', 'type': 'STRING', 'mode': 'NULLABLE'},
            {'name': 'New_Cases', 'type': 'STRING', 'mode': 'NULLABLE'},
            {'name': 'New_Deaths', 'type': 'STRING', 'mode': 'NULLABLE'},
            {'name': 'Total_Cases', 'type': 'STRING', 'mode': 'NULLABLE'},
            {'name': 'Total_Deaths', 'type': 'STRING', 'mode': 'NULLABLE'},
            {'name': 'Total_Recovered', 'type': 'STRING', 'mode': 'NULLABLE'},
        ],
        destination_project_dataset_table=BQ_PROJECT + '.' + BQ_DATASET + '.' + <TABLE_NAME>,
        write_disposition='WRITE_TRUNCATE',
        google_cloud_storage_conn_id=BQ_CONN_ID,
        bigquery_conn_id=BQ_CONN_ID,
        dag = dag
    )

    end = DummyOperator(
        task_id='end'
    )

    # Setting Dependencies
    start >> upload >> end

感谢任何有关如何进行的帮助。谢谢

发布与@sachinmb27 的对话作为答案。转换可以放在 python 函数中并使用 PythonOperator to call the transform function at runtime. More details on what operators can be used in Airflow can be seen in Airflow Operator docs.