ERROR: airflow.exceptions.AirflowException: Use keyword arguments when initializing operators

ERROR: airflow.exceptions.AirflowException: Use keyword arguments when initializing operators

这是我尝试 运行 apache-airflow UI

中的程序时显示的错误日志
ERROR [airflow.models.dagbag.DagBag] Failed to import: /d/Program Files/meta airflow/dags/csv-json.py
Traceback (most recent call last):
  File "/home/siva/.local/lib/python3.6/site-packages/airflow/models/dagbag.py", line 331, in _load_modules_from_file
    loader.exec_module(new_module)
  File "<frozen importlib._bootstrap_external>", line 678, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/d/Program Files/meta airflow/dags/csv-json.py", line 39, in <module>
    fetchdata=PythonOperator('fetch_data',python_callable=load_csv_data(),dag=dag)
  File "/home/siva/.local/lib/python3.6/site-packages/airflow/models/baseoperator.py", line 145, in apply_defaults
    raise AirflowException("Use keyword arguments when initializing operators")
airflow.exceptions.AirflowException: Use keyword arguments when initializing operators
Initialization done

该程序是关于将 csv 数据集转换为 structured/nested json 文件 这是代码

import json
import csv
import os
import pandas as pd
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
from airflow.models import Variable

AIRFLOW_HOME = os.getenv('AIRFLOW_HOME')

csv_file=Variable.get("csv_path")

def load_csv_data():
    with open(csv_file,'r') as file:
        data = pd.read_csv(file,index_col='show_id')
    return("dataframe created")

def process_into_json(data):
    data.groupby(['show_id','type','title'])
    data.apply(lambda x: x[['director','cast','country','date_added','release_year','rating','duration','listed_in']].to_dict('records'))
    data.reset_index()
    data.rename(columns={0:'details'})
    return ("processing complete")

def store_into_json(data):
    data.to_json('data\sample-supermarket.json',indent=5,orient='records')
    return(" storing done")

default_args = {'owner': 'airflow','start_date': datetime(2022, 4, 4),}

with DAG("csv-to-json-conversion",
        schedule_interval='@daily',
        start_date=datetime(2022, 4, 4),
        default_args=default_args,
        tags=['conversion_of_data_file']) as dag:
        
        fetchdata=PythonOperator('fetch_data',python_callable=load_csv_data(),dag=dag)
        
        processdata=PythonOperator('process_data',python_callable=process_into_json(),dag=dag)

        loaddata=PythonOperator('load_data',python_callable=store_into_json(),dag=dag)
        
        start=DummyOperator("start",dag=dag)

        dead=DummyOperator("dead",dag=dag)

        end=DummyOperator("end",dag=dag)

        start>>collectdata>>[processdata>>loaddata,dead]>>end

检查代码并给我一个错误的解决方案,如果有更好的方法来编写这个程序请提出你的意见 这是我使用的数据集 link

您的代码中有两个错误:

  1. 初始化运算符时不使用关键字参数:使用 PythonOperator(task_id='fetch_data', ...) 而不是 PythonOperator('fetch_data', ...)

  2. 您的 python_callable 参数必须是可调用的:使用 PythonOperator(...,python_callable=load_csv_data, ...) 而不是 PythonOperator(...,python_callable=load_csv_data(), ...)

with DAG("csv-to-json-conversion",
        schedule_interval='@daily',
        start_date=datetime(2022, 4, 4),
        default_args=default_args,
        tags=['conversion_of_data_file']) as dag:
        
        fetchdata=PythonOperator(task_id='fetch_data',python_callable=load_csv_data,dag=dag)
        
        processdata=PythonOperator(task_id='process_data',python_callable=process_into_json,dag=dag)

        loaddata=PythonOperator(task_id='load_data',python_callable=store_into_json,dag=dag)
        
        start=DummyOperator(task_id="start",dag=dag)

        dead=DummyOperator(task_id="dead",dag=dag)

        end=DummyOperator(task_id="end",dag=dag)

        start>>fetchdata>>[processdata>>loaddata,dead]>>end