如何从多个 API 调用中更新 pandas 数据框
How to update a pandas dataframe, from multiple API calls
我需要编写一个 python 脚本来
- 读取包含列 (
person_id
、name
、flag
) 的 csv 文件。该文件有 3000 行。
- 基于 csv 文件中的
person_id
,我需要调用 URL 传递 person_id
来执行 GET
http://api.myendpoint.intranet/get-data/1234
URL 将 return person_id
的一些信息,如下例所示。我需要获取所有租金对象并保存在我的 csv 上。我的输出需要像这样
import pandas as pd
import requests
ids = pd.read_csv(f"{path}/data.csv", delimiter=';')
person_rents = df = pd.DataFrame([], columns=list('person_id','carId','price','rentStatus'))
for id in ids:
response = request.get(f'endpoint/{id["person_id"]}')
json = response.json()
person_rents.append( [person_id, rent['carId'], rent['price'], rent['rentStatus'] ] )
pd.read_csv(f"{path}/data.csv", delimiter=';' )
person_id;name;flag;cardId;price;rentStatus
1000;Joseph;1;6638;1000;active
1000;Joseph;1;5566;2000;active
响应示例
{
"active": false,
"ctodx": false,
"rents": [{
"carId": 6638,
"price": 1000,
"rentStatus": "active"
}, {
"carId": 5566,
"price": 2000,
"rentStatus": "active"
}
],
"responseCode": "OK",
"status": [{
"request": 345,
"requestStatus": "F"
}, {
"requestId": 678,
"requestStatus": "P"
}
],
"transaction": false
}
- 在 csv 上保存来自响应的附加数据后,我需要使用 URL 上的 carId 从另一个端点获取数据。里程结果必须保存在同一个 csv 中。
http://api.myendpoint.intranet/get-mileage/6638
http://api.myendpoint.intranet/get-mileage/5566
每次调用的return会是这样
{"mileage":1000.0000}
{"mileage":550.0000}
最终输出必须是
person_id;name;flag;cardId;price;rentStatus;mileage
1000;Joseph;1;6638;1000;active;1000.0000
1000;Joseph;1;5566;2000;active;550.0000
有人可以帮我写这个脚本吗?
可以与 pandas 或任何 python 3 lib.
有许多不同的方法可以实现这一点。其中之一是,就像您在评论中开始的那样:
- 使用 pandas
读取 CSV 文件
- 为每一行取 person_id 并构建一个调用
- 然后可以从租金
中获取交付的JSON响应
- 然后为每个单独的租赁提取 carId
- 最后将其收集在 row_list
- 然后 row_list 通过 pandas
转换回 csv
没有任何错误处理的非常简单的解决方案可能如下所示:
from types import SimpleNamespace
import pandas as pd
import requests
import json
path = '/some/path/'
df = pd.read_csv(f'{path}/data.csv', delimiter=';')
rows_list = []
for _, row in df.iterrows():
rentCall = f'http://api.myendpoint.intranet/get-data/{row.person_id}'
print(rentCall)
response = requests.get(rentCall)
r = json.loads(response.text, object_hook=lambda d: SimpleNamespace(**d))
for rent in r.rents:
mileageCall = f'http://api.myendpoint.intranet/get-mileage/{rent.carId}'
print(mileageCall)
response2 = requests.get(mileageCall)
m = json.loads(response2.text, object_hook=lambda d: SimpleNamespace(**d))
state = "active" if r.active else "inactive"
rows_list.append((row['person_id'], row['name'], row['flag'], rent.carId, rent.price, state, m.mileage))
df = pd.DataFrame(rows_list, columns=('person_id', 'name', 'flag', 'carId', 'price', 'rentStatus', 'mileage'))
print(df.to_csv(index=False, sep=';'))
代码说明
- 创建数据框,
df
,pd.read_csv
。
- 预计
'person_id'
中的所有值都是唯一的。
- 在
'person_id'
上使用 .apply
来调用 prepare_data
。
prepare_data
期望 'person_id'
是 str
或 int
,如类型注释所示,Union[int, str]
- 调用
API
,这将 return 一个 dict
,到 prepare_data
函数。
- 将
dict
的 'rents'
键转换为数据帧,pd.json_normalize
。
- 在
'carId'
上使用 .apply
来调用 API
,并提取作为列添加到数据帧 data
的 'mileage'
.
- 将
'person_id'
添加到data
,可用于合并df
和s
。
- 将
pd.Series
、s
转换为数据帧,使用pd.concat
,然后merge
、df
和s
,在person_id
.
- 以所需格式
pd.to_csv
保存到 csv。
潜在问题
- 如果有问题,最有可能出现在
call_api
函数中。
- 只要
call_api
return 是 dict
,就像问题中显示的响应一样,代码的其余部分将正常工作以产生所需的输出。
import pandas as pd
import requests
import json
from typing import Union
def call_api(url: str) -> dict:
r = requests.get(url)
return r.json()
def prepare_data(uid: Union[int, str]) -> pd.DataFrame:
d_url = f'http://api.myendpoint.intranet/get-data/{uid}'
m_url = 'http://api.myendpoint.intranet/get-mileage/'
# get the rent data from the api call
rents = call_api(d_url)['rents']
# normalize rents into a dataframe
data = pd.json_normalize(rents)
# get the mileage data from the api call and add it to data as a column
data['mileage'] = data.carId.apply(lambda cid: call_api(f'{m_url}{cid}')['mileage'])
# add person_id as a column to data, which will be used to merge data to df
data['person_id'] = uid
return data
# read data from file
df = pd.read_csv('file.csv', sep=';')
# call prepare_data
s = df.person_id.apply(prepare_data)
# s is a Series of DataFrames, which can be combined with pd.concat
s = pd.concat([v for v in s])
# join df with s, on person_id
df = df.merge(s, on='person_id')
# save to csv
df.to_csv('output.csv', sep=';', index=False)
- 如果运行这段代码有错误:
- 发表评论,让我知道。
- edit 您的问题,并将整个
TraceBack
作为文本粘贴到代码块中。
例子
# given the following start dataframe
person_id name flag
0 1000 Joseph 1
1 400 Sam 1
# resulting dataframe using the same data for both id 1000 and 400
person_id name flag carId price rentStatus mileage
0 1000 Joseph 1 6638 1000 active 1000.0
1 1000 Joseph 1 5566 2000 active 1000.0
2 400 Sam 1 6638 1000 active 1000.0
3 400 Sam 1 5566 2000 active 1000.0
通过多处理加速
您提到您有 3000 行,这意味着您将不得不进行很多次 API 调用。根据连接情况,这些调用中的每一个都可能需要一段时间。因此,以顺序方式执行此操作可能会太慢。大多数时候,您的程序只会等待服务器的响应,而不做任何其他事情。
我们可以通过使用 multiprocessing.
来提高这个性能
我使用了 中的所有代码,但我替换了以下顺序调用:
# call prepare_data
s = df.person_id.apply(prepare_data)
使用并行替代方案:
from multiprocessing import Pool
n_processes=20 # Experiment with this to see what works well
with Pool(n_processes) as p:
s=p.map(prepare_data, df.person_id)
或者,线程池可能更快,但您必须通过将导入替换为
from multiprocessing.pool import ThreadPool as Pool
.
我需要编写一个 python 脚本来
- 读取包含列 (
person_id
、name
、flag
) 的 csv 文件。该文件有 3000 行。 - 基于 csv 文件中的
person_id
,我需要调用 URL 传递person_id
来执行 GET http://api.myendpoint.intranet/get-data/1234 URL 将 returnperson_id
的一些信息,如下例所示。我需要获取所有租金对象并保存在我的 csv 上。我的输出需要像这样
import pandas as pd
import requests
ids = pd.read_csv(f"{path}/data.csv", delimiter=';')
person_rents = df = pd.DataFrame([], columns=list('person_id','carId','price','rentStatus'))
for id in ids:
response = request.get(f'endpoint/{id["person_id"]}')
json = response.json()
person_rents.append( [person_id, rent['carId'], rent['price'], rent['rentStatus'] ] )
pd.read_csv(f"{path}/data.csv", delimiter=';' )
person_id;name;flag;cardId;price;rentStatus
1000;Joseph;1;6638;1000;active
1000;Joseph;1;5566;2000;active
响应示例
{
"active": false,
"ctodx": false,
"rents": [{
"carId": 6638,
"price": 1000,
"rentStatus": "active"
}, {
"carId": 5566,
"price": 2000,
"rentStatus": "active"
}
],
"responseCode": "OK",
"status": [{
"request": 345,
"requestStatus": "F"
}, {
"requestId": 678,
"requestStatus": "P"
}
],
"transaction": false
}
- 在 csv 上保存来自响应的附加数据后,我需要使用 URL 上的 carId 从另一个端点获取数据。里程结果必须保存在同一个 csv 中。 http://api.myendpoint.intranet/get-mileage/6638 http://api.myendpoint.intranet/get-mileage/5566
每次调用的return会是这样
{"mileage":1000.0000}
{"mileage":550.0000}
最终输出必须是
person_id;name;flag;cardId;price;rentStatus;mileage
1000;Joseph;1;6638;1000;active;1000.0000
1000;Joseph;1;5566;2000;active;550.0000
有人可以帮我写这个脚本吗? 可以与 pandas 或任何 python 3 lib.
有许多不同的方法可以实现这一点。其中之一是,就像您在评论中开始的那样:
- 使用 pandas 读取 CSV 文件
- 为每一行取 person_id 并构建一个调用
- 然后可以从租金 中获取交付的JSON响应
- 然后为每个单独的租赁提取 carId
- 最后将其收集在 row_list
- 然后 row_list 通过 pandas 转换回 csv
没有任何错误处理的非常简单的解决方案可能如下所示:
from types import SimpleNamespace
import pandas as pd
import requests
import json
path = '/some/path/'
df = pd.read_csv(f'{path}/data.csv', delimiter=';')
rows_list = []
for _, row in df.iterrows():
rentCall = f'http://api.myendpoint.intranet/get-data/{row.person_id}'
print(rentCall)
response = requests.get(rentCall)
r = json.loads(response.text, object_hook=lambda d: SimpleNamespace(**d))
for rent in r.rents:
mileageCall = f'http://api.myendpoint.intranet/get-mileage/{rent.carId}'
print(mileageCall)
response2 = requests.get(mileageCall)
m = json.loads(response2.text, object_hook=lambda d: SimpleNamespace(**d))
state = "active" if r.active else "inactive"
rows_list.append((row['person_id'], row['name'], row['flag'], rent.carId, rent.price, state, m.mileage))
df = pd.DataFrame(rows_list, columns=('person_id', 'name', 'flag', 'carId', 'price', 'rentStatus', 'mileage'))
print(df.to_csv(index=False, sep=';'))
代码说明
- 创建数据框,
df
,pd.read_csv
。- 预计
'person_id'
中的所有值都是唯一的。
- 预计
- 在
'person_id'
上使用.apply
来调用prepare_data
。prepare_data
期望'person_id'
是str
或int
,如类型注释所示,Union[int, str]
- 调用
API
,这将 return 一个dict
,到prepare_data
函数。 - 将
dict
的'rents'
键转换为数据帧,pd.json_normalize
。 - 在
'carId'
上使用.apply
来调用API
,并提取作为列添加到数据帧data
的'mileage'
. - 将
'person_id'
添加到data
,可用于合并df
和s
。 - 将
pd.Series
、s
转换为数据帧,使用pd.concat
,然后merge
、df
和s
,在person_id
. - 以所需格式
pd.to_csv
保存到 csv。
潜在问题
- 如果有问题,最有可能出现在
call_api
函数中。 - 只要
call_api
return 是dict
,就像问题中显示的响应一样,代码的其余部分将正常工作以产生所需的输出。
import pandas as pd
import requests
import json
from typing import Union
def call_api(url: str) -> dict:
r = requests.get(url)
return r.json()
def prepare_data(uid: Union[int, str]) -> pd.DataFrame:
d_url = f'http://api.myendpoint.intranet/get-data/{uid}'
m_url = 'http://api.myendpoint.intranet/get-mileage/'
# get the rent data from the api call
rents = call_api(d_url)['rents']
# normalize rents into a dataframe
data = pd.json_normalize(rents)
# get the mileage data from the api call and add it to data as a column
data['mileage'] = data.carId.apply(lambda cid: call_api(f'{m_url}{cid}')['mileage'])
# add person_id as a column to data, which will be used to merge data to df
data['person_id'] = uid
return data
# read data from file
df = pd.read_csv('file.csv', sep=';')
# call prepare_data
s = df.person_id.apply(prepare_data)
# s is a Series of DataFrames, which can be combined with pd.concat
s = pd.concat([v for v in s])
# join df with s, on person_id
df = df.merge(s, on='person_id')
# save to csv
df.to_csv('output.csv', sep=';', index=False)
- 如果运行这段代码有错误:
- 发表评论,让我知道。
- edit 您的问题,并将整个
TraceBack
作为文本粘贴到代码块中。
例子
# given the following start dataframe
person_id name flag
0 1000 Joseph 1
1 400 Sam 1
# resulting dataframe using the same data for both id 1000 and 400
person_id name flag carId price rentStatus mileage
0 1000 Joseph 1 6638 1000 active 1000.0
1 1000 Joseph 1 5566 2000 active 1000.0
2 400 Sam 1 6638 1000 active 1000.0
3 400 Sam 1 5566 2000 active 1000.0
通过多处理加速
您提到您有 3000 行,这意味着您将不得不进行很多次 API 调用。根据连接情况,这些调用中的每一个都可能需要一段时间。因此,以顺序方式执行此操作可能会太慢。大多数时候,您的程序只会等待服务器的响应,而不做任何其他事情。 我们可以通过使用 multiprocessing.
来提高这个性能我使用了
# call prepare_data
s = df.person_id.apply(prepare_data)
使用并行替代方案:
from multiprocessing import Pool
n_processes=20 # Experiment with this to see what works well
with Pool(n_processes) as p:
s=p.map(prepare_data, df.person_id)
或者,线程池可能更快,但您必须通过将导入替换为
from multiprocessing.pool import ThreadPool as Pool
.