多个进程之间的速率限制下载

Rate Limit Downloads Amongst Multiple Processes

我想从网站下载和处理大量文件。该网站的服务条款限制了您每秒允许下载的文件数量。

处理文件所花费的时间实际上是瓶颈,所以我希望能够并行处理多个文件。但我不希望不同的进程结合起来违反下载限制。所以我需要一些东西来限制过度请求率。我在想类似下面的事情,但我并不是 multiprocessing 模块的专家。

import multiprocessing
from multiprocessing.managers import BaseManager
import time

class DownloadLimiter(object):

    def __init__(self, time):
        self.time = time
        self.lock = multiprocessing.Lock()

    def get(self, url):
        self.lock.acquire()
        time.sleep(self.time)
        self.lock.release()
        return url


class DownloadManager(BaseManager):
    pass

DownloadManager.register('downloader', DownloadLimiter)


class Worker(multiprocessing.Process):

    def __init__(self, downloader, queue, file_name):
        super().__init__()
        self.downloader = downloader
        self.file_name = file_name
        self.queue = queue

    def run(self):
        while not self.queue.empty():
            url = self.queue.get()
            content = self.downloader.get(url)
            with open(self.file_name, "a+") as fh:
                fh.write(str(content) + "\n")

然后在其他地方运行 下载

manager = DownloadManager()
manager.start()
downloader = manager.downloader(0.5)
queue = multiprocessing.Queue()

urls = range(50)
for url in urls:
    queue.put(url)

job1 = Worker(downloader, queue, r"foo.txt")
job2 = Worker(downloader, queue, r"bar.txt")
jobs = [job1, job2]

for job in jobs:
    job.start()

for job in jobs:
    job.join()

这似乎是小规模的工作,但我有点担心锁定是否真的正确完成。

此外,如果有更好的模式来实现相同的目标,我很想听听。

这可以用 Ray 干净地完成,它是一个用于并行和分布式的库 Python。

Ray 中的资源

当您启动 Ray 时,您可以告诉它那台机器上有哪些 资源 可用。 Ray 将自动尝试确定 CPU 核心的数量和 GPU 的数量,但这些可以指定,实际上可以传入任意 user-defined resources嗯,例如,通过调用

ray.init(num_cpus=4, resources={'Network': 2})

这告诉 Ray 该机器有 4 个 CPU 核心和 2 个名为 Network 的用户定义资源。

作为可调度工作单元的每个 Ray "task" 都有一定的资源需求。默认情况下,一项任务将需要 1 CPU 个核心,仅此而已。但是,可以通过用

声明相应的函数来指定任意资源需求
@ray.remote(resources={'Network': 1})
def f():
    pass

这告诉 Ray,为了让 f 在 "worker" 进程上执行,必须有 1 个 CPU 核心(默认值)和 1 个 Network资源可用。

由于机器有2个Network资源和4个CPU核心,最多可以同时执行2个f副本。另一方面,如果有另一个函数 g 声明为

@ray.remote
def g():
    pass

则可以并发执行4份g或同时执行2份f和2份g

例子

这是一个示例,其中包含用于下载内容和处理内容的实际函数的占位符。

import ray
import time

max_concurrent_downloads = 2

ray.init(num_cpus=4, resources={'Network': max_concurrent_downloads})

@ray.remote(resources={'Network': 1})
def download_content(url):
    # Download the file.
    time.sleep(1)
    return 'result from ' + url

@ray.remote
def process_result(result):
    # Process the result.
    time.sleep(1)
    return 'processed ' + result

urls = ['url1', 'url2', 'url3', 'url4']

result_ids = [download_content.remote(url) for url in urls]

processed_ids = [process_result.remote(result_id) for result_id in result_ids]

# Wait until the tasks have finished and retrieve the results.
processed_results = ray.get(processed_ids)

