使用多线程模块将 API 数据检索到数据框中
Retrieve API data into dataframe using multi threading module
我正在使用第三方 API 从大量天数中为不同的标签检索 10 分钟的数据。当前的数据提取可能需要几分钟,具体取决于天数和标签数量。因此,我正在尝试多线程,我知道这对繁重的 IO 操作很有用。
API 调用如下(我已经替换了实际的 API 名称):
import numpy as N
import requests as r
import json
import pandas as pd
from datetime import datetime
import concurrent.futures
class pyGeneric:
def __init__(self, serverName, apiKey, rootApiUrl='/Generic.Services/api'):
"""
Initialize a connection to server, and return a pyGeneric server object
"""
self.baseUrl = serverName + rootApiUrl
self.apiKey = apiKey
self.bearer = 'Bearer ' + apiKey
self.header = {'mediaType':'application/json','Authorization':self.bearer}
def getRawMeasurementsJson(self, tag, start, end):
apiQuery = '/measurements/' + tag + '/from/' + start + '/to/' + end + '?format=json'
dataresponse = r.get(self.baseUrl+apiQuery, headers=self.header)
data = json.loads(dataresponse.text)
return data
def getAggregatesPandas(self, tags, start, end):
"""
Return tag(s) in a pandas dataFrame
"""
df = pd.DataFrame()
if type(tags) == str:
tags = [tags]
for tag in tags:
tempJson = self.getRawMeasurementsJson(tag, start, end)
tempDf = pd.DataFrame(tempJson['timeSeriesList'][0]['timeSeries'])
name = tempJson['timeSeriesList'][0]['measurementName']
df['TimeUtc'] = [datetime.fromtimestamp(i/1000) for i in tempDf['t']]
df['TimeUtc'] = df['TimeUtc'].dt.round('min')
df[name] = tempDf['v']
return df
gener = pyGeneric('https://api.generic.com', 'auth_keymlkj9789878686')
对 API 的调用示例如下:
gener_df = gener.getAggregatesPandas('tag1.10m.SQL', '*-10d', '*')
这对单个标签有效,但对于列表,这需要更长的时间,这就是我一直在尝试以下操作的原因:
tags = ['tag1.10m.SQL',
'tag2.10m.SQL',
'tag3.10m.SQL',
'tag4.10m.SQL',
'tag5.10m.SQL',
'tag6.10m.SQL',
'tag7.10m.SQL',
'tag8.10m.SQL',
'tag9.10m.SQL',
'tag10.10m.SQL']
startdate = "*-150d"
enddate = '*'
final_df = pd.DataFrame
with concurrent.futures.ThreadPoolExecutor() as executor:
args = ((i,startdate, enddate) for i in tags)
executor.map(lambda p: gener.getAggregatesPandas(*p), args)
但是我无法检查 gener.getAggregatesPandas 是否被正确执行。最终我想在一个名为 final_df 的数据框中获得结果,但也不确定如何进行。我在这篇 中读到,在上下文管理器中附加会导致数据帧的二次副本,因此最终会减慢速度。
你可以试试下面的,如果服务器也能处理的话,它会很容易让你并行发出很多请求;
# it's just a wrapper around concurrent.futures ThreadPoolExecutor with a nice tqdm progress bar!
from tqdm.contrib.concurrent import thread_map, process_map # for multi-threading, multi-processing respectively)
def chunk_list(lst, size):
"""
From SO only;
Yield successive n-sized chunks from list.
"""
for i in range(0, len(lst), size):
yield lst[i:i + size]
for idx, my_chunk in enumerate(chunk_list(huge_list, size=2**12)):
for response in thread_map(<which_func_to_call>, my_chunk, max_workers=your_cpu_cores+6)):
# which_func_to_call -> wrap the returned response json obj in this, etc
# do something with the response now..
# make sure to cache the chunk results as well
编辑 1:
from functools import partial
startdate = "*-150d"
enddate = '*'
my_new_func = partial(which_func_to_call, startdate=startdate, enddate=enddate)
现在我们可以改用这个功能了;
注意 -> my_new_func
现在接受一个参数..
编辑 2:
对于缓存,我建议使用 csv
模块并将您想要的响应写入 csv 文件,而不是使用 pandas 等;或者您可以根据需要转储 JSON 响应等; JSON/dict 类响应的示例代码如下所示,
import csv
import os
with open(OUTPUT_FILE_NAME, "a+", newline="") as csvfile:
# fieldnames = [your_headers_list]
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
# Make sure you write the header only once as we are opening the file in append mode (writer.writeheader())
for idx, my_chunk in enumerate(chunk_list(<huge_list>, size=CHUNK_SIZE)):
for response in thread_map(
<my_partial_wrapped_func>, my_chunk, max_workers=min(32, os.cpu_count() + 6)
):
# .......
# .......
writer.writerow(<row_of_the_csv_as_a_dict_with_fieldnames_as_keys>)
据我正确理解,您需要了解 getAggregatesPandas 是否正确执行。
你可以像下面那样做。
with concurrent.futures.ThreadPoolExecutor() as executor:
args = ((i,startdate, enddate) for i in tags)
results = executor.map(lambda p: gener.getAggregatesPandas(*p), args)
for result in results:
final_df.append(result,ignore_index=False)
#another approach is below
#for f in concurrent.futures.as_completed(results):
# final_df.append(result,ignore_index=False)
参考视频:-video
我正在使用第三方 API 从大量天数中为不同的标签检索 10 分钟的数据。当前的数据提取可能需要几分钟,具体取决于天数和标签数量。因此,我正在尝试多线程,我知道这对繁重的 IO 操作很有用。
API 调用如下(我已经替换了实际的 API 名称):
import numpy as N
import requests as r
import json
import pandas as pd
from datetime import datetime
import concurrent.futures
class pyGeneric:
def __init__(self, serverName, apiKey, rootApiUrl='/Generic.Services/api'):
"""
Initialize a connection to server, and return a pyGeneric server object
"""
self.baseUrl = serverName + rootApiUrl
self.apiKey = apiKey
self.bearer = 'Bearer ' + apiKey
self.header = {'mediaType':'application/json','Authorization':self.bearer}
def getRawMeasurementsJson(self, tag, start, end):
apiQuery = '/measurements/' + tag + '/from/' + start + '/to/' + end + '?format=json'
dataresponse = r.get(self.baseUrl+apiQuery, headers=self.header)
data = json.loads(dataresponse.text)
return data
def getAggregatesPandas(self, tags, start, end):
"""
Return tag(s) in a pandas dataFrame
"""
df = pd.DataFrame()
if type(tags) == str:
tags = [tags]
for tag in tags:
tempJson = self.getRawMeasurementsJson(tag, start, end)
tempDf = pd.DataFrame(tempJson['timeSeriesList'][0]['timeSeries'])
name = tempJson['timeSeriesList'][0]['measurementName']
df['TimeUtc'] = [datetime.fromtimestamp(i/1000) for i in tempDf['t']]
df['TimeUtc'] = df['TimeUtc'].dt.round('min')
df[name] = tempDf['v']
return df
gener = pyGeneric('https://api.generic.com', 'auth_keymlkj9789878686')
对 API 的调用示例如下:
gener_df = gener.getAggregatesPandas('tag1.10m.SQL', '*-10d', '*')
这对单个标签有效,但对于列表,这需要更长的时间,这就是我一直在尝试以下操作的原因:
tags = ['tag1.10m.SQL',
'tag2.10m.SQL',
'tag3.10m.SQL',
'tag4.10m.SQL',
'tag5.10m.SQL',
'tag6.10m.SQL',
'tag7.10m.SQL',
'tag8.10m.SQL',
'tag9.10m.SQL',
'tag10.10m.SQL']
startdate = "*-150d"
enddate = '*'
final_df = pd.DataFrame
with concurrent.futures.ThreadPoolExecutor() as executor:
args = ((i,startdate, enddate) for i in tags)
executor.map(lambda p: gener.getAggregatesPandas(*p), args)
但是我无法检查 gener.getAggregatesPandas 是否被正确执行。最终我想在一个名为 final_df 的数据框中获得结果,但也不确定如何进行。我在这篇
你可以试试下面的,如果服务器也能处理的话,它会很容易让你并行发出很多请求;
# it's just a wrapper around concurrent.futures ThreadPoolExecutor with a nice tqdm progress bar!
from tqdm.contrib.concurrent import thread_map, process_map # for multi-threading, multi-processing respectively)
def chunk_list(lst, size):
"""
From SO only;
Yield successive n-sized chunks from list.
"""
for i in range(0, len(lst), size):
yield lst[i:i + size]
for idx, my_chunk in enumerate(chunk_list(huge_list, size=2**12)):
for response in thread_map(<which_func_to_call>, my_chunk, max_workers=your_cpu_cores+6)):
# which_func_to_call -> wrap the returned response json obj in this, etc
# do something with the response now..
# make sure to cache the chunk results as well
编辑 1:
from functools import partial
startdate = "*-150d"
enddate = '*'
my_new_func = partial(which_func_to_call, startdate=startdate, enddate=enddate)
现在我们可以改用这个功能了;
注意 -> my_new_func
现在接受一个参数..
编辑 2:
对于缓存,我建议使用 csv
模块并将您想要的响应写入 csv 文件,而不是使用 pandas 等;或者您可以根据需要转储 JSON 响应等; JSON/dict 类响应的示例代码如下所示,
import csv
import os
with open(OUTPUT_FILE_NAME, "a+", newline="") as csvfile:
# fieldnames = [your_headers_list]
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
# Make sure you write the header only once as we are opening the file in append mode (writer.writeheader())
for idx, my_chunk in enumerate(chunk_list(<huge_list>, size=CHUNK_SIZE)):
for response in thread_map(
<my_partial_wrapped_func>, my_chunk, max_workers=min(32, os.cpu_count() + 6)
):
# .......
# .......
writer.writerow(<row_of_the_csv_as_a_dict_with_fieldnames_as_keys>)
据我正确理解,您需要了解 getAggregatesPandas 是否正确执行。
你可以像下面那样做。
with concurrent.futures.ThreadPoolExecutor() as executor:
args = ((i,startdate, enddate) for i in tags)
results = executor.map(lambda p: gener.getAggregatesPandas(*p), args)
for result in results:
final_df.append(result,ignore_index=False)
#another approach is below
#for f in concurrent.futures.as_completed(results):
# final_df.append(result,ignore_index=False)
参考视频:-video