Airflow SimpleHttpOperator 没有推送到 xcom
Airflow SimpleHttpOperator is not pushing to xcom
我的 dag 中有以下 SimpleHttpOperator:
extracting_user = SimpleHttpOperator(
task_id='extracting_user',
http_conn_id='user_api',
endpoint='api/', # Some Api already configured and checked
method="GET",
response_filter=lambda response: json.loads(response.text),
log_response=True,
do_xcom_push=True,
)
后跟 PythonOperator:
processing_user = PythonOperator(
task_id='processing_user',
python_callable=_processing_user
)
函数:
def _processing_user(ti):
users = ti.xcom_pull(task_ids=['extracting_user'])
if not len(users) or 'results' not in users[0]:
raise ValueError(f'User is empty')
**More function code**
当我执行 airflow tasks test myDag extracting_user 2022-03-02
后跟 airflow tasks test myDag processing_user 2022-03-02
时,我得到值错误,用户变量等于一个空数组。
我单独测试了 extracting_user 任务,它从 API 中获取了所需的数据。我已经用 sqlite xcom 查询过,它是一个空 table.
我正在使用 airflow 2.3.0
换airflow 2.0.0版本问题解决了。 SimpleHttpOperator 似乎没有在 2.3.0 版本
上的 xcom table 上存储请求响应
我的 dag 中有以下 SimpleHttpOperator:
extracting_user = SimpleHttpOperator(
task_id='extracting_user',
http_conn_id='user_api',
endpoint='api/', # Some Api already configured and checked
method="GET",
response_filter=lambda response: json.loads(response.text),
log_response=True,
do_xcom_push=True,
)
后跟 PythonOperator:
processing_user = PythonOperator(
task_id='processing_user',
python_callable=_processing_user
)
函数:
def _processing_user(ti):
users = ti.xcom_pull(task_ids=['extracting_user'])
if not len(users) or 'results' not in users[0]:
raise ValueError(f'User is empty')
**More function code**
当我执行 airflow tasks test myDag extracting_user 2022-03-02
后跟 airflow tasks test myDag processing_user 2022-03-02
时,我得到值错误,用户变量等于一个空数组。
我单独测试了 extracting_user 任务,它从 API 中获取了所需的数据。我已经用 sqlite xcom 查询过,它是一个空 table.
我正在使用 airflow 2.3.0
换airflow 2.0.0版本问题解决了。 SimpleHttpOperator 似乎没有在 2.3.0 版本
上的 xcom table 上存储请求响应