如何将多个参数传递给 dask.distributed.Client().map?
How to pass multiple arguments to dask.distributed.Client().map?
import dask.distributed
def f(x, y):
return x, y
client = dask.distributed.Client()
client.map(f, [(1, 2), (2, 3)])
无效。
[<Future: status: pending, key: f-137239e2f6eafbe900c0087f550bc0ca>,
<Future: status: pending, key: f-64f918a0c730c63955da91694fcf7acc>]
distributed.worker - WARNING - Compute Failed
Function: f
args: ((1, 2))
kwargs: {}
Exception: TypeError("f() missing 1 required positional argument: 'y'",)
distributed.worker - WARNING - Compute Failed
Function: f
args: ((2, 3))
kwargs: {}
Exception: TypeError("f() missing 1 required positional argument: 'y'",)
您的签名不太正确- 可能文档不明确(欢迎提出建议)。 Client.map()
为每个提交的任务获取(可变数量的)参数集,而不是单个可迭代的东西。你应该将其表述为
client.map(f, (1, 2), (2, 3))
或者,如果您想更接近原始模式
client.map(f, *[(1, 2), (2, 3)])
好的,documentation 在这个问题上肯定有点混乱。而且我找不到能清楚说明这个问题的例子。那么让我在下面分解一下:
def test_fn(a, b, c, d, **kwargs):
return a + b + c + d + kwargs["special"]
futures = client.map(test_fn, *[[1, 2, 3, 4], (1, 2, 3, 4), (1, 2, 3, 4), (1, 2, 3, 4)], special=100)
output = [f.result() for f in futures]
# output = [104, 108, 112, 116]
futures = client.map(test_fn, [1, 2, 3, 4], (1, 2, 3, 4), (1, 2, 3, 4), (1, 2, 3, 4), special=100)
output = [f.result() for f in futures]
# output = [104, 108, 112, 116]
注意事项:
- 使用列表还是元组都没有关系。就像我上面做的那样,你可以混合使用它们。
- 您必须按位置对参数进行分组。因此,如果您传入 4 组参数,第一个列表将包含所有 4 组参数的第一个参数。 (在这种情况下,对
test_fn
的“第一次”调用得到 a=b=c=d=1。)
- 额外的
**kwargs
(如 special
)被传递给函数。但它对所有函数调用都是相同的值。
现在想想,这并不令人惊讶。我认为它只是遵循 Python 的 concurrent.futures.ProcessPoolExecutor.map() 签名。
PS。请注意,即使文档显示“Returns:
期货的列表、迭代器或队列,具体取决于
输入。”,你实际上会得到这个错误:Dask no longer supports mapping over Iterators or Queues. Consider using a normal for loop and Client.submit
import dask.distributed
def f(x, y):
return x, y
client = dask.distributed.Client()
client.map(f, [(1, 2), (2, 3)])
无效。
[<Future: status: pending, key: f-137239e2f6eafbe900c0087f550bc0ca>,
<Future: status: pending, key: f-64f918a0c730c63955da91694fcf7acc>]
distributed.worker - WARNING - Compute Failed
Function: f
args: ((1, 2))
kwargs: {}
Exception: TypeError("f() missing 1 required positional argument: 'y'",)
distributed.worker - WARNING - Compute Failed
Function: f
args: ((2, 3))
kwargs: {}
Exception: TypeError("f() missing 1 required positional argument: 'y'",)
您的签名不太正确- 可能文档不明确(欢迎提出建议)。 Client.map()
为每个提交的任务获取(可变数量的)参数集,而不是单个可迭代的东西。你应该将其表述为
client.map(f, (1, 2), (2, 3))
或者,如果您想更接近原始模式
client.map(f, *[(1, 2), (2, 3)])
好的,documentation 在这个问题上肯定有点混乱。而且我找不到能清楚说明这个问题的例子。那么让我在下面分解一下:
def test_fn(a, b, c, d, **kwargs):
return a + b + c + d + kwargs["special"]
futures = client.map(test_fn, *[[1, 2, 3, 4], (1, 2, 3, 4), (1, 2, 3, 4), (1, 2, 3, 4)], special=100)
output = [f.result() for f in futures]
# output = [104, 108, 112, 116]
futures = client.map(test_fn, [1, 2, 3, 4], (1, 2, 3, 4), (1, 2, 3, 4), (1, 2, 3, 4), special=100)
output = [f.result() for f in futures]
# output = [104, 108, 112, 116]
注意事项:
- 使用列表还是元组都没有关系。就像我上面做的那样,你可以混合使用它们。
- 您必须按位置对参数进行分组。因此,如果您传入 4 组参数,第一个列表将包含所有 4 组参数的第一个参数。 (在这种情况下,对
test_fn
的“第一次”调用得到 a=b=c=d=1。) - 额外的
**kwargs
(如special
)被传递给函数。但它对所有函数调用都是相同的值。
现在想想,这并不令人惊讶。我认为它只是遵循 Python 的 concurrent.futures.ProcessPoolExecutor.map() 签名。
PS。请注意,即使文档显示“Returns:
期货的列表、迭代器或队列,具体取决于
输入。”,你实际上会得到这个错误:Dask no longer supports mapping over Iterators or Queues. Consider using a normal for loop and Client.submit