抓取 dags 文件夹以提取 ExternalTask​​Sensor 任务和参数

Scrape dags folder to extract ExternalTaskSensor tasks and parameters

我们广泛使用了 [ExternalTask​​Sensor][1],以至于交叉 dag 依赖项的数量变得难以追踪。因此,我们想要一种方法来提取使用该传感器的所有任务以及传递给这些任务的参数,例如 external_dag_idexternal_task_id。提取此信息将使我们能够创建一个依赖项列表(如果我们需要,也可能是一个图表)。

方法: 到目前为止,我们已经能够使用 list_dags cli 选项来获取所有 dag 的列表。对于每个 dag,我们然后 运行 list_tasks 选项和 -t 参数来获取任务列表和使用的运算符。下一步是检索传递给这些任务的参数,这就是我们卡住的地方。是否有任何官方或非官方的方法来抓取这些数据?

信息: 我们正在 运行ning Airflow 1.10.9 和 Composer 1.11.0。到目前为止,我们的脚本是用 python3 编写的。 [1]: https://airflow.readthedocs.io/en/stable/_modules/airflow/sensors/external_task_sensor.html

您可以为此利用 Airflow's metadb

  • 要么直接查询

    SELECT operator
    FROM task_instance
    WHERE dag_id = 'my_dag'
      AND task_id = 'my_task';```
    
    
  • 或使用SQLAlchemy

    from airflow.utils.session import provide_session
    from airflow.models import TaskInstance
    
    @provide_session
    def get_operator_name(my_dag_id: str, my_task_id: str, session=None) -> str:
        """Fetch TaskInstance from the database using pickling"""
        task_instance: TaskInstance = session.query(TaskInstance).filter(TaskInstance.dag_id == my_dag_id).filter(TaskInstance.task_id == my_task_id).first()
        return task_instance.operator
    

这种方法的缺点是在 task 至少有一次 运行 之前它不会起作用(并且它的条目已在 TaskInstance table 中创建)


参考

你可以这样做:

dag_models = session.query(DagModel).filter(DagModel.is_active.is_(True)).all()

for dag_model in dag_models:
     dag = dag_model.get_dag()
     for task in dag.task_dict.values():
         if isinstance(task, ExternalTaskSensor):
             do_smth(task.external_dag_id, task.external_task_id)