尝试使用 `asyncio` 模块实现 2 "threads"

Trying to implement 2 "threads" using `asyncio` module

我之前在 Python 中玩过线程,但决定尝试 asyncio 模块,特别是因为你可以取消 运行 任务,这看起来像一个很好的细节。但是,出于某种原因,我无法理解它。

这是我想要实现的(抱歉,如果我使用的术语不正确):

我使用 aiohttp 作为网络服务器。

这是我目前拥有的:

class aiotest():

    def __init__(self):
        self._dl = None     # downloader future
        self._webapp = None # web server future
        self.init_server()

    def init_server(self):

        print('Setting up web interface')
        app = web.Application()
        app.router.add_route('GET', '/stop', self.stop)
        print('added urls')
        self._webapp = app

    @asyncio.coroutine
    def _downloader(self):
        while True:
            try:
                print('Downloading and verifying file...')
                # Dummy sleep - to be replaced by actual code
                yield from asyncio.sleep(random.randint(3,10))
                # Wait a predefined nr of seconds between downloads
                yield from asyncio.sleep(30)
            except asyncio.CancelledError:
                break

    @asyncio.coroutine
    def _supervisor(self):

        print('Starting downloader')
        self._dl = asyncio.async(self._downloader())

    def start(self):
        loop = asyncio.get_event_loop()
        loop.run_until_complete(self._supervisor())
        loop.close()

    @asyncio.coroutine
    def stop(self):
        print('Received STOP')
        self._dl.cancel()
        return web.Response(body=b"Stopping... ")

此 class 调用者:

t = aiotest()
t.start()

这当然不行,我觉得这是一段可怕的代码。

我不清楚的地方:

最后一个更普遍的问题:asyncio 是否应该取代 threading 模块(在未来)?还是每个都有自己的应用程序?

感谢所有的指点、评论和说明!

为什么当前代码不起作用:

  • 您正在 运行 宁事件循环,直到 self._supervisor() 完成。 self._supervisor() 创建任务(立即发生)并立即完成。

  • 您正在尝试 运行 事件循环直到 _supervisor 完成,但是您将如何以及何时启动服务器?我认为事件循环应该 运行ning 直到服务器停止。 _supervisor 或其他东西可以作为任务添加(到相同的事件循环)。 aiohttp 已经有启动服务器和事件循环的功能 - web.run_app,但我们可以做到 manually.

您的问题:

  1. 您的服务器将 运行 直到您停止它。您可以 start/stop 不同 服务器工作时的协程。

  2. 不同协程只需要一个事件循环。

  3. 我觉得你不需要supervisor.

  4. 更一般的问题:asyncio 帮助您 运行 不同 在单进程中的单线程中并行运行。这就是为什么 asyncio 非常酷而且速度很快。你的一些同步代码与你的线程 可以使用 asyncio 及其协程重写。此外:异步可以 interact 有线程和进程。 如果您仍然需要线程和进程,它会很有用:这里是 example.

有用的注释:

  • 当我们谈论非线程的 asyncio 协程时,最好使用术语 coroutine 而不是 thread
  • 如果使用Python3.5,可以使用async/awaitsyntax 而不是 coroutine/yield from

我重写了您的代码以向您展示想法。如何查看:运行 program, see console, open http://localhost:8080/stop, see console, open http://localhost:8080/start, see console, type CTRL+C.

import asyncio
import random
from contextlib import suppress

from aiohttp import web


class aiotest():
    def __init__(self):
        self._webapp = None
        self._d_task = None
        self.init_server()

    # SERVER:
    def init_server(self):
        app = web.Application()
        app.router.add_route('GET', '/start', self.start)
        app.router.add_route('GET', '/stop', self.stop)
        app.router.add_route('GET', '/kill_server', self.kill_server)
        self._webapp = app

    def run_server(self):
        # Create server:
        loop = asyncio.get_event_loop()
        handler = self._webapp.make_handler()
        f = loop.create_server(handler, '0.0.0.0', 8080)
        srv = loop.run_until_complete(f)
        try:
            # Start downloader at server start:
            asyncio.async(self.start(None))  # I'm using controllers here and below to be short,
                                             # but it's better to split controller and start func
            # Start server:
            loop.run_forever()
        except KeyboardInterrupt:
            pass
        finally:
            # Stop downloader when server stopped:
            loop.run_until_complete(self.stop(None))
            # Cleanup resources:
            srv.close()
            loop.run_until_complete(srv.wait_closed())
            loop.run_until_complete(self._webapp.shutdown())
            loop.run_until_complete(handler.finish_connections(60.0))
            loop.run_until_complete(self._webapp.cleanup())
        loop.close()

    @asyncio.coroutine
    def kill_server(self, request):
        print('Server killing...')
        loop = asyncio.get_event_loop()
        loop.stop()
        return web.Response(body=b"Server killed")

    # DOWNLOADER
    @asyncio.coroutine
    def start(self, request):
        if self._d_task is None:
            print('Downloader starting...')
            self._d_task = asyncio.async(self._downloader())
            return web.Response(body=b"Downloader started")
        else:
            return web.Response(body=b"Downloader already started")

    @asyncio.coroutine
    def stop(self, request):
        if (self._d_task is not None) and (not self._d_task.cancelled()):
            print('Downloader stopping...')
            self._d_task.cancel()            
            # cancel() just say task it should be cancelled
            # to able task handle CancelledError await for it
            with suppress(asyncio.CancelledError):
                yield from self._d_task
            self._d_task = None
            return web.Response(body=b"Downloader stopped")
        else:
            return web.Response(body=b"Downloader already stopped or stopping")

    @asyncio.coroutine
    def _downloader(self):
        while True:
            print('Downloading and verifying file...')
            # Dummy sleep - to be replaced by actual code
            yield from asyncio.sleep(random.randint(1, 2))
            # Wait a predefined nr of seconds between downloads
            yield from asyncio.sleep(1)


if __name__ == '__main__':
    t = aiotest()
    t.run_server()