运行时添加到 DAG 的任务调度失败
Tasks added to DAG during runtime fail to be scheduled
我的想法是让任务 foo
生成输入列表(用户、报告、日志文件等),并为输入列表中的每个元素启动一个任务。目标是利用Airflow的重试和其他逻辑,而不是重新实现它。
因此,理想情况下,我的 DAG 应该如下所示:
这里唯一的变量是生成的任务数。我想在完成所有这些之后再做一些任务,因此为每个任务都启动一个新的 DAG 似乎并不合适。
这是我的代码:
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1)
}
dag = DAG('dynamic_dag_generator', schedule_interval=None, default_args=default_args)
foo_operator = BashOperator(
task_id='foo',
bash_command="echo '%s'" % json.dumps(range(0, random.randint(40,60))),
xcom_push=True,
dag=dag)
def gen_nodes(**kwargs):
ti = kwargs['ti']
workers = json.loads(ti.xcom_pull(task_ids='foo'))
for wid in workers:
print("Iterating worker %s" % wid)
op = PythonOperator(
task_id='test_op_%s' % wid,
python_callable=lambda: print("Dynamic task!"),
dag=dag
)
op.set_downstream(bar_operator)
op.set_upstream(dummy_op)
gen_subdag_node_op = PythonOperator(
task_id='gen_subdag_nodes',
python_callable=gen_nodes,
provide_context=True,
dag=dag
)
gen_subdag_node_op.set_upstream(foo_operator)
dummy_op = DummyOperator(
task_id='dummy',
dag=dag
)
dummy_op.set_upstream(gen_subdag_node_op)
bar_operator = DummyOperator(
task_id='bar',
dag=dag)
bar_operator.set_upstream(dummy_op)
在日志中,我可以看到 gen_nodes
已正确执行(即 Iterating worker 5
,等等)。但是,新任务没有安排,也没有执行的证据。
我在网上找到了相关的代码示例,such as this,但无法使其工作。我错过了什么吗?
或者,是否有更合适的方法来解决这个问题(隔离工作单元)?
此时,当 dag 运行ning 时,airflow 不支持 adding/removing 任务。
工作流顺序将是 dag 开始时评估的任何顺序 运行。
See the second paragraph here.
这意味着您不能 add/remove 根据 运行 中发生的事情执行任务。您可以根据与 运行 无关的内容在 for 循环中添加 X 任务,但在 运行 开始后,工作流程 shape/order 不会发生变化。
很多时候你可以使用 BranchPythonOperator
在 dag 运行 中做出决定(这些决定可以基于你的 xcom
值)但它们必须是决定沿着工作流程中已经存在的分支走下去。
Dag 运行s,和 Dag 定义在气流中以不完全直观的方式分开,但或多或少是 created/generated 在 dag 运行 中的任何东西( xcom
、dag_run.conf
等)不能用于定义 dag 本身。
我的想法是让任务 foo
生成输入列表(用户、报告、日志文件等),并为输入列表中的每个元素启动一个任务。目标是利用Airflow的重试和其他逻辑,而不是重新实现它。
因此,理想情况下,我的 DAG 应该如下所示:
这里唯一的变量是生成的任务数。我想在完成所有这些之后再做一些任务,因此为每个任务都启动一个新的 DAG 似乎并不合适。
这是我的代码:
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1)
}
dag = DAG('dynamic_dag_generator', schedule_interval=None, default_args=default_args)
foo_operator = BashOperator(
task_id='foo',
bash_command="echo '%s'" % json.dumps(range(0, random.randint(40,60))),
xcom_push=True,
dag=dag)
def gen_nodes(**kwargs):
ti = kwargs['ti']
workers = json.loads(ti.xcom_pull(task_ids='foo'))
for wid in workers:
print("Iterating worker %s" % wid)
op = PythonOperator(
task_id='test_op_%s' % wid,
python_callable=lambda: print("Dynamic task!"),
dag=dag
)
op.set_downstream(bar_operator)
op.set_upstream(dummy_op)
gen_subdag_node_op = PythonOperator(
task_id='gen_subdag_nodes',
python_callable=gen_nodes,
provide_context=True,
dag=dag
)
gen_subdag_node_op.set_upstream(foo_operator)
dummy_op = DummyOperator(
task_id='dummy',
dag=dag
)
dummy_op.set_upstream(gen_subdag_node_op)
bar_operator = DummyOperator(
task_id='bar',
dag=dag)
bar_operator.set_upstream(dummy_op)
在日志中,我可以看到 gen_nodes
已正确执行(即 Iterating worker 5
,等等)。但是,新任务没有安排,也没有执行的证据。
我在网上找到了相关的代码示例,such as this,但无法使其工作。我错过了什么吗?
或者,是否有更合适的方法来解决这个问题(隔离工作单元)?
此时,当 dag 运行ning 时,airflow 不支持 adding/removing 任务。
工作流顺序将是 dag 开始时评估的任何顺序 运行。
See the second paragraph here.
这意味着您不能 add/remove 根据 运行 中发生的事情执行任务。您可以根据与 运行 无关的内容在 for 循环中添加 X 任务,但在 运行 开始后,工作流程 shape/order 不会发生变化。
很多时候你可以使用 BranchPythonOperator
在 dag 运行 中做出决定(这些决定可以基于你的 xcom
值)但它们必须是决定沿着工作流程中已经存在的分支走下去。
Dag 运行s,和 Dag 定义在气流中以不完全直观的方式分开,但或多或少是 created/generated 在 dag 运行 中的任何东西( xcom
、dag_run.conf
等)不能用于定义 dag 本身。