如何动态嵌套 Airflow DAG?
How to nest an Airflow DAG dynamically?
我有一个包含三个运算符的简单 DAG。第一个是 PythonOperator
具有我们自己的功能,另外两个是来自 airflow.contrib
的标准运算符(准确地说是 FileToGoogleCloudStorageOperator
和 GoogleCloudStorageToBigQueryOperator
)。他们按顺序工作。我们的自定义任务会生成许多文件,通常在 2 到 5 个之间,具体取决于参数。所有这些文件都必须由后续任务单独处理。这意味着我想要几个下游分支,但不知道 DAG 之前到底有多少 运行.
你会如何解决这个问题?
更新:
使用 jhnclvr 在他的 中提到的 BranchPythonOperator
作为出发点,我创建了一个运算符,它会根据条件跳过或继续执行分支。这种方法是可行的,因为最大可能的分支数是已知的并且足够小。
运营商:
class SkipOperator(PythonOperator):
def execute(self, context):
boolean = super(SkipOperator, self).execute(context)
session = settings.Session()
for task in context['task'].downstream_list:
if boolean is False:
ti = TaskInstance(
task, execution_date=context['ti'].execution_date)
ti.state = State.SKIPPED
ti.start_date = datetime.now()
ti.end_date = datetime.now()
session.merge(ti)
session.commit()
session.close()
用法:
def check(i, templates_dict=None, **kwargs):
return len(templates_dict["data_list"].split(",")) > i
dag = DAG(
dag_name,
default_args=default_args,
schedule_interval=None
)
load = CustomOperator(
task_id="load_op",
bash_command=' '.join([
'./command.sh'
'--data-list {{ dag_run.conf["data_list"]|join(",") }}'
]),
dag=dag
)
for i in range(0, 5):
condition = SkipOperator(
task_id=f"{dag_name}_condition_{i}",
python_callable=partial(check, i),
provide_context=True,
templates_dict={"data_list": '{{ dag_run.conf["data_list"]|join(",") }}'},
dag=dag
)
gs_filename = 'prefix_{{ dag_run.conf["data_list"][%d] }}.json' % i
load_to_gcs = CustomFileToGoogleCloudStorageOperator(
task_id=f"{dag_name}_to_gs_{i}",
src='/tmp/{{ run_id }}_%d.{{ dag_run.conf["file_extension"] }}' % i,
bucket=gs_bucket,
dst=gs_filename,
mime_type='application/json',
google_cloud_storage_conn_id=connection_id,
dag=dag
)
load_to_bq = GoogleCloudStorageToBigQueryOperator(
task_id=f"{dag_name}_to_bq_{i}",
bucket=gs_bucket,
source_objects=[gs_filename, ],
source_format='NEWLINE_DELIMITED_JSON',
destination_project_dataset_table='myproject.temp_{{ dag_run.conf["data_list"][%d] }}' % i,
bigquery_conn_id=connection_id,
schema_fields={},
google_cloud_storage_conn_id=connection_id,
write_disposition='WRITE_TRUNCATE',
dag=dag
)
condition.set_upstream(load)
load_to_gcs.set_upstream(condition)
load_to_bq.set_upstream(load_to_gcs)
基本上,当 运行 时,您无法将任务添加到 DAG
。您需要提前知道要添加多少任务。
您可以使用一个运算符处理 N 个文件。
或者,如果您有另一个处理文件的单独 dag,您可以触发该文件 DAG
N 次,在 conf 中传递文件名。
See here for an example of the TriggerDagRunOperator
.
See here for the DAG
that would be triggered.
And lastly see this post from which the above examples are from.
我有一个包含三个运算符的简单 DAG。第一个是 PythonOperator
具有我们自己的功能,另外两个是来自 airflow.contrib
的标准运算符(准确地说是 FileToGoogleCloudStorageOperator
和 GoogleCloudStorageToBigQueryOperator
)。他们按顺序工作。我们的自定义任务会生成许多文件,通常在 2 到 5 个之间,具体取决于参数。所有这些文件都必须由后续任务单独处理。这意味着我想要几个下游分支,但不知道 DAG 之前到底有多少 运行.
你会如何解决这个问题?
更新:
使用 jhnclvr 在他的 BranchPythonOperator
作为出发点,我创建了一个运算符,它会根据条件跳过或继续执行分支。这种方法是可行的,因为最大可能的分支数是已知的并且足够小。
运营商:
class SkipOperator(PythonOperator):
def execute(self, context):
boolean = super(SkipOperator, self).execute(context)
session = settings.Session()
for task in context['task'].downstream_list:
if boolean is False:
ti = TaskInstance(
task, execution_date=context['ti'].execution_date)
ti.state = State.SKIPPED
ti.start_date = datetime.now()
ti.end_date = datetime.now()
session.merge(ti)
session.commit()
session.close()
用法:
def check(i, templates_dict=None, **kwargs):
return len(templates_dict["data_list"].split(",")) > i
dag = DAG(
dag_name,
default_args=default_args,
schedule_interval=None
)
load = CustomOperator(
task_id="load_op",
bash_command=' '.join([
'./command.sh'
'--data-list {{ dag_run.conf["data_list"]|join(",") }}'
]),
dag=dag
)
for i in range(0, 5):
condition = SkipOperator(
task_id=f"{dag_name}_condition_{i}",
python_callable=partial(check, i),
provide_context=True,
templates_dict={"data_list": '{{ dag_run.conf["data_list"]|join(",") }}'},
dag=dag
)
gs_filename = 'prefix_{{ dag_run.conf["data_list"][%d] }}.json' % i
load_to_gcs = CustomFileToGoogleCloudStorageOperator(
task_id=f"{dag_name}_to_gs_{i}",
src='/tmp/{{ run_id }}_%d.{{ dag_run.conf["file_extension"] }}' % i,
bucket=gs_bucket,
dst=gs_filename,
mime_type='application/json',
google_cloud_storage_conn_id=connection_id,
dag=dag
)
load_to_bq = GoogleCloudStorageToBigQueryOperator(
task_id=f"{dag_name}_to_bq_{i}",
bucket=gs_bucket,
source_objects=[gs_filename, ],
source_format='NEWLINE_DELIMITED_JSON',
destination_project_dataset_table='myproject.temp_{{ dag_run.conf["data_list"][%d] }}' % i,
bigquery_conn_id=connection_id,
schema_fields={},
google_cloud_storage_conn_id=connection_id,
write_disposition='WRITE_TRUNCATE',
dag=dag
)
condition.set_upstream(load)
load_to_gcs.set_upstream(condition)
load_to_bq.set_upstream(load_to_gcs)
基本上,当 运行 时,您无法将任务添加到 DAG
。您需要提前知道要添加多少任务。
您可以使用一个运算符处理 N 个文件。
或者,如果您有另一个处理文件的单独 dag,您可以触发该文件 DAG
N 次,在 conf 中传递文件名。
See here for an example of the TriggerDagRunOperator
.
See here for the DAG
that would be triggered.
And lastly see this post from which the above examples are from.