asyncio python 协程被取消,而任务仍在等待从 redis 通道读取

asyncio python coroutine cancelled while task still pending reading from redis channel

我有多个协程,每个协程都在等待队列中的内容开始处理。

队列的内容由频道订阅者填充,他们的工作只是接收消息并将项目推送到适当的队列中。

在数据被一个队列处理器使用并生成新数据后,它会被分派到适当的消息通道,在该通道中重复此过程,直到数据准备好中继到提供它的 api。

import asyncio
from random import randint
from Models.ConsumerStrategies import Strategy
from Helpers.Log import Log
import Connectors.datastore as ds

import json
__name__ = "Consumer"

MIN = 1
MAX = 4

async def consume(configuration: dict, queue: str, processor: Strategy) -> None:
    """Consumes new items in queue and publish a message into the appropriate channel with the data generated for the next consumer,
    if no new content is available sleep for a random number of seconds between MIN and MAX global variables

    Args:
        configuration (dict): configuration dictionary
        queue (str): queue being consumed
        processor (Strategy): consumer strategy
    """
    
    logger = Log().get_logger(processor.__name__, configuration['logFolder'], configuration['logFormat'], configuration['USE'])
    while True:
        try:
            ds_handle = await ds.get_datastore_handle(ds.get_uri(conf=configuration))
            token = await ds_handle.lpop(queue)
            if token is not None:
                result = await processor.consume(json.loads(token), ds_handle)
                status = await processor.relay(result, ds_handle)
                logger.debug(status)
            else:
                wait_for = randint(MIN,MAX)
                logger.debug(f'queue: {queue} empty waiting: {wait_for} before retry')
                await asyncio.sleep(wait_for)
            ds_handle.close()
        except Exception as e:
            logger.error(f"{e}")
            logger.error(f"{e.with_traceback}")

我注意到 24 小时后 运行 我收到了这些错误:

Task was destroyed but it is pending!
task: <Task pending name='Task-2' coro=<consume() running at Services/Consumer.py:26> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f86bc29cbe0>()]> cb=[_chain_future.<locals>._call_set_state() at asyncio/futures.py:391]>
Task was destroyed but it is pending!
task: <Task pending name='Task-426485' coro=<RedisConnection._read_data() done, defined at aioredis/connection.py:180> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f86bc29ccd0>()]> cb=[RedisConnection.__init__.<locals>.<lambda>() at aioredis/connection.py:168]>

我不确定如何解释、解决或从中恢复,我的假设是首先我应该切换到 redis 流而不是使用通道和队列。

但是,回到这个场景,我在不同的进程中有频道订阅者,而消费者 运行 在与循环中的不同任务相同的进程中。

我假设这里发生的是,由于消费者基本上在某个时候轮询队列,连接池管理器或 redis 本身最终开始挂起消费者打开的连接,并被取消。

因为我没有看到来自该队列处理器的任何进一步消息,但我也看到了 wait_for_future,我不确定它可能来自订阅者 ensure_future 消息 reader

import asyncio
from multiprocessing import process
from Helpers.Log import Log
import Services.Metas as metas
import Models.SubscriberStrategies as processor
import Connectors.datastore as ds_linker
import Models.Exceptions as Exceptions

async def subscriber(conf: dict, channel: str, processor: processor.Strategy) -> None:
    """Subscription handler. Receives the channel name, datastore connection and a parsing strategy.
    Creates a task that listens on the channel and process every message and processing strategy for the specific message

    Args:
        conf (dict): configuration dictionary
        channel (str): channel to subscribe to
        ds (aioredis.connection): connection handler to datastore
        processor (processor.Strategy): processor message handler
    """
    async def reader(ch):
        while await ch.wait_message():
            msg = await ch.get_json()
            await processor.handle_message(msg=msg)

    ds_uri = ds_linker.get_uri(conf=conf)
    ds = await ds_linker.get_datastore_handle(ds_uri)
    pub = await ds.subscribe(channel)
    ch = pub[0]
    tsk = asyncio.ensure_future(reader(ch))
    await tsk

我可以借助一些帮助来解决这个问题并正确理解引擎盖下发生的事情。谢谢

花了几天时间才重现这个问题,我在 aioredis github 回购的问题中发现了同样问题的人。

所以我必须检查所有连接 open/close 与 redis 以确保添加:

        ds_handle.close()
        await ds_handle.wait_closed()

我还着手改进消费者中的异常管理:

while True:
    try:
        ds_handle = await ds.get_datastore_handle(ds.get_uri(conf=configuration))
        token = await ds_handle.lpop(queue)
        if token is not None:
            result = await processor.consume(json.loads(token), ds_handle)
            status = await processor.relay(result, ds_handle)
            logger.debug(status)
        else:
            wait_for = randint(MIN,MAX)
            logger.debug(f'queue: {queue} empty waiting: {wait_for} before retry')
            await asyncio.sleep(wait_for)   
    except Exception as e:
        logger.error(f"{e}")
        logger.error(f"{traceback.print_exc()}")
    finally:
        ds_handle.close()
        await ds_handle.wait_closed()

生产者也一样:

try:
    async def reader(ch):
        while await ch.wait_message():
            msg = await ch.get_json()
            await processor.handle_message(msg=msg)

    ds_uri = ds_linker.get_uri(conf=conf)
    ds = await ds_linker.get_datastore_handle(ds_uri)
    pub = await ds.subscribe(channel)
    ch = pub[0]
    tsk = asyncio.ensure_future(reader(ch))
    await tsk
except Exception as e:
    logger.debug(f'{e}')
    logger.error(f'{traceback.format_exc()}')
finally:
    ds.close()
    await ds.wait_closed()

所以永远不会有与 redis 保持打开状态的连接,随着时间的推移可能最终会杀死处理器的协程之一。

对我来说,它解决了这个问题,因为在我写这篇文章的时候,它已经超过 2 周的正常运行时间,没有更多的同类事故报告。

无论如何,还有一个新的 aioredis 主要版本,它真的是最近的新闻(这是在 1.3.1 和 2.0.0 上应该使用与 redis-py 相同的模型,所以事情也因此发生了变化时间)。