Airflow PythonOperator task fail - TypeError: The key has to be a string

Airflow PythonOperator task fail - TypeError: The key has to be a string

这里是Airflow新手,多多包涵.. 我不明白为什么这个简单的任务失败了:

def getCarJSON():
    dictCars= {'link': '/cars/acura', 'num': '1'}    
    with open('data/dictCars.json', 'w') as fp:
        json.dump(dictCars, fp)

这是存储在磁盘上的简单字典 JSON。 为什么我得到:

Broken DAG: [/home/user/airflow/dags/cars.py] Traceback (most recent call last): File "/usr/local/lib/python3.8/dist-packages/airflow/models/baseoperator.py", line 404, in init validate_key(task_id) File "/usr/local/lib/python3.8/dist-packages/airflow/utils/helpers.py", line 39, in validate_key raise TypeError("The key has to be a string") TypeError: The key has to be a string

我在 DAG 文件中有常用数据:

# Set default args
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2021, 3, 23),
    'email': ['donko@gmail.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=2)
}

schedule_interval = '30 09 * * *'

# Define DAG: Set ID and assign default args and schedule interval
dag = DAG(
    dag_id = 'get_cars',
    default_args = default_args,
    schedule_interval = schedule_interval
)


# Get cars dict
get_cars_json = PythonOperator(
    task_id=getCarJSON,
    python_callable=getCarJSON,
    dag=dag
)

我只想将数据转储到驱动器上...

从 broken dag 消息来看,似乎在验证 tast_id 属性时出现了这个错误。

在下面的代码中,您将 getCarJSON 函数传递给了 task_id,而不是任务的名称,这将是字符串类型,因此导致了 TypeError[=13= 的问题]

get_cars_json = PythonOperator(
    task_id='getCarJSON', # Name here was without quotations
    python_callable=getCarJSON,
    dag=dag
)