asyncpg - 无法执行操作:另一个操作正在进行中
asyncpg - cannot perform operation: another operation is in progress
我正在尝试解决以下错误:
asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress
这是完整的回溯:
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/multiprocessing/spawn.py", line 116, in spawn_main
exitcode = _main(fd, parent_sentinel)
│ │ └ 4
│ └ 7
└ <function _main at 0x109c8aca0>
File "/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/multiprocessing/spawn.py", line 129, in _main
return self._bootstrap(parent_sentinel)
│ │ └ 4
│ └ <function BaseProcess._bootstrap at 0x109b1f8b0>
└ <SpawnProcess name='SpawnProcess-4' parent=36604 started>
File "/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
│ └ <function BaseProcess.run at 0x109b18ee0>
└ <SpawnProcess name='SpawnProcess-4' parent=36604 started>
File "/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
│ │ │ │ │ └ {'config': <uvicorn.config.Config object at 0x109cd55b0>, 'target': <bound method Server.run of <uvicorn.server.Server object...
│ │ │ │ └ <SpawnProcess name='SpawnProcess-4' parent=36604 started>
│ │ │ └ ()
│ │ └ <SpawnProcess name='SpawnProcess-4' parent=36604 started>
│ └ <function subprocess_started at 0x10a4aca60>
└ <SpawnProcess name='SpawnProcess-4' parent=36604 started>
File "/Users/ddd/Desktop/repos/xxx/.venv/lib/python3.8/site-packages/uvicorn/subprocess.py", line 61, in subprocess_started
target(sockets=sockets)
│ └ [<socket.socket fd=3, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('0.0.0.0', 80)>]
└ <bound method Server.run of <uvicorn.server.Server object at 0x109cd56a0>>
File "/Users/ddd/Desktop/repos/xxx/.venv/lib/python3.8/site-packages/uvicorn/server.py", line 48, in run
loop.run_until_complete(self.serve(sockets=sockets))
│ │ │ │ └ [<socket.socket fd=3, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('0.0.0.0', 80)>]
│ │ │ └ <function Server.serve at 0x10a4abca0>
│ │ └ <uvicorn.server.Server object at 0x109cd56a0>
│ └ <function BaseEventLoop.run_until_complete at 0x10a205820>
└ <_UnixSelectorEventLoop running=True closed=False debug=False>
File "/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/asyncio/base_events.py", line 603, in run_until_complete
self.run_forever()
│ └ <function BaseEventLoop.run_forever at 0x10a205790>
└ <_UnixSelectorEventLoop running=True closed=False debug=False>
File "/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/asyncio/base_events.py", line 570, in run_forever
self._run_once()
│ └ <function BaseEventLoop._run_once at 0x10a209310>
└ <_UnixSelectorEventLoop running=True closed=False debug=False>
File "/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/asyncio/base_events.py", line 1859, in _run_once
handle._run()
│ └ <function Handle._run at 0x10a13ed30>
└ <Handle <TaskWakeupMethWrapper object at 0x10bb75a60>(<_GatheringFu...in progress')>)>
File "/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/asyncio/events.py", line 81, in _run
self._context.run(self._callback, *self._args)
│ │ │ │ │ └ <member '_args' of 'Handle' objects>
│ │ │ │ └ <Handle <TaskWakeupMethWrapper object at 0x10bb75a60>(<_GatheringFu...in progress')>)>
│ │ │ └ <member '_callback' of 'Handle' objects>
│ │ └ <Handle <TaskWakeupMethWrapper object at 0x10bb75a60>(<_GatheringFu...in progress')>)>
│ └ <member '_context' of 'Handle' objects>
└ <Handle <TaskWakeupMethWrapper object at 0x10bb75a60>(<_GatheringFu...in progress')>)>
> File "./xxx/xxx/xxx.py", line 144, in get_disclosure_data
hh_json, db_json = await asyncio.gather(*coroutines)
│ │ └ [<coroutine object xxxx at 0x10bb2cb40>, <coroutine object db_call at 0x10bb2cc40>]
│ └ <function gather at 0x10a1fad30>
└ <module 'asyncio' from '/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/asyncio/__init__.py'>
File "./xxx/xxx/xxx.py", line 52, in db_call
db_json = await asyncio.gather(*coroutines, loop=asyncio.get_event_loop())
│ │ │ │ └ <built-in function get_event_loop>
│ │ │ └ <module 'asyncio' from '/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/asyncio/__init__.py'>
│ │ └ [<coroutine object DBConnectionManager.fetch_item at 0x10bb434c0>, <coroutine object DBConnectionManager.fetch_item at 0x10bb...
│ └ <function gather at 0x10a1fad30>
└ <module 'asyncio' from '/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/asyncio/__init__.py'>
File "./xxx/xxx/xx.py", line 97, in fetch_item
await self._connection_pool.release(self.con)
│ │ │ │ └ <PoolConnectionProxy [released] 0x10bbc9cd0>
│ │ │ └ <chd_api.data.db.DBConnectionManager object at 0x10b946a30>
│ │ └ <function Pool.release at 0x10b956a60>
│ └ <asyncpg.pool.Pool object at 0x10bb131e0>
└ <chd_api.data.db.DBConnectionManager object at 0x10b946a30>
File "/Users/ddd/Desktop/repos/xxx/.venv/lib/python3.8/site-packages/asyncpg/pool.py", line 666, in release
return await asyncio.shield(ch.release(timeout))
│ │ │ │ └ None
│ │ │ └ <function PoolConnectionHolder.release at 0x10b952e50>
│ │ └ <asyncpg.pool.PoolConnectionHolder object at 0x10bb2a5c0>
│ └ <function shield at 0x10a1faee0>
└ <module 'asyncio' from '/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/asyncio/__init__.py'>
File "/Users/ddd/Desktop/repos/xxx/.venv/lib/python3.8/site-packages/asyncpg/pool.py", line 218, in release
raise ex
File "/Users/ddd/Desktop/repos/xxx/.venv/lib/python3.8/site-packages/asyncpg/pool.py", line 208, in release
await self._con.reset(timeout=budget)
│ │ └ None
│ └ <member '_con' of 'PoolConnectionHolder' objects>
└ <asyncpg.pool.PoolConnectionHolder object at 0x10bb2a5c0>
File "/Users/ddd/Desktop/repos/xxx/.venv/lib/python3.8/site-packages/asyncpg/connection.py", line 1311, in reset
await self.execute(reset_query, timeout=timeout)
│ │ │ └ None
│ │ └ 'SELECT pg_advisory_unlock_all();\nCLOSE ALL;\nUNLISTEN *;\nRESET ALL;'
│ └ <function Connection.execute at 0x10b93f3a0>
└ <asyncpg.connection.Connection object at 0x10bc34120>
File "/Users/ddd/Desktop/repos/xxx/.venv/lib/python3.8/site-packages/asyncpg/connection.py", line 297, in execute
return await self._protocol.query(query, timeout)
│ │ │ └ None
│ │ └ 'SELECT pg_advisory_unlock_all();\nCLOSE ALL;\nUNLISTEN *;\nRESET ALL;'
│ └ <member '_protocol' of 'Connection' objects>
└ <asyncpg.connection.Connection object at 0x10bc34120>
File "asyncpg/protocol/protocol.pyx", line 321, in query
self._check_state()
File "asyncpg/protocol/protocol.pyx", line 684, in asyncpg.protocol.protocol.BaseProtocol._check_state
raise apg_exc.InterfaceError(
│ └ <class 'asyncpg.exceptions._base.InterfaceError'>
└ <module 'asyncpg.exceptions' from '/Users/ddd/Desktop/repos/chd-api/.venv/lib/python3.8/site-packages/asyncpg/exception...
asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress
我有以下代码来设置连接池并使用池中的连接执行查询:
class DBConnectionManager(object):
""" Class for setting up and tearing down db connection """
def __init__(self):
self.host = SETTINGS.db_host
self.database = SETTINGS.db_name
self.user = SETTINGS.db_user
self.password = SETTINGS.db_password
self.port = "5432"
self._connection_pool = None
self.con = None
async def connect(self):
if not self._connection_pool:
try:
self._connection_pool = await asyncpg.create_pool(
host=self.host,
database=self.database,
user=self.user,
password=self.password,
port=self.port,
min_size=50,
max_size=100,
)
logger.info("Database pool connection opened")
except Exception as e:
logger.exception(e)
async def fetch_item(self, query: str, *args):
if not self._connection_pool:
await self.connect()
else:
self.con = await self._connection_pool.acquire()
try:
result = await self.con.fetch(query, *args)
return result
except Exception as e:
logger.exception(e)
finally:
await self._connection_pool.release(self.con)
async def close(self):
if not self._connection_pool:
try:
await self._connection_pool.close()
logger.info("Database pool connection closed")
except Exception as e:
logger.exception(e)
并且正在尝试使用以下方法执行大约 22 个数据库调用:
async def db_call(db, lat, lng):
"""
Performs the necessary db calls given a lat, lng
Required Input:
lat::float a latitude in decimal degrees. Must be specified with `lng` (i.e. 39.2994)
lng::float a longitude in decimal degrees. Must be specified with `lat` (i.e. -122.33)
Returns:
dict
"""
coroutines = []
for table in db_map:
# SQL columns
db_fields = ",".join(
[
f"{col} AS {db_map[table]['fields'][col]}"
for col in db_map[table]["fields"]
]
)
# Output names
api_fields = [db_map[table]["fields"][col] for col in db_map[table]["fields"]]
if db_map[table]["query_type"] == "pip":
limit = db_map[table]["options"]["LIMIT"]
query = f"SELECT {db_fields} from {table} WHERE (ST_Covers(geom, GeomFromEWKT('SRID=4326;POINT({lng} {lat})'))) LIMIT {limit};"
else:
distance = db_map[table]["options"]["DISTANCE"]
geo2geo = f"geom::geography, GeomFromEWKT('SRID=4326;POINT({lng} {lat})')::geography"
query = (
f"SELECT {db_fields}, ST_Distance({geo2geo})"
f"from {table} WHERE (ST_DWithin({geo2geo}, {distance}))"
f"ORDER BY ST_Distance({geo2geo}) LIMIT 1;"
)
coroutines.append(db.fetch_item(query))
db_res = await asyncio.gather(*coroutines)
.... code for processing results
我已经检查了关于此错误的 asyncpg github 的几个问题,但仍未找到合适的解决方案。另请注意,此调用是在 FastAPI 中执行的。任何关于为什么这个错误可能 occurring/steps 来解决它的指导将不胜感激。
在fetch_item
中对self.con
的赋值导致多个协程共享同一个连接。虽然您确实希望它们共享连接池,但共享相同的 连接 没有意义,因为连接是有状态的。
要解决此问题,请将 self.con
的用法替换为局部变量 con
。
我正在尝试解决以下错误:
asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress
这是完整的回溯:
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/multiprocessing/spawn.py", line 116, in spawn_main
exitcode = _main(fd, parent_sentinel)
│ │ └ 4
│ └ 7
└ <function _main at 0x109c8aca0>
File "/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/multiprocessing/spawn.py", line 129, in _main
return self._bootstrap(parent_sentinel)
│ │ └ 4
│ └ <function BaseProcess._bootstrap at 0x109b1f8b0>
└ <SpawnProcess name='SpawnProcess-4' parent=36604 started>
File "/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
│ └ <function BaseProcess.run at 0x109b18ee0>
└ <SpawnProcess name='SpawnProcess-4' parent=36604 started>
File "/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
│ │ │ │ │ └ {'config': <uvicorn.config.Config object at 0x109cd55b0>, 'target': <bound method Server.run of <uvicorn.server.Server object...
│ │ │ │ └ <SpawnProcess name='SpawnProcess-4' parent=36604 started>
│ │ │ └ ()
│ │ └ <SpawnProcess name='SpawnProcess-4' parent=36604 started>
│ └ <function subprocess_started at 0x10a4aca60>
└ <SpawnProcess name='SpawnProcess-4' parent=36604 started>
File "/Users/ddd/Desktop/repos/xxx/.venv/lib/python3.8/site-packages/uvicorn/subprocess.py", line 61, in subprocess_started
target(sockets=sockets)
│ └ [<socket.socket fd=3, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('0.0.0.0', 80)>]
└ <bound method Server.run of <uvicorn.server.Server object at 0x109cd56a0>>
File "/Users/ddd/Desktop/repos/xxx/.venv/lib/python3.8/site-packages/uvicorn/server.py", line 48, in run
loop.run_until_complete(self.serve(sockets=sockets))
│ │ │ │ └ [<socket.socket fd=3, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('0.0.0.0', 80)>]
│ │ │ └ <function Server.serve at 0x10a4abca0>
│ │ └ <uvicorn.server.Server object at 0x109cd56a0>
│ └ <function BaseEventLoop.run_until_complete at 0x10a205820>
└ <_UnixSelectorEventLoop running=True closed=False debug=False>
File "/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/asyncio/base_events.py", line 603, in run_until_complete
self.run_forever()
│ └ <function BaseEventLoop.run_forever at 0x10a205790>
└ <_UnixSelectorEventLoop running=True closed=False debug=False>
File "/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/asyncio/base_events.py", line 570, in run_forever
self._run_once()
│ └ <function BaseEventLoop._run_once at 0x10a209310>
└ <_UnixSelectorEventLoop running=True closed=False debug=False>
File "/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/asyncio/base_events.py", line 1859, in _run_once
handle._run()
│ └ <function Handle._run at 0x10a13ed30>
└ <Handle <TaskWakeupMethWrapper object at 0x10bb75a60>(<_GatheringFu...in progress')>)>
File "/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/asyncio/events.py", line 81, in _run
self._context.run(self._callback, *self._args)
│ │ │ │ │ └ <member '_args' of 'Handle' objects>
│ │ │ │ └ <Handle <TaskWakeupMethWrapper object at 0x10bb75a60>(<_GatheringFu...in progress')>)>
│ │ │ └ <member '_callback' of 'Handle' objects>
│ │ └ <Handle <TaskWakeupMethWrapper object at 0x10bb75a60>(<_GatheringFu...in progress')>)>
│ └ <member '_context' of 'Handle' objects>
└ <Handle <TaskWakeupMethWrapper object at 0x10bb75a60>(<_GatheringFu...in progress')>)>
> File "./xxx/xxx/xxx.py", line 144, in get_disclosure_data
hh_json, db_json = await asyncio.gather(*coroutines)
│ │ └ [<coroutine object xxxx at 0x10bb2cb40>, <coroutine object db_call at 0x10bb2cc40>]
│ └ <function gather at 0x10a1fad30>
└ <module 'asyncio' from '/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/asyncio/__init__.py'>
File "./xxx/xxx/xxx.py", line 52, in db_call
db_json = await asyncio.gather(*coroutines, loop=asyncio.get_event_loop())
│ │ │ │ └ <built-in function get_event_loop>
│ │ │ └ <module 'asyncio' from '/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/asyncio/__init__.py'>
│ │ └ [<coroutine object DBConnectionManager.fetch_item at 0x10bb434c0>, <coroutine object DBConnectionManager.fetch_item at 0x10bb...
│ └ <function gather at 0x10a1fad30>
└ <module 'asyncio' from '/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/asyncio/__init__.py'>
File "./xxx/xxx/xx.py", line 97, in fetch_item
await self._connection_pool.release(self.con)
│ │ │ │ └ <PoolConnectionProxy [released] 0x10bbc9cd0>
│ │ │ └ <chd_api.data.db.DBConnectionManager object at 0x10b946a30>
│ │ └ <function Pool.release at 0x10b956a60>
│ └ <asyncpg.pool.Pool object at 0x10bb131e0>
└ <chd_api.data.db.DBConnectionManager object at 0x10b946a30>
File "/Users/ddd/Desktop/repos/xxx/.venv/lib/python3.8/site-packages/asyncpg/pool.py", line 666, in release
return await asyncio.shield(ch.release(timeout))
│ │ │ │ └ None
│ │ │ └ <function PoolConnectionHolder.release at 0x10b952e50>
│ │ └ <asyncpg.pool.PoolConnectionHolder object at 0x10bb2a5c0>
│ └ <function shield at 0x10a1faee0>
└ <module 'asyncio' from '/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/asyncio/__init__.py'>
File "/Users/ddd/Desktop/repos/xxx/.venv/lib/python3.8/site-packages/asyncpg/pool.py", line 218, in release
raise ex
File "/Users/ddd/Desktop/repos/xxx/.venv/lib/python3.8/site-packages/asyncpg/pool.py", line 208, in release
await self._con.reset(timeout=budget)
│ │ └ None
│ └ <member '_con' of 'PoolConnectionHolder' objects>
└ <asyncpg.pool.PoolConnectionHolder object at 0x10bb2a5c0>
File "/Users/ddd/Desktop/repos/xxx/.venv/lib/python3.8/site-packages/asyncpg/connection.py", line 1311, in reset
await self.execute(reset_query, timeout=timeout)
│ │ │ └ None
│ │ └ 'SELECT pg_advisory_unlock_all();\nCLOSE ALL;\nUNLISTEN *;\nRESET ALL;'
│ └ <function Connection.execute at 0x10b93f3a0>
└ <asyncpg.connection.Connection object at 0x10bc34120>
File "/Users/ddd/Desktop/repos/xxx/.venv/lib/python3.8/site-packages/asyncpg/connection.py", line 297, in execute
return await self._protocol.query(query, timeout)
│ │ │ └ None
│ │ └ 'SELECT pg_advisory_unlock_all();\nCLOSE ALL;\nUNLISTEN *;\nRESET ALL;'
│ └ <member '_protocol' of 'Connection' objects>
└ <asyncpg.connection.Connection object at 0x10bc34120>
File "asyncpg/protocol/protocol.pyx", line 321, in query
self._check_state()
File "asyncpg/protocol/protocol.pyx", line 684, in asyncpg.protocol.protocol.BaseProtocol._check_state
raise apg_exc.InterfaceError(
│ └ <class 'asyncpg.exceptions._base.InterfaceError'>
└ <module 'asyncpg.exceptions' from '/Users/ddd/Desktop/repos/chd-api/.venv/lib/python3.8/site-packages/asyncpg/exception...
asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress
我有以下代码来设置连接池并使用池中的连接执行查询:
class DBConnectionManager(object):
""" Class for setting up and tearing down db connection """
def __init__(self):
self.host = SETTINGS.db_host
self.database = SETTINGS.db_name
self.user = SETTINGS.db_user
self.password = SETTINGS.db_password
self.port = "5432"
self._connection_pool = None
self.con = None
async def connect(self):
if not self._connection_pool:
try:
self._connection_pool = await asyncpg.create_pool(
host=self.host,
database=self.database,
user=self.user,
password=self.password,
port=self.port,
min_size=50,
max_size=100,
)
logger.info("Database pool connection opened")
except Exception as e:
logger.exception(e)
async def fetch_item(self, query: str, *args):
if not self._connection_pool:
await self.connect()
else:
self.con = await self._connection_pool.acquire()
try:
result = await self.con.fetch(query, *args)
return result
except Exception as e:
logger.exception(e)
finally:
await self._connection_pool.release(self.con)
async def close(self):
if not self._connection_pool:
try:
await self._connection_pool.close()
logger.info("Database pool connection closed")
except Exception as e:
logger.exception(e)
并且正在尝试使用以下方法执行大约 22 个数据库调用:
async def db_call(db, lat, lng):
"""
Performs the necessary db calls given a lat, lng
Required Input:
lat::float a latitude in decimal degrees. Must be specified with `lng` (i.e. 39.2994)
lng::float a longitude in decimal degrees. Must be specified with `lat` (i.e. -122.33)
Returns:
dict
"""
coroutines = []
for table in db_map:
# SQL columns
db_fields = ",".join(
[
f"{col} AS {db_map[table]['fields'][col]}"
for col in db_map[table]["fields"]
]
)
# Output names
api_fields = [db_map[table]["fields"][col] for col in db_map[table]["fields"]]
if db_map[table]["query_type"] == "pip":
limit = db_map[table]["options"]["LIMIT"]
query = f"SELECT {db_fields} from {table} WHERE (ST_Covers(geom, GeomFromEWKT('SRID=4326;POINT({lng} {lat})'))) LIMIT {limit};"
else:
distance = db_map[table]["options"]["DISTANCE"]
geo2geo = f"geom::geography, GeomFromEWKT('SRID=4326;POINT({lng} {lat})')::geography"
query = (
f"SELECT {db_fields}, ST_Distance({geo2geo})"
f"from {table} WHERE (ST_DWithin({geo2geo}, {distance}))"
f"ORDER BY ST_Distance({geo2geo}) LIMIT 1;"
)
coroutines.append(db.fetch_item(query))
db_res = await asyncio.gather(*coroutines)
.... code for processing results
我已经检查了关于此错误的 asyncpg github 的几个问题,但仍未找到合适的解决方案。另请注意,此调用是在 FastAPI 中执行的。任何关于为什么这个错误可能 occurring/steps 来解决它的指导将不胜感激。
在fetch_item
中对self.con
的赋值导致多个协程共享同一个连接。虽然您确实希望它们共享连接池,但共享相同的 连接 没有意义,因为连接是有状态的。
要解决此问题,请将 self.con
的用法替换为局部变量 con
。