手动 DAG 运行 设置单个任务状态
Manual DAG run set individual task state
我有一个没有时间表的 DAG(根据需要手动 运行)。它有很多任务。有时我想通过手动将任务状态更改为 SUCCESS 来 'skip' 一些初始任务。更改手动执行的 DAG 的任务状态失败,似乎是因为解析 execution_date.
时存在错误
是否有另一种方法可以为手动执行的 DAG 单独设置任务状态?
下面的示例 运行。任务的执行日期是 01-13T17:27:13.130427,我相信没有正确解析毫秒数。
回溯
回溯(最后一次调用):
文件“/opt/conda/envs/jumpman_prod/lib/python3.6/site-packages/airflow/www/views.py”,第 2372 行,在 set_task_instance_state 中
execution_date = datetime.strptime(execution_date, '%Y-%m-%d %H:%M:%S')
文件“/opt/conda/envs/jumpman_prod/lib/python3.6/_strptime.py”,第 565 行,在 _strptime_datetime 中
tt, fraction = _strptime(data_string, 格式)
文件“/opt/conda/envs/jumpman_prod/lib/python3.6/_strptime.py”,第 365 行,在 _strptime 中
data_string[found.end():])
ValueError:未转换的数据仍然存在:..130427
你可能想做的是使用分支,顾名思义,它允许你根据某些条件遵循不同的执行路径,就像一个if
任何编程语言。
您可以使用 BranchPythonOperator
(已记录 here)来实现此目标:想法是此运算符由 python_callable
配置,该函数输出 task_id
执行下一个(当然,这应该是 BranchPythonOperator
本身直接下游的任务)。
使用分支会自动将 skipped
任务设置为正确的状态,如文档中所述:
All other “branches” or directly downstream tasks are marked with a state of skipped
so that these paths can’t move forward. The skipped
states are propagated downstream to allow for the DAG state to fill up and the DAG run’s state to be inferred.
生成的 DAG 将如下所示:
(来源:apache.org)
在 Apache Airflow 官方文档中记录了分支 here。
它在 任务实例 页面中不起作用,但您可以在另一个页面中进行操作:
- 打开 DAG 图视图
- select 需要 运行(屏幕 1)并单击 go
- select 需要任务
- 在弹出窗口 window 中单击 标记成功(屏幕 2)
- 然后确认。
PS与airflow 1.9版本有关
屏幕 1
屏幕 2
我有一个没有时间表的 DAG(根据需要手动 运行)。它有很多任务。有时我想通过手动将任务状态更改为 SUCCESS 来 'skip' 一些初始任务。更改手动执行的 DAG 的任务状态失败,似乎是因为解析 execution_date.
时存在错误是否有另一种方法可以为手动执行的 DAG 单独设置任务状态?
下面的示例 运行。任务的执行日期是 01-13T17:27:13.130427,我相信没有正确解析毫秒数。
回溯
回溯(最后一次调用): 文件“/opt/conda/envs/jumpman_prod/lib/python3.6/site-packages/airflow/www/views.py”,第 2372 行,在 set_task_instance_state 中 execution_date = datetime.strptime(execution_date, '%Y-%m-%d %H:%M:%S') 文件“/opt/conda/envs/jumpman_prod/lib/python3.6/_strptime.py”,第 565 行,在 _strptime_datetime 中 tt, fraction = _strptime(data_string, 格式) 文件“/opt/conda/envs/jumpman_prod/lib/python3.6/_strptime.py”,第 365 行,在 _strptime 中 data_string[found.end():]) ValueError:未转换的数据仍然存在:..130427
你可能想做的是使用分支,顾名思义,它允许你根据某些条件遵循不同的执行路径,就像一个if
任何编程语言。
您可以使用 BranchPythonOperator
(已记录 here)来实现此目标:想法是此运算符由 python_callable
配置,该函数输出 task_id
执行下一个(当然,这应该是 BranchPythonOperator
本身直接下游的任务)。
使用分支会自动将 skipped
任务设置为正确的状态,如文档中所述:
All other “branches” or directly downstream tasks are marked with a state of
skipped
so that these paths can’t move forward. Theskipped
states are propagated downstream to allow for the DAG state to fill up and the DAG run’s state to be inferred.
生成的 DAG 将如下所示:
(来源:apache.org)
在 Apache Airflow 官方文档中记录了分支 here。
它在 任务实例 页面中不起作用,但您可以在另一个页面中进行操作:
- 打开 DAG 图视图
- select 需要 运行(屏幕 1)并单击 go
- select 需要任务
- 在弹出窗口 window 中单击 标记成功(屏幕 2)
- 然后确认。
PS与airflow 1.9版本有关
屏幕 1
屏幕 2