Python Panda Dataframe 的并发期货
Python Concurrent Futures for Panda Dataframe
我有一个包含几千行的数据框
input_df
case_id api_param stat
1 data1 1
2 data2 0
1 data3 0
4 data4 0
1 data5 1
我做了一个 groupBy(case_id)
并得到:
case_id 1 2 3
1 data1 data3 data5
2 data2 nan nan
4 data4 nan nan
现在假设对于每个 case_id
,我想为所有 case_id 修改 api_param 列中的日期值,其中 stat == 0。=> 修改数据 2 ,数据 3,数据 4。
为此,我决定在先前数据的 k 个数据点内选择一个新数据,并调用 API 检查数据是否有效;
ie: url = https:// 例子..com/over/there?name=api_param[i] with api_param == data2 +k data pnt 例如上面的 case_id ==2。
如果 API 响应是 200 那么我可以覆盖 input_df
.
中的旧值
现在我的文件中可能有成千上万个这样的案例,每个案例都有很多数据点要更改。
假设我有 300 个案例,每个案例有 100 个要修改的日期
因此使用 Python requests API 会非常慢。我想使用 concurrent.futures;我该怎么做呢?
你可以使用 multiprocessing.pool.ThreadPool
.
from multiprocessing.pool import ThreadPool
from datetime import timedelta
# get dates with stat=0
dates = input_df[input_df['stat']==0]['api_param']
# get urls, add 7 days to date (assuming date is already datetime.datetime)
urls = dates.apply(lambda date_obj: 'https:// example..com/over/there?name=%s' % str(date_obj+timedelta(days=7))).tolist()
with ThreadPool(10) as pool:
results = pool.map(request.get, urls)
# add request status to input_df
input_df['request_status'] = 0
input_df.loc[input_df['stat']==0, 'request_status'] = [x.status_code for x in results]
# update dates
input_df.loc[(input_df['stat']==0) & (input_df['request_status']==200), 'api_params'] = input_df.loc[(input_df['stat']==0) & (input_df['request_status']==200), 'api_params'].apply(lambda date_obj: date_obj+timdelta(days=7))
请尝试使用此功能
def check_api_call(count, dates):
length = dates.values.__len__()
executor = futures.ThreadPoolExecutor()
for i in range(length):
date = dates.values[i]
pool = executor.submit(task_api, date)
response = pool.result()
while not response:
count = count + 1
day_value = count * 7
td = pd.to_timedelta(day_value, unit='d')
delta_date = datetime.strptime(date, "%Y-%m-%d") + td
new_date = delta_date.strftime("%Y-%m-%d")
pool = executor.submit(task_api, new_date)
response = pool.result()
if not response:
continue
dates.values[i] = new_date
return True, dates
def task_api(date):
url = "https:// example..com/over/there?name=" + date
response = requests.get(url)
if response.status_code == 404:
return False
else:
return True
我有一个包含几千行的数据框
input_df
case_id api_param stat
1 data1 1
2 data2 0
1 data3 0
4 data4 0
1 data5 1
我做了一个 groupBy(case_id)
并得到:
case_id 1 2 3
1 data1 data3 data5
2 data2 nan nan
4 data4 nan nan
现在假设对于每个 case_id
,我想为所有 case_id 修改 api_param 列中的日期值,其中 stat == 0。=> 修改数据 2 ,数据 3,数据 4。
为此,我决定在先前数据的 k 个数据点内选择一个新数据,并调用 API 检查数据是否有效;
ie: url = https:// 例子..com/over/there?name=api_param[i] with api_param == data2 +k data pnt 例如上面的 case_id ==2。
如果 API 响应是 200 那么我可以覆盖 input_df
.
现在我的文件中可能有成千上万个这样的案例,每个案例都有很多数据点要更改。 假设我有 300 个案例,每个案例有 100 个要修改的日期
因此使用 Python requests API 会非常慢。我想使用 concurrent.futures;我该怎么做呢?
你可以使用 multiprocessing.pool.ThreadPool
.
from multiprocessing.pool import ThreadPool
from datetime import timedelta
# get dates with stat=0
dates = input_df[input_df['stat']==0]['api_param']
# get urls, add 7 days to date (assuming date is already datetime.datetime)
urls = dates.apply(lambda date_obj: 'https:// example..com/over/there?name=%s' % str(date_obj+timedelta(days=7))).tolist()
with ThreadPool(10) as pool:
results = pool.map(request.get, urls)
# add request status to input_df
input_df['request_status'] = 0
input_df.loc[input_df['stat']==0, 'request_status'] = [x.status_code for x in results]
# update dates
input_df.loc[(input_df['stat']==0) & (input_df['request_status']==200), 'api_params'] = input_df.loc[(input_df['stat']==0) & (input_df['request_status']==200), 'api_params'].apply(lambda date_obj: date_obj+timdelta(days=7))
请尝试使用此功能
def check_api_call(count, dates):
length = dates.values.__len__()
executor = futures.ThreadPoolExecutor()
for i in range(length):
date = dates.values[i]
pool = executor.submit(task_api, date)
response = pool.result()
while not response:
count = count + 1
day_value = count * 7
td = pd.to_timedelta(day_value, unit='d')
delta_date = datetime.strptime(date, "%Y-%m-%d") + td
new_date = delta_date.strftime("%Y-%m-%d")
pool = executor.submit(task_api, new_date)
response = pool.result()
if not response:
continue
dates.values[i] = new_date
return True, dates
def task_api(date):
url = "https:// example..com/over/there?name=" + date
response = requests.get(url)
if response.status_code == 404:
return False
else:
return True