Airflow API 清除并重新运行任务时调用返回 400
Airflow API call returning 400 when clearing and rerunning a task
我正在设置一个 Airflow 环境,其中 运行 有几个数据块笔记本。
我已经使用创建集群的 PythonOperator 设置了 create_cluster 任务,提供权限并安装所需的库,所有这些都使用 Databricks REST API 2.0
.
我在 API 安装库的调用中遇到问题。
当我触发 DAG 运行 时,一切正常。同时,如果我正在清除 create_cluster 任务,库 API 将返回 400 响应。
我什至不确定为什么会这样。
PFB api 我正在打的电话:
response_lib = s.post('https://%s/api/2.0/libraries/install' % (DOMAIN),
headers={'Authorization': 'Bearer %s' % TOKEN},
json={
"cluster_id": clusterid,
"libraries": [
{
"jar": "dbfs:/FileStore/jars/4092ccd0_a657_4de2_865a_6a413580bbcd-ojdbc8.jar"
},
{
"jar": "dbfs:/FileStore/jars/d4c36be6_f697_443e_84fd_179ce07e510a-fc98ca3a_e4c2_48de_8a76_63153afe6588_spark_salesforce_assembly_1_1_3_PR46_maxCharsPerColumn_c6b42-231af.jar"
},
{
"jar": "dbfs:/FileStore/jars/2e72bd30_5861_4f2f_ba94_b591b3c604b0-jars_a68bd7d7_5e75_493c_8720_c70ff3c1f58e_RedshiftJDBC42_1_2_12_1017_fefdf-6e9da.jar"
},
{
"maven": {
"coordinates": "com.springml:spark-salesforce_2.11:1.1.3"
}
},
{
"pypi": {
"package": "cx_Oracle"
}
},
{
"pypi": {
"package": "numpy==1.16.1"
}
},
{
"pypi": {
"package": "tabulate"
}
},
{
"pypi": {
"package": "pysftp"
}
},
{
"pypi": {
"package": "s3fs"
}
},
{
"pypi": {
"package": "regex"
}
}
]
}
)
if response_lib.status_code==200:
print('Library installation response',response_lib)
else:
print("Libraries could not be installed")
print(response_lib)
这是重试任务时 print(response_lib)
的输出:
<Response [400]>
我正在使用由 AWS 管理的 Airflow。
有人可以帮我理解为什么会这样。
Databricks 官方文档 Libraries API 2.0
已解决。
在调用安装 API 之前,提供了 10 秒的延迟。
这似乎解决了问题。
代码:
Event().wait(10)
response_lib = s.post('https://%s/api/2.0/libraries/install' % (DOMAIN),
headers={'Authorization': 'Bearer %s' % TOKEN},
...........
我正在设置一个 Airflow 环境,其中 运行 有几个数据块笔记本。
我已经使用创建集群的 PythonOperator 设置了 create_cluster 任务,提供权限并安装所需的库,所有这些都使用 Databricks REST API 2.0
.
我在 API 安装库的调用中遇到问题。 当我触发 DAG 运行 时,一切正常。同时,如果我正在清除 create_cluster 任务,库 API 将返回 400 响应。 我什至不确定为什么会这样。 PFB api 我正在打的电话:
response_lib = s.post('https://%s/api/2.0/libraries/install' % (DOMAIN),
headers={'Authorization': 'Bearer %s' % TOKEN},
json={
"cluster_id": clusterid,
"libraries": [
{
"jar": "dbfs:/FileStore/jars/4092ccd0_a657_4de2_865a_6a413580bbcd-ojdbc8.jar"
},
{
"jar": "dbfs:/FileStore/jars/d4c36be6_f697_443e_84fd_179ce07e510a-fc98ca3a_e4c2_48de_8a76_63153afe6588_spark_salesforce_assembly_1_1_3_PR46_maxCharsPerColumn_c6b42-231af.jar"
},
{
"jar": "dbfs:/FileStore/jars/2e72bd30_5861_4f2f_ba94_b591b3c604b0-jars_a68bd7d7_5e75_493c_8720_c70ff3c1f58e_RedshiftJDBC42_1_2_12_1017_fefdf-6e9da.jar"
},
{
"maven": {
"coordinates": "com.springml:spark-salesforce_2.11:1.1.3"
}
},
{
"pypi": {
"package": "cx_Oracle"
}
},
{
"pypi": {
"package": "numpy==1.16.1"
}
},
{
"pypi": {
"package": "tabulate"
}
},
{
"pypi": {
"package": "pysftp"
}
},
{
"pypi": {
"package": "s3fs"
}
},
{
"pypi": {
"package": "regex"
}
}
]
}
)
if response_lib.status_code==200:
print('Library installation response',response_lib)
else:
print("Libraries could not be installed")
print(response_lib)
这是重试任务时 print(response_lib)
的输出:
<Response [400]>
我正在使用由 AWS 管理的 Airflow。 有人可以帮我理解为什么会这样。 Databricks 官方文档 Libraries API 2.0
已解决。 在调用安装 API 之前,提供了 10 秒的延迟。 这似乎解决了问题。
代码:
Event().wait(10)
response_lib = s.post('https://%s/api/2.0/libraries/install' % (DOMAIN),
headers={'Authorization': 'Bearer %s' % TOKEN},
...........