尝试使用 `asyncio` 模块实现 2 "threads"
Trying to implement 2 "threads" using `asyncio` module
我之前在 Python 中玩过线程,但决定尝试 asyncio
模块,特别是因为你可以取消 运行 任务,这看起来像一个很好的细节。但是,出于某种原因,我无法理解它。
这是我想要实现的(抱歉,如果我使用的术语不正确):
- 一个
downloader
线程,每 x 秒下载同一个文件,检查其哈希与之前下载的哈希值,如果不同则保存它。
- 在后台运行的
webserver
线程,允许控制(暂停、列出、停止)downloader
线程。
我使用 aiohttp
作为网络服务器。
这是我目前拥有的:
class aiotest():
def __init__(self):
self._dl = None # downloader future
self._webapp = None # web server future
self.init_server()
def init_server(self):
print('Setting up web interface')
app = web.Application()
app.router.add_route('GET', '/stop', self.stop)
print('added urls')
self._webapp = app
@asyncio.coroutine
def _downloader(self):
while True:
try:
print('Downloading and verifying file...')
# Dummy sleep - to be replaced by actual code
yield from asyncio.sleep(random.randint(3,10))
# Wait a predefined nr of seconds between downloads
yield from asyncio.sleep(30)
except asyncio.CancelledError:
break
@asyncio.coroutine
def _supervisor(self):
print('Starting downloader')
self._dl = asyncio.async(self._downloader())
def start(self):
loop = asyncio.get_event_loop()
loop.run_until_complete(self._supervisor())
loop.close()
@asyncio.coroutine
def stop(self):
print('Received STOP')
self._dl.cancel()
return web.Response(body=b"Stopping... ")
此 class 调用者:
t = aiotest()
t.start()
这当然不行,我觉得这是一段可怕的代码。
我不清楚的地方:
- 我在
stop()
方法中停止了 downloader
,但是我将如何停止网络服务器(例如在 shutdown()
方法中)?
downloader
是否需要新的事件循环,或者我可以使用 asyncio.get_event_loop()
返回的循环吗?
- 我真的需要像
supervisor
这样的东西来实现我想要实现的东西吗?这看起来很笨拙。我如何让 supervisor
保持 运行 而不是像现在这样在单次执行后结束?
最后一个更普遍的问题:asyncio
是否应该取代 threading
模块(在未来)?还是每个都有自己的应用程序?
感谢所有的指点、评论和说明!
为什么当前代码不起作用:
您正在 运行 宁事件循环,直到 self._supervisor()
完成。 self._supervisor()
创建任务(立即发生)并立即完成。
您正在尝试 运行 事件循环直到 _supervisor
完成,但是您将如何以及何时启动服务器?我认为事件循环应该 运行ning 直到服务器停止。 _supervisor
或其他东西可以作为任务添加(到相同的事件循环)。 aiohttp
已经有启动服务器和事件循环的功能 - web.run_app
,但我们可以做到 manually.
您的问题:
您的服务器将 运行 直到您停止它。您可以 start/stop 不同
服务器工作时的协程。
不同协程只需要一个事件循环。
我觉得你不需要supervisor
.
更一般的问题:asyncio
帮助您 运行 不同
在单进程中的单线程中并行运行。这就是为什么
asyncio 非常酷而且速度很快。你的一些同步代码与你的线程
可以使用 asyncio 及其协程重写。此外:异步可以
interact 有线程和进程。
如果您仍然需要线程和进程,它会很有用:这里是 example.
有用的注释:
- 当我们谈论非线程的 asyncio 协程时,最好使用术语
coroutine
而不是 thread
- 如果使用Python3.5,可以使用
async
/await
syntax
而不是 coroutine
/yield from
我重写了您的代码以向您展示想法。如何查看:运行 program, see console, open http://localhost:8080/stop
, see console, open http://localhost:8080/start
, see console, type CTRL+C.
import asyncio
import random
from contextlib import suppress
from aiohttp import web
class aiotest():
def __init__(self):
self._webapp = None
self._d_task = None
self.init_server()
# SERVER:
def init_server(self):
app = web.Application()
app.router.add_route('GET', '/start', self.start)
app.router.add_route('GET', '/stop', self.stop)
app.router.add_route('GET', '/kill_server', self.kill_server)
self._webapp = app
def run_server(self):
# Create server:
loop = asyncio.get_event_loop()
handler = self._webapp.make_handler()
f = loop.create_server(handler, '0.0.0.0', 8080)
srv = loop.run_until_complete(f)
try:
# Start downloader at server start:
asyncio.async(self.start(None)) # I'm using controllers here and below to be short,
# but it's better to split controller and start func
# Start server:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
# Stop downloader when server stopped:
loop.run_until_complete(self.stop(None))
# Cleanup resources:
srv.close()
loop.run_until_complete(srv.wait_closed())
loop.run_until_complete(self._webapp.shutdown())
loop.run_until_complete(handler.finish_connections(60.0))
loop.run_until_complete(self._webapp.cleanup())
loop.close()
@asyncio.coroutine
def kill_server(self, request):
print('Server killing...')
loop = asyncio.get_event_loop()
loop.stop()
return web.Response(body=b"Server killed")
# DOWNLOADER
@asyncio.coroutine
def start(self, request):
if self._d_task is None:
print('Downloader starting...')
self._d_task = asyncio.async(self._downloader())
return web.Response(body=b"Downloader started")
else:
return web.Response(body=b"Downloader already started")
@asyncio.coroutine
def stop(self, request):
if (self._d_task is not None) and (not self._d_task.cancelled()):
print('Downloader stopping...')
self._d_task.cancel()
# cancel() just say task it should be cancelled
# to able task handle CancelledError await for it
with suppress(asyncio.CancelledError):
yield from self._d_task
self._d_task = None
return web.Response(body=b"Downloader stopped")
else:
return web.Response(body=b"Downloader already stopped or stopping")
@asyncio.coroutine
def _downloader(self):
while True:
print('Downloading and verifying file...')
# Dummy sleep - to be replaced by actual code
yield from asyncio.sleep(random.randint(1, 2))
# Wait a predefined nr of seconds between downloads
yield from asyncio.sleep(1)
if __name__ == '__main__':
t = aiotest()
t.run_server()
我之前在 Python 中玩过线程,但决定尝试 asyncio
模块,特别是因为你可以取消 运行 任务,这看起来像一个很好的细节。但是,出于某种原因,我无法理解它。
这是我想要实现的(抱歉,如果我使用的术语不正确):
- 一个
downloader
线程,每 x 秒下载同一个文件,检查其哈希与之前下载的哈希值,如果不同则保存它。 - 在后台运行的
webserver
线程,允许控制(暂停、列出、停止)downloader
线程。
我使用 aiohttp
作为网络服务器。
这是我目前拥有的:
class aiotest():
def __init__(self):
self._dl = None # downloader future
self._webapp = None # web server future
self.init_server()
def init_server(self):
print('Setting up web interface')
app = web.Application()
app.router.add_route('GET', '/stop', self.stop)
print('added urls')
self._webapp = app
@asyncio.coroutine
def _downloader(self):
while True:
try:
print('Downloading and verifying file...')
# Dummy sleep - to be replaced by actual code
yield from asyncio.sleep(random.randint(3,10))
# Wait a predefined nr of seconds between downloads
yield from asyncio.sleep(30)
except asyncio.CancelledError:
break
@asyncio.coroutine
def _supervisor(self):
print('Starting downloader')
self._dl = asyncio.async(self._downloader())
def start(self):
loop = asyncio.get_event_loop()
loop.run_until_complete(self._supervisor())
loop.close()
@asyncio.coroutine
def stop(self):
print('Received STOP')
self._dl.cancel()
return web.Response(body=b"Stopping... ")
此 class 调用者:
t = aiotest()
t.start()
这当然不行,我觉得这是一段可怕的代码。
我不清楚的地方:
- 我在
stop()
方法中停止了downloader
,但是我将如何停止网络服务器(例如在shutdown()
方法中)? downloader
是否需要新的事件循环,或者我可以使用asyncio.get_event_loop()
返回的循环吗?- 我真的需要像
supervisor
这样的东西来实现我想要实现的东西吗?这看起来很笨拙。我如何让supervisor
保持 运行 而不是像现在这样在单次执行后结束?
最后一个更普遍的问题:asyncio
是否应该取代 threading
模块(在未来)?还是每个都有自己的应用程序?
感谢所有的指点、评论和说明!
为什么当前代码不起作用:
您正在 运行 宁事件循环,直到
self._supervisor()
完成。self._supervisor()
创建任务(立即发生)并立即完成。您正在尝试 运行 事件循环直到
_supervisor
完成,但是您将如何以及何时启动服务器?我认为事件循环应该 运行ning 直到服务器停止。_supervisor
或其他东西可以作为任务添加(到相同的事件循环)。aiohttp
已经有启动服务器和事件循环的功能 -web.run_app
,但我们可以做到 manually.
您的问题:
您的服务器将 运行 直到您停止它。您可以 start/stop 不同 服务器工作时的协程。
不同协程只需要一个事件循环。
我觉得你不需要
supervisor
.更一般的问题:
asyncio
帮助您 运行 不同 在单进程中的单线程中并行运行。这就是为什么 asyncio 非常酷而且速度很快。你的一些同步代码与你的线程 可以使用 asyncio 及其协程重写。此外:异步可以 interact 有线程和进程。 如果您仍然需要线程和进程,它会很有用:这里是 example.
有用的注释:
- 当我们谈论非线程的 asyncio 协程时,最好使用术语
coroutine
而不是thread
- 如果使用Python3.5,可以使用
async
/await
syntax 而不是coroutine
/yield from
我重写了您的代码以向您展示想法。如何查看:运行 program, see console, open http://localhost:8080/stop
, see console, open http://localhost:8080/start
, see console, type CTRL+C.
import asyncio
import random
from contextlib import suppress
from aiohttp import web
class aiotest():
def __init__(self):
self._webapp = None
self._d_task = None
self.init_server()
# SERVER:
def init_server(self):
app = web.Application()
app.router.add_route('GET', '/start', self.start)
app.router.add_route('GET', '/stop', self.stop)
app.router.add_route('GET', '/kill_server', self.kill_server)
self._webapp = app
def run_server(self):
# Create server:
loop = asyncio.get_event_loop()
handler = self._webapp.make_handler()
f = loop.create_server(handler, '0.0.0.0', 8080)
srv = loop.run_until_complete(f)
try:
# Start downloader at server start:
asyncio.async(self.start(None)) # I'm using controllers here and below to be short,
# but it's better to split controller and start func
# Start server:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
# Stop downloader when server stopped:
loop.run_until_complete(self.stop(None))
# Cleanup resources:
srv.close()
loop.run_until_complete(srv.wait_closed())
loop.run_until_complete(self._webapp.shutdown())
loop.run_until_complete(handler.finish_connections(60.0))
loop.run_until_complete(self._webapp.cleanup())
loop.close()
@asyncio.coroutine
def kill_server(self, request):
print('Server killing...')
loop = asyncio.get_event_loop()
loop.stop()
return web.Response(body=b"Server killed")
# DOWNLOADER
@asyncio.coroutine
def start(self, request):
if self._d_task is None:
print('Downloader starting...')
self._d_task = asyncio.async(self._downloader())
return web.Response(body=b"Downloader started")
else:
return web.Response(body=b"Downloader already started")
@asyncio.coroutine
def stop(self, request):
if (self._d_task is not None) and (not self._d_task.cancelled()):
print('Downloader stopping...')
self._d_task.cancel()
# cancel() just say task it should be cancelled
# to able task handle CancelledError await for it
with suppress(asyncio.CancelledError):
yield from self._d_task
self._d_task = None
return web.Response(body=b"Downloader stopped")
else:
return web.Response(body=b"Downloader already stopped or stopping")
@asyncio.coroutine
def _downloader(self):
while True:
print('Downloading and verifying file...')
# Dummy sleep - to be replaced by actual code
yield from asyncio.sleep(random.randint(1, 2))
# Wait a predefined nr of seconds between downloads
yield from asyncio.sleep(1)
if __name__ == '__main__':
t = aiotest()
t.run_server()