通过 concurrent.futures 多处理填充 numpy 数组
populate numpy array through concurrent.futures multiprocessing
我正在寻求使用多处理来填充大型 numpy 数组。我已经完成了文档中的并发期货示例,但还没有获得足够的理解来修改用法。
这是我想做的事情的简化版本:
import numpy
import concurrent.futures
squares = numpy.empty((20, 2))
def make_square(i, squares):
print('iteration', i)
squares[i, 0], squares[i, 1] = i, i ** 2
with concurrent.futures.ProcessPoolExecutor(2) as executor:
for i in range(20):
executor.submit(make_square, i, squares)
输出运行如下:
iteration 1
iteration 0
iteration 2
iteration 3
iteration 5
iteration 4
iteration 6
iteration 7
iteration 8
iteration 9
iteration 10
iteration 11
iteration 12
iteration 13
iteration 15
iteration 14
iteration 16
iteration 17
iteration 18
iteration 19
这很好地证明了该函数是 运行 并发的。但是 squares 数组仍然是空的。
填充方块数组的正确语法是什么?
其次,使用 .map 会更好吗?
提前致谢!
8/2/17
哇。所以我进入了 reddit-land,因为我不想让任何人来解决这个问题。很高兴回到 Whosebug。感谢@ilia w495 nikitin 和@donkopotamus。这是我在 reddit 上发布的内容,更详细地解释了这个问题的背景。
The posted code is an analogy of what I'm trying to do, which is populating
a numpy array with a relatively simple calculation (dot product) involving
two other arrays. The algorithm depends on a value N which can be anything
from 1 on up, though we won't likely use a value larger than 24.
I'm currently running the algorithm on a distributed computing system and
the N = 20 versions take longer than 10 days to complete. I'm using dozens
of cores to obtain the required memory, but gaining none of the benefits of
multiple CPUs. I've rewritten the code using numba which makes lower N
variants superfast on my own laptop which can't handle the memory
requirements for larger Ns, but alas, our distributed computing environment
is not currently able to install numba. So I'm attempting concurrent.futures
to take advantage of the multiple CPUs in our computing environment in the
hopes of speeding things up.
所以不是计算时间密集,而是 1600 万次以上的迭代。初始化的数组为N x 2 ** N,即上面代码中的range(16777216)
可能根本不可能通过多处理来填充数组。
А good example,我想这对你有帮助:
from concurrent.futures import ProcessPoolExecutor
from time import sleep
def return_after_5_secs(message):
sleep(5)
return message
pool = ProcessPoolExecutor(3)
future = pool.submit(return_after_5_secs, ("hello"))
print(future.done())
sleep(2)
print(future.done())
sleep(2)
print(future.done())
sleep(2)
print(future.done())
print("Result: " + future.result())
未来——只是承诺做某事。所以我看到你的代码是这样的:
import concurrent.futures
import itertools
import os
import time
import numpy
SQUARE_LIST_SIZE = 20
def main():
# Creates empty array.
square_list = numpy.empty((SQUARE_LIST_SIZE, 2))
# Creates a sequence (generator) of promises
future_seq = make_future_seq(square_list)
# Creates a sequence (generator) of computed square.
square_seq = make_square_seq(future_seq)
# Creates a sequence (generator) of computed square.
square_list = list(square_seq)
return square_list
def make_future_seq(squares):
"""
Generates the sequence of empty a promises.
Creates a new process only on `submit`.
"""
with concurrent.futures.ProcessPoolExecutor(4) as executor:
for i in range(SQUARE_LIST_SIZE):
# Only makes a promise to do something.
future = executor.submit(make_one_square, i, squares)
print('future ', i, '= >', future)
yield future
def make_square_seq(future_seq):
"""
Generates the sequence of fulfilled a promises.
"""
# Just to copy iterator
for_show_1, for_show_2, future_seq = itertools.tee(future_seq, 3)
# Let's check it, May be it withdrawn =)
for i, future in enumerate(for_show_1):
print('future ', i, 'done [1] =>', future.done())
# Try to keep its promises
for future in future_seq:
yield future.result()
# Let's check it one more time. It is faithful to!
for i, future in enumerate(for_show_2):
print('future ', i, 'done [2] =>', future.done())
return future_seq
def make_one_square(i, squares):
print('inside [1] = >', i, 'pid = ', os.getpid())
squares[i, 0], squares[i, 1] = i, i ** 2
time.sleep(1) # Long and hard computation.
print('inside [2]= >', i, 'pid = ', os.getpid())
return squares
if __name__ == '__main__':
main()
字母太多了。这只是为了解释。
这取决于,但很多真实的例子只需要 future.result()
调用。
检查此页面:concurrent.futures.html
所以这段代码会生成类似这样的东西:
$ python test_futures_1.py
future 0 = > <Future at 0x7fc0dc758278 state=running>
future 0 done [1] => False
future 1 = > <Future at 0x7fc0dc758da0 state=pending>
inside [1] = > 0 pid = 19364
future 1 done [1] => False
inside [1] = > 1 pid = 19365
future 2 = > <Future at 0x7fc0dc758e10 state=pending>
future 2 done [1] => False
future 3 = > <Future at 0x7fc0dc758cc0 state=pending>
inside [1] = > 2 pid = 19366
future 3 done [1] => False
future 4 = > <Future at 0x7fc0dc769048 state=pending>
future 4 done [1] => False
inside [1] = > 3 pid = 19367
future 5 = > <Future at 0x7fc0dc758f60 state=running>
future 5 done [1] => False
future 6 = > <Future at 0x7fc0dc758fd0 state=pending>
future 6 done [1] => False
future 7 = > <Future at 0x7fc0dc7691d0 state=pending>
future 7 done [1] => False
future 8 = > <Future at 0x7fc0dc769198 state=pending>
future 8 done [1] => False
future 9 = > <Future at 0x7fc0dc7690f0 state=pending>
future 9 done [1] => False
future 10 = > <Future at 0x7fc0dc769438 state=pending>
future 10 done [1] => False
future 11 = > <Future at 0x7fc0dc7694a8 state=pending>
future 11 done [1] => False
future 12 = > <Future at 0x7fc0dc769550 state=pending>
future 12 done [1] => False
future 13 = > <Future at 0x7fc0dc7695f8 state=pending>
future 13 done [1] => False
future 14 = > <Future at 0x7fc0dc7696a0 state=pending>
future 14 done [1] => False
future 15 = > <Future at 0x7fc0dc769748 state=pending>
future 15 done [1] => False
future 16 = > <Future at 0x7fc0dc7697f0 state=pending>
future 16 done [1] => False
future 17 = > <Future at 0x7fc0dc769898 state=pending>
future 17 done [1] => False
future 18 = > <Future at 0x7fc0dc769940 state=pending>
future 18 done [1] => False
future 19 = > <Future at 0x7fc0dc7699e8 state=pending>
future 19 done [1] => False
inside [2]= > 0 pid = 19364
inside [2]= > 1 pid = 19365
inside [1] = > 4 pid = 19364
inside [2]= > 2 pid = 19366
inside [1] = > 5 pid = 19365
inside [1] = > 6 pid = 19366
inside [2]= > 3 pid = 19367
inside [1] = > 7 pid = 19367
inside [2]= > 4 pid = 19364
inside [2]= > 5 pid = 19365
inside [2]= > 6 pid = 19366
inside [1] = > 8 pid = 19364
inside [1] = > 9 pid = 19365
inside [1] = > 10 pid = 19366
inside [2]= > 7 pid = 19367
inside [1] = > 11 pid = 19367
inside [2]= > 8 pid = 19364
inside [2]= > 9 pid = 19365
inside [2]= > 10 pid = 19366
inside [2]= > 11 pid = 19367
inside [1] = > 13 pid = 19366
inside [1] = > 12 pid = 19364
inside [1] = > 14 pid = 19365
inside [1] = > 15 pid = 19367
inside [2]= > 14 pid = 19365
inside [2]= > 13 pid = 19366
inside [2]= > 12 pid = 19364
inside [2]= > 15 pid = 19367
inside [1] = > 16 pid = 19365
inside [1] = > 17 pid = 19364
inside [1] = > 18 pid = 19367
inside [1] = > 19 pid = 19366
inside [2]= > 16 pid = 19365
inside [2]= > 18 pid = 19367
inside [2]= > 17 pid = 19364
inside [2]= > 19 pid = 19366
future 0 done [2] => True
future 1 done [2] => True
future 2 done [2] => True
future 3 done [2] => True
future 4 done [2] => True
future 5 done [2] => True
future 6 done [2] => True
future 7 done [2] => True
future 8 done [2] => True
future 9 done [2] => True
future 10 done [2] => True
future 11 done [2] => True
future 12 done [2] => True
future 13 done [2] => True
future 14 done [2] => True
future 15 done [2] => True
future 16 done [2] => True
future 17 done [2] => True
future 18 done [2] => True
future 19 done [2] => True
这里的问题是 ProcessPoolExecutor
将在 单独的 进程中执行一个函数。
由于这些是单独的进程,具有 单独的内存 space,您不能指望它们对您的数组 (squares
) 所做的任何更改都会体现在parent。因此你的原始数组没有改变(正如你发现的那样)。
您需要执行以下任一操作:
- 使用
ThreadPoolExecutor
,但请注意,在一般情况下,您仍然不应尝试修改多线程中的全局变量;
- 重新编写您的代码,让您的 process/thread 进行某种(昂贵的)计算并 return 结果 .
后者看起来像这样:
squares = numpy.zeros((20, 2))
def make_square(i):
print('iteration', i)
# compute expensive data here ...
# return row number and the computed data
return i, ([i, i**2])
with concurrent.futures.ProcessPoolExecutor(2) as executor:
for row, result in executor.map(make_square, range(20)):
squares[row] = result
这将产生您期望的结果:
[[ 0. 0.]
[ 1. 1.]
[ 2. 4.]
...
[ 18. 324.]
[ 19. 361.]]
我正在寻求使用多处理来填充大型 numpy 数组。我已经完成了文档中的并发期货示例,但还没有获得足够的理解来修改用法。
这是我想做的事情的简化版本:
import numpy
import concurrent.futures
squares = numpy.empty((20, 2))
def make_square(i, squares):
print('iteration', i)
squares[i, 0], squares[i, 1] = i, i ** 2
with concurrent.futures.ProcessPoolExecutor(2) as executor:
for i in range(20):
executor.submit(make_square, i, squares)
输出运行如下:
iteration 1
iteration 0
iteration 2
iteration 3
iteration 5
iteration 4
iteration 6
iteration 7
iteration 8
iteration 9
iteration 10
iteration 11
iteration 12
iteration 13
iteration 15
iteration 14
iteration 16
iteration 17
iteration 18
iteration 19
这很好地证明了该函数是 运行 并发的。但是 squares 数组仍然是空的。
填充方块数组的正确语法是什么?
其次,使用 .map 会更好吗?
提前致谢!
8/2/17 哇。所以我进入了 reddit-land,因为我不想让任何人来解决这个问题。很高兴回到 Whosebug。感谢@ilia w495 nikitin 和@donkopotamus。这是我在 reddit 上发布的内容,更详细地解释了这个问题的背景。
The posted code is an analogy of what I'm trying to do, which is populating
a numpy array with a relatively simple calculation (dot product) involving
two other arrays. The algorithm depends on a value N which can be anything
from 1 on up, though we won't likely use a value larger than 24.
I'm currently running the algorithm on a distributed computing system and
the N = 20 versions take longer than 10 days to complete. I'm using dozens
of cores to obtain the required memory, but gaining none of the benefits of
multiple CPUs. I've rewritten the code using numba which makes lower N
variants superfast on my own laptop which can't handle the memory
requirements for larger Ns, but alas, our distributed computing environment
is not currently able to install numba. So I'm attempting concurrent.futures
to take advantage of the multiple CPUs in our computing environment in the
hopes of speeding things up.
所以不是计算时间密集,而是 1600 万次以上的迭代。初始化的数组为N x 2 ** N,即上面代码中的range(16777216)
可能根本不可能通过多处理来填充数组。
А good example,我想这对你有帮助:
from concurrent.futures import ProcessPoolExecutor
from time import sleep
def return_after_5_secs(message):
sleep(5)
return message
pool = ProcessPoolExecutor(3)
future = pool.submit(return_after_5_secs, ("hello"))
print(future.done())
sleep(2)
print(future.done())
sleep(2)
print(future.done())
sleep(2)
print(future.done())
print("Result: " + future.result())
未来——只是承诺做某事。所以我看到你的代码是这样的:
import concurrent.futures
import itertools
import os
import time
import numpy
SQUARE_LIST_SIZE = 20
def main():
# Creates empty array.
square_list = numpy.empty((SQUARE_LIST_SIZE, 2))
# Creates a sequence (generator) of promises
future_seq = make_future_seq(square_list)
# Creates a sequence (generator) of computed square.
square_seq = make_square_seq(future_seq)
# Creates a sequence (generator) of computed square.
square_list = list(square_seq)
return square_list
def make_future_seq(squares):
"""
Generates the sequence of empty a promises.
Creates a new process only on `submit`.
"""
with concurrent.futures.ProcessPoolExecutor(4) as executor:
for i in range(SQUARE_LIST_SIZE):
# Only makes a promise to do something.
future = executor.submit(make_one_square, i, squares)
print('future ', i, '= >', future)
yield future
def make_square_seq(future_seq):
"""
Generates the sequence of fulfilled a promises.
"""
# Just to copy iterator
for_show_1, for_show_2, future_seq = itertools.tee(future_seq, 3)
# Let's check it, May be it withdrawn =)
for i, future in enumerate(for_show_1):
print('future ', i, 'done [1] =>', future.done())
# Try to keep its promises
for future in future_seq:
yield future.result()
# Let's check it one more time. It is faithful to!
for i, future in enumerate(for_show_2):
print('future ', i, 'done [2] =>', future.done())
return future_seq
def make_one_square(i, squares):
print('inside [1] = >', i, 'pid = ', os.getpid())
squares[i, 0], squares[i, 1] = i, i ** 2
time.sleep(1) # Long and hard computation.
print('inside [2]= >', i, 'pid = ', os.getpid())
return squares
if __name__ == '__main__':
main()
字母太多了。这只是为了解释。
这取决于,但很多真实的例子只需要 future.result()
调用。
检查此页面:concurrent.futures.html
所以这段代码会生成类似这样的东西:
$ python test_futures_1.py
future 0 = > <Future at 0x7fc0dc758278 state=running>
future 0 done [1] => False
future 1 = > <Future at 0x7fc0dc758da0 state=pending>
inside [1] = > 0 pid = 19364
future 1 done [1] => False
inside [1] = > 1 pid = 19365
future 2 = > <Future at 0x7fc0dc758e10 state=pending>
future 2 done [1] => False
future 3 = > <Future at 0x7fc0dc758cc0 state=pending>
inside [1] = > 2 pid = 19366
future 3 done [1] => False
future 4 = > <Future at 0x7fc0dc769048 state=pending>
future 4 done [1] => False
inside [1] = > 3 pid = 19367
future 5 = > <Future at 0x7fc0dc758f60 state=running>
future 5 done [1] => False
future 6 = > <Future at 0x7fc0dc758fd0 state=pending>
future 6 done [1] => False
future 7 = > <Future at 0x7fc0dc7691d0 state=pending>
future 7 done [1] => False
future 8 = > <Future at 0x7fc0dc769198 state=pending>
future 8 done [1] => False
future 9 = > <Future at 0x7fc0dc7690f0 state=pending>
future 9 done [1] => False
future 10 = > <Future at 0x7fc0dc769438 state=pending>
future 10 done [1] => False
future 11 = > <Future at 0x7fc0dc7694a8 state=pending>
future 11 done [1] => False
future 12 = > <Future at 0x7fc0dc769550 state=pending>
future 12 done [1] => False
future 13 = > <Future at 0x7fc0dc7695f8 state=pending>
future 13 done [1] => False
future 14 = > <Future at 0x7fc0dc7696a0 state=pending>
future 14 done [1] => False
future 15 = > <Future at 0x7fc0dc769748 state=pending>
future 15 done [1] => False
future 16 = > <Future at 0x7fc0dc7697f0 state=pending>
future 16 done [1] => False
future 17 = > <Future at 0x7fc0dc769898 state=pending>
future 17 done [1] => False
future 18 = > <Future at 0x7fc0dc769940 state=pending>
future 18 done [1] => False
future 19 = > <Future at 0x7fc0dc7699e8 state=pending>
future 19 done [1] => False
inside [2]= > 0 pid = 19364
inside [2]= > 1 pid = 19365
inside [1] = > 4 pid = 19364
inside [2]= > 2 pid = 19366
inside [1] = > 5 pid = 19365
inside [1] = > 6 pid = 19366
inside [2]= > 3 pid = 19367
inside [1] = > 7 pid = 19367
inside [2]= > 4 pid = 19364
inside [2]= > 5 pid = 19365
inside [2]= > 6 pid = 19366
inside [1] = > 8 pid = 19364
inside [1] = > 9 pid = 19365
inside [1] = > 10 pid = 19366
inside [2]= > 7 pid = 19367
inside [1] = > 11 pid = 19367
inside [2]= > 8 pid = 19364
inside [2]= > 9 pid = 19365
inside [2]= > 10 pid = 19366
inside [2]= > 11 pid = 19367
inside [1] = > 13 pid = 19366
inside [1] = > 12 pid = 19364
inside [1] = > 14 pid = 19365
inside [1] = > 15 pid = 19367
inside [2]= > 14 pid = 19365
inside [2]= > 13 pid = 19366
inside [2]= > 12 pid = 19364
inside [2]= > 15 pid = 19367
inside [1] = > 16 pid = 19365
inside [1] = > 17 pid = 19364
inside [1] = > 18 pid = 19367
inside [1] = > 19 pid = 19366
inside [2]= > 16 pid = 19365
inside [2]= > 18 pid = 19367
inside [2]= > 17 pid = 19364
inside [2]= > 19 pid = 19366
future 0 done [2] => True
future 1 done [2] => True
future 2 done [2] => True
future 3 done [2] => True
future 4 done [2] => True
future 5 done [2] => True
future 6 done [2] => True
future 7 done [2] => True
future 8 done [2] => True
future 9 done [2] => True
future 10 done [2] => True
future 11 done [2] => True
future 12 done [2] => True
future 13 done [2] => True
future 14 done [2] => True
future 15 done [2] => True
future 16 done [2] => True
future 17 done [2] => True
future 18 done [2] => True
future 19 done [2] => True
这里的问题是 ProcessPoolExecutor
将在 单独的 进程中执行一个函数。
由于这些是单独的进程,具有 单独的内存 space,您不能指望它们对您的数组 (squares
) 所做的任何更改都会体现在parent。因此你的原始数组没有改变(正如你发现的那样)。
您需要执行以下任一操作:
- 使用
ThreadPoolExecutor
,但请注意,在一般情况下,您仍然不应尝试修改多线程中的全局变量; - 重新编写您的代码,让您的 process/thread 进行某种(昂贵的)计算并 return 结果 .
后者看起来像这样:
squares = numpy.zeros((20, 2))
def make_square(i):
print('iteration', i)
# compute expensive data here ...
# return row number and the computed data
return i, ([i, i**2])
with concurrent.futures.ProcessPoolExecutor(2) as executor:
for row, result in executor.map(make_square, range(20)):
squares[row] = result
这将产生您期望的结果:
[[ 0. 0.]
[ 1. 1.]
[ 2. 4.]
...
[ 18. 324.]
[ 19. 361.]]