将 concurrent.futures 与无限迭代器和停止条件一起使用
Using concurrent.futures with infinite iterator and stopping criteria
我正在尝试并行化一个循环,该循环使用无限生成器作为输入来收集一些数据并在收到一定数量的数据时停止。
我的实现是这样的。
class A:
def __iter__(self):
i = 0
while True:
yield i
i += 1
def procpar(x):
r = random.random()
print('Computing x =', x)
if r > 0.5
return [2 * x]
else:
return [2 * x, x ** 2]
with ProcessPoolExecutor(4) as pool:
out = []
x = A()
for res in pool.map(procpar, x):
out.extend(res)
if len(out) > 100:
break
现在,当我 运行 它时,我确实得到了这个输出,之后它就挂起,没有任何反应。
Computing x = 1
Computing x = 6
Computing x = 2
Computing x = 3
Computing x = 4
Computing x = 5
调查发生了什么,是 map
方法试图从迭代器展开并生成数据 x = A()
,因此它陷入了无限循环。
关于如何避免陷入无限循环的任何建议。当然,我可以在将迭代器提供给进程池之前分块调用迭代器 x
,但看看是否有人有更好或更直接的解决方案。
尝试使用 multiprocessing.pool.imap
代替:
from multiprocessing import Pool
import random
class A:
def __iter__(self):
i = 0
while True:
yield i
i += 1
def procpar(x):
r = random.random()
print('Computing x =', x)
if r > 0.5:
return [2 * x]
else:
return [2 * x, x ** 2]
# Required for Windows:
if __name__ == '__main__':
with Pool(4) as pool:
out = []
x = A()
for res in pool.imap(procpar, x):
out.extend(res)
if len(out) > 100:
break
print(out)
打印:
Computing x = 0
Computing x = 1
Computing x = 2
Computing x = 3
Computing x = 4
Computing x = 5
Computing x = 6
Computing x = 7
Computing x = 8
Computing x = 9
Computing x = 10
Computing x = 11
Computing x = 12
Computing x = 13
Computing x = 14
Computing x = 15
Computing x = 16
Computing x = 17
Computing x = 18
Computing x = 19
Computing x = 20
Computing x = 21
Computing x = 22
Computing x = 23
Computing x = 24
Computing x = 25
Computing x = 26
Computing x = 27
Computing x = 28
Computing x = 29
Computing x = 30
Computing x = 31
Computing x = 32
Computing x = 33
Computing x = 34
Computing x = 35
Computing x = 36
Computing x = 37
Computing x = 38
Computing x = 39
Computing x = 40
Computing x = 41
Computing x = 42
Computing x = 43
Computing x = 44
Computing x = 45
Computing x = 46
Computing x = 47
Computing x = 48
Computing x = 49
Computing x = 50
Computing x = 51
Computing x = 52
Computing x = 53
Computing x = 54
Computing x = 55
Computing x = 56
Computing x = 57
Computing x = 58
Computing x = 59
Computing x = 60
Computing x = 61
Computing x = 62
Computing x = 63
Computing x = 64
[0, 2, 1, 4, 4, 6, 8, 16, 10, 12, 14, 49, 16, 64, 18, 20, 22, 24, 144, 26, 28, 196, 30, 225, 32, 256, 34, 289, 36, 38, 361, 40, 400, 42, 441, 44, 484, 46, 529, 48, 576, 50, 625, 52, 54, 56, 58, 60, 900, 62, 961, 64, 66, 1089, 68, 1156, 70, 1225, 72, 1296, 74, 1369, 76, 78, 1521, 80, 1600, 82, 1681, 84, 1764, 86, 88, 90, 2025, 92, 2116, 94, 96, 2304, 98, 100, 102, 2601, 104, 2704, 106, 2809, 108, 110, 3025, 112, 3136, 114, 116, 3364, 118, 3481, 120, 3600, 122]
我正在尝试并行化一个循环,该循环使用无限生成器作为输入来收集一些数据并在收到一定数量的数据时停止。
我的实现是这样的。
class A:
def __iter__(self):
i = 0
while True:
yield i
i += 1
def procpar(x):
r = random.random()
print('Computing x =', x)
if r > 0.5
return [2 * x]
else:
return [2 * x, x ** 2]
with ProcessPoolExecutor(4) as pool:
out = []
x = A()
for res in pool.map(procpar, x):
out.extend(res)
if len(out) > 100:
break
现在,当我 运行 它时,我确实得到了这个输出,之后它就挂起,没有任何反应。
Computing x = 1
Computing x = 6
Computing x = 2
Computing x = 3
Computing x = 4
Computing x = 5
调查发生了什么,是 map
方法试图从迭代器展开并生成数据 x = A()
,因此它陷入了无限循环。
关于如何避免陷入无限循环的任何建议。当然,我可以在将迭代器提供给进程池之前分块调用迭代器 x
,但看看是否有人有更好或更直接的解决方案。
尝试使用 multiprocessing.pool.imap
代替:
from multiprocessing import Pool
import random
class A:
def __iter__(self):
i = 0
while True:
yield i
i += 1
def procpar(x):
r = random.random()
print('Computing x =', x)
if r > 0.5:
return [2 * x]
else:
return [2 * x, x ** 2]
# Required for Windows:
if __name__ == '__main__':
with Pool(4) as pool:
out = []
x = A()
for res in pool.imap(procpar, x):
out.extend(res)
if len(out) > 100:
break
print(out)
打印:
Computing x = 0
Computing x = 1
Computing x = 2
Computing x = 3
Computing x = 4
Computing x = 5
Computing x = 6
Computing x = 7
Computing x = 8
Computing x = 9
Computing x = 10
Computing x = 11
Computing x = 12
Computing x = 13
Computing x = 14
Computing x = 15
Computing x = 16
Computing x = 17
Computing x = 18
Computing x = 19
Computing x = 20
Computing x = 21
Computing x = 22
Computing x = 23
Computing x = 24
Computing x = 25
Computing x = 26
Computing x = 27
Computing x = 28
Computing x = 29
Computing x = 30
Computing x = 31
Computing x = 32
Computing x = 33
Computing x = 34
Computing x = 35
Computing x = 36
Computing x = 37
Computing x = 38
Computing x = 39
Computing x = 40
Computing x = 41
Computing x = 42
Computing x = 43
Computing x = 44
Computing x = 45
Computing x = 46
Computing x = 47
Computing x = 48
Computing x = 49
Computing x = 50
Computing x = 51
Computing x = 52
Computing x = 53
Computing x = 54
Computing x = 55
Computing x = 56
Computing x = 57
Computing x = 58
Computing x = 59
Computing x = 60
Computing x = 61
Computing x = 62
Computing x = 63
Computing x = 64
[0, 2, 1, 4, 4, 6, 8, 16, 10, 12, 14, 49, 16, 64, 18, 20, 22, 24, 144, 26, 28, 196, 30, 225, 32, 256, 34, 289, 36, 38, 361, 40, 400, 42, 441, 44, 484, 46, 529, 48, 576, 50, 625, 52, 54, 56, 58, 60, 900, 62, 961, 64, 66, 1089, 68, 1156, 70, 1225, 72, 1296, 74, 1369, 76, 78, 1521, 80, 1600, 82, 1681, 84, 1764, 86, 88, 90, 2025, 92, 2116, 94, 96, 2304, 98, 100, 102, 2601, 104, 2704, 106, 2809, 108, 110, 3025, 112, 3136, 114, 116, 3364, 118, 3481, 120, 3600, 122]