Python concurrent.futures 运行 池中的线程,直到找到结果

Python concurrent.futures run threads in pool until a result is found

我想 运行 ThreadPoolExecutor 中的线程,直到其中一个给我一个特定的结果。现在我的代码如下所示:

pool = concurrent.futures.ThreadPoolExecutor(max_workers = 4)
futures = [pool.submit(g,i) for i in range (4)]
j = 4
for f in concurrent.futures.as_completed(futures):
    if f.result():
        break # OK the result is found
    else:
        j += 1
        futures.append(pool.submit(g,j))

但是最后一行的 append 似乎对 as_completed 生成器没有影响。有办法实现吗?

您可以继续检查序列中的每个未来,直到找到完成的一个。

from collections import deque
pool = concurrent.futures.ThreadPoolExecutor(max_workers = 4)

futures = deque(pool.submit(g,i) for i in range (4))
j = 4
result = False
while not result:
    while not futures[0].done():
        futures.rotate()
    future = futures.popleft()
    result = future.result()
    if not result:
        j += 1
        futures.append(pool.submit(g,j))

这是一个类似的解决方案,使用 concurrent.futures.wait
首先是用于测试目的的可调用对象:

import random
class F:
    def __init__(self, threshold=.03):
        self._bool = random.random() < threshold
    def __call__(self, n):
        self.n = n
        time.sleep(random.random())
        return self
    def __bool__(self):
        return self._bool
    def __str__(self):
        return f'I am number {self.n}'
    def __repr__(self):
        return f'I am number {self.n}'

解决方案

pool = concurrent.futures.ThreadPoolExecutor(max_workers = 4)
j = 4
futures = [pool.submit(F(),i) for i in range(j)]
result = False
while not result:
    #print(f'there are {len(futures)} futures')
    maybe_futures = concurrent.futures.wait(futures, return_when='FIRST_COMPLETED')
    futures = maybe_futures.not_done
    # more than one may have completed(?)
    for future in maybe_futures.done:
        temp = future.result()
        if not temp:
            j += 1
            futures.add(pool.submit(F(),j))
        else:
            result = temp
            break

另一种使用回调的解决方案(使用可调用 上面的 ):不确定如果未来在添加回调之前完成会发生什么。

completed_futures = collections.deque()
result = False
def callback(future, completed=completed_futures):
    completed.append(future)

j = 4

with concurrent.futures.ThreadPoolExecutor(max_workers = 4) as pool:
    #initial tasks
    for i in range(j):
        future = pool.submit(F(),i)
        future.add_done_callback(callback)
    while not result:    # is this the same as - while True: ?
        while not completed_futures:
            pass
        while completed_futures:
            future = completed_futures.popleft()
            result = future.result()
            if result:
                break
            j += 1
            future = pool.submit(F(),j)
            future.add_done_callback(callback)
print(result)