如何在气流中并行 运行 相同的任务?

how to run same tasks in parallel in airflow?

我想 运行 为不同的来源并行执行相同的任务,即 [A,B,C,D]。以下任务对于所有源都是相同的,目前我已经重复了 4 次相同的代码,然后 运行 它们并行,这个设置只是增加了代码行。我想要一种更好的方法来并行执行 运行 相同的任务(所有源的相同逻辑)并减少我的代码行。可能是模块化的方法。

        delete_A_table = BigQueryDeleteTableOperator(
            task_id="delete_A_table",
            deletion_dataset_table=projectId+"."+dataSetId+".daily_A",
            ignore_if_missing=True
        )

        create_A_ext_table = BigQueryCreateExternalTableOperator(
            task_id="create_A_ext_table",
            table_resource={
                "tableReference": {
                    "projectId": 'projectId1',
                    "datasetId": 'datasetId1',
                    "tableId": "daily_A",
                },
                "externalDataConfiguration": {
                    "sourceUris": "gs://loc/source_data_A"+bucket_path,
                    "sourceFormat": "AVRO",
                    "compression": "NONE",
                    "skipLeadingRows": 1,
                },
            },
        )

我运行以下实现并行:

delete_C_table >> create_C_ext_table >> [trigger_C, trigger_daily_vge_report]
delete_B_table >> create_B_ext_table
delete_D_table >> create_D_ext_table

TIA。

==在这里更新==================新代码===================== =====

这是实际代码,只是替换了几个表名。

import os
from datetime import datetime, timedelta
from airflow import models
from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateExternalTableOperator, \
    BigQueryDeleteTableOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.operators.python_operator import PythonOperator

extract_day = int(models.Variable.get('day'))
ext_table_names = models.Variable.get('ext_table_names')

extract_date = (datetime.now() - timedelta(days=extract_day)).strftime('%Y-%m-%d')

dte = datetime.now() - timedelta(days=extract_day)
year = dte.year
mth = dte.month
day = dte.day

if day < 10:
    day = "0" + str(day)

if mth < 10:
    mth = "0" + str(mth)

bucket_path = "/year=" + str(year) + "/month=" + str(mth) + "/day=" + str(day) + "/*"

projectId = "projectId1"
dataSetId = "datasetid"

default_dag_args = {
    'start_date': datetime(2022, 3, 4),
    'email': ['atul.patil@gmail.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=1)
}

with models.DAG(
        'GCS_to_BQ_ExternalTableForLoop',
        default_args=default_dag_args,
        catchup=False,
        schedule_interval='0 2 * * *'
        # schedule_interval=None
) as dag:
    
    for table_name in ('A', 'B','C','D'):
        delete_table_task = BigQueryDeleteTableOperator(
            task_id=f"delete_{table_name}_table",
            deletion_dataset_table=f"{projectId}.{dataSetId}.daily_{table_name}",
            ignore_if_missing=True
        )
    
        create_table_ext_task = BigQueryCreateExternalTableOperator(
            task_id=f"create_{table_name}_ext_table",
            table_resource={
                "tableReference": {
                    "projectId": projectId,
                    "datasetId": dataSetId,
                    "tableId": f"daily_{table_name}",
                },
                "externalDataConfiguration": {
                    "sourceUris": f"gs://loc-{table_name}/data_{table_name}{bucket_path}",
                    "sourceFormat": "AVRO",
                    "compression": "NONE",
                    "skipLeadingRows": 1,
                },
            },
        )
        
    trigger_dag_a = TriggerDagRunOperator(
         task_id='trigger_DAG_A',
         trigger_dag_id='DAG-A',
         execution_date='{{ ds }}',
         reset_dag_run=True
    )
    
    trigger_dag_b = TriggerDagRunOperator(
         task_id='trigger_DAG_B',
         trigger_dag_id='DAG-B',
         execution_date='{{ ds }}',
         reset_dag_run=True
    )

现在,如果我给出如下依赖关系:

delete_table_task >> create_table_ext_task >> [trigger_dag_a, trigger_dag_b]

给我这样的图表(不是我需要的)

我想要的是这样的:

    delete_A_table >> create_A_ext_table >> [trigger_DAG_C, trigger_DAG_D, Different_DAG]
    delete_C_table >> create_C_ext_table >> [trigger_DAG_C, trigger_DAG_D, Different_DAG]
    delete_B_table >> create_B_ext_table >> [Different_DAG]
    delete_D_table >> create_D_ext_table >> [Different_DAG]

这就是我需要的。

如果我没理解错的话,你有固定数量的 table,并且你想 运行 每个 table 并行的相同流而不重复代码;类似的东西对你很有效:

for table_name in ["A", "B"]:
    delete_table_task = BigQueryDeleteTableOperator(
        task_id=f"delete_{table_name}_table",
        deletion_dataset_table=f"{projectId}.{dataSetId}.daily_{table_name}",
        ignore_if_missing=True
    )

    create_ext_table_task = BigQueryCreateExternalTableOperator(
        task_id=f"create_{table_name}_ext_table",
        table_resource={
            "tableReference": {
                "projectId": 'projectId1',
                "datasetId": 'datasetId1',
                "tableId": f"daily_{table_name}",
            },
            "externalDataConfiguration": {
                "sourceUris": f"gs://loc/source_data_{table_name}{bucket_path}",
                "sourceFormat": "AVRO",
                "compression": "NONE",
                "skipLeadingRows": 1,
            },
        },
    )

    delete_table_task >> create_ext_table_task

在table C的情况下,你有一个额外的流程,你总是可以在循环内做一些条件来在需要时添加流程。

根据评论进行编辑: 如果你想让循环中的所有任务都downstream/upstream到其他任务,你有两个选择:

  1. 使用任务组;参见 the following example
  2. 可以在循环之前创建下游任务,在循环中写入delete_table_task >> create_ext_table_task >> downstream_task