当 PythonOperator 尝试调用 API 并下载数据时 Airflow DAG 失败
Airflow DAG fails when PythonOperator tries to call API and download data
我第一次尝试在我的笔记本电脑上配置 Airflow(不使用 docker,仅遵循文档)。我的目标是设置一个简单的 ETL 作业。
我用一个 PythonOperator 编写了最简单的 DAG:
from datetime import timedelta
from view import spotify_etl
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(2),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
}
dag = DAG(
'airflow_dag_tutorial-new',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
)
run_etl = PythonOperator(
task_id='main_task',
python_callable=spotify_etl,
dag=dag,
)
run_etl
当我通过打印语句传递虚拟函数时,DAG 成功运行。但是,当我传递调用 Spotify API 的函数 spotify_etl 时,DAG 失败了。这是函数:
def spotify_etl():
token = 'xxx'
headers = {
'Accept' : "application/json",
'Content-Type': "application/json",
'Authorization': 'Bearer {token}'.format(token=token)
}
today = datetime.datetime.now()
yesterday = today - datetime.timedelta(days=100)
yesterday_unix_timestamp = int(yesterday.timestamp()) *1000
r = requests.get("https://api.spotify.com/v1/me/player/recently-played?after={time}".format(time=yesterday_unix_timestamp), headers=headers)
data = r.json()
print(data)
我得到的错误是:
[2020-11-08 12:35:23,453] {local_task_job.py:102} INFO - Task exited with return code Negsignal.SIGABRT
有谁知道如何为调用 API 的函数正确使用 PythonOperator?是什么导致了这个错误?
我尝试在我的 venv 中设置:export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES(如此处建议: and here: https://github.com/ansible/ansible/issues/32499#issuecomment-341578864),但似乎没有解决问题。
原来是“export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES”设置不正确。它必须添加到 .zshrc 而不是 .bash_profile。那解决了。
我第一次尝试在我的笔记本电脑上配置 Airflow(不使用 docker,仅遵循文档)。我的目标是设置一个简单的 ETL 作业。
我用一个 PythonOperator 编写了最简单的 DAG:
from datetime import timedelta
from view import spotify_etl
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(2),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
}
dag = DAG(
'airflow_dag_tutorial-new',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
)
run_etl = PythonOperator(
task_id='main_task',
python_callable=spotify_etl,
dag=dag,
)
run_etl
当我通过打印语句传递虚拟函数时,DAG 成功运行。但是,当我传递调用 Spotify API 的函数 spotify_etl 时,DAG 失败了。这是函数:
def spotify_etl():
token = 'xxx'
headers = {
'Accept' : "application/json",
'Content-Type': "application/json",
'Authorization': 'Bearer {token}'.format(token=token)
}
today = datetime.datetime.now()
yesterday = today - datetime.timedelta(days=100)
yesterday_unix_timestamp = int(yesterday.timestamp()) *1000
r = requests.get("https://api.spotify.com/v1/me/player/recently-played?after={time}".format(time=yesterday_unix_timestamp), headers=headers)
data = r.json()
print(data)
我得到的错误是:
[2020-11-08 12:35:23,453] {local_task_job.py:102} INFO - Task exited with return code Negsignal.SIGABRT
有谁知道如何为调用 API 的函数正确使用 PythonOperator?是什么导致了这个错误?
我尝试在我的 venv 中设置:export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES(如此处建议:
原来是“export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES”设置不正确。它必须添加到 .zshrc 而不是 .bash_profile。那解决了。