.join 在 dask 数据帧中的结果似乎取决于方式,它们生成了 dask 数据帧
Result of .join in dask dataframes seems to depend on the way, the dask dataframe was generated
将join
应用于由.from_delayed方法生成的dask数据帧时,我得到了意想不到的结果。我想通过以下示例来证明这一点,该示例由三部分组成。
- 通过
from_delayed
方法生成dask数据帧并将其与通过from_pandas
生成的dask数据帧连接起来
- 使用
compute
方法将两个数据帧转换为 pandas 数据帧。如 (1) 所示加入他们
- 使用
compute
将通过from_delayed
方法生成的dask数据帧转换为pandas。 Aferwards 使用 from_pandas
将其转换回 dask。然后按照 (1) 中的方式加入。
考虑以下代码:
import dask.dataframe
import pandas as pd
# functions for generating a dask dataframe
def get_pdf(character):
'''constructs a pandas dataframe with indexes [character]1, ..., [character]5'''
index = [character + str(i) for i in range(5)]
return pd.DataFrame({'A':[1,2,3,4,5]}, index = index)
def get_ddf():
'''constructs dask dataframe out of pandas dataframes via the .from-delayed method with indexes A1, A2, A3, ... F3, F3, F4'''
delayed_list = [dask.delayed(get_pdf)(x) for x in 'ABCDEF']
return dask.dataframe.from_delayed(delayed_list)
#generate dask dataframes, that will be joined
ddf1 = get_ddf()
ddf2 = dask.dataframe.from_pandas(pd.DataFrame({'B': [1,2,3]}, index = ['A0', 'B1', 'C3']), npartitions = 2)
#recreate ddf1 by converting it to a pandas dataframe and afterwards to a dask dataframe
ddf1_from_pandas = dask.dataframe.from_pandas(ddf1.compute(), npartitions = 3)
#compute joins
dask_from_delayed_join = ddf1.join(ddf2, how = 'inner')
pandas_join = ddf1.compute().join(ddf2.compute(), how = 'inner')
dask_from_pandas_join = ddf1_from_pandas.join(ddf2, how = 'inner')
我希望所有三个结果(dask_from_delayed_join
、pandas_join
、dask_from_pandas_join
)都相同。
但是,第一个结果与其他结果不同:
print(dask_from_delayed_join.compute())
:
Empty DataFrame
Columns: [A, B]
Index: []
print(pandas_join)
:
A B
A0 1 1
B1 2 2
C3 4 3
print(dask_from_pandas_join.compute())
:
A B
A0 1 1
B1 2 2
C3 4 3
这是怎么回事?
dd.merge
确实存在一些问题。这些已在 dask 版本 0.10.2
中得到解决
In [10]: print(dask_from_delayed_join.compute())
A B
A0 1 1
B1 2 2
C3 4 3
In [11]: print(pandas_join)
A B
A0 1 1
B1 2 2
C3 4 3
In [12]: print(dask_from_pandas_join.compute())
A B
A0 1 1
B1 2 2
C3 4 3
将join
应用于由.from_delayed方法生成的dask数据帧时,我得到了意想不到的结果。我想通过以下示例来证明这一点,该示例由三部分组成。
- 通过
from_delayed
方法生成dask数据帧并将其与通过from_pandas
生成的dask数据帧连接起来
- 使用
compute
方法将两个数据帧转换为 pandas 数据帧。如 (1) 所示加入他们 - 使用
compute
将通过from_delayed
方法生成的dask数据帧转换为pandas。 Aferwards 使用from_pandas
将其转换回 dask。然后按照 (1) 中的方式加入。
考虑以下代码:
import dask.dataframe
import pandas as pd
# functions for generating a dask dataframe
def get_pdf(character):
'''constructs a pandas dataframe with indexes [character]1, ..., [character]5'''
index = [character + str(i) for i in range(5)]
return pd.DataFrame({'A':[1,2,3,4,5]}, index = index)
def get_ddf():
'''constructs dask dataframe out of pandas dataframes via the .from-delayed method with indexes A1, A2, A3, ... F3, F3, F4'''
delayed_list = [dask.delayed(get_pdf)(x) for x in 'ABCDEF']
return dask.dataframe.from_delayed(delayed_list)
#generate dask dataframes, that will be joined
ddf1 = get_ddf()
ddf2 = dask.dataframe.from_pandas(pd.DataFrame({'B': [1,2,3]}, index = ['A0', 'B1', 'C3']), npartitions = 2)
#recreate ddf1 by converting it to a pandas dataframe and afterwards to a dask dataframe
ddf1_from_pandas = dask.dataframe.from_pandas(ddf1.compute(), npartitions = 3)
#compute joins
dask_from_delayed_join = ddf1.join(ddf2, how = 'inner')
pandas_join = ddf1.compute().join(ddf2.compute(), how = 'inner')
dask_from_pandas_join = ddf1_from_pandas.join(ddf2, how = 'inner')
我希望所有三个结果(dask_from_delayed_join
、pandas_join
、dask_from_pandas_join
)都相同。
但是,第一个结果与其他结果不同:
print(dask_from_delayed_join.compute())
:
Empty DataFrame
Columns: [A, B]
Index: []
print(pandas_join)
:
A B
A0 1 1
B1 2 2
C3 4 3
print(dask_from_pandas_join.compute())
:
A B
A0 1 1
B1 2 2
C3 4 3
这是怎么回事?
dd.merge
确实存在一些问题。这些已在 dask 版本 0.10.2
In [10]: print(dask_from_delayed_join.compute())
A B
A0 1 1
B1 2 2
C3 4 3
In [11]: print(pandas_join)
A B
A0 1 1
B1 2 2
C3 4 3
In [12]: print(dask_from_pandas_join.compute())
A B
A0 1 1
B1 2 2
C3 4 3