Airflow API 清除并重新运行任务时调用返回 400

Airflow API call returning 400 when clearing and rerunning a task

我正在设置一个 Airflow 环境,其中 运行 有几个数据块笔记本。 我已经使用创建集群的 PythonOperator 设置了 create_cluster 任务,提供权限并安装所需的库,所有这些都使用 Databricks REST API 2.0.

我在 API 安装库的调用中遇到问题。 当我触发 DAG 运行 时,一切正常。同时,如果我正在清除 create_cluster 任务,库 API 将返回 400 响应。 我什至不确定为什么会这样。 PFB api 我正在打的电话:

response_lib = s.post('https://%s/api/2.0/libraries/install' % (DOMAIN),
        headers={'Authorization': 'Bearer %s' % TOKEN},
        json={
            "cluster_id": clusterid,
            "libraries": [
                {
                    "jar": "dbfs:/FileStore/jars/4092ccd0_a657_4de2_865a_6a413580bbcd-ojdbc8.jar"
                },
                {
                    "jar": "dbfs:/FileStore/jars/d4c36be6_f697_443e_84fd_179ce07e510a-fc98ca3a_e4c2_48de_8a76_63153afe6588_spark_salesforce_assembly_1_1_3_PR46_maxCharsPerColumn_c6b42-231af.jar"
                },
                {
                    "jar": "dbfs:/FileStore/jars/2e72bd30_5861_4f2f_ba94_b591b3c604b0-jars_a68bd7d7_5e75_493c_8720_c70ff3c1f58e_RedshiftJDBC42_1_2_12_1017_fefdf-6e9da.jar"
                },
                {
                    "maven": {
                        "coordinates": "com.springml:spark-salesforce_2.11:1.1.3"
                        }
                },
                {
                    "pypi": {
                        "package": "cx_Oracle"
                        }
                },
                {
                    "pypi": {
                        "package": "numpy==1.16.1"
                        }
                },
                {
                    "pypi": {
                        "package": "tabulate"
                        }
                },
                {
                    "pypi": {
                        "package": "pysftp"
                        }
                },
                {
                    "pypi": {
                        "package": "s3fs"
                        }
                },
                {
                    "pypi": {
                        "package": "regex"
                        }
                }
    
                ]
            }
        )
        
        if response_lib.status_code==200:
            print('Library installation response',response_lib)
        else:
            print("Libraries could not be installed")
            print(response_lib)
        

这是重试任务时 print(response_lib) 的输出: <Response [400]>

我正在使用由 AWS 管理的 Airflow。 有人可以帮我理解为什么会这样。 Databricks 官方文档 Libraries API 2.0

已解决。 在调用安装 API 之前,提供了 10 秒的延迟。 这似乎解决了问题。

代码:

Event().wait(10)
response_lib = s.post('https://%s/api/2.0/libraries/install' % (DOMAIN),
        headers={'Authorization': 'Bearer %s' % TOKEN},
...........