我正在使用 Dask 在多个数据集上使用 Snorkel 应用 LabelingFunction,但它似乎需要永远。这是正常的吗?

I'm using Dask to apply LabelingFunction using Snorkel on multiple datasets but it seems to take forever. Is this normal?

我的问题如下: 我有几个 csv 格式的数据集(900K、1M7 和 1M7 条目),我将它们加载到多个 Dask Dataframe 中。 然后我将它们全部连接在一个 Dask Dataframe 中,我可以将其提供给我的 Snorkel Applier,它将一堆标签函数应用到我的 Dataframe 的每一行和 return 一个 numpy 数组,其中的行数与 Dataframe 中的行数一样多以及与标签函数一样多的列。

当我使用 3 个数据集(超过 2 天......)时,对 Snorkel Applier 的调用似乎要花很长时间。但是,如果我只是 运行 只有第一个数据集的代码,调用大约需要 2 个小时。当然我不做连接步骤。

所以我想知道这怎么可能?我应该更改串联 Dataframe 中的分区数吗?或者也许我首先使用 Dask 很糟糕?

这是我使用的代码:

from snorkel.labeling.apply.dask import DaskLFApplier
import dask.dataframe as dd
import numpy as np
import os

start = time.time()

applier = DaskLFApplier(lfs)  # lfs are the function that are going to be applied, one of them featurize one of the column of my Dataframe and apply a sklearn classifier (I put n_jobs to None when loading the model)

# If I have only one CSV to read
if isinstance(PATH_TO_CSV, str):
    training_data = dd.read_csv(PATH_TO_CSV, lineterminator=os.linesep, na_filter=False, dtype={'size': 'int32'})
    slices = None
 
# If I have several CSV  
elif isinstance(PATH_TO_CSV, list):
    training_data_list = [dd.read_csv(path, lineterminator=os.linesep, na_filter=False, dtype={'size': 'int32'}) for path in PATH_TO_CSV]
    training_data = dd.concat(training_data_list, axis=0)

    # some useful things I do to know where to slice my final result and be sure I can assign each part to each dataset
    df_sizes = [len(df) for df in training_data_list]
    cut_idx = np.insert(np.cumsum(df_sizes), 0, 0)
    slices = list(zip(cut_idx[:-1], cut_idx[1:]))

# The call that lasts forever: I tested all the code above without that line on my 3 datasets and it runs perfectly fine
L_train = applier.apply(training_data)

end = time.time()
print('Time elapsed: {}'.format(timedelta(seconds=end-start)))

如果您需要更多信息,我会尽力为您提供。 提前感谢您的帮助:)

似乎默认情况下 applier 函数正在使用进程,因此不会受益于您可能拥有的额外工作人员:

# add this to the beginning of your code
from dask.distributed import Client
client = Client()
# you can see the address of the client by typing `client` and opening the dashboard

# skipping your other code

# you need to pass the client explicitly to the applier
# after launching this open the dashboard and watch the workers work :)
L_train = applier.apply(training_data, scheduler=client)