AirFlowException - Python_Callable 必须是可调用的
AirFlowException - Python_Callable must be callable
我对现有的工作流程做了一个小改动,它破坏了气流。这是代码:
dag_name = platform + "_" + report['table']
dag = DAG(
dag_name,
catchup=True,
default_args=default_args,
schedule_interval=report['schedule']
)
with dag:
trigger_report = PythonOperator(
task_id=dag.dag_id + '_trigger_report',
python_callable=trigger_report,
provide_context=True,
op_kwargs={
'report_name': report['report'],
'amazonmws_conn_id': default_args['amazonmws_conn_id']
},
dag=dag
)
这是我收到的错误:
airflow.exceptions.AirflowException: python_callable param must be callable
您似乎将 trigger_report
本身作为 python_callable
.
这是故意的吗?它已经有值了吗?
(可能,否则你会得到 NameError: name 'trigger_report' is not defined
)
对于收到此消息的任何其他人,错误是由于任务和 python_callable 函数具有相同的名称。
错误很明显,您将 PythonOperator 放入一个变量中,并将该变量作为 PythonOperator 中的 python 函数调用!!!!
1- python callable 必须是函数而不是变量。
2- 你不应该也不能在函数内调用相同的东西。
3- 在您的情况下,trigger_report
是一项任务,与气流有关,而不是 python 基本事物。
更新类似错误:
如果你愿意按照上面的方法去做,但问题是坚持。
你必须做一个不可调用的,可调用的。
使用functools
举个例子:
import functools
def your_func(value1):
return ""
trigger_report = PythonOperator(
task_id="aaaa",
python_callable=functools.partial(your_func, value1=1),
provide_context=True,
dag=dag
)
我对现有的工作流程做了一个小改动,它破坏了气流。这是代码:
dag_name = platform + "_" + report['table']
dag = DAG(
dag_name,
catchup=True,
default_args=default_args,
schedule_interval=report['schedule']
)
with dag:
trigger_report = PythonOperator(
task_id=dag.dag_id + '_trigger_report',
python_callable=trigger_report,
provide_context=True,
op_kwargs={
'report_name': report['report'],
'amazonmws_conn_id': default_args['amazonmws_conn_id']
},
dag=dag
)
这是我收到的错误:
airflow.exceptions.AirflowException: python_callable param must be callable
您似乎将 trigger_report
本身作为 python_callable
.
这是故意的吗?它已经有值了吗?
(可能,否则你会得到 NameError: name 'trigger_report' is not defined
)
对于收到此消息的任何其他人,错误是由于任务和 python_callable 函数具有相同的名称。
错误很明显,您将 PythonOperator 放入一个变量中,并将该变量作为 PythonOperator 中的 python 函数调用!!!!
1- python callable 必须是函数而不是变量。
2- 你不应该也不能在函数内调用相同的东西。
3- 在您的情况下,trigger_report
是一项任务,与气流有关,而不是 python 基本事物。
更新类似错误:
如果你愿意按照上面的方法去做,但问题是坚持。 你必须做一个不可调用的,可调用的。
使用functools
举个例子:
import functools
def your_func(value1):
return ""
trigger_report = PythonOperator(
task_id="aaaa",
python_callable=functools.partial(your_func, value1=1),
provide_context=True,
dag=dag
)