有没有办法用 Airflow 在 Dag 中参数化 BigQuery 运算符?
Is there a way to parametrize BigQueryOperators in a Dag with Airflow?
所以我的目标是创建一个带有 BigQueryOperators 的 Dag,我可以在我的 SQL 中发送带有参数化目的地 table 的 Airflow。
我检查了很多关于如何将参数发送到 PythonOperators 以便在 Airflow 中使用 --conf 调用它们的主题,但我不知道如何将相同的方法应用于 BigQueryOperators 的参数。
我的 dag.py 看起来像这样:
import airflow
import blabla..
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
with DAG(
"TestPython",
schedule_interval=None,
default_args=default_args,
max_active_runs=1,
catchup=False,
) as dag:
stepOne = BigQueryOperator(
task_id="stepOne",
sql="SELECT * FROM `testTable` ",
destination_dataset_table=" **variableTable** ",
write_disposition="WRITE_TRUNCATE",
use_legacy_sql=False,
)
stepOne
我想知道是否有一种方法可以使用 airflow trigger_dag 命令或其他方式设置目标 table 名称(当然,当它不存在时具有默认值设置它仍然可以上传到我的 Dag 存储桶中)
如果有什么不清楚的地方,我可以提供更多细节和我尝试过的方法。
是的,您可以将 运行 时间值传递给“destination_dataset_table”,因为它是一个模板化字段。
例如:
my_suffix = "{{ macros.ds_format(macros.ds_add(ds, -2), "%Y-%m-%d", "%Y%m%d") }}"
stepOne = BigQueryOperator(
task_id="stepOne",
sql="SELECT * FROM `testTable` ",
destination_dataset_table=f"project_id.dataset_id.table_prefix_{my_suffix}",
write_disposition="WRITE_TRUNCATE",
use_legacy_sql=False,
)
在我的示例中,我使用 Airflow 宏来更改 table 名称来操纵日期,但您可以使用许多其他的,例如 XCOM:
"{{ task_instance.xcom_pull(task_ids='task_id', key='return_value') }}"
对于您的特定用例,我认为 应该可行。
You can pass parameters from the CLI using --conf '{"key":"value"}' and then use it in the DAG file as "{{ dag_run.conf["key"] }}" in templated field.
所以我的目标是创建一个带有 BigQueryOperators 的 Dag,我可以在我的 SQL 中发送带有参数化目的地 table 的 Airflow。 我检查了很多关于如何将参数发送到 PythonOperators 以便在 Airflow 中使用 --conf 调用它们的主题,但我不知道如何将相同的方法应用于 BigQueryOperators 的参数。
我的 dag.py 看起来像这样:
import airflow
import blabla..
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
with DAG(
"TestPython",
schedule_interval=None,
default_args=default_args,
max_active_runs=1,
catchup=False,
) as dag:
stepOne = BigQueryOperator(
task_id="stepOne",
sql="SELECT * FROM `testTable` ",
destination_dataset_table=" **variableTable** ",
write_disposition="WRITE_TRUNCATE",
use_legacy_sql=False,
)
stepOne
我想知道是否有一种方法可以使用 airflow trigger_dag 命令或其他方式设置目标 table 名称(当然,当它不存在时具有默认值设置它仍然可以上传到我的 Dag 存储桶中)
如果有什么不清楚的地方,我可以提供更多细节和我尝试过的方法。
是的,您可以将 运行 时间值传递给“destination_dataset_table”,因为它是一个模板化字段。
例如:
my_suffix = "{{ macros.ds_format(macros.ds_add(ds, -2), "%Y-%m-%d", "%Y%m%d") }}"
stepOne = BigQueryOperator(
task_id="stepOne",
sql="SELECT * FROM `testTable` ",
destination_dataset_table=f"project_id.dataset_id.table_prefix_{my_suffix}",
write_disposition="WRITE_TRUNCATE",
use_legacy_sql=False,
)
在我的示例中,我使用 Airflow 宏来更改 table 名称来操纵日期,但您可以使用许多其他的,例如 XCOM:
"{{ task_instance.xcom_pull(task_ids='task_id', key='return_value') }}"
对于您的特定用例,我认为
You can pass parameters from the CLI using --conf '{"key":"value"}' and then use it in the DAG file as "{{ dag_run.conf["key"] }}" in templated field.