为什么 Dask/Distributed worker 中的计算速度要慢得多?
Why is a computation much slower within a Dask/Distributed worker?
我有一个计算在 Dask/Distributed worker 中运行比在本地运行 运行 慢得多。我可以在不进行任何 I/O 的情况下重现它,因此我可以排除它与传输数据有关。以下代码是一个最小的复制示例:
import time
import pandas as pd
import numpy as np
from dask.distributed import Client, LocalCluster
def gen_data(N=5000000):
""" Dummy data generator """
df = pd.DataFrame(index=range(N))
for c in range(10):
df[str(c)] = np.random.uniform(size=N)
df["id"] = np.random.choice(range(100), size=len(df))
return df
def do_something_on_df(df):
""" Dummy computation that contains inplace mutations """
for c in range(df.shape[1]):
df[str(c)] = np.random.uniform(size=df.shape[0])
return 42
def run_test():
""" Test computation """
df = gen_data()
for key, group_df in df.groupby("id"):
do_something_on_df(group_df)
class TimedContext(object):
def __enter__(self):
self.t1 = time.time()
def __exit__(self, exc_type, exc_val, exc_tb):
self.t2 = time.time()
print(self.t2 - self.t1)
if __name__ == "__main__":
client = Client("tcp://10.0.2.15:8786")
with TimedContext():
run_test()
with TimedContext():
client.submit(run_test).result()
运行 本地测试计算需要约 10 秒,但在 Dask/Distributed 内需要约 30 秒。我还注意到 Dask/Distributed 工作人员输出了很多日志消息,例如
distributed.core - WARNING - Event loop was unresponsive for 1.03s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - WARNING - Event loop was unresponsive for 1.25s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - WARNING - Event loop was unresponsive for 1.91s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - WARNING - Event loop was unresponsive for 1.99s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - WARNING - Event loop was unresponsive for 1.50s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - WARNING - Event loop was unresponsive for 1.90s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - WARNING - Event loop was unresponsive for 2.23s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
...
这很令人惊讶,因为不清楚是什么在这个例子中持有 GIL。
为什么会有这么大的性能差异?我该怎么做才能获得相同的性能?
免责声明:出于文档目的自行回答...
此行为是 Pandas 非常令人惊讶的行为的结果。默认情况下,Pandas __setitem__
处理程序执行检查以检测链式分配,从而导致著名的 SettingWithCopyWarning
。在处理副本时,这些检查发出对 gc.collect
here 的调用。因此,过度使用 __setitem__
的代码将导致过多的 gc.collect
调用。这通常会对性能产生重大影响,但问题在 Dask/Distributed worker 中更严重,因为与 运行 相比,垃圾收集必须处理更多的 Python 数据结构独立的。很可能隐藏的垃圾收集调用也是 GIL 持有警告的来源。
因此,解决方案是避免这些过多的 gc.collect
调用。有两种方式:
- 避免在副本上使用
__setitem__
:可以说是最好的解决方案,但它需要对副本的生成位置有所了解。在上面的示例中,这可以通过将函数调用更改为 do_something_on_df(group_df.copy())
. 来实现
- 禁用链式赋值检查:简单地将
pd.options.mode.chained_assignment = None
放在计算的开头也会禁用 gc.collect
调用。
在这两种情况下,测试计算在本地和 Dask/Distributed 下的运行速度都比以前快得多,约为 3.5 秒。这也消除了 GIL 持有警告。
注意:我已在 GitHub 上为此提交了 issue。
我有一个计算在 Dask/Distributed worker 中运行比在本地运行 运行 慢得多。我可以在不进行任何 I/O 的情况下重现它,因此我可以排除它与传输数据有关。以下代码是一个最小的复制示例:
import time
import pandas as pd
import numpy as np
from dask.distributed import Client, LocalCluster
def gen_data(N=5000000):
""" Dummy data generator """
df = pd.DataFrame(index=range(N))
for c in range(10):
df[str(c)] = np.random.uniform(size=N)
df["id"] = np.random.choice(range(100), size=len(df))
return df
def do_something_on_df(df):
""" Dummy computation that contains inplace mutations """
for c in range(df.shape[1]):
df[str(c)] = np.random.uniform(size=df.shape[0])
return 42
def run_test():
""" Test computation """
df = gen_data()
for key, group_df in df.groupby("id"):
do_something_on_df(group_df)
class TimedContext(object):
def __enter__(self):
self.t1 = time.time()
def __exit__(self, exc_type, exc_val, exc_tb):
self.t2 = time.time()
print(self.t2 - self.t1)
if __name__ == "__main__":
client = Client("tcp://10.0.2.15:8786")
with TimedContext():
run_test()
with TimedContext():
client.submit(run_test).result()
运行 本地测试计算需要约 10 秒,但在 Dask/Distributed 内需要约 30 秒。我还注意到 Dask/Distributed 工作人员输出了很多日志消息,例如
distributed.core - WARNING - Event loop was unresponsive for 1.03s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - WARNING - Event loop was unresponsive for 1.25s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - WARNING - Event loop was unresponsive for 1.91s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - WARNING - Event loop was unresponsive for 1.99s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - WARNING - Event loop was unresponsive for 1.50s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - WARNING - Event loop was unresponsive for 1.90s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - WARNING - Event loop was unresponsive for 2.23s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
...
这很令人惊讶,因为不清楚是什么在这个例子中持有 GIL。
为什么会有这么大的性能差异?我该怎么做才能获得相同的性能?
免责声明:出于文档目的自行回答...
此行为是 Pandas 非常令人惊讶的行为的结果。默认情况下,Pandas __setitem__
处理程序执行检查以检测链式分配,从而导致著名的 SettingWithCopyWarning
。在处理副本时,这些检查发出对 gc.collect
here 的调用。因此,过度使用 __setitem__
的代码将导致过多的 gc.collect
调用。这通常会对性能产生重大影响,但问题在 Dask/Distributed worker 中更严重,因为与 运行 相比,垃圾收集必须处理更多的 Python 数据结构独立的。很可能隐藏的垃圾收集调用也是 GIL 持有警告的来源。
因此,解决方案是避免这些过多的 gc.collect
调用。有两种方式:
- 避免在副本上使用
__setitem__
:可以说是最好的解决方案,但它需要对副本的生成位置有所了解。在上面的示例中,这可以通过将函数调用更改为do_something_on_df(group_df.copy())
. 来实现
- 禁用链式赋值检查:简单地将
pd.options.mode.chained_assignment = None
放在计算的开头也会禁用gc.collect
调用。
在这两种情况下,测试计算在本地和 Dask/Distributed 下的运行速度都比以前快得多,约为 3.5 秒。这也消除了 GIL 持有警告。
注意:我已在 GitHub 上为此提交了 issue。