Python:通过 select.epoll() 检索多个 URL

Python: retrieve several URLs via select.epoll()

我有一个面向事件的服务器,它已经在使用 select.epoll()

现在应该解决一个新要求:应该获取 URL(异步)。

到目前为止,我一直使用请求库,而且我总是同步使用它,从不使用异步。

如何将请求库(或不同的 urllib)与 linux epoll 结合使用?

requests library docs 对此有注释,但只提到了异步框架(没有 select.epoll()):http://docs.python-requests.org/en/master/user/advanced/#blocking-or-non-blocking

我没有和 select.epoll() 结婚。它工作到现在。如果可行,我可以使用不同的解决方案。

背景:更大的问题是“我应该使用 select.epoll() 还是 python 拥有的众多异步框架之一?”。但是 Whosebug 上的问题一定不能太宽泛。这就是为什么这个问题的重点是“通过 select.epoll() 检索多个 URL”。如果您对更大的问题有提示,请发表评论。

如果你很好奇,我业余时间开发的一个小项目需要这个问题:https://github.com/guettli/ipo(IPO是一个基于PostgreSQL的开源异步作业队列。)

如何将请求库(或不同的 urllib)与 linux epoll 结合使用?

不幸的是,除非在构建这样的库时考虑到了这种集成,否则你不能这样做。 epoll,以及select/poll/kqueue等是I/O多路复用系统调用,整体程序架构需要围绕它构建。

简单来说,一个典型的程序结构可以归结为如下

  • 需要一堆文件描述符(在你的情况下是非阻塞模式的套接字)
  • 一个系统调用(man epoll_waitepoll 的情况下)阻塞直到一个或多个指定的事件发生描述符
  • 返回可用于I/O的描述符信息

之后,这是外部代码处理这些描述符的工作,即计算出有多少数据可用,调用一些回调等。

如果库使用常规 blocking 套接字,并行化它的唯一方法是使用 threads/processes 这是一个很好的主题 article,示例使用 C,这很好,因为它更容易理解引擎盖下实际发生的事情

异步框架和请求库

让我们看看有什么建议here

If you are concerned about the use of blocking IO, there are lots of projects out there that combine Requests with one of Python's asynchronicity frameworks. Some excellent examples are requests-threads, grequests, and requests-futures).

请求线程 - 使用线程

grequests - 与 gevent 集成(这是一个不同的故事,见下文)

requests-futures - 事实上也是threads/processes

它们都与真正的异步性无关

我应该使用 select.epoll() 还是 python 具有的众多异步框架之一

请注意,epolllinux-特定的 野兽,它不会工作,即在 OS X 具有称为 kqueue 的不同机制。由于您似乎正在编写通用作业队列,因此它似乎不是一个好的解决方案。

现在回到 python。您有以下选项:

threads/processes/concurrent.futures - 这不太可能是您的目标,因为您的应用是典型的 C10K 服务器

epoll/kqueue - 你必须自己做所有事情。在获取 HTTP url 的情况下,您不仅需要处理 http/ssl,还需要处理异步 DNS 解析。还可以考虑使用 asyncore[] 提供一些基本基础设施

twisted/tornado - 基于回调的框架已经为您完成了所有底层工作

gevent - 如果您要重用现有的阻塞库(urllib、请求等)并同时使用 python [=125,这可能是您喜欢的东西=] 和 python 3.x。但这个解决方案是设计上的黑客攻击。对于像你这样大小的应用程序,它可能没问题,但我不会将它用于任何更大的应用程序,它应该是坚如磐石的,并且 运行 in prod

asyncio

This module provides infrastructure for writing single-threaded concurrent code using coroutines, multiplexing I/O access over sockets and other resources, running network clients and servers, and other related primitives

它拥有您可能需要的一切。 还有一些库使用流行的 RDBM 和 http https://github.com/aio-libs

但是缺少python2.x的支持。有 ports 的 asyncio 到 python 2.x 但不确定它们有多稳定

终于

所以如果我可以牺牲 python 2.x 我个人会选择 asyncio 和相关库

如果您确实确实需要 python 2.x,请根据所需的稳定性和假设的峰值负载使用上述方法之一

