如何线程生成器

How to thread a generator

我有一个生成器对象,它加载了大量数据并占用了系统的 I/O。数据太大无法一次全部放入内存,因此使用生成器。 我有一个消费者,所有 CPU 来处理生成器产生的数据。它不会消耗太多其他资源。是否可以使用线程交错执行这些任务?

例如,我猜想可以在 11 秒内 运行 下面的简化代码。

import time, threading
lock = threading.Lock()
def gen():
    for x in range(10):
        time.sleep(1)
        yield x
def con(x):
    lock.acquire()
    time.sleep(1)
    lock.release()
    return x+1

不过,最简单的线程应用在那个时候还没有运行。它确实加快了速度,但我认为是因为执行生成的调度程序和工作的调度程序之间的并行性。但不是因为工人之间的并行性。

import joblib
%time joblib.Parallel(n_jobs=2,backend='threading',pre_dispatch=2)((joblib.delayed(con)(x) for x in gen()))
# CPU times: user 0 ns, sys: 0 ns, total: 0 ns
# Wall time: 16 s

将您的数据发送到不同的进程。我使用 concurrent.futures 因为我喜欢简单的 界面 .

这在我的电脑上运行大约需要 11 秒。

from concurrent.futures import ThreadPoolExecutor
import concurrent
import threading
lock = threading.Lock()

def gen():
    for x in range(10):
        time.sleep(1)
        yield x

def con(x):
    lock.acquire()
    time.sleep(1)
    lock.release()
    return f'{x+1}'

if __name__ == "__main__":

    futures = []
    with ThreadPoolExecutor() as executor:
        t0 = time.time()
        for x in gen():
            futures.append(executor.submit(con,x))
    results = []
    for future in concurrent.futures.as_completed(futures):
        results.append(future.result())
    print(time.time() - t0)
    print('\n'.join(results))

使用 100 次生成器迭代 (def gen(): for x in range(100):) 大约需要 102 秒。


您的 进程 可能需要跟踪有多少数据已发送到尚未完成的任务,以防止占用内存资源。

con 添加一些诊断打印件似乎表明可能一次至少有两个数据块 在那里

def con(x):
    print(f'{x} received payload at t0 + {time.time()-t0:3.3f}')
    lock.acquire()
    time.sleep(1)
    lock.release()
    print(f'{x} released lock at t0 + {time.time()-t0:3.3f}')
    return f'{x+1}'

我创建这个问题是为了查看是否有 for 循环模式的惯用插入式替换。虽然 wwii 的答案确实解决了这个问题,但它有一个警告,即如果输出相当大,生成器可能会领先于消费者线程并蜂拥而至。我也更喜欢joblib。

问题文本中 joblib 代码的问题是 gen 在主线程中迭代,因此它没有分派作业而是花时间等待 gen。当输入生成器因 joblib 变慢时,我已经放弃尝试理解调度是如此奇怪。然而,在将生产者和消费者都移入延迟函数后,我确实设法让它正确地完成了这件事。

当可迭代对象的长度实际上事先已知时(例如要逐个处理的文件列表),代码很简单。下面的代码保证了只有一个线程同时进行数据生成和一个线程进行数据消费。

sync_gen,sync_con = threading.Lock(), threading.Lock()
@joblib.delayed
def work(iterable):
    with sync_gen:
        x = next(iterable)
    with sync_con:
        return con(x)

N=10
iterable = gen()
res1 = joblib.Parallel(2,'threading')(work(iterable) for x in range(N))
#[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

如果生成器长度未知,线程工作者最好累积其结果而不是处理单个输入。

sync_gen,sync_con = threading.Lock(), threading.Lock()
def thread_safe(gen):
    try:
        while True:
            with sync_gen:
                x = next(gen)
            yield x
    except StopIteration:
        pass

def work2(safe_iterable):
    res = []
    for x in safe_iterable:
        with sync_con:
            res.append(con(x))
    return res

iterable = gen()
de_work2= joblib.delayed(work2)
res2 = joblib.Parallel(2,'threading')(de_work2(thread_safe(iterable)) for x in range(2))
#[[1, 3, 5, 7, 9], [2, 4, 6, 8, 10]]

或者使用 ThreadPoolExecutor:

from concurrent.futures import ThreadPoolExecutor
iterable = gen()
with ThreadPoolExecutor() as e:
    futures = [e.submit(work2,thread_safe(iterable)) for x in range(2)]
res = [future.result() for future in futures]