将 pandas 转换为 dask 代码并出错

Convert pandas to dask code and it errors out

我有 pandas 完美运行的代码。

import pandas as pd                                              
                                                                 
courses_df = pd.DataFrame(                                       
    [                                                            
        ["Jay", "MS"],                                           
        ["Jay", "Music"],                                        
        ["Dorsey", "Music"],                                     
        ["Dorsey", "Piano"],                                     
        ["Mark", "MS"],                                          
    ],                                                           
    columns=["Name", "Course"],                                  
)                                                                
                                                                 
pandas_df_json = (                                               
    courses_df.groupby(["Name"])                                 
    .apply(lambda x: x.drop(columns="Name").to_json(orient="records"))                
    .reset_index(name="courses_json")                            
)                    

但是当我将数据帧转换为 Dask 并尝试相同的操作时。

from dask import dataframe as dd  
df = dd.from_pandas(courses_df, npartitions=2)                                                           
df.groupby(["Name"]).apply(lambda x: x.to_json(orient="records")).reset_index(                           
    name="courses_json"                                                                                  
).compute() 

我得到的错误是

UserWarning: `meta` is not specified, inferred from partial data. Please provide `meta` if the result is unexpected.
  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result
  df.groupby(["Name"]).apply(lambda x: x.to_json(orient="records")).reset_index(
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Input In [37], in <module>
      1 from dask import dataframe as dd
      3 df = dd.from_pandas(courses_df, npartitions=2)
----> 4 df.groupby(["Name"]).apply(lambda x: x.drop(columns="Name").to_json(orient="records")).reset_index(
      5     name="courses_json"
      6 ).compute()

TypeError: _Frame.reset_index() got an unexpected keyword argument 'name'

                                                                                         

我对 dask 和 pandas 的预期输出应该是相同的

     Name                             courses_json
0  Dorsey  [{"Course":"Music"},{"Course":"Piano"}]
1     Jay     [{"Course":"MS"},{"Course":"Music"}]
2    Mark                        [{"Course":"MS"}]

我如何在 dask 中实现这个?

到目前为止我的尝试

from dask import dataframe as dd                                             
                                                                             
df = dd.from_pandas(courses_df, npartitions=2)                               
df.groupby(["Name"]).apply(                                                  
    lambda x: x.drop(columns="Name").to_json(orient="records")               
).compute()                           
UserWarning: `meta` is not specified, inferred from partial data. Please provide `meta` if the result is unexpected.                
  Before: .apply(func)                                                                                                                                                 
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result                                                                                               
  or:     .apply(func, meta=('x', 'f8'))            for series result                                                                                                  
  df.groupby(["Name"]).apply(                                                                                                                                          
Out[57]:                                                                                                                                                               
Name                                                                                                                                                                   
Dorsey    [{"Course":"Piano"},{"Course":"Music"}]                                                                                                                      
Jay          [{"Course":"MS"},{"Course":"Music"}]                                                                                                                      
Mark                            [{"Course":"MS"}]                                                                                                                      
dtype: object                     

我想传递元论证,还想要第二列

要有一个像courses_json

这样有意义的名字

对于 meta 警告,Dask 希望您为结果指定列数据类型。它是可选的,但如果您不指定它,Dask 完全有可能推断出错误的数据类型。例如,可以将一个分区推断为 int 类型,将另一个分区推断为 float 类型。对于稀疏数据集尤其如此。有关详细信息,请参阅文档页面:

https://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.apply.html

这应该可以解决警告:

from dask import dataframe as dd                                             
                                                                             
df = dd.from_pandas(courses_df, npartitions=2)                               
new_df = df.groupby(["Name"]).apply(
     lambda x: x.drop(columns="Name").to_json(orient="records"),
     meta=("Name", "O") 
).to_frame()

# rename columns
new_df.columns = ["courses_json"]

# use numeric int index instead of name as in the given example 
new_df = new_df.reset_index()

new_df.compute()

你的计算结果是一个 dask Series,而不是一个 Dataframe。这就是为什么你需要在这里使用 numpy 类型 (https://www.w3schools.com/python/numpy/numpy_data_types.asp)。它由一个索引和一个值组成。如果不使用 .to_frame() 方法将第二列转换回数据框,您将无法直接命名第二列。