气流:从另一个操作员呼叫一个操作员

Airflow: calling one operator from another

    def func(**kwargs):
            print(config['rds_conn_id'])
            rds = PostgresHooklog(postgres_conn_id=config['rds_conn_id'])
            rds.set_autocommit(rds.get_conn(), True)
            conn = rds
            print(conn)
            # a = []
            previous_e = None
            for i in range(0, 1):
                a = TriggerDagRunOperator(
                        task_id='test_trigger_dagrun'+str(i),
                        trigger_dag_id="example1_trigger_target_dag",
                        provide_context=True,
                        params={'message': 'Hello world'},
                        trigger_rule="all_done",
                        dag=dag,
                    )
                if previous_e:
                    previous_e >> a
                a
                previous_e = a
                # a.execute(dict({ 'message': 'Hello world'}))
                # if i not in [0]:
                #     a[i - 1] >> a[i]
    
    default_args={"owner": "airflow", "start_date": days_ago(2)}
    
    # Define the DAG
    dag = DAG(
        dag_id="example1_trigger_controller_dag",
        default_args={"owner": "airflow", "start_date": days_ago(2)},
        schedule_interval="@once",
        tags=['example']
    )
    with dag:
    # Define the single task in this controller example DAG
        bhuvitest = PythonOperator(
            task_id='python_task',
            python_callable=func,
            dag=dag)

我想通过函数在 PythonOperator 中调用 TriggerDagRunOperator,这将从数据库中获取一些记录。基于记录迭代循环并调用 TriggerDagRunOperator。

  1. 如何使每次迭代顺序而不是并行。在这种情况下它不起作用,即使它没有调用 TriggerDagRunOperator。
  2. 我想将 TriggerDagRunOperator 中的一些参数传递给目标 dag,在这种情况下,变量消息作为参数,但这也不起作用。

非常感谢您的帮助。 谢谢

Airflow 不允许像您一样使用运算符。所有运算符都必须存在于 DAG 上下文中。如 Airflow official tutorial 中所述,DAG 定义“需要快速评估(秒,而不是分钟),因为调度程序将定期执行它以反映任何更改”。因此不建议查询数据库以“即时”创建 DAG。

话虽如此,您可以使用 Variables 动态定义 DAG,甚至是工作人员的本地文件(如果您有多个工作人员,这会更复杂)。 None 两者都是“良好做法”,但您仍然可以使用它。对于 Variables 选项,您可以有两个 DAG,一个用于查询数据库和设置配置变量,另一个用于读取它、遍历它并创建任务。这也不是很推荐,因为每次执行 Variable.get("variable_name").

时都会创建到元数据数据库的连接

针对您的问题:

  1. 要使其顺序化,只需使用 for idx, val in enumerate(iterable) 进行迭代并执行类似以下操作:
if idx > 0:
    iterable[idx - 1] >> iterable[idx]
  1. 要传递一些参数,您可以使用 TriggerDagRunOperator 中的 conf 参数。