运行 异步事件循环中的协程和线程:退出时出错
Running coroutines and threads in asyncio event loop: Errors while exiting
我设计了一段 python 代码,它在更大的方案中基本上作为微服务工作。
我在循环中安排了两个任务,在执行程序中设置了两个任务运行。
奇怪的是代码运行很好。做我期望的一切。但是当我用 KeyboardInterrupt (Ctrl+C) 结束它时,我看到了错误和异常。这让我觉得我肯定在滥用异步模式。我将尝试提供代码的简要概述,而不会立即进入冗长的细节:
class Prototype:
def _redis_subscriber(self):
self._p = self._redis_itx.pubsub(ignore_subscribe_messages=True)
self._p.subscribe("channel1")
while True:
pubbed_msg = self._p.get_message()
if pubbed_msg is not None:
#process process process
time.sleep(0.01)
def _generic_worker_on_internal_q(self):
while True:
item = self.q.get() #blocking call
#process item
async def task1(self):
#network I/O bound code
async def task2(self):
#network I/O bound code. also fills with self.q.put()
def run(self):
asyncio.ensure_future(self.task1(), loop=self._event_loop)
asyncio.ensure_future(self.task2(), loop=self._event_loop)
asyncio.ensure_future(self._event_loop.run_in_executor(None, self._redis_subscriber))
asyncio.ensure_future(self._event_loop.run_in_executor(None, self._generic_worker_on_internal_q))
self._event_loop.run_forever()
if __name__ == '__main__':
p = Prototype()
p.run()
此外,我尝试在 Protoype.run() 方法中尝试另一种方法:
def __init__(self):
self._tasks = []
def run(self):
self._tasks.append(asyncio.ensure_future(self._task1()))
self._tasks.append(asyncio.ensure_future(self._task2()))
self._tasks.append(asyncio.ensure_future(self._event_loop.run_in_executor(None, self._redis_subscriber)))
self._tasks.append(asyncio.ensure_future(self._event_loop.run_in_executor(None, self._generic_worker_on_internal_q)))
self._event_loop.run_until_complete(self._tasks)
无论如何,当我尝试使用 Ctrl+C 结束 运行ning 脚本时,它不会退出在第一次尝试。我必须按两次。这就是出现的结果:
KeyboardInterrupt
^CError in atexit._run_exitfuncs:
Traceback (most recent call last):
File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/concurrent/futures/thread.py", line 40, in _python_exit
t.join()
File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 1056, in join
self._wait_for_tstate_lock()
File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 1072, in _wait_for_tstate_lock
elif lock.acquire(block, timeout):
KeyboardInterrupt
Exception ignored in: <bound method BaseEventLoop.call_exception_handler of <_UnixSelectorEventLoop running=False closed=False debug=False>>
Traceback (most recent call last):
File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 1296, in call_exception_handler
File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/logging/__init__.py", line 1335, in error
File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/logging/__init__.py", line 1442, in _log
File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/logging/__init__.py", line 1452, in handle
File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/logging/__init__.py", line 1514, in callHandlers
File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/logging/__init__.py", line 863, in handle
File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/logging/__init__.py", line 1069, in emit
File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/logging/__init__.py", line 1059, in _open
NameError: name 'open' is not defined
我哪里错了?
您在执行程序中安排了两个无限任务。这些任务正在阻止退出。
默认的任务执行器将它们放在一个由队列管理的线程中,当退出时,队列会收到停止执行任务的信号。但是,如果您的任务从不 returns,队列管理器永远无法检查此状态。
您可以通过不 运行 无限循环来避免这种状态。相反,每次到达终点时,重新安排您的任务,并且不要阻止接收消息:
def _redis_subscriber(self):
self._p = self._redis_itx.pubsub(ignore_subscribe_messages=True)
self._p.subscribe("channel1")
def process_message():
# non-blocking task to repeatedly run in the executor
pubbed_msg = self._p.get_message(False)
if pubbed_msg is not None:
# process process process
time.sleep(0.01)
# reschedule function for next message
asyncio.ensure_future(self._event_loop.run_in_executor(None, process_message))
# kick of first handler
process_message()
你还在执行器中运行把这个函数踢掉:
def run(self):
# ...
asyncio.ensure_future(self._event_loop.run_in_executor(None, self._redis_subscriber))
对 _generic_worker_on_internal_q()
执行相同操作,并确保避免使用对 Queue.get()
的阻塞调用,因此请使用 self.q.get(False)
.
您甚至可以为此使用装饰器:
import asyncio
from functools import partial, wraps
def auto_reschedule(loop=None, executor=None):
"""Repeatedly re-schedule function in the given executor"""
def decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
result = f(*args, **kwargs)
callable = wrapper
if args or kwargs:
callable = partial(callable, *args, **kwargs)
current_loop = loop
if current_loop is None:
current_loop = asyncio.get_event_loop()
current_loop.run_in_executor(executor, callable)
return result
return wrapper
return decorator
并在您的内部函数上使用此装饰器,您可以在其中访问引用循环的实例属性:
def _redis_subscriber(self):
self._p = self._redis_itx.pubsub(ignore_subscribe_messages=True)
self._p.subscribe("channel1")
@auto_reschedule(self._event_loop)
def process_message():
# non-blocking task to repeatedly run in the executor
pubbed_msg = self._p.get_message(False)
if pubbed_msg is not None:
# process process process
time.sleep(0.01)
# kick of first handler
process_message()
后者的快速演示:
import asyncio
import time
import random
# auto_reschedule imported or defined
def create_thread_task(i, loop):
@auto_reschedule(loop)
def thread_task():
print(f'Task #{i} running in worker')
time.sleep(random.uniform(1, 3))
return thread_task
def main():
loop = asyncio.get_event_loop()
for i in range(5):
asyncio.ensure_future(
loop.run_in_executor(None, create_thread_task(i, loop)))
loop.run_forever()
if __name__ == '__main__':
main()
我设计了一段 python 代码,它在更大的方案中基本上作为微服务工作。
我在循环中安排了两个任务,在执行程序中设置了两个任务运行。
奇怪的是代码运行很好。做我期望的一切。但是当我用 KeyboardInterrupt (Ctrl+C) 结束它时,我看到了错误和异常。这让我觉得我肯定在滥用异步模式。我将尝试提供代码的简要概述,而不会立即进入冗长的细节:
class Prototype:
def _redis_subscriber(self):
self._p = self._redis_itx.pubsub(ignore_subscribe_messages=True)
self._p.subscribe("channel1")
while True:
pubbed_msg = self._p.get_message()
if pubbed_msg is not None:
#process process process
time.sleep(0.01)
def _generic_worker_on_internal_q(self):
while True:
item = self.q.get() #blocking call
#process item
async def task1(self):
#network I/O bound code
async def task2(self):
#network I/O bound code. also fills with self.q.put()
def run(self):
asyncio.ensure_future(self.task1(), loop=self._event_loop)
asyncio.ensure_future(self.task2(), loop=self._event_loop)
asyncio.ensure_future(self._event_loop.run_in_executor(None, self._redis_subscriber))
asyncio.ensure_future(self._event_loop.run_in_executor(None, self._generic_worker_on_internal_q))
self._event_loop.run_forever()
if __name__ == '__main__':
p = Prototype()
p.run()
此外,我尝试在 Protoype.run() 方法中尝试另一种方法:
def __init__(self):
self._tasks = []
def run(self):
self._tasks.append(asyncio.ensure_future(self._task1()))
self._tasks.append(asyncio.ensure_future(self._task2()))
self._tasks.append(asyncio.ensure_future(self._event_loop.run_in_executor(None, self._redis_subscriber)))
self._tasks.append(asyncio.ensure_future(self._event_loop.run_in_executor(None, self._generic_worker_on_internal_q)))
self._event_loop.run_until_complete(self._tasks)
无论如何,当我尝试使用 Ctrl+C 结束 运行ning 脚本时,它不会退出在第一次尝试。我必须按两次。这就是出现的结果:
KeyboardInterrupt
^CError in atexit._run_exitfuncs:
Traceback (most recent call last):
File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/concurrent/futures/thread.py", line 40, in _python_exit
t.join()
File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 1056, in join
self._wait_for_tstate_lock()
File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 1072, in _wait_for_tstate_lock
elif lock.acquire(block, timeout):
KeyboardInterrupt
Exception ignored in: <bound method BaseEventLoop.call_exception_handler of <_UnixSelectorEventLoop running=False closed=False debug=False>>
Traceback (most recent call last):
File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 1296, in call_exception_handler
File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/logging/__init__.py", line 1335, in error
File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/logging/__init__.py", line 1442, in _log
File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/logging/__init__.py", line 1452, in handle
File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/logging/__init__.py", line 1514, in callHandlers
File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/logging/__init__.py", line 863, in handle
File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/logging/__init__.py", line 1069, in emit
File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/logging/__init__.py", line 1059, in _open
NameError: name 'open' is not defined
我哪里错了?
您在执行程序中安排了两个无限任务。这些任务正在阻止退出。
默认的任务执行器将它们放在一个由队列管理的线程中,当退出时,队列会收到停止执行任务的信号。但是,如果您的任务从不 returns,队列管理器永远无法检查此状态。
您可以通过不 运行 无限循环来避免这种状态。相反,每次到达终点时,重新安排您的任务,并且不要阻止接收消息:
def _redis_subscriber(self):
self._p = self._redis_itx.pubsub(ignore_subscribe_messages=True)
self._p.subscribe("channel1")
def process_message():
# non-blocking task to repeatedly run in the executor
pubbed_msg = self._p.get_message(False)
if pubbed_msg is not None:
# process process process
time.sleep(0.01)
# reschedule function for next message
asyncio.ensure_future(self._event_loop.run_in_executor(None, process_message))
# kick of first handler
process_message()
你还在执行器中运行把这个函数踢掉:
def run(self):
# ...
asyncio.ensure_future(self._event_loop.run_in_executor(None, self._redis_subscriber))
对 _generic_worker_on_internal_q()
执行相同操作,并确保避免使用对 Queue.get()
的阻塞调用,因此请使用 self.q.get(False)
.
您甚至可以为此使用装饰器:
import asyncio
from functools import partial, wraps
def auto_reschedule(loop=None, executor=None):
"""Repeatedly re-schedule function in the given executor"""
def decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
result = f(*args, **kwargs)
callable = wrapper
if args or kwargs:
callable = partial(callable, *args, **kwargs)
current_loop = loop
if current_loop is None:
current_loop = asyncio.get_event_loop()
current_loop.run_in_executor(executor, callable)
return result
return wrapper
return decorator
并在您的内部函数上使用此装饰器,您可以在其中访问引用循环的实例属性:
def _redis_subscriber(self):
self._p = self._redis_itx.pubsub(ignore_subscribe_messages=True)
self._p.subscribe("channel1")
@auto_reschedule(self._event_loop)
def process_message():
# non-blocking task to repeatedly run in the executor
pubbed_msg = self._p.get_message(False)
if pubbed_msg is not None:
# process process process
time.sleep(0.01)
# kick of first handler
process_message()
后者的快速演示:
import asyncio
import time
import random
# auto_reschedule imported or defined
def create_thread_task(i, loop):
@auto_reschedule(loop)
def thread_task():
print(f'Task #{i} running in worker')
time.sleep(random.uniform(1, 3))
return thread_task
def main():
loop = asyncio.get_event_loop()
for i in range(5):
asyncio.ensure_future(
loop.run_in_executor(None, create_thread_task(i, loop)))
loop.run_forever()
if __name__ == '__main__':
main()