如何正确使用dask.delayed

How to use dask.delayed correctly

我做了一个计时实验,但我认为我没有正确使用 dask.delayed。这是代码:

import pandas as pd
import dask
import time

def my_operation(row_str: str):
    text_to_add = 'Five Michigan State University students—Ash Williams, his girlfriend, Linda; his sister, Cheryl; their friend Scott; and Scotts girlfriend Shelly—vacation at an isolated cabin in rural Tennessee. Approaching the cabin, the group notices the porch swing move on its own but suddenly stop as Scott grabs the doorknob. While Cheryl draws a picture of a clock, the clock stops, and she hears a faint, demonic voice tell her to "join us". Her hand becomes possessed, turns pale and draws a picture of a book with a demonic face on its cover. Although shaken, she does not mention the incident.'
    new_str = row_str + ' ' + text_to_add
    return new_str

def gen_sequential(n_rows: int):
    df = pd.read_csv('path/to/myfile.csv', nrows=n_rows)
    results_list = []
    tic = time.perf_counter()
    for ii in range(df.shape[0]):
        my_new_str = my_operation(df.iloc[ii, 0])
        results_list.append(my_new_str)
    toc = time.perf_counter()
    task_time = toc - tic
    return results_list, task_time

def gen_pandas_apply(n_rows: int):
    df = pd.read_csv('path/to/myfile.csv', nrows=n_rows)
    tic = time.perf_counter()
    df['gen'] = df['text'].apply(my_operation)
    toc = time.perf_counter()
    task_time = toc - tic
    return df, task_time

def gen_dask_compute(n_rows: int):
    df = pd.read_csv('path/to/myfile.csv', nrows=n_rows)
    results_list = []
    tic = time.perf_counter()
    for ii in range(df.shape[0]):
        my_new_str = dask.delayed(my_operation)(df.iloc[ii, 0])
        results_list.append(my_new_str)

    results_list = dask.compute(*results_list)
    toc = time.perf_counter()
    task_time = toc-tic
    return results_list, task_time

n_rows = 16
times = []
for ii in range(100):
    #_, t_dask_task = gen_sequential(n_rows)
    #_, t_dask_task = gen_pandas_apply(n_rows)
    _, t_dask_task = gen_dask_compute(n_rows)
    times.append(t_dask_task)
t_mean = sum(times)/len(times)
print('average time for 100 iterations: {}'.format(t_mean))

我 运行 测试了我文件中的 8、64、256、1024、32768、262144 和 1048576 行(大约 200 万行文本)并将其与 gen_sequential()gen_pandas_apply()。以下是结果:

n_rows    sequential[s]        pandas_apply[s]       dask_compute[s]
===========================================================================
8         0.000288928459959    0.001460871489944     0.002077747459807
---------------------------------------------------------------------------
64        0.001723313619877    0.001805401749916     0.011105699519758
---------------------------------------------------------------------------
256       0.006383508619801    0.00198456062968      0.046899785500136
---------------------------------------------------------------------------
1024      0.022589521310038    0.002799118410258     0.197301750000333
---------------------------------------------------------------------------
32768     0.63460024946984     0.035047864249209     5.91377260136054
---------------------------------------------------------------------------
262144    5.28406698709983     0.254192861450574     50.5853837806704
---------------------------------------------------------------------------
1048576   21.1142608421401     0.967728560800169     195.71797474096
---------------------------------------------------------------------------

我认为我没有正确使用 dask.delayed,因为对于更大的 n_rows,平均计算时间比其他方法要长。我希望 dask.delayed 的巨大优势随着数据集的增大而变得明显。有谁知道我要去哪里错了?这是我的设置:

我目前正在阅读 Vaex,但目前我仅限于在该项目中使用 dask。预先感谢您的帮助!

my_operation 到 运行 每行花费的时间极少。即使使用“线程”调度程序,Dask 也会增加每个任务的开销,实际上 python 的 GIL 意味着 non-vectorised 这样的操作实际上不能 运行 并行。

正如你应该避免迭代 pandas 数据框一样,你应该真正避免迭代它,并分派每一行以供 dask 处理。

您知道 Dask 有一个 pandas-like 数据框 API 吗? 你可以这样做:

import dask.dataframe as dd
df = dd.read_csv('path/to/myfile.csv')
out = df['text'].map(my_operation)

但请记住:pandas 快速高效,因此将您的工作分解为 Dask 的块对于适合内存的东西通常不会更快,特别是如果您输出的数据与输入一样大(而不是聚合)。