连接redis时异步代码失败
Asynchronous code failure when connecting to redis
我创建了一个小的 class 来执行 redis 的基本操作,使用 aioredis。
class RedisService:
def __init__(self, r_url) -> str:
self.redis = r_url
async def create_connection(self):
return await aioredis.create_redis(self.redis)
async def _get(self, key) -> str:
try:
return await self.create_connection().get(key, encoding='utf-8')
finally:
await self._close()
async def _set(self, key, value) -> None:
await self.create_connection().set(key, value)
await self._close()
async def _close(self) -> None:
self.create_connection().close()
await self._redis.wait_closed()
以及调用 rediswrite/read 操作的测试处理程序
@router.post('/perform')
async def index():
key = 'test'
value = 'test'
value = await RedisService(r_url)._set(key, value)
return {'result': value}
但是报错
await self.create_connection.set(key, value)
AttributeError: ''coroutine'' object has no attribute 'set'
我猜问题可能是异步代码必须 运行 通过事件循环
asyncio.run(some coroutine)
但我无法理解如何将此逻辑构建到我的代码中
event_loop 由 uvicorn 在启动 fastapi 应用程序时提供,可以负责以异步方式调用 redis get/set 值。
参考 -> https://fastapi.tiangolo.com/tutorial/first-steps/
“https://github.com/tiangolo/fastapi/issues/1694”中的以下代码片段已更改以适合问题中的提问。
将创建连接结果移入对象状态有助于处理网络调用的异步解析。
当查询路径“/”时,保持连接的对象状态将用于
以异步方式将结果设置为 redis。
from fastapi import FastAPI
from connections import redis_cache
app = FastAPI()
@app.on_event('startup')
async def startup_event():
await redis_cache.create_connection(r_url="redis://localhost")
@app.on_event('shutdown')
async def shutdown_event():
await redis_cache._close()
@app.get("/")
async def root():
key = 'key'
value = 'data'
await redis_cache._set(key, value)
@app.get("/value")
async def get_value():
key = 'key'
return await redis_cache._get(key)
from typing import Optional
from aioredis import Redis, create_redis
class RedisCache:
def __init__(self) -> str:
self.redis_cache = None
async def create_connection(self,r_url):
self.redis_cache = await create_redis(r_url)
async def _get(self, key) -> str:
return await self.redis_cache.get(key)
async def _set(self, key, value) -> None:
await self.redis_cache.set(key, value)
async def _close(self) -> None:
self.redis_cache.close()
await self.redis_cache.wait_closed()
redis_cache = RedisCache()
你的问题是你如何使用create_connection
。您必须调用它并等待它 returns.
await self.create_connection()
然后您还需要等待 set
和 get
。作为单行,这会变得混乱。
await (await self.create_connection()).set(key, value)
为了帮助解决这个问题,您应该将等待拆分为单独的语句。
conn = await self.create_connection()
await conn.set(key, value)
每次需要执行操作时都创建一个新连接可能会很昂贵。我建议以一种或两种方式更改 create_connection
。
要么让它连接到您的实例
async def create_connection(self):
self.conn = await aioredis.create_redis(self.redis)
实例化RedisService
然后使用
可以调用这个
await self.conn.set(key, value)
或者您可以切换到使用连接池。
async def create_connection(self):
return await aioredis.create_redis_pool(self.redis)
我创建了一个小的 class 来执行 redis 的基本操作,使用 aioredis。
class RedisService:
def __init__(self, r_url) -> str:
self.redis = r_url
async def create_connection(self):
return await aioredis.create_redis(self.redis)
async def _get(self, key) -> str:
try:
return await self.create_connection().get(key, encoding='utf-8')
finally:
await self._close()
async def _set(self, key, value) -> None:
await self.create_connection().set(key, value)
await self._close()
async def _close(self) -> None:
self.create_connection().close()
await self._redis.wait_closed()
以及调用 rediswrite/read 操作的测试处理程序
@router.post('/perform')
async def index():
key = 'test'
value = 'test'
value = await RedisService(r_url)._set(key, value)
return {'result': value}
但是报错
await self.create_connection.set(key, value)
AttributeError: ''coroutine'' object has no attribute 'set'
我猜问题可能是异步代码必须 运行 通过事件循环
asyncio.run(some coroutine)
但我无法理解如何将此逻辑构建到我的代码中
event_loop 由 uvicorn 在启动 fastapi 应用程序时提供,可以负责以异步方式调用 redis get/set 值。 参考 -> https://fastapi.tiangolo.com/tutorial/first-steps/
“https://github.com/tiangolo/fastapi/issues/1694”中的以下代码片段已更改以适合问题中的提问。
将创建连接结果移入对象状态有助于处理网络调用的异步解析。
当查询路径“/”时,保持连接的对象状态将用于 以异步方式将结果设置为 redis。
from fastapi import FastAPI
from connections import redis_cache
app = FastAPI()
@app.on_event('startup')
async def startup_event():
await redis_cache.create_connection(r_url="redis://localhost")
@app.on_event('shutdown')
async def shutdown_event():
await redis_cache._close()
@app.get("/")
async def root():
key = 'key'
value = 'data'
await redis_cache._set(key, value)
@app.get("/value")
async def get_value():
key = 'key'
return await redis_cache._get(key)
from typing import Optional
from aioredis import Redis, create_redis
class RedisCache:
def __init__(self) -> str:
self.redis_cache = None
async def create_connection(self,r_url):
self.redis_cache = await create_redis(r_url)
async def _get(self, key) -> str:
return await self.redis_cache.get(key)
async def _set(self, key, value) -> None:
await self.redis_cache.set(key, value)
async def _close(self) -> None:
self.redis_cache.close()
await self.redis_cache.wait_closed()
redis_cache = RedisCache()
你的问题是你如何使用create_connection
。您必须调用它并等待它 returns.
await self.create_connection()
然后您还需要等待 set
和 get
。作为单行,这会变得混乱。
await (await self.create_connection()).set(key, value)
为了帮助解决这个问题,您应该将等待拆分为单独的语句。
conn = await self.create_connection()
await conn.set(key, value)
每次需要执行操作时都创建一个新连接可能会很昂贵。我建议以一种或两种方式更改 create_connection
。
要么让它连接到您的实例
async def create_connection(self):
self.conn = await aioredis.create_redis(self.redis)
实例化RedisService
然后使用
await self.conn.set(key, value)
或者您可以切换到使用连接池。
async def create_connection(self):
return await aioredis.create_redis_pool(self.redis)