无法向 concurrrent.futures.Executor.map() 发送多个参数

Unable to send multiple arguments to concurrrent.futures.Executor.map()

我正在尝试结合这两个 SO 答案中提供的解决方案 - Using threading to slice an array into chunks and perform calculation on each chunk and reassemble the returned arrays into one array and Pass multiple parameters to concurrent.futures.Executor.map?。我有一个 numpy 数组,我将其分成多个段,我希望每个块都发送到一个单独的线程,并且一个附加参数与原始数组的块一起发送。这个额外的参数是一个常量,不会改变。 performCalc 是一个函数,它将接受两个参数——一个是原始 numpy 数组的块和一个常量。

我尝试的第一个解决方案

import psutil
import numpy as np
import sys
from concurrent.futures import ThreadPoolExecutor
from functools import partial

def main():
    testThread()

def testThread():

    minLat = -65.76892
    maxLat =  66.23587
    minLon =  -178.81404
    maxLon =  176.2949
    latGrid = np.arange(minLat,maxLat,0.05)
    lonGrid = np.arange(minLon,maxLon,0.05)

    gridLon,gridLat = np.meshgrid(latGrid,lonGrid)
    grid_points = np.c_[gridLon.ravel(),gridLat.ravel()]

    n_jobs = psutil.cpu_count(logical=False)

    chunk = np.array_split(grid_points,n_jobs,axis=0)


   x = ThreadPoolExecutor(max_workers=n_jobs) 
   maxDistance = 4.3
   func = partial(performCalc,chunk)
   args = [chunk,maxDistance]
   # This prints 4.3 twice although there are four cores in the system
   results = x.map(func,args)
   # This prints 4.3 four times correctly
   results1 = x.map(performTest,chunk)

  def performCalc(chunk,maxDistance):
      print(maxDistance)
      return chunk

 def performTest(chunk):
     print("test")

 main()

所以 performCalc() 打印了两次 4.3,即使系统中的核心数是 4。而 performTest() 正确打印了四次测试。我无法找出此错误的原因。

此外,我确定我设置 for itertools.partial 调用的方式不正确。

1) 原来的numpy数组有四块。

2) 每个块要与 maxDistance 配对并发送到 performCalc()

3) 将有四个线程打印 maxDistance 并将总结果的 return 部分 return 放入一个数组中

我哪里错了?

更新

我也尝试过使用 lambda 方法

results = x.map(lambda p:performCalc(*p),args)

但这什么也不打印。

使用用户 mkorvas 提供的解决方案,如此处所示 - 我能够解决我的问题,如此处的解决方案所示 -

import psutil
import numpy as np
import sys
from concurrent.futures import ThreadPoolExecutor
from functools import partial

def main():
   testThread()

def testThread():

   minLat = -65.76892
   maxLat =  66.23587
   minLon =  -178.81404
   maxLon =  176.2949
   latGrid = np.arange(minLat,maxLat,0.05)
   lonGrid = np.arange(minLon,maxLon,0.05)
   print(latGrid.shape,lonGrid.shape)
   gridLon,gridLat = np.meshgrid(latGrid,lonGrid)
   grid_points = np.c_[gridLon.ravel(),gridLat.ravel()]
   print(grid_points.shape)
   n_jobs = psutil.cpu_count(logical=False)
   chunk = np.array_split(grid_points,n_jobs,axis=0)
   x = ThreadPoolExecutor(max_workers=n_jobs) 


  maxDistance = 4.3
  func = partial(performCalc,maxDistance)

  results = x.map(func,chunk)


 def performCalc(maxDistance,chunk):

     print(maxDistance)
     return chunk

main()

显然需要做的(我不知道为什么,也许有人可以在另一个答案中澄清)是你需要将输入顺序切换到函数 performCalc()

如图所示-

      def performCalc(maxDistance,chunk):

          print(maxDistance)
          return chunk