任务在气流中得到意外参数 'dag'
task got unexpected argument 'dag' in airflow
我创建了一个以 PythonOperator 作为运算符的任务。它使用参数调用另一个文件夹中的函数。但是运算符不接受参数 dag=dag
而实际上它是必须的,因为它用于指向 dag 上下文。
dags/
- my_dag.py
sub_folder/
- __init__.py
- my_functions.py
我的 DAG 包含任务 1 和任务 2。他们将从子文件夹调用该函数,并将参数传递给打印。
my_dag.py
import datetime as dt
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from sub_folder.my_functions import task1, task2
args = {
'owner': 'hello',
'start_date': dt.datetime(2019, 1, 1),
'retries': 1,
'retry_delay': dt.timedelta(minutes=2)
}
dag = DAG(
'try',
default_args = args,
schedule_interval = dt.timedelta(minutes=2))
task1 = PythonOperator(
task_id='task1',
python_callable=task1,
provide_context=True,
op_kwargs={'idx': "Hello "},
dag=dag,
)
task2 = PythonOperator(
task_id='task2',
python_callable=task2,
provide_context=True,
op_kwargs={'idx': "World!"},
dag=dag,
)
task1 >> task2
可调用函数只是打印传递给它们的参数的简单函数。
my_functions.py
def task1(idx):
print(f"Task 1! {idx}")
def task2(idx):
print(f"Task 2! {idx}")
我的任务 1 总是重试 运行 并且有时会失败。我查看了日志以了解发生了什么。我发现它得到了
TypeError: task1() got an unexpected keyword argument 'dag'
我不知道这里发生了什么。显然,我必须调用 dag=dag,这实际上是让运算符指向它必须具有上下文的 dag 容器的一个参数。
my_functions.task1
和名为 task1
的 PythonOperator
之间存在冲突
尝试:
import sub_folder.my_functions as mf # changed
task1 = PythonOperator(
task_id='task1',
python_callable=mf.task1, # changed
provide_context=True,
op_kwargs={'idx': "Hello "},
dag=dag,
)
我创建了一个以 PythonOperator 作为运算符的任务。它使用参数调用另一个文件夹中的函数。但是运算符不接受参数 dag=dag
而实际上它是必须的,因为它用于指向 dag 上下文。
dags/
- my_dag.py
sub_folder/
- __init__.py
- my_functions.py
我的 DAG 包含任务 1 和任务 2。他们将从子文件夹调用该函数,并将参数传递给打印。
my_dag.py
import datetime as dt
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from sub_folder.my_functions import task1, task2
args = {
'owner': 'hello',
'start_date': dt.datetime(2019, 1, 1),
'retries': 1,
'retry_delay': dt.timedelta(minutes=2)
}
dag = DAG(
'try',
default_args = args,
schedule_interval = dt.timedelta(minutes=2))
task1 = PythonOperator(
task_id='task1',
python_callable=task1,
provide_context=True,
op_kwargs={'idx': "Hello "},
dag=dag,
)
task2 = PythonOperator(
task_id='task2',
python_callable=task2,
provide_context=True,
op_kwargs={'idx': "World!"},
dag=dag,
)
task1 >> task2
可调用函数只是打印传递给它们的参数的简单函数。
my_functions.py
def task1(idx):
print(f"Task 1! {idx}")
def task2(idx):
print(f"Task 2! {idx}")
我的任务 1 总是重试 运行 并且有时会失败。我查看了日志以了解发生了什么。我发现它得到了
TypeError: task1() got an unexpected keyword argument 'dag'
我不知道这里发生了什么。显然,我必须调用 dag=dag,这实际上是让运算符指向它必须具有上下文的 dag 容器的一个参数。
my_functions.task1
和名为 task1
PythonOperator
之间存在冲突
尝试:
import sub_folder.my_functions as mf # changed
task1 = PythonOperator(
task_id='task1',
python_callable=mf.task1, # changed
provide_context=True,
op_kwargs={'idx': "Hello "},
dag=dag,
)