多个进程之间的速率限制下载
Rate Limit Downloads Amongst Multiple Processes
我想从网站下载和处理大量文件。该网站的服务条款限制了您每秒允许下载的文件数量。
处理文件所花费的时间实际上是瓶颈,所以我希望能够并行处理多个文件。但我不希望不同的进程结合起来违反下载限制。所以我需要一些东西来限制过度请求率。我在想类似下面的事情,但我并不是 multiprocessing
模块的专家。
import multiprocessing
from multiprocessing.managers import BaseManager
import time
class DownloadLimiter(object):
def __init__(self, time):
self.time = time
self.lock = multiprocessing.Lock()
def get(self, url):
self.lock.acquire()
time.sleep(self.time)
self.lock.release()
return url
class DownloadManager(BaseManager):
pass
DownloadManager.register('downloader', DownloadLimiter)
class Worker(multiprocessing.Process):
def __init__(self, downloader, queue, file_name):
super().__init__()
self.downloader = downloader
self.file_name = file_name
self.queue = queue
def run(self):
while not self.queue.empty():
url = self.queue.get()
content = self.downloader.get(url)
with open(self.file_name, "a+") as fh:
fh.write(str(content) + "\n")
然后在其他地方运行 下载
manager = DownloadManager()
manager.start()
downloader = manager.downloader(0.5)
queue = multiprocessing.Queue()
urls = range(50)
for url in urls:
queue.put(url)
job1 = Worker(downloader, queue, r"foo.txt")
job2 = Worker(downloader, queue, r"bar.txt")
jobs = [job1, job2]
for job in jobs:
job.start()
for job in jobs:
job.join()
这似乎是小规模的工作,但我有点担心锁定是否真的正确完成。
此外,如果有更好的模式来实现相同的目标,我很想听听。
这可以用 Ray 干净地完成,它是一个用于并行和分布式的库 Python。
Ray 中的资源
当您启动 Ray 时,您可以告诉它那台机器上有哪些 资源 可用。 Ray 将自动尝试确定 CPU 核心的数量和 GPU 的数量,但这些可以指定,实际上可以传入任意 user-defined resources嗯,例如,通过调用
ray.init(num_cpus=4, resources={'Network': 2})
这告诉 Ray 该机器有 4 个 CPU 核心和 2 个名为 Network
的用户定义资源。
作为可调度工作单元的每个 Ray "task" 都有一定的资源需求。默认情况下,一项任务将需要 1 CPU 个核心,仅此而已。但是,可以通过用
声明相应的函数来指定任意资源需求
@ray.remote(resources={'Network': 1})
def f():
pass
这告诉 Ray,为了让 f
在 "worker" 进程上执行,必须有 1 个 CPU 核心(默认值)和 1 个 Network
资源可用。
由于机器有2个Network
资源和4个CPU核心,最多可以同时执行2个f
副本。另一方面,如果有另一个函数 g
声明为
@ray.remote
def g():
pass
则可以并发执行4份g
或同时执行2份f
和2份g
例子
这是一个示例,其中包含用于下载内容和处理内容的实际函数的占位符。
import ray
import time
max_concurrent_downloads = 2
ray.init(num_cpus=4, resources={'Network': max_concurrent_downloads})
@ray.remote(resources={'Network': 1})
def download_content(url):
# Download the file.
time.sleep(1)
return 'result from ' + url
@ray.remote
def process_result(result):
# Process the result.
time.sleep(1)
return 'processed ' + result
urls = ['url1', 'url2', 'url3', 'url4']
result_ids = [download_content.remote(url) for url in urls]
processed_ids = [process_result.remote(result_id) for result_id in result_ids]
# Wait until the tasks have finished and retrieve the results.
processed_results = ray.get(processed_ids)
这是一个时间轴描述(您可以通过 运行 ray timeline
从命令行生成并在 chrome://tracing in 中打开生成的 JSON 文件Chrome 网络浏览器)。
在上面的脚本中,我们提交了 4 个 download_content
任务。这些是我们通过指定它们需要 Network
资源(除了默认的 1 CPU 资源)来限制速率的那些。然后我们提交 4 process_result
个任务,每个任务都需要默认的 1 CPU 资源。任务分三个阶段执行(只需查看 blue 框)。
- 我们从执行 2 个
download_content
任务开始,这是一次可以执行的任务(由于速率限制)。我们还不能执行任何 process_result
任务,因为它们依赖于 download_content
任务的输出。
- 这些都完成了,所以我们开始执行剩余的两个
download_content
任务以及两个 process_result
任务,因为我们没有对 process_result
任务进行速率限制。
- 我们执行剩余的
process_result
个任务。
每个"row"是一个工作进程。时间从左到右。
您可以在 Ray documentation.
查看更多关于如何执行此操作的信息
有一个库完全满足您的需求,名为 ratelimit
来自他们主页的示例:
此函数在 15 分钟内不能进行超过 15 API 次调用。
from ratelimit import limits
import requests
FIFTEEN_MINUTES = 900
@limits(calls=15, period=FIFTEEN_MINUTES)
def call_api(url):
response = requests.get(url)
if response.status_code != 200:
raise Exception('API response: {}'.format(response.status_code))
return response
顺便说一下,在 I/O 密集型任务(例如网络爬虫)中,您可以使用多线程,而不是多处理。在使用多处理时,您必须创建另一个进程进行控制,并协调您所做的所有事情。在多线程方法的情况下,所有线程本质上都可以访问主进程内存,因此信号变得更加容易(因为 e
在线程之间共享):
import logging
import threading
import time
logging.basicConfig(level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
def wait_for_event(e):
"""Wait for the event to be set before doing anything"""
logging.debug('wait_for_event starting')
event_is_set = e.wait()
logging.debug('event set: %s', event_is_set)
def wait_for_event_timeout(e, t):
"""Wait t seconds and then timeout"""
while not e.isSet():
logging.debug('wait_for_event_timeout starting')
event_is_set = e.wait(t)
logging.debug('event set: %s', event_is_set)
if event_is_set:
logging.debug('processing event')
else:
logging.debug('doing other work')
e = threading.Event()
t1 = threading.Thread(name='block',
target=wait_for_event,
args=(e,))
t1.start()
t2 = threading.Thread(name='non-block',
target=wait_for_event_timeout,
args=(e, 2))
t2.start()
logging.debug('Waiting before calling Event.set()')
time.sleep(3)
e.set()
logging.debug('Event is set')
最简单的方法是在主线程上下载并将文档提供给工作池。
在我自己的实现中,我走了使用 celery 处理文档和使用 gevent 下载的路线。哪个做同样的事情只是更复杂。
这是一个简单的例子。
import multiprocessing
from multiprocessing import Pool
import time
import typing
def work(doc: str) -> str:
# do some processing here....
return doc + " processed"
def download(url: str) -> str:
return url # a hack for demo, use e.g. `requests.get()`
def run_pipeline(
urls: typing.List[str],
session_request_limit: int = 10,
session_length: int = 60,
) -> None:
"""
Download and process each url in `urls` at a max. rate limit
given by `session_request_limit / session_length`
"""
workers = Pool(multiprocessing.cpu_count())
results = []
n_requests = 0
session_start = time.time()
for url in urls:
doc = download(url)
results.append(
workers.apply_async(work, (doc,))
)
n_requests += 1
if n_requests >= session_request_limit:
time_to_next_session = session_length - time.time() - session_start
time.sleep(time_to_next_session)
if time.time() - session_start >= session_length:
session_start = time.time()
n_requests = 0
# Collect results
for result in results:
print(result.get())
if __name__ == "__main__":
urls = ["www.google.com", "www.whosebug.com"]
run_pipeline(urls)
你在"rate limit downloads"下的意思不是很清楚。在这种情况下,它是一些并发下载,这是一个常见的用例,我认为简单的解决方案是使用带有进程池的信号量:
#!/usr/bin/env python3
import os
import time
import random
from functools import partial
from multiprocessing import Pool, Manager
CPU_NUM = 4
CONCURRENT_DOWNLOADS = 2
def download(url, semaphore):
pid = os.getpid()
with semaphore:
print('Process {p} is downloading from {u}'.format(p=pid, u=url))
time.sleep(random.randint(1, 5))
# Process the obtained resource:
time.sleep(random.randint(1, 5))
return 'Successfully processed {}'.format(url)
def main():
manager = Manager()
semaphore = manager.Semaphore(CONCURRENT_DOWNLOADS)
target = partial(download, semaphore=semaphore)
urls = ['https://link/to/resource/{i}'.format(i=i) for i in range(10)]
with Pool(processes=CPU_NUM) as pool:
results = pool.map(target, urls)
print(results)
if __name__ == '__main__':
main()
如您所见,一次只有CONCURRENT_DONWLOADS
个进程在下载,而其他进程都在忙于处理获取的资源。
好的,在 OP
的以下澄清之后
By "downloads per second" I mean that globally there are no more than downloads started per second.
我决定 post 另一个答案,因为我认为我的第一个答案可能也会对希望限制并发 运行 进程数量的人感兴趣。
我认为没有必要使用额外的框架来解决这个问题。这个想法是使用为每个资源link、资源队列和固定数量的进程而不是线程生成的下载线程:
#!/usr/bin/env python3
import os
import time
import random
from threading import Thread
from multiprocessing import Process, JoinableQueue
WORKERS = 4
DOWNLOADS_PER_SECOND = 2
def download_resource(url, resource_queue):
pid = os.getpid()
t = time.strftime('%H:%M:%S')
print('Thread {p} is downloading from {u} ({t})'.format(p=pid, u=url, t=t),
flush=True)
time.sleep(random.randint(1, 10))
results = '[resource {}]'.format(url)
resource_queue.put(results)
def process_resource(resource_queue):
pid = os.getpid()
while True:
res = resource_queue.get()
print('Process {p} is processing {r}'.format(p=pid, r=res),
flush=True)
time.sleep(random.randint(1, 10))
resource_queue.task_done()
def main():
resource_queue = JoinableQueue()
# Start process workers:
for _ in range(WORKERS):
worker = Process(target=process_resource,
args=(resource_queue,),
daemon=True)
worker.start()
urls = ['https://link/to/resource/{i}'.format(i=i) for i in range(10)]
while urls:
target_urls = urls[:DOWNLOADS_PER_SECOND]
urls = urls[DOWNLOADS_PER_SECOND:]
# Start downloader threads:
for url in target_urls:
downloader = Thread(target=download_resource,
args=(url, resource_queue),
daemon=True)
downloader.start()
time.sleep(1)
resource_queue.join()
if __name__ == '__main__':
main()
结果看起来像这样:
$ ./limit_download_rate.py
Thread 32482 is downloading from https://link/to/resource/0 (10:14:08)
Thread 32482 is downloading from https://link/to/resource/1 (10:14:08)
Thread 32482 is downloading from https://link/to/resource/2 (10:14:09)
Thread 32482 is downloading from https://link/to/resource/3 (10:14:09)
Thread 32482 is downloading from https://link/to/resource/4 (10:14:10)
Thread 32482 is downloading from https://link/to/resource/5 (10:14:10)
Process 32483 is processing [resource https://link/to/resource/2]
Process 32484 is processing [resource https://link/to/resource/0]
Thread 32482 is downloading from https://link/to/resource/6 (10:14:11)
Thread 32482 is downloading from https://link/to/resource/7 (10:14:11)
Process 32485 is processing [resource https://link/to/resource/1]
Process 32486 is processing [resource https://link/to/resource/3]
Thread 32482 is downloading from https://link/to/resource/8 (10:14:12)
Thread 32482 is downloading from https://link/to/resource/9 (10:14:12)
Process 32484 is processing [resource https://link/to/resource/6]
Process 32485 is processing [resource https://link/to/resource/9]
Process 32483 is processing [resource https://link/to/resource/8]
Process 32486 is processing [resource https://link/to/resource/4]
Process 32485 is processing [resource https://link/to/resource/7]
Process 32483 is processing [resource https://link/to/resource/5]
这里,每秒 DOWNLOADS_PER_SECOND
个线程启动,在这个例子中是两个,然后下载资源并将资源放入队列。 WORKERS
是从队列中获取资源以进行进一步处理的进程数。使用此设置,您将能够限制每秒开始的下载数量,并让工作人员并行处理获取的资源。
我想从网站下载和处理大量文件。该网站的服务条款限制了您每秒允许下载的文件数量。
处理文件所花费的时间实际上是瓶颈,所以我希望能够并行处理多个文件。但我不希望不同的进程结合起来违反下载限制。所以我需要一些东西来限制过度请求率。我在想类似下面的事情,但我并不是 multiprocessing
模块的专家。
import multiprocessing
from multiprocessing.managers import BaseManager
import time
class DownloadLimiter(object):
def __init__(self, time):
self.time = time
self.lock = multiprocessing.Lock()
def get(self, url):
self.lock.acquire()
time.sleep(self.time)
self.lock.release()
return url
class DownloadManager(BaseManager):
pass
DownloadManager.register('downloader', DownloadLimiter)
class Worker(multiprocessing.Process):
def __init__(self, downloader, queue, file_name):
super().__init__()
self.downloader = downloader
self.file_name = file_name
self.queue = queue
def run(self):
while not self.queue.empty():
url = self.queue.get()
content = self.downloader.get(url)
with open(self.file_name, "a+") as fh:
fh.write(str(content) + "\n")
然后在其他地方运行 下载
manager = DownloadManager()
manager.start()
downloader = manager.downloader(0.5)
queue = multiprocessing.Queue()
urls = range(50)
for url in urls:
queue.put(url)
job1 = Worker(downloader, queue, r"foo.txt")
job2 = Worker(downloader, queue, r"bar.txt")
jobs = [job1, job2]
for job in jobs:
job.start()
for job in jobs:
job.join()
这似乎是小规模的工作,但我有点担心锁定是否真的正确完成。
此外,如果有更好的模式来实现相同的目标,我很想听听。
这可以用 Ray 干净地完成,它是一个用于并行和分布式的库 Python。
Ray 中的资源
当您启动 Ray 时,您可以告诉它那台机器上有哪些 资源 可用。 Ray 将自动尝试确定 CPU 核心的数量和 GPU 的数量,但这些可以指定,实际上可以传入任意 user-defined resources嗯,例如,通过调用
ray.init(num_cpus=4, resources={'Network': 2})
这告诉 Ray 该机器有 4 个 CPU 核心和 2 个名为 Network
的用户定义资源。
作为可调度工作单元的每个 Ray "task" 都有一定的资源需求。默认情况下,一项任务将需要 1 CPU 个核心,仅此而已。但是,可以通过用
声明相应的函数来指定任意资源需求@ray.remote(resources={'Network': 1})
def f():
pass
这告诉 Ray,为了让 f
在 "worker" 进程上执行,必须有 1 个 CPU 核心(默认值)和 1 个 Network
资源可用。
由于机器有2个Network
资源和4个CPU核心,最多可以同时执行2个f
副本。另一方面,如果有另一个函数 g
声明为
@ray.remote
def g():
pass
则可以并发执行4份g
或同时执行2份f
和2份g
例子
这是一个示例,其中包含用于下载内容和处理内容的实际函数的占位符。
import ray
import time
max_concurrent_downloads = 2
ray.init(num_cpus=4, resources={'Network': max_concurrent_downloads})
@ray.remote(resources={'Network': 1})
def download_content(url):
# Download the file.
time.sleep(1)
return 'result from ' + url
@ray.remote
def process_result(result):
# Process the result.
time.sleep(1)
return 'processed ' + result
urls = ['url1', 'url2', 'url3', 'url4']
result_ids = [download_content.remote(url) for url in urls]
processed_ids = [process_result.remote(result_id) for result_id in result_ids]
# Wait until the tasks have finished and retrieve the results.
processed_results = ray.get(processed_ids)
这是一个时间轴描述(您可以通过 运行 ray timeline
从命令行生成并在 chrome://tracing in 中打开生成的 JSON 文件Chrome 网络浏览器)。
在上面的脚本中,我们提交了 4 个 download_content
任务。这些是我们通过指定它们需要 Network
资源(除了默认的 1 CPU 资源)来限制速率的那些。然后我们提交 4 process_result
个任务,每个任务都需要默认的 1 CPU 资源。任务分三个阶段执行(只需查看 blue 框)。
- 我们从执行 2 个
download_content
任务开始,这是一次可以执行的任务(由于速率限制)。我们还不能执行任何process_result
任务,因为它们依赖于download_content
任务的输出。 - 这些都完成了,所以我们开始执行剩余的两个
download_content
任务以及两个process_result
任务,因为我们没有对process_result
任务进行速率限制。 - 我们执行剩余的
process_result
个任务。
每个"row"是一个工作进程。时间从左到右。
您可以在 Ray documentation.
查看更多关于如何执行此操作的信息有一个库完全满足您的需求,名为 ratelimit
来自他们主页的示例:
此函数在 15 分钟内不能进行超过 15 API 次调用。
from ratelimit import limits
import requests
FIFTEEN_MINUTES = 900
@limits(calls=15, period=FIFTEEN_MINUTES)
def call_api(url):
response = requests.get(url)
if response.status_code != 200:
raise Exception('API response: {}'.format(response.status_code))
return response
顺便说一下,在 I/O 密集型任务(例如网络爬虫)中,您可以使用多线程,而不是多处理。在使用多处理时,您必须创建另一个进程进行控制,并协调您所做的所有事情。在多线程方法的情况下,所有线程本质上都可以访问主进程内存,因此信号变得更加容易(因为 e
在线程之间共享):
import logging
import threading
import time
logging.basicConfig(level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
def wait_for_event(e):
"""Wait for the event to be set before doing anything"""
logging.debug('wait_for_event starting')
event_is_set = e.wait()
logging.debug('event set: %s', event_is_set)
def wait_for_event_timeout(e, t):
"""Wait t seconds and then timeout"""
while not e.isSet():
logging.debug('wait_for_event_timeout starting')
event_is_set = e.wait(t)
logging.debug('event set: %s', event_is_set)
if event_is_set:
logging.debug('processing event')
else:
logging.debug('doing other work')
e = threading.Event()
t1 = threading.Thread(name='block',
target=wait_for_event,
args=(e,))
t1.start()
t2 = threading.Thread(name='non-block',
target=wait_for_event_timeout,
args=(e, 2))
t2.start()
logging.debug('Waiting before calling Event.set()')
time.sleep(3)
e.set()
logging.debug('Event is set')
最简单的方法是在主线程上下载并将文档提供给工作池。
在我自己的实现中,我走了使用 celery 处理文档和使用 gevent 下载的路线。哪个做同样的事情只是更复杂。
这是一个简单的例子。
import multiprocessing
from multiprocessing import Pool
import time
import typing
def work(doc: str) -> str:
# do some processing here....
return doc + " processed"
def download(url: str) -> str:
return url # a hack for demo, use e.g. `requests.get()`
def run_pipeline(
urls: typing.List[str],
session_request_limit: int = 10,
session_length: int = 60,
) -> None:
"""
Download and process each url in `urls` at a max. rate limit
given by `session_request_limit / session_length`
"""
workers = Pool(multiprocessing.cpu_count())
results = []
n_requests = 0
session_start = time.time()
for url in urls:
doc = download(url)
results.append(
workers.apply_async(work, (doc,))
)
n_requests += 1
if n_requests >= session_request_limit:
time_to_next_session = session_length - time.time() - session_start
time.sleep(time_to_next_session)
if time.time() - session_start >= session_length:
session_start = time.time()
n_requests = 0
# Collect results
for result in results:
print(result.get())
if __name__ == "__main__":
urls = ["www.google.com", "www.whosebug.com"]
run_pipeline(urls)
你在"rate limit downloads"下的意思不是很清楚。在这种情况下,它是一些并发下载,这是一个常见的用例,我认为简单的解决方案是使用带有进程池的信号量:
#!/usr/bin/env python3
import os
import time
import random
from functools import partial
from multiprocessing import Pool, Manager
CPU_NUM = 4
CONCURRENT_DOWNLOADS = 2
def download(url, semaphore):
pid = os.getpid()
with semaphore:
print('Process {p} is downloading from {u}'.format(p=pid, u=url))
time.sleep(random.randint(1, 5))
# Process the obtained resource:
time.sleep(random.randint(1, 5))
return 'Successfully processed {}'.format(url)
def main():
manager = Manager()
semaphore = manager.Semaphore(CONCURRENT_DOWNLOADS)
target = partial(download, semaphore=semaphore)
urls = ['https://link/to/resource/{i}'.format(i=i) for i in range(10)]
with Pool(processes=CPU_NUM) as pool:
results = pool.map(target, urls)
print(results)
if __name__ == '__main__':
main()
如您所见,一次只有CONCURRENT_DONWLOADS
个进程在下载,而其他进程都在忙于处理获取的资源。
好的,在 OP
的以下澄清之后By "downloads per second" I mean that globally there are no more than downloads started per second.
我决定 post 另一个答案,因为我认为我的第一个答案可能也会对希望限制并发 运行 进程数量的人感兴趣。
我认为没有必要使用额外的框架来解决这个问题。这个想法是使用为每个资源link、资源队列和固定数量的进程而不是线程生成的下载线程:
#!/usr/bin/env python3
import os
import time
import random
from threading import Thread
from multiprocessing import Process, JoinableQueue
WORKERS = 4
DOWNLOADS_PER_SECOND = 2
def download_resource(url, resource_queue):
pid = os.getpid()
t = time.strftime('%H:%M:%S')
print('Thread {p} is downloading from {u} ({t})'.format(p=pid, u=url, t=t),
flush=True)
time.sleep(random.randint(1, 10))
results = '[resource {}]'.format(url)
resource_queue.put(results)
def process_resource(resource_queue):
pid = os.getpid()
while True:
res = resource_queue.get()
print('Process {p} is processing {r}'.format(p=pid, r=res),
flush=True)
time.sleep(random.randint(1, 10))
resource_queue.task_done()
def main():
resource_queue = JoinableQueue()
# Start process workers:
for _ in range(WORKERS):
worker = Process(target=process_resource,
args=(resource_queue,),
daemon=True)
worker.start()
urls = ['https://link/to/resource/{i}'.format(i=i) for i in range(10)]
while urls:
target_urls = urls[:DOWNLOADS_PER_SECOND]
urls = urls[DOWNLOADS_PER_SECOND:]
# Start downloader threads:
for url in target_urls:
downloader = Thread(target=download_resource,
args=(url, resource_queue),
daemon=True)
downloader.start()
time.sleep(1)
resource_queue.join()
if __name__ == '__main__':
main()
结果看起来像这样:
$ ./limit_download_rate.py
Thread 32482 is downloading from https://link/to/resource/0 (10:14:08)
Thread 32482 is downloading from https://link/to/resource/1 (10:14:08)
Thread 32482 is downloading from https://link/to/resource/2 (10:14:09)
Thread 32482 is downloading from https://link/to/resource/3 (10:14:09)
Thread 32482 is downloading from https://link/to/resource/4 (10:14:10)
Thread 32482 is downloading from https://link/to/resource/5 (10:14:10)
Process 32483 is processing [resource https://link/to/resource/2]
Process 32484 is processing [resource https://link/to/resource/0]
Thread 32482 is downloading from https://link/to/resource/6 (10:14:11)
Thread 32482 is downloading from https://link/to/resource/7 (10:14:11)
Process 32485 is processing [resource https://link/to/resource/1]
Process 32486 is processing [resource https://link/to/resource/3]
Thread 32482 is downloading from https://link/to/resource/8 (10:14:12)
Thread 32482 is downloading from https://link/to/resource/9 (10:14:12)
Process 32484 is processing [resource https://link/to/resource/6]
Process 32485 is processing [resource https://link/to/resource/9]
Process 32483 is processing [resource https://link/to/resource/8]
Process 32486 is processing [resource https://link/to/resource/4]
Process 32485 is processing [resource https://link/to/resource/7]
Process 32483 is processing [resource https://link/to/resource/5]
这里,每秒 DOWNLOADS_PER_SECOND
个线程启动,在这个例子中是两个,然后下载资源并将资源放入队列。 WORKERS
是从队列中获取资源以进行进一步处理的进程数。使用此设置,您将能够限制每秒开始的下载数量,并让工作人员并行处理获取的资源。