如何使用 Motor 干净地关闭 Change Streams?
How to cleanly shutdown Change Streams with Motor?
TL; DR
这确实是 Motor 1.2.0 中的一个错误,由 A. Jesse Jiryu Davis 迅速修复,并且在 1.2.1 或更高版本的驱动程序中可用。
原题
我在 Python 3 上使用其新的 Change Stream 功能编写了一个程序来监视 MongoDB 集合的更改。这是 MCVE:
from asyncio import get_event_loop, CancelledError
from contextlib import suppress
from motor.motor_asyncio import AsyncIOMotorClient
async def watch(collection):
async with collection.watch([]) as stream:
async for change in stream:
print(change)
async def cleanup():
task.cancel()
with suppress(CancelledError):
await task
if __name__ == '__main__':
conn = AsyncIOMotorClient()
loop = get_event_loop()
task = loop.create_task(watch(conn.database.collection)) # Replace with a real collection.
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
loop.run_until_complete(cleanup())
loop.shutdown_asyncgens()
loop.close()
当我用 CTRL+C 终止程序时,它引发了三个不同的异常。
^Cexception calling callback for <Future at 0x102efea58 state=finished raised InvalidStateError>
Traceback (most recent call last):
File "/Users/viotti/motor/lib/python3.6/site-packages/motor/core.py", line 1259, in _next
change = self.delegate.next()
File "/Users/viotti/motor/lib/python3.6/site-packages/pymongo/change_stream.py", line 79, in next
change = self._cursor.next()
File "/Users/viotti/motor/lib/python3.6/site-packages/pymongo/command_cursor.py", line 292, in next
raise StopIteration
StopIteration
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/concurrent/futures/thread.py", line 56, in run
result = self.fn(*self.args, **self.kwargs)
File "/Users/viotti/motor/lib/python3.6/site-packages/motor/core.py", line 1264, in _next
future.set_exception(StopAsyncIteration())
asyncio.base_futures.InvalidStateError: invalid state
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/concurrent/futures/_base.py", line 324, in _invoke_callbacks
callback(self)
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/futures.py", line 414, in _call_set_state
dest_loop.call_soon_threadsafe(_set_state, destination, source)
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 620, in call_soon_threadsafe
self._check_closed()
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 357, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
有没有办法让该程序静默关闭?
我正在 macOS Sierra 上使用 Python 3.6.4、Motor 1.2 和 pymongo 3.6.0 进行测试。
我认为你的代码是正确的,问题在motor
这边。
在调查过程中我发现了两个问题:
- 如果此时您关闭循环连接尚未建立,您将收到
exception calling callback for <Future
错误,因为循环在异步回调完成之前关闭。它似乎与异步生成器或流无关,但与任何 motor
用法有关。
AgnosticChangeStream
异步迭代机制(_next function)在取消时没有考虑大小写。尝试设置例外以取消未来导致 InvalidStateError
.
此代码演示了两个问题和可能的解决方法:
import types
import asyncio
from contextlib import suppress
from motor.motor_asyncio import AsyncIOMotorClient
async def test():
while True:
await asyncio.sleep(0.1)
async def cleanup(task):
task.cancel()
with suppress(asyncio.CancelledError):
await task
def _next(self, future):
try:
if not self.delegate:
self.delegate = self._collection.delegate.watch(**self._kwargs)
change = self.delegate.next()
self._framework.call_soon(self.get_io_loop(),
future.set_result,
change)
except StopIteration:
future.set_exception(StopAsyncIteration())
except Exception as exc:
# CASE 2:
# Cancellation of async iteration (and future with it) happens immediately
# and trying to set exception to cancelled future leads to InvalidStateError,
# we should prevent it:
if future.cancelled():
return
future.set_exception(exc)
async def watch(collection):
async with collection.watch([]) as stream:
# Patch stream to achieve CASE 2:
stream._next = types.MethodType(_next, stream)
async for change in stream:
print(change)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
tmp = asyncio.ensure_future(test()) # Way to receive KeyboardInterrupt immediately.
client = AsyncIOMotorClient()
collection = client.test_database.test_collection
task = asyncio.ensure_future(watch(collection))
try:
loop.run_forever()
except KeyboardInterrupt:
print('KeyboardInterrupt')
finally:
loop.run_until_complete(cleanup(tmp))
loop.run_until_complete(cleanup(task))
# CASE 1:
# Looks like propagating KeyboardInterrupt doesn't affect motor's try
# to establish connection to db and I didn't find a way to stop this manually.
# We should keep event loop alive until we receive ServerSelectionTimeoutError
# and motor would be able to execute it's asyncio callbacks:
loop.run_until_complete(asyncio.sleep(client.server_selection_timeout))
loop.shutdown_asyncgens()
loop.close()
由于添加了修复程序,它在没有 warnings/exceptions 的情况下完成(至少在我的机器上)。
我不建议你使用上面的技巧!它只是为了演示问题的地方和可能的解决方案。我不确定它是否正确执行所有操作。
相反,我建议您 create issue at motor user group / Jira 将您的代码段和我的回答附加到那里,然后等到错误被修复。
TL; DR
这确实是 Motor 1.2.0 中的一个错误,由 A. Jesse Jiryu Davis 迅速修复,并且在 1.2.1 或更高版本的驱动程序中可用。
原题
我在 Python 3 上使用其新的 Change Stream 功能编写了一个程序来监视 MongoDB 集合的更改。这是 MCVE:
from asyncio import get_event_loop, CancelledError
from contextlib import suppress
from motor.motor_asyncio import AsyncIOMotorClient
async def watch(collection):
async with collection.watch([]) as stream:
async for change in stream:
print(change)
async def cleanup():
task.cancel()
with suppress(CancelledError):
await task
if __name__ == '__main__':
conn = AsyncIOMotorClient()
loop = get_event_loop()
task = loop.create_task(watch(conn.database.collection)) # Replace with a real collection.
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
loop.run_until_complete(cleanup())
loop.shutdown_asyncgens()
loop.close()
当我用 CTRL+C 终止程序时,它引发了三个不同的异常。
^Cexception calling callback for <Future at 0x102efea58 state=finished raised InvalidStateError>
Traceback (most recent call last):
File "/Users/viotti/motor/lib/python3.6/site-packages/motor/core.py", line 1259, in _next
change = self.delegate.next()
File "/Users/viotti/motor/lib/python3.6/site-packages/pymongo/change_stream.py", line 79, in next
change = self._cursor.next()
File "/Users/viotti/motor/lib/python3.6/site-packages/pymongo/command_cursor.py", line 292, in next
raise StopIteration
StopIteration
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/concurrent/futures/thread.py", line 56, in run
result = self.fn(*self.args, **self.kwargs)
File "/Users/viotti/motor/lib/python3.6/site-packages/motor/core.py", line 1264, in _next
future.set_exception(StopAsyncIteration())
asyncio.base_futures.InvalidStateError: invalid state
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/concurrent/futures/_base.py", line 324, in _invoke_callbacks
callback(self)
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/futures.py", line 414, in _call_set_state
dest_loop.call_soon_threadsafe(_set_state, destination, source)
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 620, in call_soon_threadsafe
self._check_closed()
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 357, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
有没有办法让该程序静默关闭?
我正在 macOS Sierra 上使用 Python 3.6.4、Motor 1.2 和 pymongo 3.6.0 进行测试。
我认为你的代码是正确的,问题在motor
这边。
在调查过程中我发现了两个问题:
- 如果此时您关闭循环连接尚未建立,您将收到
exception calling callback for <Future
错误,因为循环在异步回调完成之前关闭。它似乎与异步生成器或流无关,但与任何motor
用法有关。 AgnosticChangeStream
异步迭代机制(_next function)在取消时没有考虑大小写。尝试设置例外以取消未来导致InvalidStateError
.
此代码演示了两个问题和可能的解决方法:
import types
import asyncio
from contextlib import suppress
from motor.motor_asyncio import AsyncIOMotorClient
async def test():
while True:
await asyncio.sleep(0.1)
async def cleanup(task):
task.cancel()
with suppress(asyncio.CancelledError):
await task
def _next(self, future):
try:
if not self.delegate:
self.delegate = self._collection.delegate.watch(**self._kwargs)
change = self.delegate.next()
self._framework.call_soon(self.get_io_loop(),
future.set_result,
change)
except StopIteration:
future.set_exception(StopAsyncIteration())
except Exception as exc:
# CASE 2:
# Cancellation of async iteration (and future with it) happens immediately
# and trying to set exception to cancelled future leads to InvalidStateError,
# we should prevent it:
if future.cancelled():
return
future.set_exception(exc)
async def watch(collection):
async with collection.watch([]) as stream:
# Patch stream to achieve CASE 2:
stream._next = types.MethodType(_next, stream)
async for change in stream:
print(change)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
tmp = asyncio.ensure_future(test()) # Way to receive KeyboardInterrupt immediately.
client = AsyncIOMotorClient()
collection = client.test_database.test_collection
task = asyncio.ensure_future(watch(collection))
try:
loop.run_forever()
except KeyboardInterrupt:
print('KeyboardInterrupt')
finally:
loop.run_until_complete(cleanup(tmp))
loop.run_until_complete(cleanup(task))
# CASE 1:
# Looks like propagating KeyboardInterrupt doesn't affect motor's try
# to establish connection to db and I didn't find a way to stop this manually.
# We should keep event loop alive until we receive ServerSelectionTimeoutError
# and motor would be able to execute it's asyncio callbacks:
loop.run_until_complete(asyncio.sleep(client.server_selection_timeout))
loop.shutdown_asyncgens()
loop.close()
由于添加了修复程序,它在没有 warnings/exceptions 的情况下完成(至少在我的机器上)。
我不建议你使用上面的技巧!它只是为了演示问题的地方和可能的解决方案。我不确定它是否正确执行所有操作。
相反,我建议您 create issue at motor user group / Jira 将您的代码段和我的回答附加到那里,然后等到错误被修复。