线程中的生成器

Generator in threading

我有一个生成器 returns 我可以生成特定的字符串,如何将它与此代码一起使用?

from multiprocessing.dummy import Pool as ThreadPool
pool = ThreadPool(4)
results = pool.map(my_function, my_array)

我使用生成器代替上面传递的数组,因为值的数量不适合数组,我不需要存储它们。

附加问题: 在这种情况下,向下一次迭代的过渡应该在函数内部进行?

本质上与here提出的问题相同。

实质是 multiprocessing 会将任何没有 __len__ 方法的可迭代对象转换为列表。

有一个 open issue 可以添加对生成器的支持,但现在,你是 SOL。

如果您的数组太大而无法放入内存,请考虑将其分块读取、处理,然后将结果分块转储到磁盘。没有更多上下文,我无法真正提供更具体的解决方案。

更新:

感谢您发布代码。我的第一个问题,是否绝对有必要使用多处理?根据 my_function 的作用,您可能看不到使用 ThreadPool 有什么好处,因为 python 受到 GIL 的限制,因此任何 CPU 绑定的工作函数都不会加速.在这种情况下,也许 ProcessPool 会更好。否则,你可能最好 运行 results = map(my_function, generator).

当然,如果您没有内存来加载输入数据,您就不太可能有内存来存储结果。

其次,您可以使用 itertools

改进您的生成器

尝试:

import itertools
import string

letters = string.ascii_lowercase
cod = itertools.permutations(letters, 6)

def my_function(x):
    return x

def dump_results_to_disk(results, outfile):
    with open(outfile, 'w') as fp:
        for result in results:
            fp.write(str(result) + '\n')

def process_in_chunks(generator, chunk_size=50):
    accumulator = []
    chunk_number = 1
    for item in generator:
        if len(accumulator) < chunk_size:
            accumulator.append(item)
        else:
            results = list(map(my_function, accumulator))
            dump_results_to_disk(results, "results" + str(chunk_number) + '.txt')
            chunk_number += 1
            accumulator = []
            
    dump_results_to_disk(results, "results" + str(chunk_number))

process_in_chunks(cod)

显然,将 my_function() 更改为您的工作函数是什么,也许您想做一些事情而不是转储到磁盘。您可以将 chunk_size 缩放到内存中可以容纳的条目数。如果你没有磁盘 space 或结果的内存,那么你真的没有办法处理聚合数据