如何使用 dask 数据框将列转换为类别 'as_known()'?
How to convert column into category 'as_known()' with dask dataframe?
我正在尝试将列转换为类别以执行 pivot_table 操作。
我试过以下方法:
user_item_df = user_item.pivot_table(index='msno',
columns='song_id',
values='interacted',
aggfunc='mean')
我得到了这个:
ValueError Traceback (most recent call last)
<ipython-input-76-a870ece1f3e8> in <module>
2 columns='song_id',
3 values='interacted',
----> 4 aggfunc='mean')
~/anaconda3/lib/python3.6/site-packages/dask/dataframe/core.py in pivot_table(self, index, columns, values, aggfunc)
3123 from .reshape import pivot_table
3124 return pivot_table(self, index=index, columns=columns, values=values,
-> 3125 aggfunc=aggfunc)
3126
3127 def to_records(self, index=False):
~/anaconda3/lib/python3.6/site-packages/dask/dataframe/reshape.py in pivot_table(df, index, columns, values, aggfunc)
190 raise ValueError("'columns' must be the name of an existing column")
191 if not is_categorical_dtype(df[columns]):
--> 192 raise ValueError("'columns' must be category dtype")
193 if not has_known_categories(df[columns]):
194 raise ValueError("'columns' must have known categories. Please use "
ValueError: 'columns' must be category dtype
所以我尝试转换列:
user_item.song_id = user_item.song_id.astype('category')
但是我在调用 pivot_table 时得到了这个:
ValueError Traceback (most recent call last)
<ipython-input-78-a870ece1f3e8> in <module>
2 columns='song_id',
3 values='interacted',
----> 4 aggfunc='mean')
~/anaconda3/lib/python3.6/site-packages/dask/dataframe/core.py in pivot_table(self, index, columns, values, aggfunc)
3123 from .reshape import pivot_table
3124 return pivot_table(self, index=index, columns=columns, values=values,
-> 3125 aggfunc=aggfunc)
3126
3127 def to_records(self, index=False):
~/anaconda3/lib/python3.6/site-packages/dask/dataframe/reshape.py in pivot_table(df, index, columns, values, aggfunc)
192 raise ValueError("'columns' must be category dtype")
193 if not has_known_categories(df[columns]):
--> 194 raise ValueError("'columns' must have known categories. Please use "
195 "`df[columns].cat.as_known()` beforehand to ensure "
196 "known categories")
ValueError: 'columns' must have known categories. Please use `df[columns].cat.as_known()` beforehand to ensure known categories
然后我尝试了:
user_item.song_id = user_item.song_id.astype('category').cat.as_known()
我立即得到:
KeyError Traceback (most recent call last)
<timed exec> in <module>
~/anaconda3/lib/python3.6/site-packages/dask/dataframe/categorical.py in as_known(self, **kwargs)
187 if self.known:
188 return self._series
--> 189 categories = self._property_map('categories').unique().compute(**kwargs)
190 return self.set_categories(categories.values)
191
~/anaconda3/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
154 dask.base.compute
155 """
--> 156 (result,) = compute(self, traverse=False, **kwargs)
157 return result
158
~/anaconda3/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
395 keys = [x.__dask_keys__() for x in collections]
396 postcomputes = [x.__dask_postcompute__() for x in collections]
--> 397 results = schedule(dsk, keys, **kwargs)
398 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
399
~/anaconda3/lib/python3.6/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2336 try:
2337 results = self.gather(packed, asynchronous=asynchronous,
-> 2338 direct=direct)
2339 finally:
2340 for f in futures.values():
~/anaconda3/lib/python3.6/site-packages/distributed/client.py in gather(self, futures, errors, maxsize, direct, asynchronous)
1660 return self.sync(self._gather, futures, errors=errors,
1661 direct=direct, local_worker=local_worker,
-> 1662 asynchronous=asynchronous)
1663
1664 @gen.coroutine
~/anaconda3/lib/python3.6/site-packages/distributed/client.py in sync(self, func, *args, **kwargs)
674 return future
675 else:
--> 676 return sync(self.loop, func, *args, **kwargs)
677
678 def __repr__(self):
~/anaconda3/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
275 e.wait(10)
276 if error[0]:
--> 277 six.reraise(*error[0])
278 else:
279 return result[0]
~/anaconda3/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
684 if value.__traceback__ is not tb:
685 raise value.with_traceback(tb)
--> 686 raise value
687
688 else:
~/anaconda3/lib/python3.6/site-packages/distributed/utils.py in f()
260 if timeout is not None:
261 future = gen.with_timeout(timedelta(seconds=timeout), future)
--> 262 result[0] = yield future
263 except Exception as exc:
264 error[0] = sys.exc_info()
~/anaconda3/lib/python3.6/site-packages/tornado/gen.py in run(self)
1131
1132 try:
-> 1133 value = future.result()
1134 except Exception:
1135 self.had_exception = True
~/anaconda3/lib/python3.6/site-packages/tornado/gen.py in run(self)
1139 if exc_info is not None:
1140 try:
-> 1141 yielded = self.gen.throw(*exc_info)
1142 finally:
1143 # Break up a reference to itself
~/anaconda3/lib/python3.6/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1501 six.reraise(type(exception),
1502 exception,
-> 1503 traceback)
1504 if errors == 'skip':
1505 bad_keys.add(key)
~/anaconda3/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
683 value = tp()
684 if value.__traceback__ is not tb:
--> 685 raise value.with_traceback(tb)
686 raise value
687
/home/pi/env/lib/python3.5/site-packages/dask/dataframe/core.py in apply_and_enforce()
KeyError: '_func'
我的工人的输出是:
Exception: KeyError('_func',)
distributed.worker - WARNING - Compute Failed
Function: execute_task
args: ((<function apply at 0x764b3c90>, <function unique at 0x6ef24a50>, [(<function apply_and_enforce at 0x6eeede88>, <function Accessor._delegate_property at 0x6ef28198>, [(<function apply_and_enforce at 0x6eeede88>, <methodcaller: astype>, [(<built-in function getitem>, (<function apply at 0x764b3c90>, <function partial_by_order at 0x762ebd20>, [ msno ... interacted
0 vDi/nHqBu7wb+DtI2Ix4TupWQatUEFR41mDC0c8Voh8= ... 1
1 3IGfhB6dtaYxEGm20yFtRxN7KoFZjzGJbXPSjsjW5cM= ... 1
2 4QugsKXr1pJXSBj6CbSYCF6O7QY2/MHGICUU16p3fig= ... 1
3 i4g6DQpmkTuRCS6/osUsQ8GSBJM8261is4Q04NDGRPk= ... 1
4 TTNNMisplhps4y5gdQ6rsv0++TIKOOIIZLz05W97vFU= ... 1
5 sDR8kS+t73zE9QM8D03Zw3mVrsRXc0Nih/WRl02sfZI= ... 1
6 yiGYGWyGrCYHlMOtPv65urw9RfdH43PNGzu8TRaO+m8= ... 1
7 7lXXPZLRbAPWE5ILi2BFQVEhYzPz9cwNvuzIVCuHfZY= ... 1
8 4clHF4wjaFgY6+nQWoXm1EEAvB
kwargs: {}
Exception: KeyError('_func',)
如果有人知道如何解决这个问题,那将对我有很大帮助。
通过在所有工作人员、调度程序和客户端上放置相同版本的 dask 分布式 dask-core 来解决。
我正在尝试将列转换为类别以执行 pivot_table 操作。
我试过以下方法:
user_item_df = user_item.pivot_table(index='msno',
columns='song_id',
values='interacted',
aggfunc='mean')
我得到了这个:
ValueError Traceback (most recent call last)
<ipython-input-76-a870ece1f3e8> in <module>
2 columns='song_id',
3 values='interacted',
----> 4 aggfunc='mean')
~/anaconda3/lib/python3.6/site-packages/dask/dataframe/core.py in pivot_table(self, index, columns, values, aggfunc)
3123 from .reshape import pivot_table
3124 return pivot_table(self, index=index, columns=columns, values=values,
-> 3125 aggfunc=aggfunc)
3126
3127 def to_records(self, index=False):
~/anaconda3/lib/python3.6/site-packages/dask/dataframe/reshape.py in pivot_table(df, index, columns, values, aggfunc)
190 raise ValueError("'columns' must be the name of an existing column")
191 if not is_categorical_dtype(df[columns]):
--> 192 raise ValueError("'columns' must be category dtype")
193 if not has_known_categories(df[columns]):
194 raise ValueError("'columns' must have known categories. Please use "
ValueError: 'columns' must be category dtype
所以我尝试转换列:
user_item.song_id = user_item.song_id.astype('category')
但是我在调用 pivot_table 时得到了这个:
ValueError Traceback (most recent call last)
<ipython-input-78-a870ece1f3e8> in <module>
2 columns='song_id',
3 values='interacted',
----> 4 aggfunc='mean')
~/anaconda3/lib/python3.6/site-packages/dask/dataframe/core.py in pivot_table(self, index, columns, values, aggfunc)
3123 from .reshape import pivot_table
3124 return pivot_table(self, index=index, columns=columns, values=values,
-> 3125 aggfunc=aggfunc)
3126
3127 def to_records(self, index=False):
~/anaconda3/lib/python3.6/site-packages/dask/dataframe/reshape.py in pivot_table(df, index, columns, values, aggfunc)
192 raise ValueError("'columns' must be category dtype")
193 if not has_known_categories(df[columns]):
--> 194 raise ValueError("'columns' must have known categories. Please use "
195 "`df[columns].cat.as_known()` beforehand to ensure "
196 "known categories")
ValueError: 'columns' must have known categories. Please use `df[columns].cat.as_known()` beforehand to ensure known categories
然后我尝试了:
user_item.song_id = user_item.song_id.astype('category').cat.as_known()
我立即得到:
KeyError Traceback (most recent call last)
<timed exec> in <module>
~/anaconda3/lib/python3.6/site-packages/dask/dataframe/categorical.py in as_known(self, **kwargs)
187 if self.known:
188 return self._series
--> 189 categories = self._property_map('categories').unique().compute(**kwargs)
190 return self.set_categories(categories.values)
191
~/anaconda3/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
154 dask.base.compute
155 """
--> 156 (result,) = compute(self, traverse=False, **kwargs)
157 return result
158
~/anaconda3/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
395 keys = [x.__dask_keys__() for x in collections]
396 postcomputes = [x.__dask_postcompute__() for x in collections]
--> 397 results = schedule(dsk, keys, **kwargs)
398 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
399
~/anaconda3/lib/python3.6/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2336 try:
2337 results = self.gather(packed, asynchronous=asynchronous,
-> 2338 direct=direct)
2339 finally:
2340 for f in futures.values():
~/anaconda3/lib/python3.6/site-packages/distributed/client.py in gather(self, futures, errors, maxsize, direct, asynchronous)
1660 return self.sync(self._gather, futures, errors=errors,
1661 direct=direct, local_worker=local_worker,
-> 1662 asynchronous=asynchronous)
1663
1664 @gen.coroutine
~/anaconda3/lib/python3.6/site-packages/distributed/client.py in sync(self, func, *args, **kwargs)
674 return future
675 else:
--> 676 return sync(self.loop, func, *args, **kwargs)
677
678 def __repr__(self):
~/anaconda3/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
275 e.wait(10)
276 if error[0]:
--> 277 six.reraise(*error[0])
278 else:
279 return result[0]
~/anaconda3/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
684 if value.__traceback__ is not tb:
685 raise value.with_traceback(tb)
--> 686 raise value
687
688 else:
~/anaconda3/lib/python3.6/site-packages/distributed/utils.py in f()
260 if timeout is not None:
261 future = gen.with_timeout(timedelta(seconds=timeout), future)
--> 262 result[0] = yield future
263 except Exception as exc:
264 error[0] = sys.exc_info()
~/anaconda3/lib/python3.6/site-packages/tornado/gen.py in run(self)
1131
1132 try:
-> 1133 value = future.result()
1134 except Exception:
1135 self.had_exception = True
~/anaconda3/lib/python3.6/site-packages/tornado/gen.py in run(self)
1139 if exc_info is not None:
1140 try:
-> 1141 yielded = self.gen.throw(*exc_info)
1142 finally:
1143 # Break up a reference to itself
~/anaconda3/lib/python3.6/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1501 six.reraise(type(exception),
1502 exception,
-> 1503 traceback)
1504 if errors == 'skip':
1505 bad_keys.add(key)
~/anaconda3/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
683 value = tp()
684 if value.__traceback__ is not tb:
--> 685 raise value.with_traceback(tb)
686 raise value
687
/home/pi/env/lib/python3.5/site-packages/dask/dataframe/core.py in apply_and_enforce()
KeyError: '_func'
我的工人的输出是:
Exception: KeyError('_func',)
distributed.worker - WARNING - Compute Failed
Function: execute_task
args: ((<function apply at 0x764b3c90>, <function unique at 0x6ef24a50>, [(<function apply_and_enforce at 0x6eeede88>, <function Accessor._delegate_property at 0x6ef28198>, [(<function apply_and_enforce at 0x6eeede88>, <methodcaller: astype>, [(<built-in function getitem>, (<function apply at 0x764b3c90>, <function partial_by_order at 0x762ebd20>, [ msno ... interacted
0 vDi/nHqBu7wb+DtI2Ix4TupWQatUEFR41mDC0c8Voh8= ... 1
1 3IGfhB6dtaYxEGm20yFtRxN7KoFZjzGJbXPSjsjW5cM= ... 1
2 4QugsKXr1pJXSBj6CbSYCF6O7QY2/MHGICUU16p3fig= ... 1
3 i4g6DQpmkTuRCS6/osUsQ8GSBJM8261is4Q04NDGRPk= ... 1
4 TTNNMisplhps4y5gdQ6rsv0++TIKOOIIZLz05W97vFU= ... 1
5 sDR8kS+t73zE9QM8D03Zw3mVrsRXc0Nih/WRl02sfZI= ... 1
6 yiGYGWyGrCYHlMOtPv65urw9RfdH43PNGzu8TRaO+m8= ... 1
7 7lXXPZLRbAPWE5ILi2BFQVEhYzPz9cwNvuzIVCuHfZY= ... 1
8 4clHF4wjaFgY6+nQWoXm1EEAvB
kwargs: {}
Exception: KeyError('_func',)
如果有人知道如何解决这个问题,那将对我有很大帮助。
通过在所有工作人员、调度程序和客户端上放置相同版本的 dask 分布式 dask-core 来解决。