How to fix airflow error: print_context() missing 1 required positional argument: 'ds'
How to fix airflow error: print_context() missing 1 required positional argument: 'ds'
我有一个 dag 如下:
ingest_excel.py:
from __future__ import print_function
import time
from builtins import range
from datetime import timedelta
from pprint import pprint
import airflow
from airflow.models import DAG
#from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
args = {
'owner': 'rxie',
'start_date': airflow.utils.dates.days_ago(2),
}
dag = DAG(
dag_id='ingest_excel',
default_args=args,
schedule_interval='0 0 * * *',
dagrun_timeout=timedelta(minutes=60),
)
def print_context(**kwargs):
pprint("DAG info below:")
pprint(kwargs)
return 'Whatever you return gets printed in the logs'
t11_extract_excel_to_csv = PythonOperator(
task_id='t1_extract_excel_to_csv',
provide_context=True,
python_callable=print_context(),
op_kwargs=None,
dag=dag,
)
t12_upload_csv_to_hdfs_parquet = PythonOperator(
task_id='t12_upload_csv_to_hdfs_parquet',
provide_context=True,
python_callable=print_context(),
op_kwargs=None,
dag=dag,
)
t13_register_parquet_to_impala = PythonOperator(
task_id='t13_register_parquet_to_impala',
provide_context=True,
python_callable=print_context(),
op_kwargs=None,
dag=dag,
)
t21_text_to_parquet = PythonOperator(
task_id='t21_text_to_parquet',
provide_context=True,
python_callable=print_context(),
op_kwargs=None,
dag=dag,
)
t22_register_parquet_to_impala = PythonOperator(
task_id='t22_register_parquet_to_impala',
provide_context=True,
python_callable=print_context(),
op_kwargs=None,
dag=dag,
)
t31_verify_completion = PythonOperator(
task_id='t31_verify_completion',
provide_context=True,
python_callable=print_context(),
op_kwargs=None,
dag=dag,
)
t32_send_notification = PythonOperator(
task_id='t32_send_notification',
provide_context=True,
python_callable=print_context(),
op_kwargs=None,
dag=dag,
)
t11_extract_excel_to_csv >> t12_upload_csv_to_hdfs_parquet
t12_upload_csv_to_hdfs_parquet >> t13_register_parquet_to_impala
t21_text_to_parquet >> t22_register_parquet_to_impala
t13_register_parquet_to_impala >> t31_verify_completion
t22_register_parquet_to_impala >> t31_verify_completion
t31_verify_completion >> t32_send_notification
#if __name__ == "__main__":
# dag.cli()
在DAG GUI中提示:
Broken DAG: [/root/airflow/dags/ingest_excel.py] python_callable
param must be callable
这是我第一次接触 Airflow,我对 Airflow 还很陌生,如果有人能给我一些启发并帮我整理一下,我将不胜感激。
提前致谢。
我不完全确定为什么你的代码不起作用。它 应该 工作,但下面给出了解决方法。
def print_context(**kwargs):
ds = kwargs['ds']
python_callable 也应该这样传递
python_callable=print_context,
详细说明您的问题:您的流程中断是因为您没有将函数 print_context
传递给 PythonOperator
,您传递的是 结果 调用 print_context
:
[...]
t32_send_notification = PythonOperator(
task_id='t32_send_notification',
provide_context=True,
python_callable=print_context(), # <-- This is the issue.
op_kwargs=None,
dag=dag,
)
[...]
您的函数返回字符串 'Whatever you return gets printed in the logs'
,该字符串又提供给 python_callable
关键字参数中的 PythonOperator
。 Airflow 本质上是在尝试执行以下操作:
your_return = 'Whatever you return gets printed in the logs'
your_return()
...并且您收到了您看到的错误。另一位贡献者指出您应该将 PythonOperator.python_callable
关键字参数更改为简单的 print_context
是正确的
需要将以下选项传递给较新版本的 airflow 中的 PythonOperator:
provide_context=True
否则 ds 参数不会传递给您的函数。这是我最近对 Airflow 所做的更改 运行。
完整示例:
def print_context(ds, **kwargs):
pprint(kwargs)
print(ds)
return 'Whatever you return gets printed in the logs'
run_this = PythonOperator(
task_id='print_the_context',
provide_context=True,
python_callable=print_context,
dag=dag,
)
我有一个 dag 如下: ingest_excel.py:
from __future__ import print_function
import time
from builtins import range
from datetime import timedelta
from pprint import pprint
import airflow
from airflow.models import DAG
#from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
args = {
'owner': 'rxie',
'start_date': airflow.utils.dates.days_ago(2),
}
dag = DAG(
dag_id='ingest_excel',
default_args=args,
schedule_interval='0 0 * * *',
dagrun_timeout=timedelta(minutes=60),
)
def print_context(**kwargs):
pprint("DAG info below:")
pprint(kwargs)
return 'Whatever you return gets printed in the logs'
t11_extract_excel_to_csv = PythonOperator(
task_id='t1_extract_excel_to_csv',
provide_context=True,
python_callable=print_context(),
op_kwargs=None,
dag=dag,
)
t12_upload_csv_to_hdfs_parquet = PythonOperator(
task_id='t12_upload_csv_to_hdfs_parquet',
provide_context=True,
python_callable=print_context(),
op_kwargs=None,
dag=dag,
)
t13_register_parquet_to_impala = PythonOperator(
task_id='t13_register_parquet_to_impala',
provide_context=True,
python_callable=print_context(),
op_kwargs=None,
dag=dag,
)
t21_text_to_parquet = PythonOperator(
task_id='t21_text_to_parquet',
provide_context=True,
python_callable=print_context(),
op_kwargs=None,
dag=dag,
)
t22_register_parquet_to_impala = PythonOperator(
task_id='t22_register_parquet_to_impala',
provide_context=True,
python_callable=print_context(),
op_kwargs=None,
dag=dag,
)
t31_verify_completion = PythonOperator(
task_id='t31_verify_completion',
provide_context=True,
python_callable=print_context(),
op_kwargs=None,
dag=dag,
)
t32_send_notification = PythonOperator(
task_id='t32_send_notification',
provide_context=True,
python_callable=print_context(),
op_kwargs=None,
dag=dag,
)
t11_extract_excel_to_csv >> t12_upload_csv_to_hdfs_parquet
t12_upload_csv_to_hdfs_parquet >> t13_register_parquet_to_impala
t21_text_to_parquet >> t22_register_parquet_to_impala
t13_register_parquet_to_impala >> t31_verify_completion
t22_register_parquet_to_impala >> t31_verify_completion
t31_verify_completion >> t32_send_notification
#if __name__ == "__main__":
# dag.cli()
在DAG GUI中提示:
Broken DAG: [/root/airflow/dags/ingest_excel.py]
python_callable
param must be callable
这是我第一次接触 Airflow,我对 Airflow 还很陌生,如果有人能给我一些启发并帮我整理一下,我将不胜感激。
提前致谢。
我不完全确定为什么你的代码不起作用。它 应该 工作,但下面给出了解决方法。
def print_context(**kwargs):
ds = kwargs['ds']
python_callable 也应该这样传递
python_callable=print_context,
详细说明您的问题:您的流程中断是因为您没有将函数 print_context
传递给 PythonOperator
,您传递的是 结果 调用 print_context
:
[...]
t32_send_notification = PythonOperator(
task_id='t32_send_notification',
provide_context=True,
python_callable=print_context(), # <-- This is the issue.
op_kwargs=None,
dag=dag,
)
[...]
您的函数返回字符串 'Whatever you return gets printed in the logs'
,该字符串又提供给 python_callable
关键字参数中的 PythonOperator
。 Airflow 本质上是在尝试执行以下操作:
your_return = 'Whatever you return gets printed in the logs'
your_return()
...并且您收到了您看到的错误。另一位贡献者指出您应该将 PythonOperator.python_callable
关键字参数更改为简单的 print_context
需要将以下选项传递给较新版本的 airflow 中的 PythonOperator:
provide_context=True
否则 ds 参数不会传递给您的函数。这是我最近对 Airflow 所做的更改 运行。
完整示例:
def print_context(ds, **kwargs):
pprint(kwargs)
print(ds)
return 'Whatever you return gets printed in the logs'
run_this = PythonOperator(
task_id='print_the_context',
provide_context=True,
python_callable=print_context,
dag=dag,
)