气流:从另一个操作员呼叫一个操作员
Airflow: calling one operator from another
def func(**kwargs):
print(config['rds_conn_id'])
rds = PostgresHooklog(postgres_conn_id=config['rds_conn_id'])
rds.set_autocommit(rds.get_conn(), True)
conn = rds
print(conn)
# a = []
previous_e = None
for i in range(0, 1):
a = TriggerDagRunOperator(
task_id='test_trigger_dagrun'+str(i),
trigger_dag_id="example1_trigger_target_dag",
provide_context=True,
params={'message': 'Hello world'},
trigger_rule="all_done",
dag=dag,
)
if previous_e:
previous_e >> a
a
previous_e = a
# a.execute(dict({ 'message': 'Hello world'}))
# if i not in [0]:
# a[i - 1] >> a[i]
default_args={"owner": "airflow", "start_date": days_ago(2)}
# Define the DAG
dag = DAG(
dag_id="example1_trigger_controller_dag",
default_args={"owner": "airflow", "start_date": days_ago(2)},
schedule_interval="@once",
tags=['example']
)
with dag:
# Define the single task in this controller example DAG
bhuvitest = PythonOperator(
task_id='python_task',
python_callable=func,
dag=dag)
我想通过函数在 PythonOperator 中调用 TriggerDagRunOperator,这将从数据库中获取一些记录。基于记录迭代循环并调用 TriggerDagRunOperator。
- 如何使每次迭代顺序而不是并行。在这种情况下它不起作用,即使它没有调用 TriggerDagRunOperator。
- 我想将 TriggerDagRunOperator 中的一些参数传递给目标 dag,在这种情况下,变量消息作为参数,但这也不起作用。
非常感谢您的帮助。
谢谢
Airflow 不允许像您一样使用运算符。所有运算符都必须存在于 DAG 上下文中。如 Airflow official tutorial 中所述,DAG 定义“需要快速评估(秒,而不是分钟),因为调度程序将定期执行它以反映任何更改”。因此不建议查询数据库以“即时”创建 DAG。
话虽如此,您可以使用 Variables
动态定义 DAG,甚至是工作人员的本地文件(如果您有多个工作人员,这会更复杂)。 None 两者都是“良好做法”,但您仍然可以使用它。对于 Variables
选项,您可以有两个 DAG,一个用于查询数据库和设置配置变量,另一个用于读取它、遍历它并创建任务。这也不是很推荐,因为每次执行 Variable.get("variable_name")
.
时都会创建到元数据数据库的连接
针对您的问题:
- 要使其顺序化,只需使用
for idx, val in enumerate(iterable)
进行迭代并执行类似以下操作:
if idx > 0:
iterable[idx - 1] >> iterable[idx]
- 要传递一些参数,您可以使用
TriggerDagRunOperator
中的 conf
参数。
def func(**kwargs):
print(config['rds_conn_id'])
rds = PostgresHooklog(postgres_conn_id=config['rds_conn_id'])
rds.set_autocommit(rds.get_conn(), True)
conn = rds
print(conn)
# a = []
previous_e = None
for i in range(0, 1):
a = TriggerDagRunOperator(
task_id='test_trigger_dagrun'+str(i),
trigger_dag_id="example1_trigger_target_dag",
provide_context=True,
params={'message': 'Hello world'},
trigger_rule="all_done",
dag=dag,
)
if previous_e:
previous_e >> a
a
previous_e = a
# a.execute(dict({ 'message': 'Hello world'}))
# if i not in [0]:
# a[i - 1] >> a[i]
default_args={"owner": "airflow", "start_date": days_ago(2)}
# Define the DAG
dag = DAG(
dag_id="example1_trigger_controller_dag",
default_args={"owner": "airflow", "start_date": days_ago(2)},
schedule_interval="@once",
tags=['example']
)
with dag:
# Define the single task in this controller example DAG
bhuvitest = PythonOperator(
task_id='python_task',
python_callable=func,
dag=dag)
我想通过函数在 PythonOperator 中调用 TriggerDagRunOperator,这将从数据库中获取一些记录。基于记录迭代循环并调用 TriggerDagRunOperator。
- 如何使每次迭代顺序而不是并行。在这种情况下它不起作用,即使它没有调用 TriggerDagRunOperator。
- 我想将 TriggerDagRunOperator 中的一些参数传递给目标 dag,在这种情况下,变量消息作为参数,但这也不起作用。
非常感谢您的帮助。 谢谢
Airflow 不允许像您一样使用运算符。所有运算符都必须存在于 DAG 上下文中。如 Airflow official tutorial 中所述,DAG 定义“需要快速评估(秒,而不是分钟),因为调度程序将定期执行它以反映任何更改”。因此不建议查询数据库以“即时”创建 DAG。
话虽如此,您可以使用 Variables
动态定义 DAG,甚至是工作人员的本地文件(如果您有多个工作人员,这会更复杂)。 None 两者都是“良好做法”,但您仍然可以使用它。对于 Variables
选项,您可以有两个 DAG,一个用于查询数据库和设置配置变量,另一个用于读取它、遍历它并创建任务。这也不是很推荐,因为每次执行 Variable.get("variable_name")
.
针对您的问题:
- 要使其顺序化,只需使用
for idx, val in enumerate(iterable)
进行迭代并执行类似以下操作:
if idx > 0:
iterable[idx - 1] >> iterable[idx]
- 要传递一些参数,您可以使用
TriggerDagRunOperator
中的conf
参数。