Airflow 2.0 - 运行 在本地保留 运行 功能
Airflow 2.0 - running locally keeps running the function
我有以下任务保持 运行ning 我知道这一点,因为它 运行 是 Snowflake 中的一个查询,我不断收到 DUO 推送通知。每一个。 5秒!当 DAG 运行s
时,我能做些什么来阻止它并且只有它 运行
这是任务:
create_foreign_keys = SnowflakeQueryOperator(
dag=dag,
task_id='check_and_run_foreign_key_query',
sql=SnowHook().run_fk_alter_statements(schema,query),
trigger_rule=TriggerRule.ALL_DONE
)
这是在 sql 部分调用的方法:
def run_fk_alter_statements(self, schema, additional_fk):
fk_query_path = "/fkeys.sql"
fd = open(f'{fk_query_path}', 'r')
query = fd.read()
fd.close()
additions = []
for fk in additional_fk:
additions.append(f""" or (t2.table_name = '{fk['table_name']}' and t2.column_name = '{fk['column_name']}'
and t1.table_name = '{fk['ref_table_name']}' and t1.column_name = '{fk['ref_column_name']}')\n""".upper())
raw_out = self.execute_query(query.format(schema=schema, fks=''.join(additions)), fetch_all=True)
query_jobs = []
for raw_query in raw_out:
query_jobs.append(raw_query[0])
return query_jobs
SnowflakeQueryOperator
实例化中的 sql=SnowHook().run_fk_alter_statements(schema,query)
调用实际上是顶级代码,因此它会在每次调度程序解析 DAG 时执行。您需要找到一种方法在运算符的 execute()
方法中调用该函数。
您可以添加一个 TaskFlow 函数/PythonOperator
任务以将输出从 run_fk_alter_statements()
推送到 XCom
,然后 SnowflakeQueryOperator
使用此 XCom
来执行生成的 SQL(s)。
我有以下任务保持 运行ning 我知道这一点,因为它 运行 是 Snowflake 中的一个查询,我不断收到 DUO 推送通知。每一个。 5秒!当 DAG 运行s
时,我能做些什么来阻止它并且只有它 运行这是任务:
create_foreign_keys = SnowflakeQueryOperator(
dag=dag,
task_id='check_and_run_foreign_key_query',
sql=SnowHook().run_fk_alter_statements(schema,query),
trigger_rule=TriggerRule.ALL_DONE
)
这是在 sql 部分调用的方法:
def run_fk_alter_statements(self, schema, additional_fk):
fk_query_path = "/fkeys.sql"
fd = open(f'{fk_query_path}', 'r')
query = fd.read()
fd.close()
additions = []
for fk in additional_fk:
additions.append(f""" or (t2.table_name = '{fk['table_name']}' and t2.column_name = '{fk['column_name']}'
and t1.table_name = '{fk['ref_table_name']}' and t1.column_name = '{fk['ref_column_name']}')\n""".upper())
raw_out = self.execute_query(query.format(schema=schema, fks=''.join(additions)), fetch_all=True)
query_jobs = []
for raw_query in raw_out:
query_jobs.append(raw_query[0])
return query_jobs
SnowflakeQueryOperator
实例化中的 sql=SnowHook().run_fk_alter_statements(schema,query)
调用实际上是顶级代码,因此它会在每次调度程序解析 DAG 时执行。您需要找到一种方法在运算符的 execute()
方法中调用该函数。
您可以添加一个 TaskFlow 函数/PythonOperator
任务以将输出从 run_fk_alter_statements()
推送到 XCom
,然后 SnowflakeQueryOperator
使用此 XCom
来执行生成的 SQL(s)。