带有两个数据框的 Dask 折叠

Dask fold with two data frames

这是一个关于如何使用 Dask(特别是折叠)添加两个 DataFrame 的教科书问题...不过我似乎无法让它工作,所以我想伸出手看看我在做什么做错了。

(我在 Python 3.8.5 和 Dask 2021.4.1)

下面的代码表明了我的意图:

from dask import delayed, bag
import pandas as pd

def get_df1():
    return pd.DataFrame({'a': [1, 2, 3], 'b': [4, 5, 6]})

def get_df2():
    return pd.DataFrame({'a': [3, 2, 1], 'b': [6, 5, 4]})

def addit(a, b):
    return a + b

if __name__ == '__main__':

    # Without dask
    y = addit(get_df1(), get_df2())
    print(y)

    # The above code prints the desired answer:
    #    a   b
    # 0  4  10
    # 1  4  10
    # 2  4  10

    # With dask/delayed + bag + fold
    xs = [delayed(get_df1)(), delayed(get_df2)()]
    b1 = bag.from_delayed(xs)
    y = b1.fold(addit)
    print(y.compute())

    # This prints an unexpected result
    # abab

回答(根据下面的评论):

from dask import delayed, bag
import pandas as pd

def get_df1():
    return [pd.DataFrame({'a': [1, 2, 3], 'b': [4, 5, 6]})] # Now a list

def get_df2():
    return [pd.DataFrame({'a': [3, 2, 1], 'b': [6, 5, 4]})] # Now a list

def addit(a, b):
    return a + b

if __name__ == '__main__':

    # Without dask
    y = addit(*get_df1(), *get_df2())
    print(y)

    # The above code prints the desired answer:
    #    a   b
    # 0  4  10
    # 1  4  10
    # 2  4  10

    # With dask/delayed + bag + fold
    xs = [delayed(get_df1)(), delayed(get_df2)()]
    b1 = bag.from_delayed(xs)
    y = b1.fold(addit)
    print(y.compute())

    # The above code now also prints the desired answer:
    #    a   b
    # 0  4  10
    # 1  4  10
    # 2  4  10

问题出现在这一步:

b1 = bag.from_delayed(xs)

发生的事情是 .from_delayed 期望每个延迟计算为一个列表,因此您需要将您的函数修改为 return 数据帧列表:

def get_df1():
    return [pd.DataFrame({'a': [1, 2, 3], 'b': [4, 5, 6]})]

def get_df2():
    return [pd.DataFrame({'a': [3, 2, 1], 'b': [6, 5, 4]})]

之后你的代码应该可以工作了。