这是一个时间轴描述(您可以通过 运行 ray timeline 从命令行生成并在 chrome://tracing in 中打开生成的 JSON 文件Chrome 网络浏览器)。

在上面的脚本中,我们提交了 4 个 download_content 任务。这些是我们通过指定它们需要 Network 资源(除了默认的 1 CPU 资源)来限制速率的那些。然后我们提交 4 process_result 个任务,每个任务都需要默认的 1 CPU 资源。任务分三个阶段执行(只需查看 blue 框)。

  1. 我们从执行 2 个 download_content 任务开始,这是一次可以执行的任务(由于速率限制)。我们还不能执行任何 process_result 任务,因为它们依赖于 download_content 任务的输出。
  2. 这些都完成了,所以我们开始执行剩余的两个 download_content 任务以及两个 process_result 任务,因为我们没有对 process_result 任务进行速率限制。
  3. 我们执行剩余的 process_result 个任务。

每个"row"是一个工作进程。时间从左到右。

您可以在 Ray documentation.

查看更多关于如何执行此操作的信息

有一个库完全满足您的需求,名为 ratelimit

来自他们主页的示例:

此函数在 15 分钟内不能进行超过 15 API 次调用。

from ratelimit import limits

import requests

FIFTEEN_MINUTES = 900

@limits(calls=15, period=FIFTEEN_MINUTES)
def call_api(url):
    response = requests.get(url)

    if response.status_code != 200:
        raise Exception('API response: {}'.format(response.status_code))
    return response

顺便说一下,在 I/O 密集型任务(例如网络爬虫)中,您可以使用多线程,而不是多处理。在使用多处理时,您必须创建另一个进程进行控制,并协调您所做的所有事情。在多线程方法的情况下,所有线程本质上都可以访问主进程内存,因此信号变得更加容易(因为 e 在线程之间共享):

import logging
import threading
import time

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-10s) %(message)s',
                    )

def wait_for_event(e):
    """Wait for the event to be set before doing anything"""
    logging.debug('wait_for_event starting')
    event_is_set = e.wait()
    logging.debug('event set: %s', event_is_set)

def wait_for_event_timeout(e, t):
    """Wait t seconds and then timeout"""
    while not e.isSet():
        logging.debug('wait_for_event_timeout starting')
        event_is_set = e.wait(t)
        logging.debug('event set: %s', event_is_set)
        if event_is_set:
            logging.debug('processing event')
        else:
            logging.debug('doing other work')


e = threading.Event()
t1 = threading.Thread(name='block', 
                      target=wait_for_event,
                      args=(e,))
t1.start()

t2 = threading.Thread(name='non-block', 
                      target=wait_for_event_timeout, 
                      args=(e, 2))
t2.start()

logging.debug('Waiting before calling Event.set()')
time.sleep(3)
e.set()
logging.debug('Event is set')

最简单的方法是在主线程上下载并将文档提供给工作池。

在我自己的实现中,我走了使用 celery 处理文档和使用 gevent 下载的路线。哪个做同样的事情只是更复杂。

这是一个简单的例子。

import multiprocessing
from multiprocessing import Pool
import time
import typing

def work(doc: str) -> str:
    # do some processing here....
    return doc + " processed"

def download(url: str) -> str:
    return url  # a hack for demo, use e.g. `requests.get()`

def run_pipeline(
    urls: typing.List[str],
    session_request_limit: int = 10,
    session_length: int = 60,
) -> None:
    """
    Download and process each url in `urls` at a max. rate limit
    given by `session_request_limit / session_length`
    """
    workers = Pool(multiprocessing.cpu_count())
    results = []

    n_requests = 0
    session_start = time.time()

    for url in urls:
        doc = download(url)
        results.append(
            workers.apply_async(work, (doc,))
        )
        n_requests += 1

        if n_requests >= session_request_limit:
            time_to_next_session = session_length - time.time() - session_start
            time.sleep(time_to_next_session)

        if time.time() - session_start >= session_length:
            session_start = time.time()
            n_requests = 0

    # Collect results
    for result in results:
        print(result.get())

if __name__ == "__main__":
    urls = ["www.google.com", "www.whosebug.com"]
    run_pipeline(urls)

你在"rate limit downloads"下的意思不是很清楚。在这种情况下,它是一些并发下载,这是一个常见的用例,我认为简单的解决方案是使用带有进程池的信号量:

#!/usr/bin/env python3
import os
import time
import random
from functools import partial
from multiprocessing import Pool, Manager


CPU_NUM = 4
CONCURRENT_DOWNLOADS = 2


def download(url, semaphore):
    pid = os.getpid()

    with semaphore:
        print('Process {p} is downloading from {u}'.format(p=pid, u=url))
        time.sleep(random.randint(1, 5))

    # Process the obtained resource:
    time.sleep(random.randint(1, 5))

    return 'Successfully processed {}'.format(url)


