如何从多个 API 调用中更新 pandas 数据框

How to update a pandas dataframe, from multiple API calls

我需要编写一个 python 脚本来

  1. 读取包含列 (person_idnameflag) 的 csv 文件。该文件有 3000 行。
  2. 基于 csv 文件中的 person_id,我需要调用 URL 传递 person_id 来执行 GET http://api.myendpoint.intranet/get-data/1234 URL 将 return person_id 的一些信息,如下例所示。我需要获取所有租金对象并保存在我的 csv 上。我的输出需要像这样
import pandas as pd
import requests

ids = pd.read_csv(f"{path}/data.csv", delimiter=';')
person_rents = df = pd.DataFrame([], columns=list('person_id','carId','price','rentStatus'))

for id in ids:
    response = request.get(f'endpoint/{id["person_id"]}')
    json = response.json()
    person_rents.append( [person_id, rent['carId'], rent['price'], rent['rentStatus'] ] )
    pd.read_csv(f"{path}/data.csv", delimiter=';' )
person_id;name;flag;cardId;price;rentStatus
1000;Joseph;1;6638;1000;active
1000;Joseph;1;5566;2000;active

响应示例

{
    "active": false,
    "ctodx": false,
    "rents": [{
            "carId": 6638,
            "price": 1000,
            "rentStatus": "active"
        }, {
            "carId": 5566,
            "price": 2000,
            "rentStatus": "active"
        }
    ],
    "responseCode": "OK",
    "status": [{
            "request": 345,
            "requestStatus": "F"
        }, {
            "requestId": 678,
            "requestStatus": "P"
        }
    ],
    "transaction": false
}
  1. 在 csv 上保存来自响应的附加数据后,我需要使用 URL 上的 carId 从另一个端点获取数据。里程结果必须保存在同一个 csv 中。 http://api.myendpoint.intranet/get-mileage/6638 http://api.myendpoint.intranet/get-mileage/5566

每次调用的return会是这样

{"mileage":1000.0000}
{"mileage":550.0000}

最终输出必须是

person_id;name;flag;cardId;price;rentStatus;mileage
1000;Joseph;1;6638;1000;active;1000.0000
1000;Joseph;1;5566;2000;active;550.0000

有人可以帮我写这个脚本吗? 可以与 pandas 或任何 python 3 lib.

有许多不同的方法可以实现这一点。其中之一是,就像您在评论中开始的那样:

  • 使用 pandas
  • 读取 CSV 文件
  • 为每一行取 person_id 并构建一个调用
  • 然后可以从租金
  • 中获取交付的JSON响应
  • 然后为每个单独的租赁提取 carId
  • 最后将其收集在 row_list
  • 然后 row_list 通过 pandas
  • 转换回 csv

没有任何错误处理的非常简单的解决方案可能如下所示:

from types import SimpleNamespace

import pandas as pd
import requests
import json

path = '/some/path/'
df = pd.read_csv(f'{path}/data.csv', delimiter=';')

rows_list = []
for _, row in df.iterrows():
    rentCall = f'http://api.myendpoint.intranet/get-data/{row.person_id}'
    print(rentCall)
    response = requests.get(rentCall)
    r = json.loads(response.text, object_hook=lambda d: SimpleNamespace(**d))
    for rent in r.rents:
        mileageCall = f'http://api.myendpoint.intranet/get-mileage/{rent.carId}'
        print(mileageCall)
        response2 = requests.get(mileageCall)
        m = json.loads(response2.text, object_hook=lambda d: SimpleNamespace(**d))
        state = "active" if r.active else "inactive"
        rows_list.append((row['person_id'], row['name'], row['flag'], rent.carId, rent.price, state, m.mileage))
df = pd.DataFrame(rows_list, columns=('person_id', 'name', 'flag', 'carId', 'price', 'rentStatus', 'mileage'))
print(df.to_csv(index=False, sep=';'))

代码说明

  • 创建数据框,dfpd.read_csv
    • 预计 'person_id' 中的所有值都是唯一的。
  • 'person_id' 上使用 .apply 来调用 prepare_data
    • prepare_data 期望 'person_id'strint,如类型注释所示,Union[int, str]
  • 调用 API,这将 return 一个 dict,到 prepare_data 函数。
  • dict'rents' 键转换为数据帧,pd.json_normalize
  • 'carId' 上使用 .apply 来调用 API,并提取作为列添加到数据帧 data'mileage' .
  • 'person_id'添加到data,可用于合并dfs
  • pd.Seriess转换为数据帧,使用pd.concat,然后mergedfs,在person_id.
  • 以所需格式 pd.to_csv 保存到 csv。

潜在问题

  • 如果有问题,最有可能出现在 call_api 函数中。
  • 只要 call_api return 是 dict,就像问题中显示的响应一样,代码的其余部分将正常工作以产生所需的输出。
import pandas as pd
import requests
import json
from typing import Union

def call_api(url: str) -> dict:
    r = requests.get(url)
    return r.json()

def prepare_data(uid: Union[int, str]) -> pd.DataFrame:
    
    d_url = f'http://api.myendpoint.intranet/get-data/{uid}'
    m_url = 'http://api.myendpoint.intranet/get-mileage/'
    
    # get the rent data from the api call
    rents = call_api(d_url)['rents']
    # normalize rents into a dataframe
    data = pd.json_normalize(rents)
    
    # get the mileage data from the api call and add it to data as a column
    data['mileage'] = data.carId.apply(lambda cid: call_api(f'{m_url}{cid}')['mileage'])
    # add person_id as a column to data, which will be used to merge data to df
    data['person_id'] = uid
    
    return data
    

# read data from file
df = pd.read_csv('file.csv', sep=';')

# call prepare_data
s = df.person_id.apply(prepare_data)

# s is a Series of DataFrames, which can be combined with pd.concat
s = pd.concat([v for v in s])

# join df with s, on person_id
df = df.merge(s, on='person_id')

# save to csv
df.to_csv('output.csv', sep=';', index=False)
  • 如果运行这段代码有错误:
    1. 发表评论,让我知道。
    2. edit 您的问题,并将整个 TraceBack 作为文本粘贴到代码块中。

例子

# given the following start dataframe
   person_id    name  flag
0       1000  Joseph     1
1        400     Sam     1

# resulting dataframe using the same data for both id 1000 and 400
   person_id    name  flag  carId  price rentStatus  mileage
0       1000  Joseph     1   6638   1000     active   1000.0
1       1000  Joseph     1   5566   2000     active   1000.0
2        400     Sam     1   6638   1000     active   1000.0
3        400     Sam     1   5566   2000     active   1000.0

通过多处理加速

您提到您有 3000 行,这意味着您将不得不进行很多次 API 调用。根据连接情况,这些调用中的每一个都可能需要一段时间。因此,以顺序方式执行此操作可能会太慢。大多数时候,您的程序只会等待服务器的响应,而不做任何其他事情。 我们可以通过使用 multiprocessing.

来提高这个性能

我使用了 中的所有代码,但我替换了以下顺序调用:

# call prepare_data
s = df.person_id.apply(prepare_data)

使用并行替代方案:

from multiprocessing import Pool
n_processes=20  # Experiment with this to see what works well
with Pool(n_processes) as p:
  s=p.map(prepare_data, df.person_id)

或者,线程池可能更快,但您必须通过将导入替换为 from multiprocessing.pool import ThreadPool as Pool.