ray如何处理封闭范围的变量?
How does ray deal with variables of enclosing scope?
考虑以下示例:
import numpy as np
import ray
import time
A = np.array([42] * 4200)
@ray.remote
def foo1(x):
x[5]**2
@ray.remote
def foo2(j):
A[j]**2
ray.init()
#
# some warmup for ray
#
start = time.perf_counter()
for _ in range(1000):
ray.get(foo1.remote(A))
time_foo1 = time.perf_counter() - start
start = time.perf_counter()
for _ in range(1000):
ray.get(foo2.remote(5))
time_foo2 = time.perf_counter() - start
print(time_foo1, time_foo2)
ray.shutdown()
看来 time_foo2
明显小于 time_foo1
。我天真的猜测是,每次调用 foo1
时,ray 都会序列化 A
。然而,即使我手动将 A
放入对象存储并将对象引用传递给 foo1
,我也看不到任何改进。有人能告诉我幕后发生了什么吗?
当我 运行 你的代码时,我得到 0.8745803280000004 0.672677727
。所以 foo2 较小,但幅度不大(也许 A
在您的原始代码中更大?)。话虽这么说,这里是对 ray 正在做什么的解释。
当一个函数被注释为 ray.remote
时,它被序列化以便它可以被发送到远程进程到 运行。 Ray 使用 cloudpickle 进行序列化。当一个函数被序列化时,它的全局依赖也被序列化。
在下面的示例中,A
是对必须序列化的全局变量的依赖示例。
@ray.remote
def foo2(j):
A[j]**2
调用远程函数时,Ray 必须将参数传递给远程函数。小对象有优化,大对象逻辑大致如下:
for each arg:
if arg is an ObjectRef,
do nothing
else,
replace arg with ray.put(arg)
在远程工作者上,当调用远程函数时,我们在实际调用函数之前对所有 ObjectRef 调用 ray.get
(同样,我们只关注大对象)。 ray.get
可以受益于缓存或 zero-copy 读取等优化,因此它通常比 ray.put
.
便宜得多
实际上这意味着下面的代码
@ray.remote
def foo(arg):
# At this point ray.get(arg_ref) has already happened
A = np.arange(1_000_000)
foo.remote(A) # This is the same as foo.remote(ray.put(A))
foo.remote(A) # Same as foo.remote(ray.put(A)), which means it has happened twice now
而如果我们显式调用 ray.put
我们可以节省 put
A_ref = np.put(A)
foo.remote(A_ref) # ray.put is not called here
foo.remote(A_ref) # again, ray.put is not called
当我 运行 这些示例具有 100 万个条目矩阵时 A
我得到以下时间 (here's my sample code):
Time putting A every time: 3.041259899
Time passing ref of A: 0.7547513060000002
Time serializing A in function: 0.7694220469999999
请注意,虽然序列化 A 速度很快,但这是一种不好的做法,不推荐这样做。这是因为对象放在对象存储中,序列化函数放在控件存储中,控件存储不是为传递大量数据而构建的。
考虑以下示例:
import numpy as np
import ray
import time
A = np.array([42] * 4200)
@ray.remote
def foo1(x):
x[5]**2
@ray.remote
def foo2(j):
A[j]**2
ray.init()
#
# some warmup for ray
#
start = time.perf_counter()
for _ in range(1000):
ray.get(foo1.remote(A))
time_foo1 = time.perf_counter() - start
start = time.perf_counter()
for _ in range(1000):
ray.get(foo2.remote(5))
time_foo2 = time.perf_counter() - start
print(time_foo1, time_foo2)
ray.shutdown()
看来 time_foo2
明显小于 time_foo1
。我天真的猜测是,每次调用 foo1
时,ray 都会序列化 A
。然而,即使我手动将 A
放入对象存储并将对象引用传递给 foo1
,我也看不到任何改进。有人能告诉我幕后发生了什么吗?
当我 运行 你的代码时,我得到 0.8745803280000004 0.672677727
。所以 foo2 较小,但幅度不大(也许 A
在您的原始代码中更大?)。话虽这么说,这里是对 ray 正在做什么的解释。
当一个函数被注释为 ray.remote
时,它被序列化以便它可以被发送到远程进程到 运行。 Ray 使用 cloudpickle 进行序列化。当一个函数被序列化时,它的全局依赖也被序列化。
在下面的示例中,A
是对必须序列化的全局变量的依赖示例。
@ray.remote
def foo2(j):
A[j]**2
调用远程函数时,Ray 必须将参数传递给远程函数。小对象有优化,大对象逻辑大致如下:
for each arg:
if arg is an ObjectRef,
do nothing
else,
replace arg with ray.put(arg)
在远程工作者上,当调用远程函数时,我们在实际调用函数之前对所有 ObjectRef 调用 ray.get
(同样,我们只关注大对象)。 ray.get
可以受益于缓存或 zero-copy 读取等优化,因此它通常比 ray.put
.
实际上这意味着下面的代码
@ray.remote
def foo(arg):
# At this point ray.get(arg_ref) has already happened
A = np.arange(1_000_000)
foo.remote(A) # This is the same as foo.remote(ray.put(A))
foo.remote(A) # Same as foo.remote(ray.put(A)), which means it has happened twice now
而如果我们显式调用 ray.put
我们可以节省 put
A_ref = np.put(A)
foo.remote(A_ref) # ray.put is not called here
foo.remote(A_ref) # again, ray.put is not called
当我 运行 这些示例具有 100 万个条目矩阵时 A
我得到以下时间 (here's my sample code):
Time putting A every time: 3.041259899
Time passing ref of A: 0.7547513060000002
Time serializing A in function: 0.7694220469999999
请注意,虽然序列化 A 速度很快,但这是一种不好的做法,不推荐这样做。这是因为对象放在对象存储中,序列化函数放在控件存储中,控件存储不是为传递大量数据而构建的。