ERROR: airflow.exceptions.AirflowException: Use keyword arguments when initializing operators
ERROR: airflow.exceptions.AirflowException: Use keyword arguments when initializing operators
这是我尝试 运行 apache-airflow UI
中的程序时显示的错误日志
ERROR [airflow.models.dagbag.DagBag] Failed to import: /d/Program Files/meta airflow/dags/csv-json.py
Traceback (most recent call last):
File "/home/siva/.local/lib/python3.6/site-packages/airflow/models/dagbag.py", line 331, in _load_modules_from_file
loader.exec_module(new_module)
File "<frozen importlib._bootstrap_external>", line 678, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/d/Program Files/meta airflow/dags/csv-json.py", line 39, in <module>
fetchdata=PythonOperator('fetch_data',python_callable=load_csv_data(),dag=dag)
File "/home/siva/.local/lib/python3.6/site-packages/airflow/models/baseoperator.py", line 145, in apply_defaults
raise AirflowException("Use keyword arguments when initializing operators")
airflow.exceptions.AirflowException: Use keyword arguments when initializing operators
Initialization done
该程序是关于将 csv 数据集转换为 structured/nested json 文件
这是代码
import json
import csv
import os
import pandas as pd
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
from airflow.models import Variable
AIRFLOW_HOME = os.getenv('AIRFLOW_HOME')
csv_file=Variable.get("csv_path")
def load_csv_data():
with open(csv_file,'r') as file:
data = pd.read_csv(file,index_col='show_id')
return("dataframe created")
def process_into_json(data):
data.groupby(['show_id','type','title'])
data.apply(lambda x: x[['director','cast','country','date_added','release_year','rating','duration','listed_in']].to_dict('records'))
data.reset_index()
data.rename(columns={0:'details'})
return ("processing complete")
def store_into_json(data):
data.to_json('data\sample-supermarket.json',indent=5,orient='records')
return(" storing done")
default_args = {'owner': 'airflow','start_date': datetime(2022, 4, 4),}
with DAG("csv-to-json-conversion",
schedule_interval='@daily',
start_date=datetime(2022, 4, 4),
default_args=default_args,
tags=['conversion_of_data_file']) as dag:
fetchdata=PythonOperator('fetch_data',python_callable=load_csv_data(),dag=dag)
processdata=PythonOperator('process_data',python_callable=process_into_json(),dag=dag)
loaddata=PythonOperator('load_data',python_callable=store_into_json(),dag=dag)
start=DummyOperator("start",dag=dag)
dead=DummyOperator("dead",dag=dag)
end=DummyOperator("end",dag=dag)
start>>collectdata>>[processdata>>loaddata,dead]>>end
检查代码并给我一个错误的解决方案,如果有更好的方法来编写这个程序请提出你的意见
这是我使用的数据集 link
您的代码中有两个错误:
初始化运算符时不使用关键字参数:使用 PythonOperator(task_id='fetch_data', ...)
而不是 PythonOperator('fetch_data', ...)
您的 python_callable
参数必须是可调用的:使用 PythonOperator(...,python_callable=load_csv_data, ...)
而不是 PythonOperator(...,python_callable=load_csv_data(), ...)
with DAG("csv-to-json-conversion",
schedule_interval='@daily',
start_date=datetime(2022, 4, 4),
default_args=default_args,
tags=['conversion_of_data_file']) as dag:
fetchdata=PythonOperator(task_id='fetch_data',python_callable=load_csv_data,dag=dag)
processdata=PythonOperator(task_id='process_data',python_callable=process_into_json,dag=dag)
loaddata=PythonOperator(task_id='load_data',python_callable=store_into_json,dag=dag)
start=DummyOperator(task_id="start",dag=dag)
dead=DummyOperator(task_id="dead",dag=dag)
end=DummyOperator(task_id="end",dag=dag)
start>>fetchdata>>[processdata>>loaddata,dead]>>end
这是我尝试 运行 apache-airflow UI
中的程序时显示的错误日志ERROR [airflow.models.dagbag.DagBag] Failed to import: /d/Program Files/meta airflow/dags/csv-json.py
Traceback (most recent call last):
File "/home/siva/.local/lib/python3.6/site-packages/airflow/models/dagbag.py", line 331, in _load_modules_from_file
loader.exec_module(new_module)
File "<frozen importlib._bootstrap_external>", line 678, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/d/Program Files/meta airflow/dags/csv-json.py", line 39, in <module>
fetchdata=PythonOperator('fetch_data',python_callable=load_csv_data(),dag=dag)
File "/home/siva/.local/lib/python3.6/site-packages/airflow/models/baseoperator.py", line 145, in apply_defaults
raise AirflowException("Use keyword arguments when initializing operators")
airflow.exceptions.AirflowException: Use keyword arguments when initializing operators
Initialization done
该程序是关于将 csv 数据集转换为 structured/nested json 文件 这是代码
import json
import csv
import os
import pandas as pd
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
from airflow.models import Variable
AIRFLOW_HOME = os.getenv('AIRFLOW_HOME')
csv_file=Variable.get("csv_path")
def load_csv_data():
with open(csv_file,'r') as file:
data = pd.read_csv(file,index_col='show_id')
return("dataframe created")
def process_into_json(data):
data.groupby(['show_id','type','title'])
data.apply(lambda x: x[['director','cast','country','date_added','release_year','rating','duration','listed_in']].to_dict('records'))
data.reset_index()
data.rename(columns={0:'details'})
return ("processing complete")
def store_into_json(data):
data.to_json('data\sample-supermarket.json',indent=5,orient='records')
return(" storing done")
default_args = {'owner': 'airflow','start_date': datetime(2022, 4, 4),}
with DAG("csv-to-json-conversion",
schedule_interval='@daily',
start_date=datetime(2022, 4, 4),
default_args=default_args,
tags=['conversion_of_data_file']) as dag:
fetchdata=PythonOperator('fetch_data',python_callable=load_csv_data(),dag=dag)
processdata=PythonOperator('process_data',python_callable=process_into_json(),dag=dag)
loaddata=PythonOperator('load_data',python_callable=store_into_json(),dag=dag)
start=DummyOperator("start",dag=dag)
dead=DummyOperator("dead",dag=dag)
end=DummyOperator("end",dag=dag)
start>>collectdata>>[processdata>>loaddata,dead]>>end
检查代码并给我一个错误的解决方案,如果有更好的方法来编写这个程序请提出你的意见 这是我使用的数据集 link
您的代码中有两个错误:
初始化运算符时不使用关键字参数:使用
PythonOperator(task_id='fetch_data', ...)
而不是PythonOperator('fetch_data', ...)
您的
python_callable
参数必须是可调用的:使用PythonOperator(...,python_callable=load_csv_data, ...)
而不是PythonOperator(...,python_callable=load_csv_data(), ...)
with DAG("csv-to-json-conversion",
schedule_interval='@daily',
start_date=datetime(2022, 4, 4),
default_args=default_args,
tags=['conversion_of_data_file']) as dag:
fetchdata=PythonOperator(task_id='fetch_data',python_callable=load_csv_data,dag=dag)
processdata=PythonOperator(task_id='process_data',python_callable=process_into_json,dag=dag)
loaddata=PythonOperator(task_id='load_data',python_callable=store_into_json,dag=dag)
start=DummyOperator(task_id="start",dag=dag)
dead=DummyOperator(task_id="dead",dag=dag)
end=DummyOperator(task_id="end",dag=dag)
start>>fetchdata>>[processdata>>loaddata,dead]>>end