运行 使用 asyncio 的并发 mongoengine 查询
Running concurrent mongoengine queries with asyncio
我有 4 个函数,基本上可以构建查询并执行它们。我想使用 asyncio 同时使它们 运行。我的 asyncio 实现似乎是正确的,因为它们应该是非 mongodb 任务 运行(示例 asyncio.sleep())。这是代码:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
tasks = [
service.async_get_associate_opportunity_count_by_user(me, criteria),
service.get_new_associate_opportunity_count_by_user(me, criteria),
service.async_get_associate_favorites_count(me, criteria=dict()),
service.get_group_matched_opportunities_count_by_user(me, criteria)
]
available, new, favorites, group_matched = loop.run_until_complete(asyncio.gather(*tasks))
stats['opportunities']['available'] = available
stats['opportunities']['new'] = new
stats['opportunities']['favorites'] = favorites
stats['opportunities']['group_matched'] = group_matched
loop.close()
# functions written in other file
@asyncio.coroutine
def async_get_ass(self, user, criteria=None, **kwargs):
start_time = time.time()
query = **query that gets built from some other functions**
opportunities = Opportunity.objects(query).count()
run_time = time.time() - start_time
print("runtime of available: {}".format(run_time))
yield from asyncio.sleep(2)
return opportunities
@asyncio.coroutine
def get_new_associate_opportunity_count_by_user(self, user, criteria=None, **kwargs):
start_time = time.time()
query = **query that gets built from some other functions**
opportunities = Opportunity.objects(query).count()
run_time = time.time() - start_time
print("runtime of new: {}".format(run_time))
yield from asyncio.sleep(2)
return opportunities
@asyncio.coroutine
def async_get_associate_favorites_count(self, user, criteria={}, **kwargs):
start_time = time.time()
query = **query that gets built from some other functions**
favorites = Opportunity.objects(query).count()
run_time = time.time() - start_time
print("runtime of favorites: {}".format(run_time))
yield from asyncio.sleep(2)
return favorites
@asyncio.coroutine
def get_group_matched_opportunities_count_by_user(self, user, criteria=None, **kwargs):
start_time = time.time()
query = **query that gets built from some other functions**
opportunities = Opportunity.objects(query).count()
run_time = time.time() - start_time
print("runtime of group matched: {}".format(run_time))
yield from asyncio.sleep(2)
return opportunities
yield from asyncio.sleep(2)
只是为了说明函数 运行 是异步的。这是终端上的输出:
运行组匹配时间:0.11431598663330078
运行收藏时间:0.0029871463775634766
时间戳函数运行时间:0.0004897117614746094
运行新增时间:0.15225648880004883
运行可用时间:0.13006806373596191
总 运行 时间:2403.2700061798096
据我了解,除了由于睡眠功能而添加到总 运行 时间的 2000 毫秒外,它不应超过 155-160 毫秒,因为所有功能中的最大 运行 时间是这个值。
我目前正在研究显然启用异步 mongodb 查询的 motorengine(mongoengine 0.9.0 的一个端口),但我认为我将无法使用它,因为我的模型是使用蒙古引擎。这个问题有解决方法吗?
您的查询未 运行 并行执行的原因是,每当您在协程中 运行 Opportunity.objects(query).count()
时,整个事件循环都会阻塞,因为这些方法正在执行阻塞 IO .
因此您需要一个 mongodb 可以执行 async/non-blocking IO 的驱动程序。您尝试使用 motorengine
的方法是正确的,但据我所知,它是为 Tornado 异步框架编写的。要使其与 asyncio
一起工作,您必须 连接 Tornado 和 asycnio
。请参阅 http://tornado.readthedocs.org/en/latest/asyncio.html 如何执行此操作。
另一种选择是使用 asyncio-mongo
,但它没有 mongoengine
兼容的 ORM,因此您可能不得不重写大部分代码。
我有 4 个函数,基本上可以构建查询并执行它们。我想使用 asyncio 同时使它们 运行。我的 asyncio 实现似乎是正确的,因为它们应该是非 mongodb 任务 运行(示例 asyncio.sleep())。这是代码:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
tasks = [
service.async_get_associate_opportunity_count_by_user(me, criteria),
service.get_new_associate_opportunity_count_by_user(me, criteria),
service.async_get_associate_favorites_count(me, criteria=dict()),
service.get_group_matched_opportunities_count_by_user(me, criteria)
]
available, new, favorites, group_matched = loop.run_until_complete(asyncio.gather(*tasks))
stats['opportunities']['available'] = available
stats['opportunities']['new'] = new
stats['opportunities']['favorites'] = favorites
stats['opportunities']['group_matched'] = group_matched
loop.close()
# functions written in other file
@asyncio.coroutine
def async_get_ass(self, user, criteria=None, **kwargs):
start_time = time.time()
query = **query that gets built from some other functions**
opportunities = Opportunity.objects(query).count()
run_time = time.time() - start_time
print("runtime of available: {}".format(run_time))
yield from asyncio.sleep(2)
return opportunities
@asyncio.coroutine
def get_new_associate_opportunity_count_by_user(self, user, criteria=None, **kwargs):
start_time = time.time()
query = **query that gets built from some other functions**
opportunities = Opportunity.objects(query).count()
run_time = time.time() - start_time
print("runtime of new: {}".format(run_time))
yield from asyncio.sleep(2)
return opportunities
@asyncio.coroutine
def async_get_associate_favorites_count(self, user, criteria={}, **kwargs):
start_time = time.time()
query = **query that gets built from some other functions**
favorites = Opportunity.objects(query).count()
run_time = time.time() - start_time
print("runtime of favorites: {}".format(run_time))
yield from asyncio.sleep(2)
return favorites
@asyncio.coroutine
def get_group_matched_opportunities_count_by_user(self, user, criteria=None, **kwargs):
start_time = time.time()
query = **query that gets built from some other functions**
opportunities = Opportunity.objects(query).count()
run_time = time.time() - start_time
print("runtime of group matched: {}".format(run_time))
yield from asyncio.sleep(2)
return opportunities
yield from asyncio.sleep(2)
只是为了说明函数 运行 是异步的。这是终端上的输出:
运行组匹配时间:0.11431598663330078 运行收藏时间:0.0029871463775634766 时间戳函数运行时间:0.0004897117614746094 运行新增时间:0.15225648880004883 运行可用时间:0.13006806373596191 总 运行 时间:2403.2700061798096 据我了解,除了由于睡眠功能而添加到总 运行 时间的 2000 毫秒外,它不应超过 155-160 毫秒,因为所有功能中的最大 运行 时间是这个值。
我目前正在研究显然启用异步 mongodb 查询的 motorengine(mongoengine 0.9.0 的一个端口),但我认为我将无法使用它,因为我的模型是使用蒙古引擎。这个问题有解决方法吗?
您的查询未 运行 并行执行的原因是,每当您在协程中 运行 Opportunity.objects(query).count()
时,整个事件循环都会阻塞,因为这些方法正在执行阻塞 IO .
因此您需要一个 mongodb 可以执行 async/non-blocking IO 的驱动程序。您尝试使用 motorengine
的方法是正确的,但据我所知,它是为 Tornado 异步框架编写的。要使其与 asyncio
一起工作,您必须 连接 Tornado 和 asycnio
。请参阅 http://tornado.readthedocs.org/en/latest/asyncio.html 如何执行此操作。
另一种选择是使用 asyncio-mongo
,但它没有 mongoengine
兼容的 ORM,因此您可能不得不重写大部分代码。