使用多线程的 Asyncio 请求
Asyncio requests using multithreading
我有很多公司,正在调用 REST API 来获取每家公司的每日股价。详细信息存储在 PostgreSQL 数据库中。核心函数如下所示:
async def get_data_asynchronous():
conn = await asyncpg.connect(**DBConn)
path = 'path'
source = pd.read_excel(io=path + 'companies.xlsx', sheet_name='data')
retries = Retry(total=2, backoff_factor=1, status_forcelist=[404, 502, 503, 504])
dates = pd.date_range('2015-01-01', '2019-12-01', freq='D').strftime("%d-%m-%Y").tolist()
with ThreadPoolExecutor(max_workers=10) as executor:
with requests.Session() as session:
session.mount('https://', HTTPAdapter(max_retries=retries))
loop = asyncio.get_event_loop()
for index, inputrow in source.iterrows():
try:
if int(inputrow['rowid']) > 0:
compid = inputrow['compid'].lower().strip()
tasks = [
loop.run_in_executor(
executor,
fetch,
*(session, compid, datetime.datetime.strptime(str(dates[i-1]), '%d-%m-%Y'), datetime.datetime.strptime(str(dates[i]), '%d-%m-%Y'))
)
for i in range(len(dates))
]
for content in await asyncio.gather(*tasks):
if content is not None:
for data in content:
compid = data.get('compid', '')
date = data.get('date', '')
stock_price = data.get('sprice', '')
try:
await conn.execute('''
INSERT INTO comp_dailyhistory VALUES(, , )
''', compid, date, stock_price)
except Exception as e:
print('ERROR')
pass
pass
except Exception as e:
print(str(e))
pass
在上述函数中,我首先从 excel 工作表(来源)中获取公司列表并创建日期列表。由于我的列表中有超过 20 万家公司,我创建了一个最多包含 10 个 worker 的 ThreadPoolExecutor。目的是传递每个公司 ID (compid) 和日期范围内的两个连续日期
以异步的方式到一个'fetch'函数,从而加速整个数据采集过程。提取函数如下所示:
def fetch(session, compid, start, stop):
base_url = 'baseurl'
try:
with session.get(base_url + 'compid=' + compid + '&begin=' + str(int(start.timestamp())) + '&end=' + str(int(stop.timestamp())), timeout=None) as data:
content = []
if data.status_code == 200:
for item in data.json():
ret = {'compid': compid, 'date': str(date), 'sprice': sprice}
content.append(ret)
return content
else:
return None
except Exception as e:
return None
fetch 函数使用 requests.get 获取公司在开始日期和停止日期之间的股票价格列表,将 JSON 响应解析为键值对列表,并且 returns 它们到调用函数。然后返回的列表由调用函数中的 asyncio.gather 函数获取,其中每个股票价格都使用 asyncpg 存储在 postgreSQL 中。其余代码如下:
def main():
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(get_data_asynchronous())
loop.run_until_complete(future)
main()
此设置的主要问题是脚本似乎没有获取给定公司的全套价格。例如,对于 compid = 1,应该有正好 600 个每日价格。但是,每次脚本为 运行 时,我都会得到不同的结果,它总是低于真实计数。例如,我在第一个 运行 中获得 550 个每日价格,在第二个 运行 中获得 570 个,在第三个 运行 中获得 540 个,依此类推....
为什么我的脚本无法获取 600 个每日价格的完整列表?我的一些请求是否以某种方式被丢弃了?我尝试了使用 aiohttp 请求的替代方法,但没有取得太大进展。
我没有多线程编程的经验,尤其是 asyncio,我真的很感激这方面的任何帮助吗?在此先感谢您的时间。
我做过几个涉及抓取网站的项目,每天获取数千个股票价格。正如 dano 所建议的,该问题与您的错误处理有关:
except Exception as e:
return None
这对处理失败的请求没有任何作用。您可以将失败的 url 附加到列表中,并在脚本的末尾 运行 您的 "get" 函数再次使用这些 url。如果您的信息很重要,您甚至可以定义一个函数,该函数至少尝试 5-10 次来下载它之前的股票信息 returns None。
与多线程问题更相关,您需要注意每个 second/minute/hour 的请求数,避免超过 API/website 速率限制。您可以为此使用多个代理。
希望对您有所帮助。
我有很多公司,正在调用 REST API 来获取每家公司的每日股价。详细信息存储在 PostgreSQL 数据库中。核心函数如下所示:
async def get_data_asynchronous():
conn = await asyncpg.connect(**DBConn)
path = 'path'
source = pd.read_excel(io=path + 'companies.xlsx', sheet_name='data')
retries = Retry(total=2, backoff_factor=1, status_forcelist=[404, 502, 503, 504])
dates = pd.date_range('2015-01-01', '2019-12-01', freq='D').strftime("%d-%m-%Y").tolist()
with ThreadPoolExecutor(max_workers=10) as executor:
with requests.Session() as session:
session.mount('https://', HTTPAdapter(max_retries=retries))
loop = asyncio.get_event_loop()
for index, inputrow in source.iterrows():
try:
if int(inputrow['rowid']) > 0:
compid = inputrow['compid'].lower().strip()
tasks = [
loop.run_in_executor(
executor,
fetch,
*(session, compid, datetime.datetime.strptime(str(dates[i-1]), '%d-%m-%Y'), datetime.datetime.strptime(str(dates[i]), '%d-%m-%Y'))
)
for i in range(len(dates))
]
for content in await asyncio.gather(*tasks):
if content is not None:
for data in content:
compid = data.get('compid', '')
date = data.get('date', '')
stock_price = data.get('sprice', '')
try:
await conn.execute('''
INSERT INTO comp_dailyhistory VALUES(, , )
''', compid, date, stock_price)
except Exception as e:
print('ERROR')
pass
pass
except Exception as e:
print(str(e))
pass
在上述函数中,我首先从 excel 工作表(来源)中获取公司列表并创建日期列表。由于我的列表中有超过 20 万家公司,我创建了一个最多包含 10 个 worker 的 ThreadPoolExecutor。目的是传递每个公司 ID (compid) 和日期范围内的两个连续日期 以异步的方式到一个'fetch'函数,从而加速整个数据采集过程。提取函数如下所示:
def fetch(session, compid, start, stop):
base_url = 'baseurl'
try:
with session.get(base_url + 'compid=' + compid + '&begin=' + str(int(start.timestamp())) + '&end=' + str(int(stop.timestamp())), timeout=None) as data:
content = []
if data.status_code == 200:
for item in data.json():
ret = {'compid': compid, 'date': str(date), 'sprice': sprice}
content.append(ret)
return content
else:
return None
except Exception as e:
return None
fetch 函数使用 requests.get 获取公司在开始日期和停止日期之间的股票价格列表,将 JSON 响应解析为键值对列表,并且 returns 它们到调用函数。然后返回的列表由调用函数中的 asyncio.gather 函数获取,其中每个股票价格都使用 asyncpg 存储在 postgreSQL 中。其余代码如下:
def main():
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(get_data_asynchronous())
loop.run_until_complete(future)
main()
此设置的主要问题是脚本似乎没有获取给定公司的全套价格。例如,对于 compid = 1,应该有正好 600 个每日价格。但是,每次脚本为 运行 时,我都会得到不同的结果,它总是低于真实计数。例如,我在第一个 运行 中获得 550 个每日价格,在第二个 运行 中获得 570 个,在第三个 运行 中获得 540 个,依此类推....
为什么我的脚本无法获取 600 个每日价格的完整列表?我的一些请求是否以某种方式被丢弃了?我尝试了使用 aiohttp 请求的替代方法,但没有取得太大进展。
我没有多线程编程的经验,尤其是 asyncio,我真的很感激这方面的任何帮助吗?在此先感谢您的时间。
我做过几个涉及抓取网站的项目,每天获取数千个股票价格。正如 dano 所建议的,该问题与您的错误处理有关:
except Exception as e:
return None
这对处理失败的请求没有任何作用。您可以将失败的 url 附加到列表中,并在脚本的末尾 运行 您的 "get" 函数再次使用这些 url。如果您的信息很重要,您甚至可以定义一个函数,该函数至少尝试 5-10 次来下载它之前的股票信息 returns None。
与多线程问题更相关,您需要注意每个 second/minute/hour 的请求数,避免超过 API/website 速率限制。您可以为此使用多个代理。
希望对您有所帮助。