在嵌套循环中使用 asyncio nested_future() 和 gather()
Using asyncio nested_future() and gather() with nested loops
在尝试并行执行一些异步函数时,总是出现错误,我想了解一下。
运行 Python 3.5.1 上的 asyncio,我通过 ClientSession 结合 aiohttp 和 aiopg(异步 psycopg2)调用。
主要思想是我有一个循环从一个 table 中读取行,在行上循环并并行执行对函数 row_loop 的多次调用 这将异步启动 Web 请求 GET,然后通过另一个游标将每一行的结果写入同一数据库。
ensure_future() 和 gather() 似乎工作正常,但只有第一个数据库写入通过,另一个导致异常抱怨另一个协程已经在等待。
Traceback (most recent call last):
File "sandbox/loop.py", line 148, in write_dict
await cur2.execute(INSERT, (tuple(adict.values()),))
File "/Users/mrigal/.virtualenvs/py35/lib/python3.5/site-packages/aiopg/cursor.py", line 103, in execute
waiter = self._conn._create_waiter('cursor.execute')
File "/Users/mrigal/.virtualenvs/py35/lib/python3.5/site-packages/aiopg/connection.py", line 211, in _create_waiter
'data' % func_name)
RuntimeError: cursor.execute() called while another coroutine is already waiting for incoming data
问题可能出在 aiopg 库中,或者可能是我正在针对 main 而不是针对 .gather() 实际发生的函数注册循环。但是我找不到关于它的文档...
如果不使用 ensure_future() 和 gather(),代码会很慢,因为每个调用都是一个接一个地完成的。我可能没有很好地理解 gather() 的目的,我可能需要一个真正的多线程解决方案,但我想先测试一下这个中间步骤。
这是我的实际代码:
async def make_get(row, session, spec_country_code=None):
result = await session.get(URL, country=spec_country_code)
if not spec_country_code and result.country != row.get('country'):
return await make_get(row, session, spec_country_code=result.country)
return result
async def write_dict(conn, adict):
async with conn.cursor() as cur2:
await cur2.execute(INSERT_QUERY, (tuple(adict.values()),))
async def row_loop(conn, row, session):
result = await make_get(row=row, session=session)
if result.status == 'OVER_QUERY_LIMIT':
raise OverQueryLimitException()
else:
adict = build_adict(row, result)
await write_dict(conn=conn, adict= adict)
return result.status
async def write_loop(conn):
failed_count = 0
rows = []
async with aiohttp.ClientSession() as session:
async with conn.cursor(cursor_factory=DictCursor) as cur
await cur.execute((SELECT_QUERY))
async for row in cur:
# THIS WORKS, BUT I WOULD LIKE TO USE gather()
# try:
# status = await row_loop(conn=conn, row=row, session=session)
# except OverQueryLimitException:
# break
# if status != 'OK':
# failed_count += 1
rows.append(asyncio.ensure_future(
row_loop(conn=conn, row=row, session=session)))
responses = await asyncio.gather(*rows)
print(len(responses._children)) # Just a try
return cur.rownumber, failed_count
def print_result(mode, select_count, failed_count):
print("Tried to {} {} new entries".format(mode, select_count))
print("Found {} failed/skipped entries".format(failed_count))
async def insert_new(conn):
select_count, failed_count = await write_loop(conn=conn)
print_result('insert', select_count, failed_count)
async def main():
async with aiopg.create_pool('db_string') as pool:
async with pool.acquire() as conn:
await insert_new(conn=conn)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
(更新:添加了有关同一连接中并发游标执行的信息和示例)
aiopg 不允许在同一连接上同时使用两个游标:您必须在执行新命令之前关闭游标:
import asyncio
import aiopg
DSN = 'dbname=aiopg' # to create, run: `createdb aiopg`
async def test_cursor(conn):
async with conn.cursor() as cur:
await cur.execute("SELECT 1")
async for row in cur:
print(row)
async def go():
async with aiopg.create_pool(DSN) as pool:
async with pool.acquire() as conn:
print("part 1:")
await test_cursor(conn)
await test_cursor(conn)
print("part 2:")
await asyncio.wait([test_cursor(conn), test_cursor(conn)]) # fails
loop = asyncio.get_event_loop()
loop.run_until_complete(go())
loop.close()
您可以尝试使用多个连接来绕过这一点,但是:
虽然这并没有直接回答具体问题,但我想提出一个通用的解决方案:数据库访问(在 local/LAN 主机中,用于读取或写入)通常被认为非常快,尤其是当与远程 (WAN) HTTP 请求相比。在此程序中使用同步数据库访问可能不会使其变慢,但会降低其复杂性。尝试以下方式,不使用 aiopg:
async def main():
rows = get_rows_from_db() # sync
async with aiohttp.ClientSession() as session:
tasks = [process_row(session, row) for row in rows]
responses = await asyncio.gather(*tasks)
async def process_row(session, row):
async with session.get(url) as response:
result = await response.text()
write_db_result(row, result) # sync db access
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
在尝试并行执行一些异步函数时,总是出现错误,我想了解一下。
运行 Python 3.5.1 上的 asyncio,我通过 ClientSession 结合 aiohttp 和 aiopg(异步 psycopg2)调用。
主要思想是我有一个循环从一个 table 中读取行,在行上循环并并行执行对函数 row_loop 的多次调用 这将异步启动 Web 请求 GET,然后通过另一个游标将每一行的结果写入同一数据库。
ensure_future() 和 gather() 似乎工作正常,但只有第一个数据库写入通过,另一个导致异常抱怨另一个协程已经在等待。
Traceback (most recent call last):
File "sandbox/loop.py", line 148, in write_dict
await cur2.execute(INSERT, (tuple(adict.values()),))
File "/Users/mrigal/.virtualenvs/py35/lib/python3.5/site-packages/aiopg/cursor.py", line 103, in execute
waiter = self._conn._create_waiter('cursor.execute')
File "/Users/mrigal/.virtualenvs/py35/lib/python3.5/site-packages/aiopg/connection.py", line 211, in _create_waiter
'data' % func_name)
RuntimeError: cursor.execute() called while another coroutine is already waiting for incoming data
问题可能出在 aiopg 库中,或者可能是我正在针对 main 而不是针对 .gather() 实际发生的函数注册循环。但是我找不到关于它的文档...
如果不使用 ensure_future() 和 gather(),代码会很慢,因为每个调用都是一个接一个地完成的。我可能没有很好地理解 gather() 的目的,我可能需要一个真正的多线程解决方案,但我想先测试一下这个中间步骤。
这是我的实际代码:
async def make_get(row, session, spec_country_code=None):
result = await session.get(URL, country=spec_country_code)
if not spec_country_code and result.country != row.get('country'):
return await make_get(row, session, spec_country_code=result.country)
return result
async def write_dict(conn, adict):
async with conn.cursor() as cur2:
await cur2.execute(INSERT_QUERY, (tuple(adict.values()),))
async def row_loop(conn, row, session):
result = await make_get(row=row, session=session)
if result.status == 'OVER_QUERY_LIMIT':
raise OverQueryLimitException()
else:
adict = build_adict(row, result)
await write_dict(conn=conn, adict= adict)
return result.status
async def write_loop(conn):
failed_count = 0
rows = []
async with aiohttp.ClientSession() as session:
async with conn.cursor(cursor_factory=DictCursor) as cur
await cur.execute((SELECT_QUERY))
async for row in cur:
# THIS WORKS, BUT I WOULD LIKE TO USE gather()
# try:
# status = await row_loop(conn=conn, row=row, session=session)
# except OverQueryLimitException:
# break
# if status != 'OK':
# failed_count += 1
rows.append(asyncio.ensure_future(
row_loop(conn=conn, row=row, session=session)))
responses = await asyncio.gather(*rows)
print(len(responses._children)) # Just a try
return cur.rownumber, failed_count
def print_result(mode, select_count, failed_count):
print("Tried to {} {} new entries".format(mode, select_count))
print("Found {} failed/skipped entries".format(failed_count))
async def insert_new(conn):
select_count, failed_count = await write_loop(conn=conn)
print_result('insert', select_count, failed_count)
async def main():
async with aiopg.create_pool('db_string') as pool:
async with pool.acquire() as conn:
await insert_new(conn=conn)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
(更新:添加了有关同一连接中并发游标执行的信息和示例)
aiopg 不允许在同一连接上同时使用两个游标:您必须在执行新命令之前关闭游标:
import asyncio
import aiopg
DSN = 'dbname=aiopg' # to create, run: `createdb aiopg`
async def test_cursor(conn):
async with conn.cursor() as cur:
await cur.execute("SELECT 1")
async for row in cur:
print(row)
async def go():
async with aiopg.create_pool(DSN) as pool:
async with pool.acquire() as conn:
print("part 1:")
await test_cursor(conn)
await test_cursor(conn)
print("part 2:")
await asyncio.wait([test_cursor(conn), test_cursor(conn)]) # fails
loop = asyncio.get_event_loop()
loop.run_until_complete(go())
loop.close()
您可以尝试使用多个连接来绕过这一点,但是:
虽然这并没有直接回答具体问题,但我想提出一个通用的解决方案:数据库访问(在 local/LAN 主机中,用于读取或写入)通常被认为非常快,尤其是当与远程 (WAN) HTTP 请求相比。在此程序中使用同步数据库访问可能不会使其变慢,但会降低其复杂性。尝试以下方式,不使用 aiopg:
async def main():
rows = get_rows_from_db() # sync
async with aiohttp.ClientSession() as session:
tasks = [process_row(session, row) for row in rows]
responses = await asyncio.gather(*tasks)
async def process_row(session, row):
async with session.get(url) as response:
result = await response.text()
write_db_result(row, result) # sync db access
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()