def main():
    manager = Manager()

    semaphore = manager.Semaphore(CONCURRENT_DOWNLOADS)
    target = partial(download, semaphore=semaphore)

    urls = ['https://link/to/resource/{i}'.format(i=i) for i in range(10)]

    with Pool(processes=CPU_NUM) as pool:
        results = pool.map(target, urls)

    print(results)


if __name__ == '__main__':
    main()

如您所见,一次只有CONCURRENT_DONWLOADS个进程在下载,而其他进程都在忙于处理获取的资源。

好的,在 OP

的以下澄清之后

By "downloads per second" I mean that globally there are no more than downloads started per second.

我决定 post 另一个答案,因为我认为我的第一个答案可能也会对希望限制并发 运行 进程数量的人感兴趣。

我认为没有必要使用额外的框架来解决这个问题。这个想法是使用为每个资源link、资源队列和固定数量的进程而不是线程生成的下载线程:

#!/usr/bin/env python3
import os
import time
import random
from threading import Thread
from multiprocessing import Process, JoinableQueue


WORKERS = 4
DOWNLOADS_PER_SECOND = 2


def download_resource(url, resource_queue):
    pid = os.getpid()

    t = time.strftime('%H:%M:%S')
    print('Thread {p} is downloading from {u} ({t})'.format(p=pid, u=url, t=t),
          flush=True)
    time.sleep(random.randint(1, 10))

    results = '[resource {}]'.format(url)
    resource_queue.put(results)


def process_resource(resource_queue):
    pid = os.getpid()

    while True:
        res = resource_queue.get()

        print('Process {p} is processing {r}'.format(p=pid, r=res),
              flush=True)
        time.sleep(random.randint(1, 10))

        resource_queue.task_done()


def main():
    resource_queue = JoinableQueue()

    # Start process workers:
    for _ in range(WORKERS):
        worker = Process(target=process_resource,
                         args=(resource_queue,),
                         daemon=True)
        worker.start()

    urls = ['https://link/to/resource/{i}'.format(i=i) for i in range(10)]

    while urls:
        target_urls = urls[:DOWNLOADS_PER_SECOND]
        urls = urls[DOWNLOADS_PER_SECOND:]

        # Start downloader threads:
        for url in target_urls:
            downloader = Thread(target=download_resource,
                                args=(url, resource_queue),
                                daemon=True)
            downloader.start()

        time.sleep(1)

    resource_queue.join()


if __name__ == '__main__':
    main()

结果看起来像这样:

$ ./limit_download_rate.py
Thread 32482 is downloading from https://link/to/resource/0 (10:14:08)
Thread 32482 is downloading from https://link/to/resource/1 (10:14:08)
Thread 32482 is downloading from https://link/to/resource/2 (10:14:09)
Thread 32482 is downloading from https://link/to/resource/3 (10:14:09)
Thread 32482 is downloading from https://link/to/resource/4 (10:14:10)
Thread 32482 is downloading from https://link/to/resource/5 (10:14:10)
Process 32483 is processing [resource https://link/to/resource/2]
Process 32484 is processing [resource https://link/to/resource/0]
Thread 32482 is downloading from https://link/to/resource/6 (10:14:11)
Thread 32482 is downloading from https://link/to/resource/7 (10:14:11)
Process 32485 is processing [resource https://link/to/resource/1]
Process 32486 is processing [resource https://link/to/resource/3]
Thread 32482 is downloading from https://link/to/resource/8 (10:14:12)
Thread 32482 is downloading from https://link/to/resource/9 (10:14:12)
Process 32484 is processing [resource https://link/to/resource/6]
Process 32485 is processing [resource https://link/to/resource/9]
Process 32483 is processing [resource https://link/to/resource/8]
Process 32486 is processing [resource https://link/to/resource/4]
Process 32485 is processing [resource https://link/to/resource/7]
Process 32483 is processing [resource https://link/to/resource/5]

这里,每秒 DOWNLOADS_PER_SECOND 个线程启动,在这个例子中是两个,然后下载资源并将资源放入队列。 WORKERS 是从队列中获取资源以进行进一步处理的进程数。使用此设置,您将能够限制每秒开始的下载数量,并让工作人员并行处理获取的资源。