如何重复 BigQueryOperator Dag 并将不同的日期传递到我的 sql 文件

How do I repeat BigQueryOperator Dag and pass different dates to my sql file

我有一个查询想 运行 使用 BigQueryOperator。在过去的 21 天里,每天都会 运行。 sql 文件保持不变,但传递给文件的日期发生了变化。因此,例如今天它将 运行 表示今天的日期,然后重复昨天的日期,然后重复 2 天前,一直到 21 天前。所以它将在 2021 年 7 月 14 日 运行,因此我需要将此日期传递到我的 sql 文件。然后它会 运行 7/13/2021,我需要传递到我的 sql 文件的日期是 7/13/2021。我怎样才能让这个 dag 在一个日期范围内重复,并将这个日期动态传递给 sql 文件。

在 BigQueryOperator 中,变量在“user_defined_macros”部分传递,所以我不知道如何更改我传递的日期。我考虑过循环日期数组,但我不知道如何将该日期传递给 BigQueryOperator 中链接的 sql 文件。

我的 sql 文件有 300 行长,所以我在下面包含了一个简单的例子,因为人们似乎要求一个。

DAG

with DAG(
    dag_id,
    schedule_interval='0 12 * * *',
    start_date=datetime(2021, 1, 1),
    template_searchpath='/opt/airflow/dags',
    catchup=False,
    user_defined_macros={"varsToPass":Var1
    }

) as dag:
    query_one = BigQueryOperator(
        task_id='query_one',
        sql='/sql/something.sql',
        use_legacy_sql=False,
        destination_dataset_table ='table',
        write_disposition = 'WRITE_TRUNCATE'
        
    )

sql 文件

SELECT * FROM table WHERE date = {{CHANGING_DATE}}

您的代码令人困惑,因为您描述了 today,today-1 day, ..., today - 21 days 的重复模式,但是您的代码显示 write_disposition = 'WRITE_TRUNCATE' 这意味着只有最后一个查询很重要,因为每个查询都会删除前一个查询的结果。由于没有提供更多信息,我假设您实际上是想 运行 从今天到今天 - 21 天之间的单个查询。 此外,您没有提及您所指的日期是 Airflow execution_date 还是今天的日期。

如果是execution_date则不需要传递任何参数。 SQL 需要是:

SELECT * FROM table WHERE date BETWEEN {{ execution_date }} AND
{{ execution_date - macros.timedelta(days=21) }}

如果是今天那么你需要传递参数 params:

from datetime import datetime
query_one = BigQueryOperator(
    task_id='query_one',
    sql='/sql/something.sql',
    use_legacy_sql=False,
    destination_dataset_table ='table',
    write_disposition = 'WRITE_TRUNCATE',
    params={
            "end": datetime.utcnow().strftime('%Y-%m-%d'),
            "start": (datetime.now() - datetime.timedelta(days=21)).strftime('%Y-%m-%d')
    }
    
)

然后在 SQL 中您可以将其用作:

SELECT * FROM table WHERE date BETWEEN {{ params.start }} AND
{{ params.end }}

我想指出,如果您不使用 execution_date,那么我看不到从 Airflow 传递日期的价值。您可以直接使用 BigQuery 将查询设置为:

SELECT *
FROM table
WHERE date BETWEEN DATE_SUB(current_date(), INTERVAL 21 DAY) AND current_date()

如果我的假设不正确并且您想 运行 21 个查询,那么您可以按照您描述的那样使用循环来完成:

from datetime import datetime, timedelta
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
a = []
for i in range(0, 21):
    a.append(
        BigQueryOperator(
            task_id=f'query_{i}',
            sql='/sql/something.sql',
            use_legacy_sql=False,
            destination_dataset_table='table',
            write_disposition='WRITE_TRUNCATE',  # This is probably wrong, I just copied it from your code.
            params={
                "date_value": (datetime.now() - timedelta(days=i)).strftime('%Y-%m-%d')
            }
        )

    )
    if i not in [0]:
        a[i - 1] >> a[i]

那么在您的 /sql/something.sql 中查询应该是:

SELECT * FROM table WHERE date = {{ params.date_value }}

如前所述,这将创建一个工作流:

另请注意,BigQueryOperator 已弃用。您应该使用 BigQueryExecuteQueryOperator,它在 Google 提供商中可用,通过

from airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperator

有关如何安装 Google 提供程序的更多信息,请参阅下面的第二部分 answer