使用 dask 发送并行 API 请求和错误处理
using dask for sending parallel API request and error handling
我最近开始使用dask。我想使用 http 请求将数据发送到 REST API,API return 一个 json 文件来验证数据上传是否成功。这是我的 API 调用函数:
def requestToAPI():
headers={'Content-Type': 'application/json'}
data = {
"api_key" : "xxxxxxxxxxxxx",
"attributes" : [
{
"external_id" : "user1",
"app_id" : "xxxx-xxx-xxxxx-xxxx",
"firs_name" : "user_firstname",
"last_name" : "user_lastname_test"
}
]
}
r = requests.post('https://abcdf.com/users/abdcgdu', headers=headers, data=json.dumps(data))
return r.json()
我从下面的代码中获得了一些 dask 数据帧块:
rChunk=dd.from_pandas(pandaDataFrame, chunksize=1000)
我如何使用 dask 并使用上面的块(假设每个块将更改为正确的 json 文件)将并行请求发送到 API 并在其中一个请求 fail/return 错误?
我尝试使用 dask.delayed:
[延迟(requestToAPI)(块)用于 rChunk 中的块]
但不确定如何正确处理错误?
我不确定 dask 数据框是否是您应用程序的最佳选择。您可能想要查看 delayed、futures 或 bag API。
我可能会使用 concurrent.futures
from dask.distributed import Client, as_completed
futures = client.map(process, requests)
for future in as_completed(futures):
try:
response = future.result()
# do stuff with result
except Exeption:
# do stuff
我最近开始使用dask。我想使用 http 请求将数据发送到 REST API,API return 一个 json 文件来验证数据上传是否成功。这是我的 API 调用函数:
def requestToAPI():
headers={'Content-Type': 'application/json'}
data = {
"api_key" : "xxxxxxxxxxxxx",
"attributes" : [
{
"external_id" : "user1",
"app_id" : "xxxx-xxx-xxxxx-xxxx",
"firs_name" : "user_firstname",
"last_name" : "user_lastname_test"
}
]
}
r = requests.post('https://abcdf.com/users/abdcgdu', headers=headers, data=json.dumps(data))
return r.json()
我从下面的代码中获得了一些 dask 数据帧块:
rChunk=dd.from_pandas(pandaDataFrame, chunksize=1000)
我如何使用 dask 并使用上面的块(假设每个块将更改为正确的 json 文件)将并行请求发送到 API 并在其中一个请求 fail/return 错误?
我尝试使用 dask.delayed:
[延迟(requestToAPI)(块)用于 rChunk 中的块]
但不确定如何正确处理错误?
我不确定 dask 数据框是否是您应用程序的最佳选择。您可能想要查看 delayed、futures 或 bag API。
我可能会使用 concurrent.futures
from dask.distributed import Client, as_completed
futures = client.map(process, requests)
for future in as_completed(futures):
try:
response = future.result()
# do stuff with result
except Exeption:
# do stuff