为什么 Dask/Distributed worker 中的计算速度要慢得多?

Why is a computation much slower within a Dask/Distributed worker?

我有一个计算在 Dask/Distributed worker 中运行比在本地运行 运行 慢得多。我可以在不进行任何 I/O 的情况下重现它,因此我可以排除它与传输数据有关。以下代码是一个最小的复制示例:

import time
import pandas as pd
import numpy as np
from dask.distributed import Client, LocalCluster


def gen_data(N=5000000):
    """ Dummy data generator """
    df = pd.DataFrame(index=range(N))
    for c in range(10):
        df[str(c)] = np.random.uniform(size=N)
    df["id"] = np.random.choice(range(100), size=len(df))
    return df


def do_something_on_df(df):
    """ Dummy computation that contains inplace mutations """
    for c in range(df.shape[1]):
        df[str(c)] = np.random.uniform(size=df.shape[0])
    return 42


def run_test():
    """ Test computation """
    df = gen_data()
    for key, group_df in df.groupby("id"):
        do_something_on_df(group_df)


class TimedContext(object):
    def __enter__(self):
        self.t1 = time.time()

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.t2 = time.time()
        print(self.t2 - self.t1)

if __name__ == "__main__":
    client = Client("tcp://10.0.2.15:8786")

    with TimedContext():
        run_test()

    with TimedContext():
        client.submit(run_test).result()

运行 本地测试计算需要约 10 秒,但在 Dask/Distributed 内需要约 30 秒。我还注意到 Dask/Distributed 工作人员输出了很多日志消息,例如

distributed.core - WARNING - Event loop was unresponsive for 1.03s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - WARNING - Event loop was unresponsive for 1.25s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - WARNING - Event loop was unresponsive for 1.91s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - WARNING - Event loop was unresponsive for 1.99s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - WARNING - Event loop was unresponsive for 1.50s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - WARNING - Event loop was unresponsive for 1.90s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - WARNING - Event loop was unresponsive for 2.23s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
...

这很令人惊讶,因为不清楚是什么在这个例子中持有 GIL。

为什么会有这么大的性能差异?我该怎么做才能获得相同的性能?

免责声明:出于文档目的自行回答...

此行为是 Pandas 非常令人惊讶的行为的结果。默认情况下,Pandas __setitem__ 处理程序执行检查以检测链式分配,从而导致著名的 SettingWithCopyWarning。在处理副本时,这些检查发出对 gc.collect here 的调用。因此,过度使用 __setitem__ 的代码将导致过多的 gc.collect 调用。这通常会对性能产生重大影响,但问题在 Dask/Distributed worker 中更严重,因为与 运行 相比,垃圾收集必须处理更多的 Python 数据结构独立的。很可能隐藏的垃圾收集调用也是 GIL 持有警告的来源。

因此,解决方案是避免这些过多的 gc.collect 调用。有两种方式:

  • 避免在副本上使用 __setitem__:可以说是最好的解决方案,但它需要对副本的生成位置有所了解。在上面的示例中,这可以通过将函数调用更改为 do_something_on_df(group_df.copy()).
  • 来实现
  • 禁用链式赋值检查:简单地将 pd.options.mode.chained_assignment = None 放在计算的开头也会禁用 gc.collect 调用。

在这两种情况下,测试计算在本地和 Dask/Distributed 下的运行速度都比以前快得多,约为 3.5 秒。这也消除了 GIL 持有警告。

注意:我已在 GitHub 上为此提交了 issue