在进行高性能开发时,我们总是根据我们的选择武器situation.So它仍然太宽泛无法回答。

但你的大问题更简单one.only IO 绑定程序适合异步。

epoll和asynchronous的目的是什么?避免CPU等待IO和做nothing.CPU等待IO块,IO块因为没有数据读取或没有space写。

Buffer的引入是为了减少系统call.When你在stream上调用read,实际上是从buffer中读取。(概念,不是很准确)

Select 或 epoll 是非阻塞的忙轮询(epoll 通过中断底层实现)。它基本上类似于下面的东西

while true {
  for i in stream[]{
    if i has data
          read until unavailable
    }
}

太傻了,所以才有了select和epoll。 每次你从buffer读取,都有数据等着你,是高速IO,那么epoll/select是你最好的choice.And当buffer总是空的时候,它是一个慢流,IO-bound,async是很适合这种情况。

我不太了解异步,对我来说它只是内部的软中断和大量回调。

上面的要点是正确的,从技术上讲,您不能使用针对多路复用 I/O 的阻塞调用来执行此操作,例如 select()epoll() 和 BSD/iOS , Windows 个变体。这些调用允许超时规范,因此您可以通过在短时间间隔内重复轮询来接近,然后将工作传递给主线程之外的异步处理程序。在那种情况下,读取是在主线程上完成的,多次读取可以表示它们已准备就绪,并且主线程主要用于该任务。

如果您的问题规模从小到中等,那么没有什么比 epoll()...read() 甚至 select()...read() 更好的了。如果您的问题(读取通道数)偏小。所以我鼓励你考虑一下——从主线程中获得尽可能多的工作,这些工作可以专门用于请求。

如果您正在寻找异步解决方案,最好的选择之一是 grequests 库,它既易于使用又具有性能。要获得一个想法,运行 以下客户端-服务器对。请注意,tornado 的使用在这里无关紧要,仅在服务器端,而您关心的是客户端。

试试这个 - 性能差异是白天和黑夜。

下面的client.pyclass为您提供了一个解决方案;它使用 grequests 异步发出 get() 请求。

server.py

from tornado import (httpserver, options,
                     ioloop, web, gen)
import time

import ujson as json
from collections import defaultdict

class Check(web.RequestHandler):

    @gen.coroutine
    def get(self):
        try:
            data = int(self.get_argument('data'))
        except ValueError:
            raise web.HTTPError(400, reason='Invalid value for data')

        delay = 100
        start = time.time()
        print('Processed: {!r}'.format(data))

       yield gen.Task(ioloop.IOLoop.instance().add_timeout, start + delay / 1000.)

        self.write('.')
        end = time.time()
        self.finish()


if __name__ == '__main__':
    port = 4545

    application = web.Application([
        (r'/get', Check)
        ])

    http_server = httpserver.HTTPServer(application)
    http_server.listen(port)
    print('Listening on port: {}'.format(port))
    ioloop.IOLoop.instance().start()

client.py

import grequests
from tornado.httpclient import HTTPClient
import time

def call_serial(num, httpclient):
    url = 'http://127.0.0.1:4545/get?data={}'.format(num)
    response = httpclient.fetch(url)
    print('Added: {!r}'.format(num))

def call_async(mapper):
    futures = (grequests.get(url) for url,_ in mapper)
    responses = grequests.map(futures)
    for response, (url,num) in zip(responses, mapper):
        print('Added: {!r}'.format(num))

def check(num):
    if num % 2 == 0:
        return False
    return True

def serial_calls(httpclient, up_to):
    for num in range(up_to):
        if check(num):
            call_serial(num, httpclient)

def async_calls(httpclient, up_to):
    mapper = []

    for num in range(up_to):
        if check(num):
            url = 'http://127.0.0.1:4545/get?data={}'.format(num)    
            mapper.append((url,num))

    call_async(mapper)


if __name__ == '__main__':

    httpclient = HTTPClient()

    print('SERIAL CALLS')
    serial_calls(httpclient, 100)

    print('ASYNC CALLS')
    async_calls(httpclient, 100)
    httpclient.close()

这是一个真正的异步解决方案,或者说是最接近 CPython/python 的解决方案。没有使用轮询器。