当下游任务定义依赖于上游结果时如何设置 DAG
How to set up a DAG when downstream task definitions depend on upstream outcomes
我的问题是关于一个 DAG,它根据 MySQL table 中由上游任务删除和重建的行数来动态定义 group of parallel tasks。我遇到的困难是,在我的上游任务中,我 TRUNCATE
这个 table 在再次重建它之前清除它。这就是sherlock_join_and_export_task
。当我这样做时,行数下降到零,并且我的动态生成的任务不再被定义。当 table 恢复时,图的结构也恢复了,但任务不再执行。相反,它们在树视图中显示为黑框:
下面是 sherlock_join_and_export_task
删除行 count = worker.count_online_table()
中引用的 table 后的 DAG:
sherlock_join_and_export_task
完成后,DAG 看起来像这样:
但是,None 的任务已排队并执行。 DAG 只保留 运行 并且什么也没有发生。
在这种情况下我会使用子 DAG 吗?关于如何设置或重写现有 DAG 的任何见解?我 运行 在 AWS ECS 上使用 LocalExecutor
。以下代码供参考:
from datetime import datetime
import os
import sys
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
BATCH_SIZE = 75000
from preprocessing.marketing.minimalist.table_builder import OnlineOfflinePreprocess
worker = OnlineOfflinePreprocess()
def partial_process_flow(batch_size, offset):
worker = OnlineOfflinePreprocess()
worker.import_offline_data()
worker.import_online_data(batch_size, offset)
worker.merge_aurum_to_sherlock()
worker.upload_table('aurum_to_sherlock')
def batch_worker(batch_size, offset, DAG):
return PythonOperator(
task_id="{0}_{1}".format(offset, batch_size),
python_callable=partial_process_flow,
op_args=[batch_size, offset],
dag=DAG)
DAG = DAG(
dag_id='minimalist_data_preproc',
start_date=datetime(2018, 1, 7, 2, 0, 0, 0), #..EC2 time. Equal to 11pm hora México
max_active_runs=1,
concurrency=4,
schedule_interval='0 9 * * *', #..4am hora mexico
catchup=False
)
clear_table_task = PythonOperator(
task_id='clear_table_task',
python_callable=worker.clear_marketing_table,
op_args=['aurum_to_sherlock'],
dag=DAG
)
sherlock_join_and_export_task = PythonOperator(
task_id='sherlock_join_and_export_task',
python_callable=worker.join_online_and_send_to_galileo,
dag=DAG
)
sherlock_join_and_export_task >> clear_table_task
count = worker.count_online_table()
if count == 0:
sherlock_join_and_export_task >> batch_worker(-99, -99, DAG) #..dummy task for when left join deleted
else:
format_table_task = PythonOperator(
task_id='format_table_task',
python_callable=worker.format_final_table,
dag=DAG
)
build_attributions_task = PythonOperator(
task_id='build_attributions_task',
python_callable=worker.build_attribution_weightings,
dag=DAG
)
update_attributions_task = PythonOperator(
task_id='update_attributions_task',
python_callable=worker.update_attributions,
dag=DAG
)
first_task = batch_worker(BATCH_SIZE, 0, DAG)
clear_table_task >> first_task
for offset in range(BATCH_SIZE, count, BATCH_SIZE):
first_task >> batch_worker(BATCH_SIZE, offset, DAG) >> format_table_task
format_table_task >> build_attributions_task >> update_attributions_task
这里是 DAG 正在做什么的简化概念:
...
def batch_worker(batch_size, offset, DAG):
#..A function the dynamically generates tasks based on counting the reference table
return dag_task
worker = ClassMethodsForDAG()
count = worker.method_that_counts_reference table()
if count == 0:
delete_and_rebuild_reference_table_task >> batch_worker(-99, -99, DAG)
else:
first_task = batch_worker(BATCH_SIZE, 0, DAG)
clear_table_task >> first_task
for offset in range(BATCH_SIZE, count, BATCH_SIZE):
first_task >> batch_worker(BATCH_SIZE, offset, DAG) >> downstream_task
我为这个用例奋斗了很长时间。简而言之,基于不断变化的资源状态构建的 dag,尤其是数据库 table,在 airflow 中运行不佳。
我的解决方案是编写一个小的自定义运算符,它是 truggerdagoperator 的子类,它执行查询,然后为每个子进程触发 dagruns。
它使流程“加入”下游更有趣,但在我的用例中,我能够使用另一个 dag 解决它,如果给定日期的所有子流程都已完成,它会进行轮询和短路。在其他情况下,分区传感器可以解决问题。
我有几个这样的用例(基于动态源的迭代 dag 触发器),在为使动态 Subdags 工作(很多)而进行了大量斗争之后,我切换到这种“触发子进程”策略并且从那以后一直很好。
注意 - 这可能会为一个 targ(目标)生成大量 dagrun。这使得 UI 在某些地方具有挑战性,但它是可行的(我已经开始直接查询数据库,因为我还没有准备好编写一个插件来执行 UI 东西)
查看您的 dag,我认为您已经实施了一个 non-idempotent 流程,但气流并未真正针对该流程进行配置。而不是 truncating/updating 您正在构建的 table,您可能应该保留配置的任务并仅更新 start_date/end_date 以启用和禁用它们以在任务级别进行调度,甚至运行 所有这些都在每次迭代和脚本中检查 table 如果作业被禁用,则只 运行 一个你好世界。
我的问题是关于一个 DAG,它根据 MySQL table 中由上游任务删除和重建的行数来动态定义 group of parallel tasks。我遇到的困难是,在我的上游任务中,我 TRUNCATE
这个 table 在再次重建它之前清除它。这就是sherlock_join_and_export_task
。当我这样做时,行数下降到零,并且我的动态生成的任务不再被定义。当 table 恢复时,图的结构也恢复了,但任务不再执行。相反,它们在树视图中显示为黑框:
下面是 sherlock_join_and_export_task
删除行 count = worker.count_online_table()
中引用的 table 后的 DAG:
sherlock_join_and_export_task
完成后,DAG 看起来像这样:
None 的任务已排队并执行。 DAG 只保留 运行 并且什么也没有发生。
在这种情况下我会使用子 DAG 吗?关于如何设置或重写现有 DAG 的任何见解?我 运行 在 AWS ECS 上使用 LocalExecutor
。以下代码供参考:
from datetime import datetime
import os
import sys
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
BATCH_SIZE = 75000
from preprocessing.marketing.minimalist.table_builder import OnlineOfflinePreprocess
worker = OnlineOfflinePreprocess()
def partial_process_flow(batch_size, offset):
worker = OnlineOfflinePreprocess()
worker.import_offline_data()
worker.import_online_data(batch_size, offset)
worker.merge_aurum_to_sherlock()
worker.upload_table('aurum_to_sherlock')
def batch_worker(batch_size, offset, DAG):
return PythonOperator(
task_id="{0}_{1}".format(offset, batch_size),
python_callable=partial_process_flow,
op_args=[batch_size, offset],
dag=DAG)
DAG = DAG(
dag_id='minimalist_data_preproc',
start_date=datetime(2018, 1, 7, 2, 0, 0, 0), #..EC2 time. Equal to 11pm hora México
max_active_runs=1,
concurrency=4,
schedule_interval='0 9 * * *', #..4am hora mexico
catchup=False
)
clear_table_task = PythonOperator(
task_id='clear_table_task',
python_callable=worker.clear_marketing_table,
op_args=['aurum_to_sherlock'],
dag=DAG
)
sherlock_join_and_export_task = PythonOperator(
task_id='sherlock_join_and_export_task',
python_callable=worker.join_online_and_send_to_galileo,
dag=DAG
)
sherlock_join_and_export_task >> clear_table_task
count = worker.count_online_table()
if count == 0:
sherlock_join_and_export_task >> batch_worker(-99, -99, DAG) #..dummy task for when left join deleted
else:
format_table_task = PythonOperator(
task_id='format_table_task',
python_callable=worker.format_final_table,
dag=DAG
)
build_attributions_task = PythonOperator(
task_id='build_attributions_task',
python_callable=worker.build_attribution_weightings,
dag=DAG
)
update_attributions_task = PythonOperator(
task_id='update_attributions_task',
python_callable=worker.update_attributions,
dag=DAG
)
first_task = batch_worker(BATCH_SIZE, 0, DAG)
clear_table_task >> first_task
for offset in range(BATCH_SIZE, count, BATCH_SIZE):
first_task >> batch_worker(BATCH_SIZE, offset, DAG) >> format_table_task
format_table_task >> build_attributions_task >> update_attributions_task
这里是 DAG 正在做什么的简化概念:
...
def batch_worker(batch_size, offset, DAG):
#..A function the dynamically generates tasks based on counting the reference table
return dag_task
worker = ClassMethodsForDAG()
count = worker.method_that_counts_reference table()
if count == 0:
delete_and_rebuild_reference_table_task >> batch_worker(-99, -99, DAG)
else:
first_task = batch_worker(BATCH_SIZE, 0, DAG)
clear_table_task >> first_task
for offset in range(BATCH_SIZE, count, BATCH_SIZE):
first_task >> batch_worker(BATCH_SIZE, offset, DAG) >> downstream_task
我为这个用例奋斗了很长时间。简而言之,基于不断变化的资源状态构建的 dag,尤其是数据库 table,在 airflow 中运行不佳。
我的解决方案是编写一个小的自定义运算符,它是 truggerdagoperator 的子类,它执行查询,然后为每个子进程触发 dagruns。
它使流程“加入”下游更有趣,但在我的用例中,我能够使用另一个 dag 解决它,如果给定日期的所有子流程都已完成,它会进行轮询和短路。在其他情况下,分区传感器可以解决问题。
我有几个这样的用例(基于动态源的迭代 dag 触发器),在为使动态 Subdags 工作(很多)而进行了大量斗争之后,我切换到这种“触发子进程”策略并且从那以后一直很好。
注意 - 这可能会为一个 targ(目标)生成大量 dagrun。这使得 UI 在某些地方具有挑战性,但它是可行的(我已经开始直接查询数据库,因为我还没有准备好编写一个插件来执行 UI 东西)
查看您的 dag,我认为您已经实施了一个 non-idempotent 流程,但气流并未真正针对该流程进行配置。而不是 truncating/updating 您正在构建的 table,您可能应该保留配置的任务并仅更新 start_date/end_date 以启用和禁用它们以在任务级别进行调度,甚至运行 所有这些都在每次迭代和脚本中检查 table 如果作业被禁用,则只 运行 一个你好世界。