AirFlowException - Python_Callable 必须是可调用的

AirFlowException - Python_Callable must be callable

我对现有的工作流程做了一个小改动,它破坏了气流。这是代码:

dag_name = platform + "_" + report['table']

dag = DAG(
    dag_name,
    catchup=True,
    default_args=default_args,
    schedule_interval=report['schedule']
)

with dag:

    trigger_report = PythonOperator(
        task_id=dag.dag_id + '_trigger_report',
        python_callable=trigger_report,
        provide_context=True,
        op_kwargs={
            'report_name': report['report'],
            'amazonmws_conn_id': default_args['amazonmws_conn_id']
        },
        dag=dag
    )

这是我收到的错误:

airflow.exceptions.AirflowException: python_callable param must be callable

您似乎将 trigger_report 本身作为 python_callable.

这是故意的吗?它已经有值了吗?
(可能,否则你会得到 NameError: name 'trigger_report' is not defined

对于收到此消息的任何其他人,错误是由于任务和 python_callable 函数具有相同的名称。

错误很明显,您将 PythonOperator 放入一个变量中,并将该变量作为 PythonOperator 中的 python 函数调用!!!!

1- python callable 必须是函数而不是变量。

2- 你不应该也不能在函数内调用相同的东西。

3- 在您的情况下,trigger_report 是一项任务,与气流有关,而不是 python 基本事物。

更新类似错误:

如果你愿意按照上面的方法去做,但问题是坚持。 你必须做一个不可调用的,可调用的。

使用functools

举个例子:

import functools

def your_func(value1):
    return ""

trigger_report = PythonOperator(
    task_id="aaaa",
    python_callable=functools.partial(your_func, value1=1),
    provide_context=True,
    dag=dag
)