当 PythonOperator 尝试调用 API 并下载数据时 Airflow DAG 失败

Airflow DAG fails when PythonOperator tries to call API and download data

我第一次尝试在我的笔记本电脑上配置 Airflow(不使用 docker,仅遵循文档)。我的目标是设置一个简单的 ETL 作业。

我用一个 PythonOperator 编写了最简单的 DAG:

from datetime import timedelta
from view import spotify_etl
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),

}
dag = DAG(
    'airflow_dag_tutorial-new',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
)


run_etl = PythonOperator(
        task_id='main_task',
        python_callable=spotify_etl,
        dag=dag,
    )

run_etl

当我通过打印语句传递虚拟函数时,DAG 成功运行。但是,当我传递调用 Spotify API 的函数 spotify_etl 时,DAG 失败了。这是函数:

def spotify_etl():
    token = 'xxx'

    headers = {
    'Accept' : "application/json",
    'Content-Type': "application/json",
    'Authorization': 'Bearer {token}'.format(token=token)    
    }  


    today = datetime.datetime.now()
    yesterday = today - datetime.timedelta(days=100)
    yesterday_unix_timestamp = int(yesterday.timestamp()) *1000


    r = requests.get("https://api.spotify.com/v1/me/player/recently-played?after={time}".format(time=yesterday_unix_timestamp), headers=headers)
    data = r.json()
    print(data)

我得到的错误是:

[2020-11-08 12:35:23,453] {local_task_job.py:102} INFO - Task exited with return code Negsignal.SIGABRT

有谁知道如何为调用 API 的函数正确使用 PythonOperator?是什么导致了这个错误?

我尝试在我的 venv 中设置:export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES(如此处建议: and here: https://github.com/ansible/ansible/issues/32499#issuecomment-341578864),但似乎没有解决问题。

原来是“export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES”设置不正确。它必须添加到 .zshrc 而不是 .bash_profile。那解决了。