抓取 dags 文件夹以提取 ExternalTaskSensor 任务和参数
Scrape dags folder to extract ExternalTaskSensor tasks and parameters
我们广泛使用了 [ExternalTaskSensor][1],以至于交叉 dag 依赖项的数量变得难以追踪。因此,我们想要一种方法来提取使用该传感器的所有任务以及传递给这些任务的参数,例如 external_dag_id
和 external_task_id
。提取此信息将使我们能够创建一个依赖项列表(如果我们需要,也可能是一个图表)。
方法:
到目前为止,我们已经能够使用 list_dags
cli 选项来获取所有 dag 的列表。对于每个 dag,我们然后 运行 list_tasks
选项和 -t
参数来获取任务列表和使用的运算符。下一步是检索传递给这些任务的参数,这就是我们卡住的地方。是否有任何官方或非官方的方法来抓取这些数据?
信息:
我们正在 运行ning Airflow 1.10.9 和 Composer 1.11.0。到目前为止,我们的脚本是用 python3 编写的。
[1]: https://airflow.readthedocs.io/en/stable/_modules/airflow/sensors/external_task_sensor.html
您可以为此利用 Airflow's metadb。
要么直接查询
SELECT operator
FROM task_instance
WHERE dag_id = 'my_dag'
AND task_id = 'my_task';```
或使用SQLAlchemy
from airflow.utils.session import provide_session
from airflow.models import TaskInstance
@provide_session
def get_operator_name(my_dag_id: str, my_task_id: str, session=None) -> str:
"""Fetch TaskInstance from the database using pickling"""
task_instance: TaskInstance = session.query(TaskInstance).filter(TaskInstance.dag_id == my_dag_id).filter(TaskInstance.task_id == my_task_id).first()
return task_instance.operator
这种方法的缺点是在 task
至少有一次 运行 之前它不会起作用(并且它的条目已在 TaskInstance
table 中创建)
参考
你可以这样做:
dag_models = session.query(DagModel).filter(DagModel.is_active.is_(True)).all()
for dag_model in dag_models:
dag = dag_model.get_dag()
for task in dag.task_dict.values():
if isinstance(task, ExternalTaskSensor):
do_smth(task.external_dag_id, task.external_task_id)
我们广泛使用了 [ExternalTaskSensor][1],以至于交叉 dag 依赖项的数量变得难以追踪。因此,我们想要一种方法来提取使用该传感器的所有任务以及传递给这些任务的参数,例如 external_dag_id
和 external_task_id
。提取此信息将使我们能够创建一个依赖项列表(如果我们需要,也可能是一个图表)。
方法:
到目前为止,我们已经能够使用 list_dags
cli 选项来获取所有 dag 的列表。对于每个 dag,我们然后 运行 list_tasks
选项和 -t
参数来获取任务列表和使用的运算符。下一步是检索传递给这些任务的参数,这就是我们卡住的地方。是否有任何官方或非官方的方法来抓取这些数据?
信息: 我们正在 运行ning Airflow 1.10.9 和 Composer 1.11.0。到目前为止,我们的脚本是用 python3 编写的。 [1]: https://airflow.readthedocs.io/en/stable/_modules/airflow/sensors/external_task_sensor.html
您可以为此利用 Airflow's metadb。
要么直接查询
SELECT operator FROM task_instance WHERE dag_id = 'my_dag' AND task_id = 'my_task';```
或使用
SQLAlchemy
from airflow.utils.session import provide_session from airflow.models import TaskInstance @provide_session def get_operator_name(my_dag_id: str, my_task_id: str, session=None) -> str: """Fetch TaskInstance from the database using pickling""" task_instance: TaskInstance = session.query(TaskInstance).filter(TaskInstance.dag_id == my_dag_id).filter(TaskInstance.task_id == my_task_id).first() return task_instance.operator
这种方法的缺点是在 task
至少有一次 运行 之前它不会起作用(并且它的条目已在 TaskInstance
table 中创建)
参考
你可以这样做:
dag_models = session.query(DagModel).filter(DagModel.is_active.is_(True)).all()
for dag_model in dag_models:
dag = dag_model.get_dag()
for task in dag.task_dict.values():
if isinstance(task, ExternalTaskSensor):
do_smth(task.external_dag_id, task.external_task_id)