如何使用 dask 数据框将列转换为类别 'as_known()'?

How to convert column into category 'as_known()' with dask dataframe?

我正在尝试将列转换为类别以执行 pivot_table 操作。

我试过以下方法:

user_item_df = user_item.pivot_table(index='msno',
                                     columns='song_id',
                                     values='interacted',
                                     aggfunc='mean')

我得到了这个:

ValueError                                Traceback (most recent call last)
<ipython-input-76-a870ece1f3e8> in <module>
      2                                      columns='song_id',
      3                                      values='interacted',
----> 4                                      aggfunc='mean')

~/anaconda3/lib/python3.6/site-packages/dask/dataframe/core.py in pivot_table(self, index, columns, values, aggfunc)
   3123         from .reshape import pivot_table
   3124         return pivot_table(self, index=index, columns=columns, values=values,
-> 3125                            aggfunc=aggfunc)
   3126 
   3127     def to_records(self, index=False):

~/anaconda3/lib/python3.6/site-packages/dask/dataframe/reshape.py in pivot_table(df, index, columns, values, aggfunc)
    190         raise ValueError("'columns' must be the name of an existing column")
    191     if not is_categorical_dtype(df[columns]):
--> 192         raise ValueError("'columns' must be category dtype")
    193     if not has_known_categories(df[columns]):
    194         raise ValueError("'columns' must have known categories. Please use "

ValueError: 'columns' must be category dtype

所以我尝试转换列: user_item.song_id = user_item.song_id.astype('category')

但是我在调​​用 pivot_table 时得到了这个:

ValueError                                Traceback (most recent call last)
<ipython-input-78-a870ece1f3e8> in <module>
      2                                      columns='song_id',
      3                                      values='interacted',
----> 4                                      aggfunc='mean')

~/anaconda3/lib/python3.6/site-packages/dask/dataframe/core.py in pivot_table(self, index, columns, values, aggfunc)
   3123         from .reshape import pivot_table
   3124         return pivot_table(self, index=index, columns=columns, values=values,
-> 3125                            aggfunc=aggfunc)
   3126 
   3127     def to_records(self, index=False):

~/anaconda3/lib/python3.6/site-packages/dask/dataframe/reshape.py in pivot_table(df, index, columns, values, aggfunc)
    192         raise ValueError("'columns' must be category dtype")
    193     if not has_known_categories(df[columns]):
--> 194         raise ValueError("'columns' must have known categories. Please use "
    195                          "`df[columns].cat.as_known()` beforehand to ensure "
    196                          "known categories")

ValueError: 'columns' must have known categories. Please use `df[columns].cat.as_known()` beforehand to ensure known categories

然后我尝试了:

user_item.song_id = user_item.song_id.astype('category').cat.as_known()

我立即得到:

KeyError                                  Traceback (most recent call last)
<timed exec> in <module>

~/anaconda3/lib/python3.6/site-packages/dask/dataframe/categorical.py in as_known(self, **kwargs)
    187         if self.known:
    188             return self._series
--> 189         categories = self._property_map('categories').unique().compute(**kwargs)
    190         return self.set_categories(categories.values)
    191 

~/anaconda3/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
    154         dask.base.compute
    155         """
--> 156         (result,) = compute(self, traverse=False, **kwargs)
    157         return result
    158 

~/anaconda3/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
    395     keys = [x.__dask_keys__() for x in collections]
    396     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 397     results = schedule(dsk, keys, **kwargs)
    398     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    399 

~/anaconda3/lib/python3.6/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2336             try:
   2337                 results = self.gather(packed, asynchronous=asynchronous,
-> 2338                                       direct=direct)
   2339             finally:
   2340                 for f in futures.values():

~/anaconda3/lib/python3.6/site-packages/distributed/client.py in gather(self, futures, errors, maxsize, direct, asynchronous)
   1660             return self.sync(self._gather, futures, errors=errors,
   1661                              direct=direct, local_worker=local_worker,
-> 1662                              asynchronous=asynchronous)
   1663 
   1664     @gen.coroutine

~/anaconda3/lib/python3.6/site-packages/distributed/client.py in sync(self, func, *args, **kwargs)
    674             return future
    675         else:
--> 676             return sync(self.loop, func, *args, **kwargs)
    677 
    678     def __repr__(self):

~/anaconda3/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
    275             e.wait(10)
    276     if error[0]:
--> 277         six.reraise(*error[0])
    278     else:
    279         return result[0]

~/anaconda3/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
    684         if value.__traceback__ is not tb:
    685             raise value.with_traceback(tb)
--> 686         raise value
    687 
    688 else:

~/anaconda3/lib/python3.6/site-packages/distributed/utils.py in f()
    260             if timeout is not None:
    261                 future = gen.with_timeout(timedelta(seconds=timeout), future)
--> 262             result[0] = yield future
    263         except Exception as exc:
    264             error[0] = sys.exc_info()

~/anaconda3/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1131 
   1132                     try:
-> 1133                         value = future.result()
   1134                     except Exception:
   1135                         self.had_exception = True

~/anaconda3/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1139                     if exc_info is not None:
   1140                         try:
-> 1141                             yielded = self.gen.throw(*exc_info)
   1142                         finally:
   1143                             # Break up a reference to itself

~/anaconda3/lib/python3.6/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1501                             six.reraise(type(exception),
   1502                                         exception,
-> 1503                                         traceback)
   1504                     if errors == 'skip':
   1505                         bad_keys.add(key)

~/anaconda3/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
    683             value = tp()
    684         if value.__traceback__ is not tb:
--> 685             raise value.with_traceback(tb)
    686         raise value
    687 

/home/pi/env/lib/python3.5/site-packages/dask/dataframe/core.py in apply_and_enforce()

KeyError: '_func'

我的工人的输出是:

Exception: KeyError('_func',)
distributed.worker - WARNING -  Compute Failed
Function:  execute_task
args:      ((<function apply at 0x764b3c90>, <function unique at 0x6ef24a50>, [(<function apply_and_enforce at 0x6eeede88>, <function Accessor._delegate_property at 0x6ef28198>, [(<function apply_and_enforce at 0x6eeede88>, <methodcaller: astype>, [(<built-in function getitem>, (<function apply at 0x764b3c90>, <function partial_by_order at 0x762ebd20>, [                                                msno  ... interacted
0       vDi/nHqBu7wb+DtI2Ix4TupWQatUEFR41mDC0c8Voh8=  ...          1
1       3IGfhB6dtaYxEGm20yFtRxN7KoFZjzGJbXPSjsjW5cM=  ...          1
2       4QugsKXr1pJXSBj6CbSYCF6O7QY2/MHGICUU16p3fig=  ...          1
3       i4g6DQpmkTuRCS6/osUsQ8GSBJM8261is4Q04NDGRPk=  ...          1
4       TTNNMisplhps4y5gdQ6rsv0++TIKOOIIZLz05W97vFU=  ...          1
5       sDR8kS+t73zE9QM8D03Zw3mVrsRXc0Nih/WRl02sfZI=  ...          1
6       yiGYGWyGrCYHlMOtPv65urw9RfdH43PNGzu8TRaO+m8=  ...          1
7       7lXXPZLRbAPWE5ILi2BFQVEhYzPz9cwNvuzIVCuHfZY=  ...          1
8       4clHF4wjaFgY6+nQWoXm1EEAvB
kwargs:    {}
Exception: KeyError('_func',)

如果有人知道如何解决这个问题,那将对我有很大帮助。

通过在所有工作人员、调度程序和客户端上放置相同版本的 dask 分布式 dask-core 来解决。