如何动态嵌套 Airflow DAG?

How to nest an Airflow DAG dynamically?

我有一个包含三个运算符的简单 DAG。第一个是 PythonOperator 具有我们自己的功能,另外两个是来自 airflow.contrib 的标准运算符(准确地说是 FileToGoogleCloudStorageOperatorGoogleCloudStorageToBigQueryOperator)。他们按顺序工作。我们的自定义任务会生成许多文件,通常在 2 到 5 个之间,具体取决于参数。所有这些文件都必须由后续任务单独处理。这意味着我想要几个下游分支,但不知道 DAG 之前到底有多少 运行.

你会如何解决这个问题?

更新:

使用 jhnclvr 在他的 中提到的 BranchPythonOperator 作为出发点,我创建了一个运算符,它会根据条件跳过或继续执行分支。这种方法是可行的,因为最大可能的分支数是已知的并且足够小。

运营商:

class SkipOperator(PythonOperator):
    def execute(self, context):
        boolean = super(SkipOperator, self).execute(context)
        session = settings.Session()
        for task in context['task'].downstream_list:
            if boolean is False:
                ti = TaskInstance(
                    task, execution_date=context['ti'].execution_date)
                ti.state = State.SKIPPED
                ti.start_date = datetime.now()
                ti.end_date = datetime.now()
                session.merge(ti)
        session.commit()
        session.close()

用法:

def check(i, templates_dict=None, **kwargs):
    return len(templates_dict["data_list"].split(",")) > i

dag = DAG(
    dag_name,
    default_args=default_args,
    schedule_interval=None
)

load = CustomOperator(
    task_id="load_op",
    bash_command=' '.join([
        './command.sh'
        '--data-list {{ dag_run.conf["data_list"]|join(",") }}'
    ]),
    dag=dag
)

for i in range(0, 5):
    condition = SkipOperator(
        task_id=f"{dag_name}_condition_{i}",
        python_callable=partial(check, i),
        provide_context=True,
        templates_dict={"data_list": '{{ dag_run.conf["data_list"]|join(",") }}'},
        dag=dag
    )
    gs_filename = 'prefix_{{ dag_run.conf["data_list"][%d] }}.json' % i

    load_to_gcs = CustomFileToGoogleCloudStorageOperator(
        task_id=f"{dag_name}_to_gs_{i}",
        src='/tmp/{{ run_id }}_%d.{{ dag_run.conf["file_extension"] }}' % i,
        bucket=gs_bucket,
        dst=gs_filename,
        mime_type='application/json',
        google_cloud_storage_conn_id=connection_id,
        dag=dag
    )
    load_to_bq = GoogleCloudStorageToBigQueryOperator(
        task_id=f"{dag_name}_to_bq_{i}",
        bucket=gs_bucket,
        source_objects=[gs_filename, ],
        source_format='NEWLINE_DELIMITED_JSON',
        destination_project_dataset_table='myproject.temp_{{ dag_run.conf["data_list"][%d] }}' % i,
        bigquery_conn_id=connection_id,
        schema_fields={},
        google_cloud_storage_conn_id=connection_id,
        write_disposition='WRITE_TRUNCATE',
        dag=dag
    )

    condition.set_upstream(load)
    load_to_gcs.set_upstream(condition)
    load_to_bq.set_upstream(load_to_gcs)

基本上,当 运行 时,您无法将任务添加到 DAG。您需要提前知道要添加多少任务。

您可以使用一个运算符处理 N 个文件。

或者,如果您有另一个处理文件的单独 dag,您可以触发该文件 DAG N 次,在 conf 中传递文件名。

See here for an example of the TriggerDagRunOperator.

See here for the DAG that would be triggered.

And lastly see this post from which the above examples are from.