如何在 celery 任务中使用 asyncio 和 aioredis 锁?
How to use asyncio and aioredis lock inside celery tasks?
目标:
- 可以 运行 asyncio 协程。
- 纠正 celery 异常行为和任务重试。
- 可以使用 aioredis 锁。
那么,如何运行正确的异步任务来达到目的呢?
什么是RuntimeError: await wasn't used with future
(下),我该如何解决?
我已经试过了:
1。 asgiref
async_to_sync
(来自 asgiref https://pypi.org/project/asgiref/)。
此选项使 运行 asyncio 协程成为可能,但重试功能不起作用。
2。芹菜池异步
(https://pypi.org/project/celery-pool-asyncio/)
与 asgiref 中的问题相同。 (此选项使 运行 asyncio 协程成为可能,但重试功能不起作用。)
3。编写自己的异步同步装饰器
我已经尝试创建自己的装饰器,例如 async_to_sync,运行 协程线程安全 (asyncio.run_coroutine_threadsafe
),但我的行为如上所述。
4。异步模块
我也在 celery 任务中尝试 asyncio.run()
或 asyncio.get_event_loop().run_until_complete()
(和 self.retry(...)
)。 这有效 很好,任务 运行s,重试有效,但是 协程执行不正确 - 在 async
函数 I无法使用 aioredis。
实施说明:
- 启动芹菜命令:
celery -A celery_test.celery_app worker -l info -n worker1 -P gevent --concurrency=10 --without-gossip --without-mingle
- celery 应用:
transport = f"redis://localhost/9"
celery_app = Celery("worker", broker=transport, backend=transport,
include=['tasks'])
celery_app.conf.broker_transport_options = {
'visibility_timeout': 60 * 60 * 24,
'fanout_prefix': True,
'fanout_patterns': True
}
- 工具:
@contextmanager
def temp_asyncio_loop():
# asyncio.get_event_loop() automatically creates event loop only for main thread
try:
prev_loop = asyncio.get_event_loop()
except RuntimeError:
prev_loop = None
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
yield loop
finally:
loop.stop()
loop.close()
del loop
asyncio.set_event_loop(prev_loop)
def with_temp_asyncio_loop(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
with temp_asyncio_loop() as t_loop:
return f(*args, loop=t_loop, **kwargs)
return wrapper
def await_(coro):
return asyncio.get_event_loop().run_until_complete(coro)
- 任务:
@celery_app.task(bind=True, max_retries=30, default_retry_delay=0)
@with_temp_asyncio_loop
def debug(self, **kwargs):
try:
await_(debug_async())
except Exception as exc:
self.retry(exc=exc)
async def debug_async():
async with RedisLock(f'redis_lock_{datetime.now()}'):
pass
- redis锁
class RedisLockException(Exception):
pass
class RedisLock(AsyncContextManager):
"""
Redis Lock class
:param lock_id: string (unique key)
:param value: dummy value
:param expire: int (time in seconds that key will storing)
:param expire_on_delete: int (time in seconds, set pause before deleting)
Usage:
try:
with RedisLock('123_lock', 5 * 60):
# do something
except RedisLockException:
"""
def __init__(self, lock_id: str, value='1', expire: int = 4, expire_on_delete: int = None):
self.lock_id = lock_id
self.expire = expire
self.value = value
self.expire_on_delete = expire_on_delete
async def acquire_lock(self):
return await redis.setnx(self.lock_id, self.value)
async def release_lock(self):
if self.expire_on_delete is None:
return await redis.delete(self.lock_id)
else:
await redis.expire(self.lock_id, self.expire_on_delete)
async def __aenter__(self, *args, **kwargs):
if not await self.acquire_lock():
raise RedisLockException({
'redis_lock': 'The process: {} still run, try again later'.format(await redis.get(self.lock_id))
})
await redis.expire(self.lock_id, self.expire)
async def __aexit__(self, exc_type, exc_value, traceback):
await self.release_lock()
在我的 windows 机器上 await redis.setnx(...)
阻塞了 celery worker 并且它停止生成日志并且 Ctrl+C
不工作。
在 docker 容器内,我收到一个错误。有部分回溯:
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/aioredis/connection.py", line 854, in read_response
response = await self._parser.read_response()
File "/usr/local/lib/python3.9/site-packages/aioredis/connection.py", line 366, in read_response
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
aioredis.exceptions.ConnectionError: Connection closed by server.
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/celery/app/trace.py", line 451, in trace_task
R = retval = fun(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/celery/app/trace.py", line 734, in __protected_call__
return self.run(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/celery/app/autoretry.py", line 54, in run
ret = task.retry(exc=exc, **retry_kwargs)
File "/usr/local/lib/python3.9/site-packages/celery/app/task.py", line 717, in retry
raise_with_context(exc)
File "/usr/local/lib/python3.9/site-packages/celery/app/autoretry.py", line 34, in run
return task._orig_run(*args, **kwargs)
File "/app/celery_tasks/tasks.py", line 69, in wrapper
return f(*args, **kwargs) # <--- inside with_temp_asyncio_loop from utils
...
File "/usr/local/lib/python3.9/contextlib.py", line 575, in enter_async_context
result = await _cm_type.__aenter__(cm)
File "/app/db/redis.py", line 50, in __aenter__
if not await self.acquire_lock():
File "/app/db/redis.py", line 41, in acquire_lock
return await redis.setnx(self.lock_id, self.value)
File "/usr/local/lib/python3.9/site-packages/aioredis/client.py", line 1064, in execute_command
return await self.parse_response(conn, command_name, **options)
File "/usr/local/lib/python3.9/site-packages/aioredis/client.py", line 1080, in parse_response
response = await connection.read_response()
File "/usr/local/lib/python3.9/site-packages/aioredis/connection.py", line 859, in read_response
await self.disconnect()
File "/usr/local/lib/python3.9/site-packages/aioredis/connection.py", line 762, in disconnect
await self._writer.wait_closed()
File "/usr/local/lib/python3.9/asyncio/streams.py", line 359, in wait_closed
await self._protocol._get_close_waiter(self)
RuntimeError: await wasn't used with future
- 库版本:
celery==5.2.1
aioredis==2.0.0
也许有帮助。 https://github.com/aio-libs/aioredis-py/issues/1273
要点是:
replace all the calls to get_event_loop to get_running_loop
which would remove that Runtime exception when a future is attached to
a different loop.
使用solo
池,然后在运行任务函数中创建装饰器
asyncio.get_event_loop().run_until_complete(f(*args, **kwargs))
并使您的任务异步
def sync(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
return asyncio.get_event_loop().run_until_complete(f(*args, **kwargs))
return wrapper
@celery_app.task()
@sync
async def task():
...
目标:
- 可以 运行 asyncio 协程。
- 纠正 celery 异常行为和任务重试。
- 可以使用 aioredis 锁。
那么,如何运行正确的异步任务来达到目的呢?
什么是RuntimeError: await wasn't used with future
(下),我该如何解决?
我已经试过了:
1。 asgiref
async_to_sync
(来自 asgiref https://pypi.org/project/asgiref/)。
此选项使 运行 asyncio 协程成为可能,但重试功能不起作用。
2。芹菜池异步
(https://pypi.org/project/celery-pool-asyncio/)
与 asgiref 中的问题相同。 (此选项使 运行 asyncio 协程成为可能,但重试功能不起作用。)
3。编写自己的异步同步装饰器
我已经尝试创建自己的装饰器,例如 async_to_sync,运行 协程线程安全 (asyncio.run_coroutine_threadsafe
),但我的行为如上所述。
4。异步模块
我也在 celery 任务中尝试 asyncio.run()
或 asyncio.get_event_loop().run_until_complete()
(和 self.retry(...)
)。 这有效 很好,任务 运行s,重试有效,但是 协程执行不正确 - 在 async
函数 I无法使用 aioredis。
实施说明:
- 启动芹菜命令:
celery -A celery_test.celery_app worker -l info -n worker1 -P gevent --concurrency=10 --without-gossip --without-mingle
- celery 应用:
transport = f"redis://localhost/9"
celery_app = Celery("worker", broker=transport, backend=transport,
include=['tasks'])
celery_app.conf.broker_transport_options = {
'visibility_timeout': 60 * 60 * 24,
'fanout_prefix': True,
'fanout_patterns': True
}
- 工具:
@contextmanager
def temp_asyncio_loop():
# asyncio.get_event_loop() automatically creates event loop only for main thread
try:
prev_loop = asyncio.get_event_loop()
except RuntimeError:
prev_loop = None
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
yield loop
finally:
loop.stop()
loop.close()
del loop
asyncio.set_event_loop(prev_loop)
def with_temp_asyncio_loop(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
with temp_asyncio_loop() as t_loop:
return f(*args, loop=t_loop, **kwargs)
return wrapper
def await_(coro):
return asyncio.get_event_loop().run_until_complete(coro)
- 任务:
@celery_app.task(bind=True, max_retries=30, default_retry_delay=0)
@with_temp_asyncio_loop
def debug(self, **kwargs):
try:
await_(debug_async())
except Exception as exc:
self.retry(exc=exc)
async def debug_async():
async with RedisLock(f'redis_lock_{datetime.now()}'):
pass
- redis锁
class RedisLockException(Exception):
pass
class RedisLock(AsyncContextManager):
"""
Redis Lock class
:param lock_id: string (unique key)
:param value: dummy value
:param expire: int (time in seconds that key will storing)
:param expire_on_delete: int (time in seconds, set pause before deleting)
Usage:
try:
with RedisLock('123_lock', 5 * 60):
# do something
except RedisLockException:
"""
def __init__(self, lock_id: str, value='1', expire: int = 4, expire_on_delete: int = None):
self.lock_id = lock_id
self.expire = expire
self.value = value
self.expire_on_delete = expire_on_delete
async def acquire_lock(self):
return await redis.setnx(self.lock_id, self.value)
async def release_lock(self):
if self.expire_on_delete is None:
return await redis.delete(self.lock_id)
else:
await redis.expire(self.lock_id, self.expire_on_delete)
async def __aenter__(self, *args, **kwargs):
if not await self.acquire_lock():
raise RedisLockException({
'redis_lock': 'The process: {} still run, try again later'.format(await redis.get(self.lock_id))
})
await redis.expire(self.lock_id, self.expire)
async def __aexit__(self, exc_type, exc_value, traceback):
await self.release_lock()
在我的 windows 机器上 await redis.setnx(...)
阻塞了 celery worker 并且它停止生成日志并且 Ctrl+C
不工作。
在 docker 容器内,我收到一个错误。有部分回溯:
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/aioredis/connection.py", line 854, in read_response
response = await self._parser.read_response()
File "/usr/local/lib/python3.9/site-packages/aioredis/connection.py", line 366, in read_response
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
aioredis.exceptions.ConnectionError: Connection closed by server.
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/celery/app/trace.py", line 451, in trace_task
R = retval = fun(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/celery/app/trace.py", line 734, in __protected_call__
return self.run(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/celery/app/autoretry.py", line 54, in run
ret = task.retry(exc=exc, **retry_kwargs)
File "/usr/local/lib/python3.9/site-packages/celery/app/task.py", line 717, in retry
raise_with_context(exc)
File "/usr/local/lib/python3.9/site-packages/celery/app/autoretry.py", line 34, in run
return task._orig_run(*args, **kwargs)
File "/app/celery_tasks/tasks.py", line 69, in wrapper
return f(*args, **kwargs) # <--- inside with_temp_asyncio_loop from utils
...
File "/usr/local/lib/python3.9/contextlib.py", line 575, in enter_async_context
result = await _cm_type.__aenter__(cm)
File "/app/db/redis.py", line 50, in __aenter__
if not await self.acquire_lock():
File "/app/db/redis.py", line 41, in acquire_lock
return await redis.setnx(self.lock_id, self.value)
File "/usr/local/lib/python3.9/site-packages/aioredis/client.py", line 1064, in execute_command
return await self.parse_response(conn, command_name, **options)
File "/usr/local/lib/python3.9/site-packages/aioredis/client.py", line 1080, in parse_response
response = await connection.read_response()
File "/usr/local/lib/python3.9/site-packages/aioredis/connection.py", line 859, in read_response
await self.disconnect()
File "/usr/local/lib/python3.9/site-packages/aioredis/connection.py", line 762, in disconnect
await self._writer.wait_closed()
File "/usr/local/lib/python3.9/asyncio/streams.py", line 359, in wait_closed
await self._protocol._get_close_waiter(self)
RuntimeError: await wasn't used with future
- 库版本:
celery==5.2.1
aioredis==2.0.0
也许有帮助。 https://github.com/aio-libs/aioredis-py/issues/1273
要点是:
replace all the calls to get_event_loop to get_running_loop which would remove that Runtime exception when a future is attached to a different loop.
使用solo
池,然后在运行任务函数中创建装饰器
asyncio.get_event_loop().run_until_complete(f(*args, **kwargs))
并使您的任务异步
def sync(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
return asyncio.get_event_loop().run_until_complete(f(*args, **kwargs))
return wrapper
@celery_app.task()
@sync
async def task():
...