无法向 concurrrent.futures.Executor.map() 发送多个参数
Unable to send multiple arguments to concurrrent.futures.Executor.map()
我正在尝试结合这两个 SO 答案中提供的解决方案 - Using threading to slice an array into chunks and perform calculation on each chunk and reassemble the returned arrays into one array and Pass multiple parameters to concurrent.futures.Executor.map?。我有一个 numpy 数组,我将其分成多个段,我希望每个块都发送到一个单独的线程,并且一个附加参数与原始数组的块一起发送。这个额外的参数是一个常量,不会改变。 performCalc 是一个函数,它将接受两个参数——一个是原始 numpy 数组的块和一个常量。
我尝试的第一个解决方案
import psutil
import numpy as np
import sys
from concurrent.futures import ThreadPoolExecutor
from functools import partial
def main():
testThread()
def testThread():
minLat = -65.76892
maxLat = 66.23587
minLon = -178.81404
maxLon = 176.2949
latGrid = np.arange(minLat,maxLat,0.05)
lonGrid = np.arange(minLon,maxLon,0.05)
gridLon,gridLat = np.meshgrid(latGrid,lonGrid)
grid_points = np.c_[gridLon.ravel(),gridLat.ravel()]
n_jobs = psutil.cpu_count(logical=False)
chunk = np.array_split(grid_points,n_jobs,axis=0)
x = ThreadPoolExecutor(max_workers=n_jobs)
maxDistance = 4.3
func = partial(performCalc,chunk)
args = [chunk,maxDistance]
# This prints 4.3 twice although there are four cores in the system
results = x.map(func,args)
# This prints 4.3 four times correctly
results1 = x.map(performTest,chunk)
def performCalc(chunk,maxDistance):
print(maxDistance)
return chunk
def performTest(chunk):
print("test")
main()
所以 performCalc() 打印了两次 4.3,即使系统中的核心数是 4。而 performTest() 正确打印了四次测试。我无法找出此错误的原因。
此外,我确定我设置 for itertools.partial 调用的方式不正确。
1) 原来的numpy数组有四块。
2) 每个块要与 maxDistance 配对并发送到 performCalc()
3) 将有四个线程打印 maxDistance 并将总结果的 return 部分 return 放入一个数组中
我哪里错了?
更新
我也尝试过使用 lambda 方法
results = x.map(lambda p:performCalc(*p),args)
但这什么也不打印。
使用用户 mkorvas 提供的解决方案,如此处所示 - 我能够解决我的问题,如此处的解决方案所示 -
import psutil
import numpy as np
import sys
from concurrent.futures import ThreadPoolExecutor
from functools import partial
def main():
testThread()
def testThread():
minLat = -65.76892
maxLat = 66.23587
minLon = -178.81404
maxLon = 176.2949
latGrid = np.arange(minLat,maxLat,0.05)
lonGrid = np.arange(minLon,maxLon,0.05)
print(latGrid.shape,lonGrid.shape)
gridLon,gridLat = np.meshgrid(latGrid,lonGrid)
grid_points = np.c_[gridLon.ravel(),gridLat.ravel()]
print(grid_points.shape)
n_jobs = psutil.cpu_count(logical=False)
chunk = np.array_split(grid_points,n_jobs,axis=0)
x = ThreadPoolExecutor(max_workers=n_jobs)
maxDistance = 4.3
func = partial(performCalc,maxDistance)
results = x.map(func,chunk)
def performCalc(maxDistance,chunk):
print(maxDistance)
return chunk
main()
显然需要做的(我不知道为什么,也许有人可以在另一个答案中澄清)是你需要将输入顺序切换到函数 performCalc()
如图所示-
def performCalc(maxDistance,chunk):
print(maxDistance)
return chunk
我正在尝试结合这两个 SO 答案中提供的解决方案 - Using threading to slice an array into chunks and perform calculation on each chunk and reassemble the returned arrays into one array and Pass multiple parameters to concurrent.futures.Executor.map?。我有一个 numpy 数组,我将其分成多个段,我希望每个块都发送到一个单独的线程,并且一个附加参数与原始数组的块一起发送。这个额外的参数是一个常量,不会改变。 performCalc 是一个函数,它将接受两个参数——一个是原始 numpy 数组的块和一个常量。
我尝试的第一个解决方案
import psutil
import numpy as np
import sys
from concurrent.futures import ThreadPoolExecutor
from functools import partial
def main():
testThread()
def testThread():
minLat = -65.76892
maxLat = 66.23587
minLon = -178.81404
maxLon = 176.2949
latGrid = np.arange(minLat,maxLat,0.05)
lonGrid = np.arange(minLon,maxLon,0.05)
gridLon,gridLat = np.meshgrid(latGrid,lonGrid)
grid_points = np.c_[gridLon.ravel(),gridLat.ravel()]
n_jobs = psutil.cpu_count(logical=False)
chunk = np.array_split(grid_points,n_jobs,axis=0)
x = ThreadPoolExecutor(max_workers=n_jobs)
maxDistance = 4.3
func = partial(performCalc,chunk)
args = [chunk,maxDistance]
# This prints 4.3 twice although there are four cores in the system
results = x.map(func,args)
# This prints 4.3 four times correctly
results1 = x.map(performTest,chunk)
def performCalc(chunk,maxDistance):
print(maxDistance)
return chunk
def performTest(chunk):
print("test")
main()
所以 performCalc() 打印了两次 4.3,即使系统中的核心数是 4。而 performTest() 正确打印了四次测试。我无法找出此错误的原因。
此外,我确定我设置 for itertools.partial 调用的方式不正确。
1) 原来的numpy数组有四块。
2) 每个块要与 maxDistance 配对并发送到 performCalc()
3) 将有四个线程打印 maxDistance 并将总结果的 return 部分 return 放入一个数组中
我哪里错了?
更新
我也尝试过使用 lambda 方法
results = x.map(lambda p:performCalc(*p),args)
但这什么也不打印。
使用用户 mkorvas 提供的解决方案,如此处所示 -
import psutil
import numpy as np
import sys
from concurrent.futures import ThreadPoolExecutor
from functools import partial
def main():
testThread()
def testThread():
minLat = -65.76892
maxLat = 66.23587
minLon = -178.81404
maxLon = 176.2949
latGrid = np.arange(minLat,maxLat,0.05)
lonGrid = np.arange(minLon,maxLon,0.05)
print(latGrid.shape,lonGrid.shape)
gridLon,gridLat = np.meshgrid(latGrid,lonGrid)
grid_points = np.c_[gridLon.ravel(),gridLat.ravel()]
print(grid_points.shape)
n_jobs = psutil.cpu_count(logical=False)
chunk = np.array_split(grid_points,n_jobs,axis=0)
x = ThreadPoolExecutor(max_workers=n_jobs)
maxDistance = 4.3
func = partial(performCalc,maxDistance)
results = x.map(func,chunk)
def performCalc(maxDistance,chunk):
print(maxDistance)
return chunk
main()
显然需要做的(我不知道为什么,也许有人可以在另一个答案中澄清)是你需要将输入顺序切换到函数 performCalc()
如图所示-
def performCalc(maxDistance,chunk):
print(maxDistance)
return chunk