Azure Databricks API 创建作业,成功调用 API 后未创建作业
Azure Databricks API to create job, job doesn't get created after successful call to the API
我正在使用 python 3.6 对 Azure Databricks 进行 API 调用,以创建 运行 特定笔记本的作业。我已按照 this link 处使用 API 的说明进行操作。唯一的区别是我使用的是 python 而不是 curl。我写的代码如下:
import requests
import os
import json
dbrks_create_job_url = "https://"+os.environ['DBRKS_INSTANCE']+".azuredatabricks.net//2.0/jobs/create"
DBRKS_REQ_HEADERS = {
'Authorization': 'Bearer ' + os.environ['DBRKS_BEARER_TOKEN'],
'X-Databricks-Azure-Workspace-Resource-Id': '/subscriptions/'+ os.environ['DBRKS_SUBSCRIPTION_ID'] +'/resourceGroups/'+ os.environ['DBRKS_RESOURCE_GROUP'] +'/providers/Microsoft.Databricks/workspaces/' + os.environ['DBRKS_WORKSPACE_NAME'],
'X-Databricks-Azure-SP-Management-Token': os.environ['DBRKS_MANAGEMENT_TOKEN']}
body_json = """
{
"name": "A sample job to trigger from DevOps",
"tasks": [
{
"task_key": "ExecuteNotebook",
"description": "Execute uploaded notebook including tests",
"depends_on": [],
"existing_cluster_id": """ + os.environ["DBRKS_CLUSTER_ID"] + """,
"notebook_task": {
"notebook_path": "/Users/myuser/sample-notebook",
"base_parameters": {}
},
"timeout_seconds": 300,
"max_retries": 1,
"min_retry_interval_millis": 5000,
"retry_on_timeout": false
}
],
"email_notifications": {},
"name": "my_test_job",
"max_concurrent_runs": 1}
"""
print("Request body in json format:")
print(body_json)
response = requests.post(dbrks_create_job_url, headers=DBRKS_REQ_HEADERS, data=body_json)
if response.status_code == 200:
print("Job created successfully!")
print(response.status_code)
print(response.content)
else:
print("job failed!")
raise Exception(response.content)
所有 OS 环境变量都是从我的 Azure DevOps 管道发送的。但是,您不需要从管道执行脚本。只要您有一个可以访问数据块工作区的服务主体,就可以从本地计算机执行它。对于 运行 python 脚本,您可以将这些环境变量替换为您自己的凭据。
解释脚本中的变量:
os.environ['DBRKS_INSTANCE']:数据块实例的名称
os.environ['DBRKS_BEARER_TOKEN']:不记名令牌。您需要它来验证您的服务主体或您的用户到数据块。稍后我会解释你如何获得它。
os.environ['DBRKS_MANAGEMENT_TOKEN']:如果您使用的服务原则未添加为数据块工作区用户或管理员,则需要此令牌。稍后我会解释你如何获得它。
os.environ['DBRKS_SUBSCRIPTION_ID']: databricks 工作区所在的 Azure 订阅 ID。
os.environ['DBRKS_RESOURCE_GROUP']: databricks 工作区的 Azure 资源组名称。
os.environ['DBRKS_WORKSPACE_NAME']:Azure 数据块工作区的名称。
os.environ["DBRKS_CLUSTER_ID"]:将在数据块中执行作业的集群 ID。
当我 运行 我的脚本时,我得到状态代码 200,这意味着它应该正常工作,如下所示:
但是,当我查看作业列表时,尽管收到了 200 状态代码,但没有创建新作业!您可以在下面看到我创建的作业不存在。
我也将API端点从azuredatabricks.net//2.0/jobs/create更改为azuredatabricks.net//2.1/jobs/create,但我还是成功了运行 但没有创建任何工作!我不明白我做错了什么。如果我做错了什么,为什么它不会引发异常并给我 200 状态码。
能够重现我面临的问题的最后一点:要获得 DBRKS_BEARER_TOKEN 和 DBRKS_MANAGEMENT_TOKEN 的上述两个变量,您可以 运行 以下脚本并手动将 os.environ['DBRKS_BEARER_TOKEN'] 和 os.environ['DBRKS_MANAGEMENT_TOKEN'] 替换为脚本执行后的打印值:
import requests
import json
import os
TOKEN_BASE_URL = 'https://login.microsoftonline.com/' + os.environ['SVCDirectoryID'] + '/oauth2/token'
TOKEN_REQ_HEADERS = {'Content-Type': 'application/x-www-form-urlencoded'}
TOKEN_REQ_BODY = {
'grant_type': 'client_credentials',
'client_id': os.environ['SVCApplicationID'],
'client_secret': os.environ['SVCSecretKey']}
def dbrks_management_token():
TOKEN_REQ_BODY['resource'] = 'https://management.core.windows.net/'
response = requests.get(TOKEN_BASE_URL, headers=TOKEN_REQ_HEADERS, data=TOKEN_REQ_BODY)
if response.status_code == 200:
print(response.status_code)
else:
raise Exception(response.text)
return response.json()['access_token']
def dbrks_bearer_token():
TOKEN_REQ_BODY['resource'] = '2ff814a6-3304-4ab8-85cb-cd0e6f879c1d'
response = requests.get(TOKEN_BASE_URL, headers=TOKEN_REQ_HEADERS, data=TOKEN_REQ_BODY)
if response.status_code == 200:
print(response.status_code)
else:
raise Exception(response.text)
return response.json()['access_token']
DBRKS_BEARER_TOKEN = dbrks_bearer_token()
DBRKS_MANAGEMENT_TOKEN = dbrks_management_token()
os.environ['DBRKS_BEARER_TOKEN'] = DBRKS_BEARER_TOKEN
os.environ['DBRKS_MANAGEMENT_TOKEN'] = DBRKS_MANAGEMENT_TOKEN
print("DBRKS_BEARER_TOKEN",os.environ['DBRKS_BEARER_TOKEN'])
print("DBRKS_MANAGEMENT_TOKEN",os.environ['DBRKS_MANAGEMENT_TOKEN'])
- SVCDirectoryID 是 Azure Active Directory (AAD) 服务主体租户 ID
- SVCApplicationID 是 AAD 服务主体客户端 ID 的值。
- SVCSecretKey 是 AAD 服务主体密钥。
感谢您的宝贵意见。
您混淆了 API 版本 - tasks
数组只能与 Jobs API 2.1 一起使用,但您使用的是 Jobs API 2.0。另一个错误是主机名和路径之间有 //
。
只需将dbrks_create_job_url
更改为"https://"+os.environ['DBRKS_INSTANCE']+".azuredatabricks.net/api/2.1/jobs/create"
我正在使用 python 3.6 对 Azure Databricks 进行 API 调用,以创建 运行 特定笔记本的作业。我已按照 this link 处使用 API 的说明进行操作。唯一的区别是我使用的是 python 而不是 curl。我写的代码如下:
import requests
import os
import json
dbrks_create_job_url = "https://"+os.environ['DBRKS_INSTANCE']+".azuredatabricks.net//2.0/jobs/create"
DBRKS_REQ_HEADERS = {
'Authorization': 'Bearer ' + os.environ['DBRKS_BEARER_TOKEN'],
'X-Databricks-Azure-Workspace-Resource-Id': '/subscriptions/'+ os.environ['DBRKS_SUBSCRIPTION_ID'] +'/resourceGroups/'+ os.environ['DBRKS_RESOURCE_GROUP'] +'/providers/Microsoft.Databricks/workspaces/' + os.environ['DBRKS_WORKSPACE_NAME'],
'X-Databricks-Azure-SP-Management-Token': os.environ['DBRKS_MANAGEMENT_TOKEN']}
body_json = """
{
"name": "A sample job to trigger from DevOps",
"tasks": [
{
"task_key": "ExecuteNotebook",
"description": "Execute uploaded notebook including tests",
"depends_on": [],
"existing_cluster_id": """ + os.environ["DBRKS_CLUSTER_ID"] + """,
"notebook_task": {
"notebook_path": "/Users/myuser/sample-notebook",
"base_parameters": {}
},
"timeout_seconds": 300,
"max_retries": 1,
"min_retry_interval_millis": 5000,
"retry_on_timeout": false
}
],
"email_notifications": {},
"name": "my_test_job",
"max_concurrent_runs": 1}
"""
print("Request body in json format:")
print(body_json)
response = requests.post(dbrks_create_job_url, headers=DBRKS_REQ_HEADERS, data=body_json)
if response.status_code == 200:
print("Job created successfully!")
print(response.status_code)
print(response.content)
else:
print("job failed!")
raise Exception(response.content)
所有 OS 环境变量都是从我的 Azure DevOps 管道发送的。但是,您不需要从管道执行脚本。只要您有一个可以访问数据块工作区的服务主体,就可以从本地计算机执行它。对于 运行 python 脚本,您可以将这些环境变量替换为您自己的凭据。
解释脚本中的变量:
os.environ['DBRKS_INSTANCE']:数据块实例的名称
os.environ['DBRKS_BEARER_TOKEN']:不记名令牌。您需要它来验证您的服务主体或您的用户到数据块。稍后我会解释你如何获得它。
os.environ['DBRKS_MANAGEMENT_TOKEN']:如果您使用的服务原则未添加为数据块工作区用户或管理员,则需要此令牌。稍后我会解释你如何获得它。
os.environ['DBRKS_SUBSCRIPTION_ID']: databricks 工作区所在的 Azure 订阅 ID。
os.environ['DBRKS_RESOURCE_GROUP']: databricks 工作区的 Azure 资源组名称。
os.environ['DBRKS_WORKSPACE_NAME']:Azure 数据块工作区的名称。
os.environ["DBRKS_CLUSTER_ID"]:将在数据块中执行作业的集群 ID。
当我 运行 我的脚本时,我得到状态代码 200,这意味着它应该正常工作,如下所示:
但是,当我查看作业列表时,尽管收到了 200 状态代码,但没有创建新作业!您可以在下面看到我创建的作业不存在。
我也将API端点从azuredatabricks.net//2.0/jobs/create更改为azuredatabricks.net//2.1/jobs/create,但我还是成功了运行 但没有创建任何工作!我不明白我做错了什么。如果我做错了什么,为什么它不会引发异常并给我 200 状态码。
能够重现我面临的问题的最后一点:要获得 DBRKS_BEARER_TOKEN 和 DBRKS_MANAGEMENT_TOKEN 的上述两个变量,您可以 运行 以下脚本并手动将 os.environ['DBRKS_BEARER_TOKEN'] 和 os.environ['DBRKS_MANAGEMENT_TOKEN'] 替换为脚本执行后的打印值:
import requests
import json
import os
TOKEN_BASE_URL = 'https://login.microsoftonline.com/' + os.environ['SVCDirectoryID'] + '/oauth2/token'
TOKEN_REQ_HEADERS = {'Content-Type': 'application/x-www-form-urlencoded'}
TOKEN_REQ_BODY = {
'grant_type': 'client_credentials',
'client_id': os.environ['SVCApplicationID'],
'client_secret': os.environ['SVCSecretKey']}
def dbrks_management_token():
TOKEN_REQ_BODY['resource'] = 'https://management.core.windows.net/'
response = requests.get(TOKEN_BASE_URL, headers=TOKEN_REQ_HEADERS, data=TOKEN_REQ_BODY)
if response.status_code == 200:
print(response.status_code)
else:
raise Exception(response.text)
return response.json()['access_token']
def dbrks_bearer_token():
TOKEN_REQ_BODY['resource'] = '2ff814a6-3304-4ab8-85cb-cd0e6f879c1d'
response = requests.get(TOKEN_BASE_URL, headers=TOKEN_REQ_HEADERS, data=TOKEN_REQ_BODY)
if response.status_code == 200:
print(response.status_code)
else:
raise Exception(response.text)
return response.json()['access_token']
DBRKS_BEARER_TOKEN = dbrks_bearer_token()
DBRKS_MANAGEMENT_TOKEN = dbrks_management_token()
os.environ['DBRKS_BEARER_TOKEN'] = DBRKS_BEARER_TOKEN
os.environ['DBRKS_MANAGEMENT_TOKEN'] = DBRKS_MANAGEMENT_TOKEN
print("DBRKS_BEARER_TOKEN",os.environ['DBRKS_BEARER_TOKEN'])
print("DBRKS_MANAGEMENT_TOKEN",os.environ['DBRKS_MANAGEMENT_TOKEN'])
- SVCDirectoryID 是 Azure Active Directory (AAD) 服务主体租户 ID
- SVCApplicationID 是 AAD 服务主体客户端 ID 的值。
- SVCSecretKey 是 AAD 服务主体密钥。
感谢您的宝贵意见。
您混淆了 API 版本 - tasks
数组只能与 Jobs API 2.1 一起使用,但您使用的是 Jobs API 2.0。另一个错误是主机名和路径之间有 //
。
只需将dbrks_create_job_url
更改为"https://"+os.environ['DBRKS_INSTANCE']+".azuredatabricks.net/api/2.1/jobs/create"