如何在气流中并行 运行 相同的任务?
how to run same tasks in parallel in airflow?
我想 运行 为不同的来源并行执行相同的任务,即 [A,B,C,D]。以下任务对于所有源都是相同的,目前我已经重复了 4 次相同的代码,然后 运行 它们并行,这个设置只是增加了代码行。我想要一种更好的方法来并行执行 运行 相同的任务(所有源的相同逻辑)并减少我的代码行。可能是模块化的方法。
delete_A_table = BigQueryDeleteTableOperator(
task_id="delete_A_table",
deletion_dataset_table=projectId+"."+dataSetId+".daily_A",
ignore_if_missing=True
)
create_A_ext_table = BigQueryCreateExternalTableOperator(
task_id="create_A_ext_table",
table_resource={
"tableReference": {
"projectId": 'projectId1',
"datasetId": 'datasetId1',
"tableId": "daily_A",
},
"externalDataConfiguration": {
"sourceUris": "gs://loc/source_data_A"+bucket_path,
"sourceFormat": "AVRO",
"compression": "NONE",
"skipLeadingRows": 1,
},
},
)
我运行以下实现并行:
delete_C_table >> create_C_ext_table >> [trigger_C, trigger_daily_vge_report]
delete_B_table >> create_B_ext_table
delete_D_table >> create_D_ext_table
TIA。
==在这里更新==================新代码===================== =====
这是实际代码,只是替换了几个表名。
import os
from datetime import datetime, timedelta
from airflow import models
from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateExternalTableOperator, \
BigQueryDeleteTableOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.operators.python_operator import PythonOperator
extract_day = int(models.Variable.get('day'))
ext_table_names = models.Variable.get('ext_table_names')
extract_date = (datetime.now() - timedelta(days=extract_day)).strftime('%Y-%m-%d')
dte = datetime.now() - timedelta(days=extract_day)
year = dte.year
mth = dte.month
day = dte.day
if day < 10:
day = "0" + str(day)
if mth < 10:
mth = "0" + str(mth)
bucket_path = "/year=" + str(year) + "/month=" + str(mth) + "/day=" + str(day) + "/*"
projectId = "projectId1"
dataSetId = "datasetid"
default_dag_args = {
'start_date': datetime(2022, 3, 4),
'email': ['atul.patil@gmail.com'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=1)
}
with models.DAG(
'GCS_to_BQ_ExternalTableForLoop',
default_args=default_dag_args,
catchup=False,
schedule_interval='0 2 * * *'
# schedule_interval=None
) as dag:
for table_name in ('A', 'B','C','D'):
delete_table_task = BigQueryDeleteTableOperator(
task_id=f"delete_{table_name}_table",
deletion_dataset_table=f"{projectId}.{dataSetId}.daily_{table_name}",
ignore_if_missing=True
)
create_table_ext_task = BigQueryCreateExternalTableOperator(
task_id=f"create_{table_name}_ext_table",
table_resource={
"tableReference": {
"projectId": projectId,
"datasetId": dataSetId,
"tableId": f"daily_{table_name}",
},
"externalDataConfiguration": {
"sourceUris": f"gs://loc-{table_name}/data_{table_name}{bucket_path}",
"sourceFormat": "AVRO",
"compression": "NONE",
"skipLeadingRows": 1,
},
},
)
trigger_dag_a = TriggerDagRunOperator(
task_id='trigger_DAG_A',
trigger_dag_id='DAG-A',
execution_date='{{ ds }}',
reset_dag_run=True
)
trigger_dag_b = TriggerDagRunOperator(
task_id='trigger_DAG_B',
trigger_dag_id='DAG-B',
execution_date='{{ ds }}',
reset_dag_run=True
)
现在,如果我给出如下依赖关系:
delete_table_task >> create_table_ext_task >> [trigger_dag_a, trigger_dag_b]
给我这样的图表(不是我需要的)
我想要的是这样的:
delete_A_table >> create_A_ext_table >> [trigger_DAG_C, trigger_DAG_D, Different_DAG]
delete_C_table >> create_C_ext_table >> [trigger_DAG_C, trigger_DAG_D, Different_DAG]
delete_B_table >> create_B_ext_table >> [Different_DAG]
delete_D_table >> create_D_ext_table >> [Different_DAG]
这就是我需要的。
如果我没理解错的话,你有固定数量的 table,并且你想 运行 每个 table 并行的相同流而不重复代码;类似的东西对你很有效:
for table_name in ["A", "B"]:
delete_table_task = BigQueryDeleteTableOperator(
task_id=f"delete_{table_name}_table",
deletion_dataset_table=f"{projectId}.{dataSetId}.daily_{table_name}",
ignore_if_missing=True
)
create_ext_table_task = BigQueryCreateExternalTableOperator(
task_id=f"create_{table_name}_ext_table",
table_resource={
"tableReference": {
"projectId": 'projectId1',
"datasetId": 'datasetId1',
"tableId": f"daily_{table_name}",
},
"externalDataConfiguration": {
"sourceUris": f"gs://loc/source_data_{table_name}{bucket_path}",
"sourceFormat": "AVRO",
"compression": "NONE",
"skipLeadingRows": 1,
},
},
)
delete_table_task >> create_ext_table_task
在table C的情况下,你有一个额外的流程,你总是可以在循环内做一些条件来在需要时添加流程。
根据评论进行编辑:
如果你想让循环中的所有任务都downstream/upstream到其他任务,你有两个选择:
- 使用任务组;参见 the following example。
- 可以在循环之前创建下游任务,在循环中写入
delete_table_task >> create_ext_table_task >> downstream_task
。
我想 运行 为不同的来源并行执行相同的任务,即 [A,B,C,D]。以下任务对于所有源都是相同的,目前我已经重复了 4 次相同的代码,然后 运行 它们并行,这个设置只是增加了代码行。我想要一种更好的方法来并行执行 运行 相同的任务(所有源的相同逻辑)并减少我的代码行。可能是模块化的方法。
delete_A_table = BigQueryDeleteTableOperator(
task_id="delete_A_table",
deletion_dataset_table=projectId+"."+dataSetId+".daily_A",
ignore_if_missing=True
)
create_A_ext_table = BigQueryCreateExternalTableOperator(
task_id="create_A_ext_table",
table_resource={
"tableReference": {
"projectId": 'projectId1',
"datasetId": 'datasetId1',
"tableId": "daily_A",
},
"externalDataConfiguration": {
"sourceUris": "gs://loc/source_data_A"+bucket_path,
"sourceFormat": "AVRO",
"compression": "NONE",
"skipLeadingRows": 1,
},
},
)
我运行以下实现并行:
delete_C_table >> create_C_ext_table >> [trigger_C, trigger_daily_vge_report]
delete_B_table >> create_B_ext_table
delete_D_table >> create_D_ext_table
TIA。
==在这里更新==================新代码===================== =====
这是实际代码,只是替换了几个表名。
import os
from datetime import datetime, timedelta
from airflow import models
from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateExternalTableOperator, \
BigQueryDeleteTableOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.operators.python_operator import PythonOperator
extract_day = int(models.Variable.get('day'))
ext_table_names = models.Variable.get('ext_table_names')
extract_date = (datetime.now() - timedelta(days=extract_day)).strftime('%Y-%m-%d')
dte = datetime.now() - timedelta(days=extract_day)
year = dte.year
mth = dte.month
day = dte.day
if day < 10:
day = "0" + str(day)
if mth < 10:
mth = "0" + str(mth)
bucket_path = "/year=" + str(year) + "/month=" + str(mth) + "/day=" + str(day) + "/*"
projectId = "projectId1"
dataSetId = "datasetid"
default_dag_args = {
'start_date': datetime(2022, 3, 4),
'email': ['atul.patil@gmail.com'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=1)
}
with models.DAG(
'GCS_to_BQ_ExternalTableForLoop',
default_args=default_dag_args,
catchup=False,
schedule_interval='0 2 * * *'
# schedule_interval=None
) as dag:
for table_name in ('A', 'B','C','D'):
delete_table_task = BigQueryDeleteTableOperator(
task_id=f"delete_{table_name}_table",
deletion_dataset_table=f"{projectId}.{dataSetId}.daily_{table_name}",
ignore_if_missing=True
)
create_table_ext_task = BigQueryCreateExternalTableOperator(
task_id=f"create_{table_name}_ext_table",
table_resource={
"tableReference": {
"projectId": projectId,
"datasetId": dataSetId,
"tableId": f"daily_{table_name}",
},
"externalDataConfiguration": {
"sourceUris": f"gs://loc-{table_name}/data_{table_name}{bucket_path}",
"sourceFormat": "AVRO",
"compression": "NONE",
"skipLeadingRows": 1,
},
},
)
trigger_dag_a = TriggerDagRunOperator(
task_id='trigger_DAG_A',
trigger_dag_id='DAG-A',
execution_date='{{ ds }}',
reset_dag_run=True
)
trigger_dag_b = TriggerDagRunOperator(
task_id='trigger_DAG_B',
trigger_dag_id='DAG-B',
execution_date='{{ ds }}',
reset_dag_run=True
)
现在,如果我给出如下依赖关系:
delete_table_task >> create_table_ext_task >> [trigger_dag_a, trigger_dag_b]
给我这样的图表(不是我需要的)
我想要的是这样的:
delete_A_table >> create_A_ext_table >> [trigger_DAG_C, trigger_DAG_D, Different_DAG]
delete_C_table >> create_C_ext_table >> [trigger_DAG_C, trigger_DAG_D, Different_DAG]
delete_B_table >> create_B_ext_table >> [Different_DAG]
delete_D_table >> create_D_ext_table >> [Different_DAG]
这就是我需要的。
如果我没理解错的话,你有固定数量的 table,并且你想 运行 每个 table 并行的相同流而不重复代码;类似的东西对你很有效:
for table_name in ["A", "B"]:
delete_table_task = BigQueryDeleteTableOperator(
task_id=f"delete_{table_name}_table",
deletion_dataset_table=f"{projectId}.{dataSetId}.daily_{table_name}",
ignore_if_missing=True
)
create_ext_table_task = BigQueryCreateExternalTableOperator(
task_id=f"create_{table_name}_ext_table",
table_resource={
"tableReference": {
"projectId": 'projectId1',
"datasetId": 'datasetId1',
"tableId": f"daily_{table_name}",
},
"externalDataConfiguration": {
"sourceUris": f"gs://loc/source_data_{table_name}{bucket_path}",
"sourceFormat": "AVRO",
"compression": "NONE",
"skipLeadingRows": 1,
},
},
)
delete_table_task >> create_ext_table_task
在table C的情况下,你有一个额外的流程,你总是可以在循环内做一些条件来在需要时添加流程。
根据评论进行编辑: 如果你想让循环中的所有任务都downstream/upstream到其他任务,你有两个选择:
- 使用任务组;参见 the following example。
- 可以在循环之前创建下游任务,在循环中写入
delete_table_task >> create_ext_table_task >> downstream_task
。