有没有办法用 Airflow 在 Dag 中参数化 BigQuery 运算符?

Is there a way to parametrize BigQueryOperators in a Dag with Airflow?

所以我的目标是创建一个带有 BigQueryOperators 的 Dag,我可以在我的 SQL 中发送带有参数化目的地 table 的 Airflow。 我检查了很多关于如何将参数发送到 PythonOperators 以便在 Airflow 中使用 --conf 调用它们的主题,但我不知道如何将相同的方法应用于 BigQueryOperators 的参数。

我的 dag.py 看起来像这样:


import airflow
import blabla..
from airflow.contrib.operators.bigquery_operator import BigQueryOperator

with DAG(
    "TestPython",
    schedule_interval=None,
    default_args=default_args,
    max_active_runs=1,
    catchup=False,
) as dag:


    stepOne = BigQueryOperator(
        task_id="stepOne",
        sql="SELECT * FROM `testTable` ",
        destination_dataset_table=" **variableTable** ",
        write_disposition="WRITE_TRUNCATE",
        use_legacy_sql=False,
    )

    stepOne

我想知道是否有一种方法可以使用 airflow trigger_dag 命令或其他方式设置目标 table 名称(当然,当它不存在时具有默认值设置它仍然可以上传到我的 Dag 存储桶中)

如果有什么不清楚的地方,我可以提供更多细节和我尝试过的方法。

是的,您可以将 运行 时间值传递给“destination_dataset_table”,因为它是一个模板化字段。

例如:

my_suffix = "{{ macros.ds_format(macros.ds_add(ds, -2), "%Y-%m-%d", "%Y%m%d") }}"
stepOne = BigQueryOperator(
    task_id="stepOne",
    sql="SELECT * FROM `testTable` ",
    destination_dataset_table=f"project_id.dataset_id.table_prefix_{my_suffix}",
    write_disposition="WRITE_TRUNCATE",
    use_legacy_sql=False,
)

在我的示例中,我使用 Airflow 宏来更改 table 名称来操纵日期,但您可以使用许多其他的,例如 XCOM:

"{{ task_instance.xcom_pull(task_ids='task_id', key='return_value') }}"

对于您的特定用例,我认为 应该可行。

You can pass parameters from the CLI using --conf '{"key":"value"}' and then use it in the DAG file as "{{ dag_run.conf["key"] }}" in templated field.