运行 使用 asyncio 的并发 mongoengine 查询

Running concurrent mongoengine queries with asyncio

我有 4 个函数,基本上可以构建查询并执行它们。我想使用 asyncio 同时使它们 运行。我的 asyncio 实现似乎是正确的,因为它们应该是非 mongodb 任务 运行(示例 asyncio.sleep())。这是代码:

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
tasks = [
                service.async_get_associate_opportunity_count_by_user(me, criteria),
                service.get_new_associate_opportunity_count_by_user(me, criteria),
                service.async_get_associate_favorites_count(me, criteria=dict()),
                service.get_group_matched_opportunities_count_by_user(me, criteria)
            ]

available, new, favorites, group_matched = loop.run_until_complete(asyncio.gather(*tasks))

stats['opportunities']['available'] = available
stats['opportunities']['new'] = new
stats['opportunities']['favorites'] = favorites
stats['opportunities']['group_matched'] = group_matched

loop.close()


# functions written in other file

@asyncio.coroutine
def async_get_ass(self, user, criteria=None, **kwargs):
    start_time = time.time()
    query = **query that gets built from some other functions**

    opportunities = Opportunity.objects(query).count()
    run_time = time.time() - start_time
    print("runtime of available: {}".format(run_time))
    yield from asyncio.sleep(2)
    return opportunities

@asyncio.coroutine
def get_new_associate_opportunity_count_by_user(self, user, criteria=None, **kwargs):
    start_time = time.time()
    query = **query that gets built from some other functions**
    opportunities = Opportunity.objects(query).count()
    run_time = time.time() - start_time
    print("runtime of new: {}".format(run_time))
    yield from asyncio.sleep(2)
    return opportunities

@asyncio.coroutine
def async_get_associate_favorites_count(self, user, criteria={}, **kwargs):
    start_time = time.time()
    query = **query that gets built from some other functions**
    favorites = Opportunity.objects(query).count()
    run_time = time.time() - start_time
    print("runtime of favorites: {}".format(run_time))
    yield from asyncio.sleep(2)
    return favorites

@asyncio.coroutine
def get_group_matched_opportunities_count_by_user(self, user, criteria=None, **kwargs):
    start_time = time.time()
    query = **query that gets built from some other functions**
    opportunities = Opportunity.objects(query).count()
    run_time = time.time() - start_time
    print("runtime of group matched: {}".format(run_time))
    yield from asyncio.sleep(2)
    return opportunities

yield from asyncio.sleep(2) 只是为了说明函数 运行 是异步的。这是终端上的输出:

运行组匹配时间:0.11431598663330078 运行收藏时间:0.0029871463775634766 时间戳函数运行时间:0.0004897117614746094 运行新增时间:0.15225648880004883 运行可用时间:0.13006806373596191 总 运行 时间:2403.2700061798096 据我了解,除了由于睡眠功能而添加到总 运行 时间的 2000 毫秒外,它不应超过 155-160 毫秒,因为所有功能中的最大 运行 时间是这个值。

我目前正在研究显然启用异步 mongodb 查询的 motorengine(mongoengine 0.9.0 的一个端口),但我认为我将无法使用它,因为我的模型是使用蒙古引擎。这个问题有解决方法吗?

您的查询未 运行 并行执行的原因是,每当您在协程中 运行 Opportunity.objects(query).count() 时,整个事件循环都会阻塞,因为这些方法正在执行阻塞 IO .

因此您需要一个 mongodb 可以执行 async/non-blocking IO 的驱动程序。您尝试使用 motorengine 的方法是正确的,但据我所知,它是为 Tornado 异步框架编写的。要使其与 asyncio 一起工作,您必须 连接 Tornado 和 asycnio。请参阅 http://tornado.readthedocs.org/en/latest/asyncio.html 如何执行此操作。

另一种选择是使用 asyncio-mongo,但它没有 mongoengine 兼容的 ORM,因此您可能不得不重写大部分代码。