在块中持久化和获取数据
Persist and fetch data in with block
我有一种情况 - 我正在使用 asyncio
包和 Python 3.x,并将数据保存在 with
块中,如下所示:
test_repo = TestRepository()
with (yield from test_repo):
res = yield from test_repo.get_by_lim_off(
page_size=int(length),
offset=start,
customer_name=customer_name,
customer_phone=customer_phone,
return_type=return_type
)
我需要在 with
块中获取 res
数据,但是当我从 with
块退出时应该发生持久性和获取数据。我怎样才能做到这一点?
此行为仅在 Python 3.5+ 中受支持,通过异步上下文管理器 (__aenter__
/__aexit__
) 和 async with
,两者均已添加到PEP 492:
class TestRepository:
# All your normal methods go here
async def __aenter__(self):
# You can call coroutines here
await self.some_init()
async def __aexit__(self, exc_type, exc, tb):
# You can call coroutines here
await self.do_persistence()
await self.fetch_data()
async def do_work():
test_repo = TestRepository()
async with test_repo:
res = await test_repo.get_by_lim_off(
page_size=int(length),
offset=start,
customer_name=customer_name,
customer_phone=customer_phone,
return_type=return_type
)
asyncio.get_event_loop().run_until_complete(do_work())
在 3.5 之前,您必须使用 try
/finally
块显式调用 init/cleanup 协程,不幸的是:
@asyncio.coroutine
def do_work():
test_repo = TestRepository()
yield from test_repo.some_init()
try:
res = yield from test_repo.get_by_lim_off(
page_size=int(length),
offset=start,
customer_name=customer_name,
customer_phone=customer_phone,
return_type=return_type
)
finally:
yield from test_repo.do_persistence()
yield from test_repo.fetch_data()
我有一种情况 - 我正在使用 asyncio
包和 Python 3.x,并将数据保存在 with
块中,如下所示:
test_repo = TestRepository()
with (yield from test_repo):
res = yield from test_repo.get_by_lim_off(
page_size=int(length),
offset=start,
customer_name=customer_name,
customer_phone=customer_phone,
return_type=return_type
)
我需要在 with
块中获取 res
数据,但是当我从 with
块退出时应该发生持久性和获取数据。我怎样才能做到这一点?
此行为仅在 Python 3.5+ 中受支持,通过异步上下文管理器 (__aenter__
/__aexit__
) 和 async with
,两者均已添加到PEP 492:
class TestRepository:
# All your normal methods go here
async def __aenter__(self):
# You can call coroutines here
await self.some_init()
async def __aexit__(self, exc_type, exc, tb):
# You can call coroutines here
await self.do_persistence()
await self.fetch_data()
async def do_work():
test_repo = TestRepository()
async with test_repo:
res = await test_repo.get_by_lim_off(
page_size=int(length),
offset=start,
customer_name=customer_name,
customer_phone=customer_phone,
return_type=return_type
)
asyncio.get_event_loop().run_until_complete(do_work())
在 3.5 之前,您必须使用 try
/finally
块显式调用 init/cleanup 协程,不幸的是:
@asyncio.coroutine
def do_work():
test_repo = TestRepository()
yield from test_repo.some_init()
try:
res = yield from test_repo.get_by_lim_off(
page_size=int(length),
offset=start,
customer_name=customer_name,
customer_phone=customer_phone,
return_type=return_type
)
finally:
yield from test_repo.do_persistence()
yield from test_repo.fetch_data()