带有两个数据框的 Dask 折叠
Dask fold with two data frames
这是一个关于如何使用 Dask(特别是折叠)添加两个 DataFrame 的教科书问题...不过我似乎无法让它工作,所以我想伸出手看看我在做什么做错了。
(我在 Python 3.8.5 和 Dask 2021.4.1)
下面的代码表明了我的意图:
from dask import delayed, bag
import pandas as pd
def get_df1():
return pd.DataFrame({'a': [1, 2, 3], 'b': [4, 5, 6]})
def get_df2():
return pd.DataFrame({'a': [3, 2, 1], 'b': [6, 5, 4]})
def addit(a, b):
return a + b
if __name__ == '__main__':
# Without dask
y = addit(get_df1(), get_df2())
print(y)
# The above code prints the desired answer:
# a b
# 0 4 10
# 1 4 10
# 2 4 10
# With dask/delayed + bag + fold
xs = [delayed(get_df1)(), delayed(get_df2)()]
b1 = bag.from_delayed(xs)
y = b1.fold(addit)
print(y.compute())
# This prints an unexpected result
# abab
回答(根据下面的评论):
from dask import delayed, bag
import pandas as pd
def get_df1():
return [pd.DataFrame({'a': [1, 2, 3], 'b': [4, 5, 6]})] # Now a list
def get_df2():
return [pd.DataFrame({'a': [3, 2, 1], 'b': [6, 5, 4]})] # Now a list
def addit(a, b):
return a + b
if __name__ == '__main__':
# Without dask
y = addit(*get_df1(), *get_df2())
print(y)
# The above code prints the desired answer:
# a b
# 0 4 10
# 1 4 10
# 2 4 10
# With dask/delayed + bag + fold
xs = [delayed(get_df1)(), delayed(get_df2)()]
b1 = bag.from_delayed(xs)
y = b1.fold(addit)
print(y.compute())
# The above code now also prints the desired answer:
# a b
# 0 4 10
# 1 4 10
# 2 4 10
问题出现在这一步:
b1 = bag.from_delayed(xs)
发生的事情是 .from_delayed
期望每个延迟计算为一个列表,因此您需要将您的函数修改为 return 数据帧列表:
def get_df1():
return [pd.DataFrame({'a': [1, 2, 3], 'b': [4, 5, 6]})]
def get_df2():
return [pd.DataFrame({'a': [3, 2, 1], 'b': [6, 5, 4]})]
之后你的代码应该可以工作了。
这是一个关于如何使用 Dask(特别是折叠)添加两个 DataFrame 的教科书问题...不过我似乎无法让它工作,所以我想伸出手看看我在做什么做错了。
(我在 Python 3.8.5 和 Dask 2021.4.1)
下面的代码表明了我的意图:
from dask import delayed, bag
import pandas as pd
def get_df1():
return pd.DataFrame({'a': [1, 2, 3], 'b': [4, 5, 6]})
def get_df2():
return pd.DataFrame({'a': [3, 2, 1], 'b': [6, 5, 4]})
def addit(a, b):
return a + b
if __name__ == '__main__':
# Without dask
y = addit(get_df1(), get_df2())
print(y)
# The above code prints the desired answer:
# a b
# 0 4 10
# 1 4 10
# 2 4 10
# With dask/delayed + bag + fold
xs = [delayed(get_df1)(), delayed(get_df2)()]
b1 = bag.from_delayed(xs)
y = b1.fold(addit)
print(y.compute())
# This prints an unexpected result
# abab
回答(根据下面的评论):
from dask import delayed, bag
import pandas as pd
def get_df1():
return [pd.DataFrame({'a': [1, 2, 3], 'b': [4, 5, 6]})] # Now a list
def get_df2():
return [pd.DataFrame({'a': [3, 2, 1], 'b': [6, 5, 4]})] # Now a list
def addit(a, b):
return a + b
if __name__ == '__main__':
# Without dask
y = addit(*get_df1(), *get_df2())
print(y)
# The above code prints the desired answer:
# a b
# 0 4 10
# 1 4 10
# 2 4 10
# With dask/delayed + bag + fold
xs = [delayed(get_df1)(), delayed(get_df2)()]
b1 = bag.from_delayed(xs)
y = b1.fold(addit)
print(y.compute())
# The above code now also prints the desired answer:
# a b
# 0 4 10
# 1 4 10
# 2 4 10
问题出现在这一步:
b1 = bag.from_delayed(xs)
发生的事情是 .from_delayed
期望每个延迟计算为一个列表,因此您需要将您的函数修改为 return 数据帧列表:
def get_df1():
return [pd.DataFrame({'a': [1, 2, 3], 'b': [4, 5, 6]})]
def get_df2():
return [pd.DataFrame({'a': [3, 2, 1], 'b': [6, 5, 4]})]
之后你的代码应该可以工作了。