ray如何处理封闭范围的变量?

How does ray deal with variables of enclosing scope?

考虑以下示例:

import numpy as np
import ray
import time

A = np.array([42] * 4200)   

@ray.remote
def foo1(x):
    x[5]**2

@ray.remote
def foo2(j):
    A[j]**2

ray.init()

#
# some warmup for ray
#

start = time.perf_counter()
for _ in range(1000):
    ray.get(foo1.remote(A))
time_foo1 = time.perf_counter() - start

start = time.perf_counter()
for _ in range(1000):
    ray.get(foo2.remote(5))
time_foo2 = time.perf_counter() - start

print(time_foo1, time_foo2)

ray.shutdown()

看来 time_foo2 明显小于 time_foo1。我天真的猜测是,每次调用 foo1 时,ray 都会序列化 A。然而,即使我手动将 A 放入对象存储并将对象引用传递给 foo1,我也看不到任何改进。有人能告诉我幕后发生了什么吗?

当我 运行 你的代码时,我得到 0.8745803280000004 0.672677727。所以 foo2 较小,但幅度不大(也许 A 在您的原始代码中更大?)。话虽这么说,这里是对 ray 正在做什么的解释。

当一个函数被注释为 ray.remote 时,它被序列化以便它可以被发送到远程进程到 运行。 Ray 使用 cloudpickle 进行序列化。当一个函数被序列化时,它的全局依赖也被序列化。

在下面的示例中,A 是对必须序列化的全局变量的依赖示例。

@ray.remote
def foo2(j):
    A[j]**2

调用远程函数时,Ray 必须将参数传递给远程函数。小对象有优化,大对象逻辑大致如下:

for each arg:
    if arg is an ObjectRef,
        do nothing
    else,
        replace arg with ray.put(arg)

在远程工作者上,当调用远程函数时,我们在实际调用函数之前对所有 ObjectRef 调用 ray.get(同样,我们只关注大对象)。 ray.get 可以受益于缓存或 zero-copy 读取等优化,因此它通常比 ray.put.

便宜得多

实际上这意味着下面的代码

@ray.remote
def foo(arg):
    # At this point ray.get(arg_ref) has already happened

A = np.arange(1_000_000)
foo.remote(A) # This is the same as foo.remote(ray.put(A))
foo.remote(A) # Same as foo.remote(ray.put(A)), which means it has happened twice now

而如果我们显式调用 ray.put 我们可以节省 put

A_ref = np.put(A) 
foo.remote(A_ref) # ray.put is not called here
foo.remote(A_ref) # again, ray.put is not called

当我 运行 这些示例具有 100 万个条目矩阵时 A 我得到以下时间 (here's my sample code):

Time putting A every time: 3.041259899
Time passing ref of A: 0.7547513060000002
Time serializing A in function: 0.7694220469999999

请注意,虽然序列化 A 速度很快,但这是一种不好的做法,不推荐这样做。这是因为对象放在对象存储中,序列化函数放在控件存储中,控件存储不是为传递大量数据而构建的。