如何在保持顺序的同时在生成器上使用线程?
How do I use threads on a generator while keeping the order?
我有一个简单的代码,它对我试图加速的生成器中的每个项目运行 GET 请求:
def stream(self, records):
# type(records) = <type 'generator'>
for record in records:
# record = OrderedDict([('_time', '1518287568'), ('data', '5552267792')])
output = rest_api_lookup(record[self.input_field])
record.update(output)
yield record
现在,它在单个线程上运行并且需要永远,因为每个 REST 调用都会等到上一个 REST 调用完成。
在使用这个很好的答案 () 之前,我已经在列表中的 Python 中使用了多线程,但我不确定如何在生成器上重新使用相同的策略而不是一个列表。
我从一位推荐我的开发人员那里得到一些建议,我将生成器分解为 100 个元素的列表,然后关闭池,但我不知道如何从生成器创建这些列表。
我还需要保持原来的顺序,因为我需要 yield record
以正确的顺序。
这是一个示例,您可以如何使用 concurrent.futures
:
from concurrent import futures
from concurrent.futures import ThreadPoolExecutor
class YourClass(object):
def stream(self, records):
for record in records:
output = rest_api_lookup(record[self.input_field])
record.update(output)
# process your list and yield back result.
yield {"result_key": "whatever the result is"}
def run_parallel(self):
""" Use this method to do the parallel processing """
# The important part - concurrent futures
# - set number of workers as the number of jobs to process - suggest 4, but may differ
# this will depend on how many threads you want to run in parallel
with ThreadPoolExecutor(4) as executor:
# Use list jobs for concurrent futures
# Use list scraped_results for results
jobs = []
parallel_results = []
# Pass some keyword arguments if needed - per job
record1 = {} # your values for record1 - if need more - create
record2 = {} # your values for record2 - if need more - create
record3 = {} # your values for record3 - if need more - create
record4 = {} # your values for record4 - if need more - create
list_of_records = [[record1, record2], [record3, record4],]
for records in list_of_records:
# Here we iterate 'number of records' times, could be different
# We're adding stream, could be different function per call
jobs.append(executor.submit(self.stream, records))
# Once parallel processing is complete, iterate over results
# append results to final processing without any networking
for job in futures.as_completed(jobs):
# Read result from future
result = job.result()
# Append to the list of results
parallel_results.append(result)
# Use sorted to sort by key to preserve order
parallel_results = sorted(parallel_results, key=lambda k: k['result_key'])
# Iterate over results streamed and do whatever is needed
for result is parallel_results:
print("Do something with me {}".format(result))
我假设您不想先将生成器 records
变成列表。加快处理速度的一种方法是将记录按块传递到 ThreadPoolExecutor
中。执行者将同时处理你的 rest_api_lookup
块的所有项目。然后你只需要"unchunk"你的结果。这里有一些运行示例代码(没有使用类,抱歉,但我希望它能说明原理):
from concurrent.futures import ThreadPoolExecutor
from time import sleep
pool = ThreadPoolExecutor(8) # 8 threads, adjust to taste and # of cores
def records():
# simulates records generator
for i in range(100):
yield {'a': i}
def rest_api_lookup(a):
# simulates REST call :)
sleep(0.1)
return {'b': -a}
def stream(records):
def update_fun(record):
output = rest_api_lookup(record['a'])
record.update(output)
return record
chunk = []
for record in records:
# submit update_fun(record) into pool, keep resulting Future
chunk.append(pool.submit(update_fun, record))
if len(chunk) == 8:
yield chunk
chunk = []
if chunk:
yield chunk
def unchunk(chunk_gen):
"""Flattens a generator of Future chunks into a generator of Future results."""
for chunk in chunk_gen:
for f in chunk:
yield f.result() # get result from Future
# Now iterate over all results in same order as generated by records()
for result in unchunk(stream(records())):
print(result)
HTH!
更新:我在模拟的 REST 调用中添加了一个 sleep
,以使其更真实。这个分块版本在我的机器上在 1.5 秒内完成。顺序版本需要 10 秒(正如预期的那样,100 * 0.1s = 10s)。
by dnswlt 效果很好,但仍有待改进。如果对 REST API 的请求(或任何其他应该对每条记录完成的操作)花费可变的时间,一些 CPU 可能空闲,而每个批次的最慢请求是 运行ning。
以下解决方案将生成器和函数作为输入,并将函数应用于生成器生成的每个元素,同时保持给定数量的 运行ning 线程(每个线程将函数应用于一个元素)。同时,它仍然returns按照输入的顺序得到结果。
from concurrent.futures import ThreadPoolExecutor
import os
import random
import time
def map_async(iterable, func, max_workers=os.cpu_count()):
# Generator that applies func to the input using max_workers concurrent jobs
def async_iterator():
iterator = iter(iterable)
pending_results = []
has_input = True
thread_pool = ThreadPoolExecutor(max_workers)
while True:
# Submit jobs for remaining input until max_worker jobs are running
while has_input and \
len([e for e in pending_results if e.running()]) \
< max_workers:
try:
e = next(iterator)
print('Submitting task...')
pending_results.append(thread_pool.submit(func, e))
except StopIteration:
print('Submitted all task.')
has_input = False
# If there are no pending results, the generator is done
if not pending_results:
return
# If the oldest job is done, return its value
if pending_results[0].done():
yield pending_results.pop(0).result()
# Otherwise, yield the CPU, then continue starting new jobs
else:
time.sleep(.01)
return async_iterator()
def example_generator():
for i in range(20):
print('Creating task', i)
yield i
def do_work(i):
print('Starting to work on', i)
time.sleep(random.uniform(0, 3))
print('Done with', i)
return i
random.seed(42)
for i in map_async(example_generator(), do_work):
print('Got result:', i)
可能执行的注释输出(在具有 8 个逻辑 CPU 的机器上):
Creating task 0
Submitting task...
Starting to work on 0
Creating task 1
Submitting task...
Starting to work on 1
Creating task 2
Submitting task...
Starting to work on 2
Creating task 3
Submitting task...
Starting to work on 3
Creating task 4
Submitting task...
Starting to work on 4
Creating task 5
Submitting task...
Starting to work on 5
Creating task 6
Submitting task...
Starting to work on 6
Creating task 7
Submitting task...
Starting to work on 7 # This point is reached quickly: 8 jobs are started before any of them finishes
Done with 1 # Job 1 is done, but since job 0 is not, the result is not returned yet
Creating task 8 # Job 1 finished, so a new job can be started
Submitting task...
Creating task 9
Starting to work on 8
Submitting task...
Done with 7
Starting to work on 9
Done with 9
Creating task 10
Submitting task...
Creating task 11
Starting to work on 10
Submitting task...
Done with 3
Starting to work on 11
Done with 2
Creating task 12
Submitting task...
Creating task 13
Starting to work on 12
Submitting task...
Done with 12
Starting to work on 13
Done with 10
Creating task 14
Submitting task...
Creating task 15
Starting to work on 14
Submitting task...
Done with 8
Starting to work on 15
Done with 13 # Several other jobs are started and completed
Creating task 16
Submitting task...
Creating task 17
Starting to work on 16
Submitting task...
Done with 0 # Finally, job 0 is completed
Starting to work on 17
Got result: 0
Got result: 1
Got result: 2
Got result: 3 # The result of all completed jobs are returned in input order until the job of the next one is still running
Done with 5
Creating task 18
Submitting task...
Creating task 19
Starting to work on 18
Submitting task...
Done with 16
Starting to work on 19
Done with 11
Submitted all task.
Done with 19
Done with 4
Got result: 4
Got result: 5
Done with 6
Got result: 6 # Job 6 must have been a very long job; now that it's done, its result and the result of many subsequent jobs can be returned
Got result: 7
Got result: 8
Got result: 9
Got result: 10
Got result: 11
Got result: 12
Got result: 13
Done with 14
Got result: 14
Done with 15
Got result: 15
Got result: 16
Done with 17
Got result: 17
Done with 18
Got result: 18
Got result: 19
上面的运行耗时约4.7s,而顺序执行(设置max_workers=1
)耗时约23.6s。如果没有避免等待每批最慢执行的优化,执行大约需要 5.3s。根据各个作业时间的变化和 max_workers
,优化的效果可能会更大。
我有一个简单的代码,它对我试图加速的生成器中的每个项目运行 GET 请求:
def stream(self, records):
# type(records) = <type 'generator'>
for record in records:
# record = OrderedDict([('_time', '1518287568'), ('data', '5552267792')])
output = rest_api_lookup(record[self.input_field])
record.update(output)
yield record
现在,它在单个线程上运行并且需要永远,因为每个 REST 调用都会等到上一个 REST 调用完成。
在使用这个很好的答案 () 之前,我已经在列表中的 Python 中使用了多线程,但我不确定如何在生成器上重新使用相同的策略而不是一个列表。
我从一位推荐我的开发人员那里得到一些建议,我将生成器分解为 100 个元素的列表,然后关闭池,但我不知道如何从生成器创建这些列表。
我还需要保持原来的顺序,因为我需要 yield record
以正确的顺序。
这是一个示例,您可以如何使用 concurrent.futures
:
from concurrent import futures
from concurrent.futures import ThreadPoolExecutor
class YourClass(object):
def stream(self, records):
for record in records:
output = rest_api_lookup(record[self.input_field])
record.update(output)
# process your list and yield back result.
yield {"result_key": "whatever the result is"}
def run_parallel(self):
""" Use this method to do the parallel processing """
# The important part - concurrent futures
# - set number of workers as the number of jobs to process - suggest 4, but may differ
# this will depend on how many threads you want to run in parallel
with ThreadPoolExecutor(4) as executor:
# Use list jobs for concurrent futures
# Use list scraped_results for results
jobs = []
parallel_results = []
# Pass some keyword arguments if needed - per job
record1 = {} # your values for record1 - if need more - create
record2 = {} # your values for record2 - if need more - create
record3 = {} # your values for record3 - if need more - create
record4 = {} # your values for record4 - if need more - create
list_of_records = [[record1, record2], [record3, record4],]
for records in list_of_records:
# Here we iterate 'number of records' times, could be different
# We're adding stream, could be different function per call
jobs.append(executor.submit(self.stream, records))
# Once parallel processing is complete, iterate over results
# append results to final processing without any networking
for job in futures.as_completed(jobs):
# Read result from future
result = job.result()
# Append to the list of results
parallel_results.append(result)
# Use sorted to sort by key to preserve order
parallel_results = sorted(parallel_results, key=lambda k: k['result_key'])
# Iterate over results streamed and do whatever is needed
for result is parallel_results:
print("Do something with me {}".format(result))
我假设您不想先将生成器 records
变成列表。加快处理速度的一种方法是将记录按块传递到 ThreadPoolExecutor
中。执行者将同时处理你的 rest_api_lookup
块的所有项目。然后你只需要"unchunk"你的结果。这里有一些运行示例代码(没有使用类,抱歉,但我希望它能说明原理):
from concurrent.futures import ThreadPoolExecutor
from time import sleep
pool = ThreadPoolExecutor(8) # 8 threads, adjust to taste and # of cores
def records():
# simulates records generator
for i in range(100):
yield {'a': i}
def rest_api_lookup(a):
# simulates REST call :)
sleep(0.1)
return {'b': -a}
def stream(records):
def update_fun(record):
output = rest_api_lookup(record['a'])
record.update(output)
return record
chunk = []
for record in records:
# submit update_fun(record) into pool, keep resulting Future
chunk.append(pool.submit(update_fun, record))
if len(chunk) == 8:
yield chunk
chunk = []
if chunk:
yield chunk
def unchunk(chunk_gen):
"""Flattens a generator of Future chunks into a generator of Future results."""
for chunk in chunk_gen:
for f in chunk:
yield f.result() # get result from Future
# Now iterate over all results in same order as generated by records()
for result in unchunk(stream(records())):
print(result)
HTH!
更新:我在模拟的 REST 调用中添加了一个 sleep
,以使其更真实。这个分块版本在我的机器上在 1.5 秒内完成。顺序版本需要 10 秒(正如预期的那样,100 * 0.1s = 10s)。
以下解决方案将生成器和函数作为输入,并将函数应用于生成器生成的每个元素,同时保持给定数量的 运行ning 线程(每个线程将函数应用于一个元素)。同时,它仍然returns按照输入的顺序得到结果。
from concurrent.futures import ThreadPoolExecutor
import os
import random
import time
def map_async(iterable, func, max_workers=os.cpu_count()):
# Generator that applies func to the input using max_workers concurrent jobs
def async_iterator():
iterator = iter(iterable)
pending_results = []
has_input = True
thread_pool = ThreadPoolExecutor(max_workers)
while True:
# Submit jobs for remaining input until max_worker jobs are running
while has_input and \
len([e for e in pending_results if e.running()]) \
< max_workers:
try:
e = next(iterator)
print('Submitting task...')
pending_results.append(thread_pool.submit(func, e))
except StopIteration:
print('Submitted all task.')
has_input = False
# If there are no pending results, the generator is done
if not pending_results:
return
# If the oldest job is done, return its value
if pending_results[0].done():
yield pending_results.pop(0).result()
# Otherwise, yield the CPU, then continue starting new jobs
else:
time.sleep(.01)
return async_iterator()
def example_generator():
for i in range(20):
print('Creating task', i)
yield i
def do_work(i):
print('Starting to work on', i)
time.sleep(random.uniform(0, 3))
print('Done with', i)
return i
random.seed(42)
for i in map_async(example_generator(), do_work):
print('Got result:', i)
可能执行的注释输出(在具有 8 个逻辑 CPU 的机器上):
Creating task 0
Submitting task...
Starting to work on 0
Creating task 1
Submitting task...
Starting to work on 1
Creating task 2
Submitting task...
Starting to work on 2
Creating task 3
Submitting task...
Starting to work on 3
Creating task 4
Submitting task...
Starting to work on 4
Creating task 5
Submitting task...
Starting to work on 5
Creating task 6
Submitting task...
Starting to work on 6
Creating task 7
Submitting task...
Starting to work on 7 # This point is reached quickly: 8 jobs are started before any of them finishes
Done with 1 # Job 1 is done, but since job 0 is not, the result is not returned yet
Creating task 8 # Job 1 finished, so a new job can be started
Submitting task...
Creating task 9
Starting to work on 8
Submitting task...
Done with 7
Starting to work on 9
Done with 9
Creating task 10
Submitting task...
Creating task 11
Starting to work on 10
Submitting task...
Done with 3
Starting to work on 11
Done with 2
Creating task 12
Submitting task...
Creating task 13
Starting to work on 12
Submitting task...
Done with 12
Starting to work on 13
Done with 10
Creating task 14
Submitting task...
Creating task 15
Starting to work on 14
Submitting task...
Done with 8
Starting to work on 15
Done with 13 # Several other jobs are started and completed
Creating task 16
Submitting task...
Creating task 17
Starting to work on 16
Submitting task...
Done with 0 # Finally, job 0 is completed
Starting to work on 17
Got result: 0
Got result: 1
Got result: 2
Got result: 3 # The result of all completed jobs are returned in input order until the job of the next one is still running
Done with 5
Creating task 18
Submitting task...
Creating task 19
Starting to work on 18
Submitting task...
Done with 16
Starting to work on 19
Done with 11
Submitted all task.
Done with 19
Done with 4
Got result: 4
Got result: 5
Done with 6
Got result: 6 # Job 6 must have been a very long job; now that it's done, its result and the result of many subsequent jobs can be returned
Got result: 7
Got result: 8
Got result: 9
Got result: 10
Got result: 11
Got result: 12
Got result: 13
Done with 14
Got result: 14
Done with 15
Got result: 15
Got result: 16
Done with 17
Got result: 17
Done with 18
Got result: 18
Got result: 19
上面的运行耗时约4.7s,而顺序执行(设置max_workers=1
)耗时约23.6s。如果没有避免等待每批最慢执行的优化,执行大约需要 5.3s。根据各个作业时间的变化和 max_workers
,优化的效果可能会更大。