任务在气流中得到意外参数 'dag'

task got unexpected argument 'dag' in airflow

我创建了一个以 PythonOperator 作为运算符的任务。它使用参数调用另一个文件夹中的函数。但是运算符不接受参数 dag=dag 而实际上它是必须的,因为它用于指向 dag 上下文。

dags/
- my_dag.py
  sub_folder/
  - __init__.py
  - my_functions.py

我的 DAG 包含任务 1 和任务 2。他们将从子文件夹调用该函数,并将参数传递给打印。

my_dag.py

import datetime as dt

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

from sub_folder.my_functions import task1, task2

args = {
    'owner': 'hello',
    'start_date': dt.datetime(2019, 1, 1),
    'retries': 1,
    'retry_delay': dt.timedelta(minutes=2)
}

dag = DAG(
    'try',
    default_args = args,
    schedule_interval = dt.timedelta(minutes=2))

task1 = PythonOperator(
    task_id='task1',
    python_callable=task1,
    provide_context=True,
    op_kwargs={'idx': "Hello "},
    dag=dag,
)

task2 = PythonOperator(
    task_id='task2',
    python_callable=task2,
    provide_context=True,
    op_kwargs={'idx': "World!"},
    dag=dag,
)

task1 >> task2

可调用函数只是打印传递给它们的参数的简单函数。

my_functions.py

def task1(idx):
    print(f"Task 1! {idx}")

def task2(idx):
    print(f"Task 2! {idx}")

我的任务 1 总是重试 运行 并且有时会失败。我查看了日志以了解发生了什么。我发现它得到了

TypeError: task1() got an unexpected keyword argument 'dag'

我不知道这里发生了什么。显然,我必须调用 dag=dag,这实际上是让运算符指向它必须具有上下文的 dag 容器的一个参数。

my_functions.task1 和名为 task1

PythonOperator 之间存在冲突

尝试:

import sub_folder.my_functions as mf  # changed

task1 = PythonOperator(
    task_id='task1',
    python_callable=mf.task1,  # changed
    provide_context=True,
    op_kwargs={'idx': "Hello "},
    dag=dag,
)