Python Panda Dataframe 的并发期货

Python Concurrent Futures for Panda Dataframe

我有一个包含几千行的数据框

input_df

case_id api_param   stat
1        data1      1
2        data2      0
1        data3      0
4        data4      0
1        data5      1

我做了一个 groupBy(case_id) 并得到:

  case_id    1      2       3  
      1     data1  data3  data5          
      2     data2  nan    nan   
      4     data4  nan    nan

现在假设对于每个 case_id,我想为所有 case_id 修改 api_param 列中的日期值,其中 stat == 0。=> 修改数据 2 ,数据 3,数据 4。 为此,我决定在先前数据的 k 个数据点内选择一个新数据,并调用 API 检查数据是否有效;

ie: url = https:// 例子..com/over/there?name=api_param[i] with api_param == data2 +k data pnt 例如上面的 case_id ==2。 如果 API 响应是 200 那么我可以覆盖 input_df.

中的旧值

现在我的文件中可能有成千上万个这样的案例,每个案例都有很多数据点要更改。 假设我有 300 个案例,每个案例有 100 个要修改的日期

因此使用 Python requests API 会非常慢。我想使用 concurrent.futures;我该怎么做呢?

你可以使用 multiprocessing.pool.ThreadPool.

from multiprocessing.pool import ThreadPool
from datetime import timedelta

# get dates with stat=0
dates = input_df[input_df['stat']==0]['api_param']
# get urls, add 7 days to date (assuming date is already datetime.datetime)
urls = dates.apply(lambda date_obj: 'https:// example..com/over/there?name=%s' % str(date_obj+timedelta(days=7))).tolist()

with ThreadPool(10) as pool:
    results = pool.map(request.get, urls)

# add request status to input_df
input_df['request_status'] = 0
input_df.loc[input_df['stat']==0, 'request_status'] = [x.status_code for x in results]

# update dates
input_df.loc[(input_df['stat']==0) & (input_df['request_status']==200), 'api_params'] = input_df.loc[(input_df['stat']==0) & (input_df['request_status']==200), 'api_params'].apply(lambda date_obj: date_obj+timdelta(days=7))

请尝试使用此功能

def check_api_call(count, dates):
    length = dates.values.__len__()
    executor = futures.ThreadPoolExecutor()
    for i in range(length):
        date = dates.values[i]
        pool = executor.submit(task_api, date)
        response = pool.result()
        while not response:
            count = count + 1
            day_value = count * 7
            td = pd.to_timedelta(day_value, unit='d')
            delta_date = datetime.strptime(date, "%Y-%m-%d") + td
            new_date = delta_date.strftime("%Y-%m-%d")
            pool = executor.submit(task_api, new_date)
            response = pool.result()
            if not response:
                continue
            dates.values[i] = new_date
    return True, dates

def task_api(date):
    url = "https:// example..com/over/there?name=" + date
    response = requests.get(url)
    if response.status_code == 404:
        return False
    else:
        return True