线程中的生成器
Generator in threading
我有一个生成器 returns 我可以生成特定的字符串,如何将它与此代码一起使用?
from multiprocessing.dummy import Pool as ThreadPool
pool = ThreadPool(4)
results = pool.map(my_function, my_array)
我使用生成器代替上面传递的数组,因为值的数量不适合数组,我不需要存储它们。
附加问题:
在这种情况下,向下一次迭代的过渡应该在函数内部进行?
本质上与here提出的问题相同。
实质是 multiprocessing 会将任何没有 __len__
方法的可迭代对象转换为列表。
有一个 open issue 可以添加对生成器的支持,但现在,你是 SOL。
如果您的数组太大而无法放入内存,请考虑将其分块读取、处理,然后将结果分块转储到磁盘。没有更多上下文,我无法真正提供更具体的解决方案。
更新:
感谢您发布代码。我的第一个问题,是否绝对有必要使用多处理?根据 my_function
的作用,您可能看不到使用 ThreadPool
有什么好处,因为 python 受到 GIL 的限制,因此任何 CPU 绑定的工作函数都不会加速.在这种情况下,也许 ProcessPool
会更好。否则,你可能最好 运行 results = map(my_function, generator)
.
当然,如果您没有内存来加载输入数据,您就不太可能有内存来存储结果。
其次,您可以使用 itertools
改进您的生成器
尝试:
import itertools
import string
letters = string.ascii_lowercase
cod = itertools.permutations(letters, 6)
def my_function(x):
return x
def dump_results_to_disk(results, outfile):
with open(outfile, 'w') as fp:
for result in results:
fp.write(str(result) + '\n')
def process_in_chunks(generator, chunk_size=50):
accumulator = []
chunk_number = 1
for item in generator:
if len(accumulator) < chunk_size:
accumulator.append(item)
else:
results = list(map(my_function, accumulator))
dump_results_to_disk(results, "results" + str(chunk_number) + '.txt')
chunk_number += 1
accumulator = []
dump_results_to_disk(results, "results" + str(chunk_number))
process_in_chunks(cod)
显然,将 my_function()
更改为您的工作函数是什么,也许您想做一些事情而不是转储到磁盘。您可以将 chunk_size
缩放到内存中可以容纳的条目数。如果你没有磁盘 space 或结果的内存,那么你真的没有办法处理聚合数据
我有一个生成器 returns 我可以生成特定的字符串,如何将它与此代码一起使用?
from multiprocessing.dummy import Pool as ThreadPool
pool = ThreadPool(4)
results = pool.map(my_function, my_array)
我使用生成器代替上面传递的数组,因为值的数量不适合数组,我不需要存储它们。
附加问题: 在这种情况下,向下一次迭代的过渡应该在函数内部进行?
本质上与here提出的问题相同。
实质是 multiprocessing 会将任何没有 __len__
方法的可迭代对象转换为列表。
有一个 open issue 可以添加对生成器的支持,但现在,你是 SOL。
如果您的数组太大而无法放入内存,请考虑将其分块读取、处理,然后将结果分块转储到磁盘。没有更多上下文,我无法真正提供更具体的解决方案。
更新:
感谢您发布代码。我的第一个问题,是否绝对有必要使用多处理?根据 my_function
的作用,您可能看不到使用 ThreadPool
有什么好处,因为 python 受到 GIL 的限制,因此任何 CPU 绑定的工作函数都不会加速.在这种情况下,也许 ProcessPool
会更好。否则,你可能最好 运行 results = map(my_function, generator)
.
当然,如果您没有内存来加载输入数据,您就不太可能有内存来存储结果。
其次,您可以使用 itertools
尝试:
import itertools
import string
letters = string.ascii_lowercase
cod = itertools.permutations(letters, 6)
def my_function(x):
return x
def dump_results_to_disk(results, outfile):
with open(outfile, 'w') as fp:
for result in results:
fp.write(str(result) + '\n')
def process_in_chunks(generator, chunk_size=50):
accumulator = []
chunk_number = 1
for item in generator:
if len(accumulator) < chunk_size:
accumulator.append(item)
else:
results = list(map(my_function, accumulator))
dump_results_to_disk(results, "results" + str(chunk_number) + '.txt')
chunk_number += 1
accumulator = []
dump_results_to_disk(results, "results" + str(chunk_number))
process_in_chunks(cod)
显然,将 my_function()
更改为您的工作函数是什么,也许您想做一些事情而不是转储到磁盘。您可以将 chunk_size
缩放到内存中可以容纳的条目数。如果你没有磁盘 space 或结果的内存,那么你真的没有办法处理聚合数据