连接redis时异步代码失败

Asynchronous code failure when connecting to redis

我创建了一个小的 class 来执行 redis 的基本操作,使用 aioredis。

class RedisService:
    def __init__(self, r_url) -> str:
        self.redis = r_url

    async def create_connection(self):
        return await aioredis.create_redis(self.redis)

    async def _get(self, key) -> str:
        try:
            return await self.create_connection().get(key, encoding='utf-8')
        finally:
            await self._close()

    async def _set(self, key, value) -> None:
        await self.create_connection().set(key, value)
        await self._close()

    async def _close(self) -> None:
        self.create_connection().close()
        await self._redis.wait_closed() 

以及调用 rediswrite/read 操作的测试处理程序

@router.post('/perform')
async def index():
    key = 'test'
    value = 'test'
    value = await RedisService(r_url)._set(key, value)
    return {'result': value}

但是报错

    await self.create_connection.set(key, value)
AttributeError: ''coroutine'' object has no attribute 'set'

我猜问题可能是异步代码必须 运行 通过事件循环

asyncio.run(some coroutine)

但我无法理解如何将此逻辑构建到我的代码中

event_loop 由 uvicorn 在启动 fastapi 应用程序时提供,可以负责以异步方式调用 redis get/set 值。 参考 -> https://fastapi.tiangolo.com/tutorial/first-steps/

“https://github.com/tiangolo/fastapi/issues/1694”中的以下代码片段已更改以适合问题中的提问。

将创建连接结果移入对象状态有助于处理网络调用的异步解析。

当查询路径“/”时,保持连接的对象状态将用于 以异步方式将结果设置为 redis。

from fastapi import FastAPI
from connections import redis_cache

app = FastAPI()

@app.on_event('startup')
async def startup_event():
    await redis_cache.create_connection(r_url="redis://localhost")

@app.on_event('shutdown')
async def shutdown_event():
    await redis_cache._close()

@app.get("/")
async def root():
    key = 'key'
    value = 'data'
    await redis_cache._set(key, value)

@app.get("/value")
async def get_value():
    key = 'key'
    return await redis_cache._get(key)

from typing import Optional
from aioredis import Redis, create_redis


class RedisCache:
    def __init__(self) -> str:
        self.redis_cache = None

    async def create_connection(self,r_url):
        self.redis_cache = await create_redis(r_url)

    async def _get(self, key) -> str:
        return await self.redis_cache.get(key)

    async def _set(self, key, value) -> None:
        await self.redis_cache.set(key, value)

    async def _close(self) -> None:
        self.redis_cache.close()
        await self.redis_cache.wait_closed()

redis_cache = RedisCache()

你的问题是你如何使用create_connection。您必须调用它并等待它 returns.

await self.create_connection()

然后您还需要等待 setget。作为单行,这会变得混乱。

await (await self.create_connection()).set(key, value)

为了帮助解决这个问题,您应该将等待拆分为单独的语句。

conn = await self.create_connection()
await conn.set(key, value)

每次需要执行操作时都创建一个新连接可能会很昂贵。我建议以一种或两种方式更改 create_connection

要么让它连接到您的实例

async def create_connection(self):
    self.conn = await aioredis.create_redis(self.redis)

实例化RedisService然后使用

可以调用这个
await self.conn.set(key, value)

或者您可以切换到使用连接池。

async def create_connection(self):
    return await aioredis.create_redis_pool(self.redis)