如何重复 BigQueryOperator Dag 并将不同的日期传递到我的 sql 文件
How do I repeat BigQueryOperator Dag and pass different dates to my sql file
我有一个查询想 运行 使用 BigQueryOperator。在过去的 21 天里,每天都会 运行。 sql 文件保持不变,但传递给文件的日期发生了变化。因此,例如今天它将 运行 表示今天的日期,然后重复昨天的日期,然后重复 2 天前,一直到 21 天前。所以它将在 2021 年 7 月 14 日 运行,因此我需要将此日期传递到我的 sql 文件。然后它会 运行 7/13/2021,我需要传递到我的 sql 文件的日期是 7/13/2021。我怎样才能让这个 dag 在一个日期范围内重复,并将这个日期动态传递给 sql 文件。
在 BigQueryOperator 中,变量在“user_defined_macros”部分传递,所以我不知道如何更改我传递的日期。我考虑过循环日期数组,但我不知道如何将该日期传递给 BigQueryOperator 中链接的 sql 文件。
我的 sql 文件有 300 行长,所以我在下面包含了一个简单的例子,因为人们似乎要求一个。
DAG
with DAG(
dag_id,
schedule_interval='0 12 * * *',
start_date=datetime(2021, 1, 1),
template_searchpath='/opt/airflow/dags',
catchup=False,
user_defined_macros={"varsToPass":Var1
}
) as dag:
query_one = BigQueryOperator(
task_id='query_one',
sql='/sql/something.sql',
use_legacy_sql=False,
destination_dataset_table ='table',
write_disposition = 'WRITE_TRUNCATE'
)
sql 文件
SELECT * FROM table WHERE date = {{CHANGING_DATE}}
您的代码令人困惑,因为您描述了 today,today-1 day, ..., today - 21 days
的重复模式,但是您的代码显示 write_disposition = 'WRITE_TRUNCATE'
这意味着只有最后一个查询很重要,因为每个查询都会删除前一个查询的结果。由于没有提供更多信息,我假设您实际上是想 运行 从今天到今天 - 21 天之间的单个查询。
此外,您没有提及您所指的日期是 Airflow execution_date
还是今天的日期。
如果是execution_date
则不需要传递任何参数。 SQL 需要是:
SELECT * FROM table WHERE date BETWEEN {{ execution_date }} AND
{{ execution_date - macros.timedelta(days=21) }}
如果是今天那么你需要传递参数 params:
from datetime import datetime
query_one = BigQueryOperator(
task_id='query_one',
sql='/sql/something.sql',
use_legacy_sql=False,
destination_dataset_table ='table',
write_disposition = 'WRITE_TRUNCATE',
params={
"end": datetime.utcnow().strftime('%Y-%m-%d'),
"start": (datetime.now() - datetime.timedelta(days=21)).strftime('%Y-%m-%d')
}
)
然后在 SQL 中您可以将其用作:
SELECT * FROM table WHERE date BETWEEN {{ params.start }} AND
{{ params.end }}
我想指出,如果您不使用 execution_date
,那么我看不到从 Airflow 传递日期的价值。您可以直接使用 BigQuery 将查询设置为:
SELECT *
FROM table
WHERE date BETWEEN DATE_SUB(current_date(), INTERVAL 21 DAY) AND current_date()
如果我的假设不正确并且您想 运行 21 个查询,那么您可以按照您描述的那样使用循环来完成:
from datetime import datetime, timedelta
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
a = []
for i in range(0, 21):
a.append(
BigQueryOperator(
task_id=f'query_{i}',
sql='/sql/something.sql',
use_legacy_sql=False,
destination_dataset_table='table',
write_disposition='WRITE_TRUNCATE', # This is probably wrong, I just copied it from your code.
params={
"date_value": (datetime.now() - timedelta(days=i)).strftime('%Y-%m-%d')
}
)
)
if i not in [0]:
a[i - 1] >> a[i]
那么在您的 /sql/something.sql
中查询应该是:
SELECT * FROM table WHERE date = {{ params.date_value }}
如前所述,这将创建一个工作流:
另请注意,BigQueryOperator
已弃用。您应该使用 BigQueryExecuteQueryOperator
,它在 Google 提供商中可用,通过
from airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperator
有关如何安装 Google 提供程序的更多信息,请参阅下面的第二部分 answer。
我有一个查询想 运行 使用 BigQueryOperator。在过去的 21 天里,每天都会 运行。 sql 文件保持不变,但传递给文件的日期发生了变化。因此,例如今天它将 运行 表示今天的日期,然后重复昨天的日期,然后重复 2 天前,一直到 21 天前。所以它将在 2021 年 7 月 14 日 运行,因此我需要将此日期传递到我的 sql 文件。然后它会 运行 7/13/2021,我需要传递到我的 sql 文件的日期是 7/13/2021。我怎样才能让这个 dag 在一个日期范围内重复,并将这个日期动态传递给 sql 文件。
在 BigQueryOperator 中,变量在“user_defined_macros”部分传递,所以我不知道如何更改我传递的日期。我考虑过循环日期数组,但我不知道如何将该日期传递给 BigQueryOperator 中链接的 sql 文件。
我的 sql 文件有 300 行长,所以我在下面包含了一个简单的例子,因为人们似乎要求一个。
DAG
with DAG(
dag_id,
schedule_interval='0 12 * * *',
start_date=datetime(2021, 1, 1),
template_searchpath='/opt/airflow/dags',
catchup=False,
user_defined_macros={"varsToPass":Var1
}
) as dag:
query_one = BigQueryOperator(
task_id='query_one',
sql='/sql/something.sql',
use_legacy_sql=False,
destination_dataset_table ='table',
write_disposition = 'WRITE_TRUNCATE'
)
sql 文件
SELECT * FROM table WHERE date = {{CHANGING_DATE}}
您的代码令人困惑,因为您描述了 today,today-1 day, ..., today - 21 days
的重复模式,但是您的代码显示 write_disposition = 'WRITE_TRUNCATE'
这意味着只有最后一个查询很重要,因为每个查询都会删除前一个查询的结果。由于没有提供更多信息,我假设您实际上是想 运行 从今天到今天 - 21 天之间的单个查询。
此外,您没有提及您所指的日期是 Airflow execution_date
还是今天的日期。
如果是execution_date
则不需要传递任何参数。 SQL 需要是:
SELECT * FROM table WHERE date BETWEEN {{ execution_date }} AND
{{ execution_date - macros.timedelta(days=21) }}
如果是今天那么你需要传递参数 params:
from datetime import datetime
query_one = BigQueryOperator(
task_id='query_one',
sql='/sql/something.sql',
use_legacy_sql=False,
destination_dataset_table ='table',
write_disposition = 'WRITE_TRUNCATE',
params={
"end": datetime.utcnow().strftime('%Y-%m-%d'),
"start": (datetime.now() - datetime.timedelta(days=21)).strftime('%Y-%m-%d')
}
)
然后在 SQL 中您可以将其用作:
SELECT * FROM table WHERE date BETWEEN {{ params.start }} AND
{{ params.end }}
我想指出,如果您不使用 execution_date
,那么我看不到从 Airflow 传递日期的价值。您可以直接使用 BigQuery 将查询设置为:
SELECT *
FROM table
WHERE date BETWEEN DATE_SUB(current_date(), INTERVAL 21 DAY) AND current_date()
如果我的假设不正确并且您想 运行 21 个查询,那么您可以按照您描述的那样使用循环来完成:
from datetime import datetime, timedelta
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
a = []
for i in range(0, 21):
a.append(
BigQueryOperator(
task_id=f'query_{i}',
sql='/sql/something.sql',
use_legacy_sql=False,
destination_dataset_table='table',
write_disposition='WRITE_TRUNCATE', # This is probably wrong, I just copied it from your code.
params={
"date_value": (datetime.now() - timedelta(days=i)).strftime('%Y-%m-%d')
}
)
)
if i not in [0]:
a[i - 1] >> a[i]
那么在您的 /sql/something.sql
中查询应该是:
SELECT * FROM table WHERE date = {{ params.date_value }}
如前所述,这将创建一个工作流:
另请注意,BigQueryOperator
已弃用。您应该使用 BigQueryExecuteQueryOperator
,它在 Google 提供商中可用,通过
from airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperator
有关如何安装 Google 提供程序的更多信息,请参阅下面的第二部分 answer。