无法将考拉系列指定为考拉中的新列
cannot assign a koalas series as a new column in koalas
我无法将一个系列作为新列分配给 koalas 数据框。下面是我正在使用的代码库:
from databricks import koalas
dft=koalas.DataFrame({'a':[1,2,3],'b':[3,4,5]})
dft.assign(c=koalas.Series([1,2,3]))
输出:
AnalysisException Traceback (most recent call last)
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/IPython/core/formatters.py in __call__(self, obj)
700 type_pprinters=self.type_printers,
701 deferred_pprinters=self.deferred_printers)
--> 702 printer.pretty(obj)
703 printer.flush()
704 return stream.getvalue()
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/IPython/lib/pretty.py in pretty(self, obj)
392 if cls is not object \
393 and callable(cls.__dict__.get('__repr__')):
--> 394 return _repr_pprint(obj, self, cycle)
395
396 return _default_pprint(obj, self, cycle)
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/IPython/lib/pretty.py in _repr_pprint(obj, p, cycle)
698 """A pprint that just redirects to the normal repr function."""
699 # Find newlines and replace them with p.break_()
--> 700 output = repr(obj)
701 lines = output.splitlines()
702 with p.group():
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/frame.py in __repr__(self)
11661 return self._to_internal_pandas().to_string()
11662
> 11663 pdf = self._get_or_create_repr_pandas_cache(max_display_count)
11664 pdf_length = len(pdf)
11665 pdf = pdf.iloc[:max_display_count]
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/frame.py in _get_or_create_repr_pandas_cache(self, n)
11652 if not hasattr(self, "_repr_pandas_cache") or n not in self._repr_pandas_cache:
11653 object.__setattr__(
> 11654 self, "_repr_pandas_cache", {n: self.head(n + 1)._to_internal_pandas()}
11655 )
11656 return self._repr_pandas_cache[n]
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/frame.py in head(self, n)
5748 return DataFrame(self._internal.with_filter(F.lit(False)))
5749 else:
-> 5750 sdf = self._internal.resolved_copy.spark_frame
5751 if get_option("compute.ordered_head"):
5752 sdf = sdf.orderBy(NATURAL_ORDER_COLUMN_NAME)
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/utils.py in wrapped_lazy_property(self)
576 def wrapped_lazy_property(self):
577 if not hasattr(self, attr_name):
--> 578 setattr(self, attr_name, fn(self))
579 return getattr(self, attr_name)
580
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/internal.py in resolved_copy(self)
1066 def resolved_copy(self) -> "InternalFrame":
1067 """ Copy the immutable InternalFrame with the updates resolved. """
-> 1068 sdf = self.spark_frame.select(self.spark_columns + list(HIDDEN_COLUMNS))
1069 return self.copy(
1070 spark_frame=sdf,
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/pyspark/sql/dataframe.py in select(self, *cols)
1683 [Row(name='Alice', age=12), Row(name='Bob', age=15)]
1684 """
-> 1685 jdf = self._jdf.select(self._jcols(*cols))
1686 return DataFrame(jdf, self.sql_ctx)
1687
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/py4j/java_gateway.py in __call__(self, *args)
1307
1308 answer = self.gateway_client.send_command(command)
-> 1309 return_value = get_return_value(
1310 answer, self.gateway_client, self.target_id, self.name)
1311
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
115 # Hide where the exception came from that shows a non-Pythonic
116 # JVM exception message.
--> 117 raise converted from None
118 else:
119 raise
AnalysisException: Resolved attribute(s) 0#991184L missing from __index_level_0__#991164L,a#991165L,b#991166L,__natural_order__#991170L in operator !Project [__index_level_0__#991164L, a#991165L, b#991166L, 0#991184L AS c#991191L, __natural_order__#991170L].;
!Project [__index_level_0__#991164L, a#991165L, b#991166L, 0#991184L AS c#991191L, __natural_order__#991170L]
+- Project [__index_level_0__#991164L, a#991165L, b#991166L, monotonically_increasing_id() AS __natural_order__#991170L]
+- LogicalRDD [__index_level_0__#991164L, a#991165L, b#991166L], false
---------------------------------------------------------------------------
AnalysisException Traceback (most recent call last)
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/IPython/core/formatters.py in __call__(self, obj)
343 method = get_real_method(obj, self.print_method)
344 if method is not None:
--> 345 return method()
346 return None
347 else:
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/frame.py in _repr_html_(self)
11684 return self._to_internal_pandas().to_html(notebook=True, bold_rows=bold_rows)
11685
> 11686 pdf = self._get_or_create_repr_pandas_cache(max_display_count)
11687 pdf_length = len(pdf)
11688 pdf = pdf.iloc[:max_display_count]
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/frame.py in _get_or_create_repr_pandas_cache(self, n)
11652 if not hasattr(self, "_repr_pandas_cache") or n not in self._repr_pandas_cache:
11653 object.__setattr__(
> 11654 self, "_repr_pandas_cache", {n: self.head(n + 1)._to_internal_pandas()}
11655 )
11656 return self._repr_pandas_cache[n]
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/frame.py in head(self, n)
5748 return DataFrame(self._internal.with_filter(F.lit(False)))
5749 else:
-> 5750 sdf = self._internal.resolved_copy.spark_frame
5751 if get_option("compute.ordered_head"):
5752 sdf = sdf.orderBy(NATURAL_ORDER_COLUMN_NAME)
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/utils.py in wrapped_lazy_property(self)
576 def wrapped_lazy_property(self):
577 if not hasattr(self, attr_name):
--> 578 setattr(self, attr_name, fn(self))
579 return getattr(self, attr_name)
580
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/internal.py in resolved_copy(self)
1066 def resolved_copy(self) -> "InternalFrame":
1067 """ Copy the immutable InternalFrame with the updates resolved. """
-> 1068 sdf = self.spark_frame.select(self.spark_columns + list(HIDDEN_COLUMNS))
1069 return self.copy(
1070 spark_frame=sdf,
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/pyspark/sql/dataframe.py in select(self, *cols)
1683 [Row(name='Alice', age=12), Row(name='Bob', age=15)]
1684 """
-> 1685 jdf = self._jdf.select(self._jcols(*cols))
1686 return DataFrame(jdf, self.sql_ctx)
1687
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/py4j/java_gateway.py in __call__(self, *args)
1307
1308 answer = self.gateway_client.send_command(command)
-> 1309 return_value = get_return_value(
1310 answer, self.gateway_client, self.target_id, self.name)
1311
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
115 # Hide where the exception came from that shows a non-Pythonic
116 # JVM exception message.
--> 117 raise converted from None
118 else:
119 raise
AnalysisException: Resolved attribute(s) 0#991184L missing from __index_level_0__#991164L,a#991165L,b#991166L,__natural_order__#991170L in operator !Project [__index_level_0__#991164L, a#991165L, b#991166L, 0#991184L AS c#991191L, __natural_order__#991170L].;
!Project [__index_level_0__#991164L, a#991165L, b#991166L, 0#991184L AS c#991191L, __natural_order__#991170L]
+- Project [__index_level_0__#991164L, a#991165L, b#991166L, monotonically_increasing_id() AS __natural_order__#991170L]
+- LogicalRDD [__index_level_0__#991164L, a#991165L, b#991166L], false
你能帮我理解我的方法出了什么问题以及如何将新列分配给 koalas datadrame 吗?
老实说,我不知道为什么 assign
会出现错误,但是向 koalas.DataFrame
添加新列的一种方法是使用标准赋值方法 ['']
如下所示。
重要的是更改选项 compute.ops_on_diff_frames
以允许对不同的 Series/DataFrames.
进行操作
import databricks.koalas as ks
ks.set_option('compute.ops_on_diff_frames', True)
dft = ks.DataFrame({'a':[1,2,3],'b':[3,4,5]})
dft['c'] = koalas.Series([1,2,3])
dft
# a b c
# 0 1 3 1
# 1 2 4 2
# 2 3 5 3
不幸的是,您只能在 assign
方法中对数据框的现有列使用表达式。
说明
错误堆栈中的重要部分是 spark 执行计划:
AnalysisException: Resolved attribute(s) 0#991184L missing from __index_level_0__#991164L,a#991165L,b#991166L,__natural_order__#991170L in operator !Project [__index_level_0__#991164L, a#991165L, b#991166L, 0#991184L AS c#991191L, __natural_order__#991170L].;
!Project [__index_level_0__#991164L, a#991165L, b#991166L, 0#991184L AS c#991191L, __natural_order__#991170L]
+- Project [__index_level_0__#991164L, a#991165L, b#991166L, monotonically_increasing_id() AS __natural_order__#991170L]
+- LogicalRDD [__index_level_0__#991164L, a#991165L, b#991166L], false
在spark执行计划中,Project
可以翻译成SQL的SELECT
。你可以看到执行计划在第二个 Project
失败(你从下到上阅读了 spark 执行计划),因为它找不到列 0#991184L
(这是你想要添加到你的 dft
dataframe)在 dft
dataframe 中的 __index_level_0__#991164L
、a#991165L
、b#991166L
、__natural_order__#991170L
列中
的确,第 0#991184L
列来自您突然创建的系列,而不是来自您的 dft
数据框的系列。对于 Spark,这意味着该列来自另一个数据帧,因此您显然无法使用 SELECT
从 dft
数据帧中检索它,这正是 Spark 正在尝试做的。
对于 link pandas 和 Spark APIs,等同于 assign
的 Spark 将是 withColumn
Spark dataframe method,其文档说明:
The column expression must be an expression over this DataFrame; attempting to add a column from some other DataFrame will raise an error.
注:其实Spark相当于assign
比select
function多了withColumn
限制只能加一列,但是[=29的限制=] 也适用于 select
因此 assign
适用于以下情况:
dft=koalas.DataFrame({'a':[1,2,3],'b':[3,4,5]})
# Type of dft['a'] and dft['b'] is Serie
dft.assign(c=dft['a']))
dft.assign(d=dft['a']*2))
dft.assign(e=dft['a']*dft['b']))
但在以下情况下不适用:
dft=koalas.DataFrame({'a':[1,2,3],'b':[3,4,5]})
dft.assign(c=koalas.Series([1,2,3]))
dft2=pd.DataFrame({'d': [1, 2, 3]})
# Type of dft2['d'] is Serie
dft.assign(d=dft2['d'])
解决方法
这里的解决方法是按照 中的说明进行操作,并使用 dft['c'] = koalas.Series([1,2,3])
分配列
它之所以有效,是因为在这种情况下,Spark 将连接两个数据帧,而不是仅仅从第一个数据帧中选择列。作为连接,这里被考拉 API 隐藏,在 Spark 中可能是非常昂贵的操作,您需要通过将 compute.ops_on_diff_frames
设置为 True
来覆盖护栏
将 compute.ops_on_diff_frames
设置为 true 只是告诉考拉“我承认此操作是一个连接,可能会导致性能不佳”。您实际上可以在执行操作后将此选项重置为其先前的值,使用 koalas.reset_option('compute.ops_on_diff_frames')
我无法将一个系列作为新列分配给 koalas 数据框。下面是我正在使用的代码库:
from databricks import koalas
dft=koalas.DataFrame({'a':[1,2,3],'b':[3,4,5]})
dft.assign(c=koalas.Series([1,2,3]))
输出:
AnalysisException Traceback (most recent call last)
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/IPython/core/formatters.py in __call__(self, obj)
700 type_pprinters=self.type_printers,
701 deferred_pprinters=self.deferred_printers)
--> 702 printer.pretty(obj)
703 printer.flush()
704 return stream.getvalue()
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/IPython/lib/pretty.py in pretty(self, obj)
392 if cls is not object \
393 and callable(cls.__dict__.get('__repr__')):
--> 394 return _repr_pprint(obj, self, cycle)
395
396 return _default_pprint(obj, self, cycle)
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/IPython/lib/pretty.py in _repr_pprint(obj, p, cycle)
698 """A pprint that just redirects to the normal repr function."""
699 # Find newlines and replace them with p.break_()
--> 700 output = repr(obj)
701 lines = output.splitlines()
702 with p.group():
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/frame.py in __repr__(self)
11661 return self._to_internal_pandas().to_string()
11662
> 11663 pdf = self._get_or_create_repr_pandas_cache(max_display_count)
11664 pdf_length = len(pdf)
11665 pdf = pdf.iloc[:max_display_count]
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/frame.py in _get_or_create_repr_pandas_cache(self, n)
11652 if not hasattr(self, "_repr_pandas_cache") or n not in self._repr_pandas_cache:
11653 object.__setattr__(
> 11654 self, "_repr_pandas_cache", {n: self.head(n + 1)._to_internal_pandas()}
11655 )
11656 return self._repr_pandas_cache[n]
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/frame.py in head(self, n)
5748 return DataFrame(self._internal.with_filter(F.lit(False)))
5749 else:
-> 5750 sdf = self._internal.resolved_copy.spark_frame
5751 if get_option("compute.ordered_head"):
5752 sdf = sdf.orderBy(NATURAL_ORDER_COLUMN_NAME)
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/utils.py in wrapped_lazy_property(self)
576 def wrapped_lazy_property(self):
577 if not hasattr(self, attr_name):
--> 578 setattr(self, attr_name, fn(self))
579 return getattr(self, attr_name)
580
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/internal.py in resolved_copy(self)
1066 def resolved_copy(self) -> "InternalFrame":
1067 """ Copy the immutable InternalFrame with the updates resolved. """
-> 1068 sdf = self.spark_frame.select(self.spark_columns + list(HIDDEN_COLUMNS))
1069 return self.copy(
1070 spark_frame=sdf,
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/pyspark/sql/dataframe.py in select(self, *cols)
1683 [Row(name='Alice', age=12), Row(name='Bob', age=15)]
1684 """
-> 1685 jdf = self._jdf.select(self._jcols(*cols))
1686 return DataFrame(jdf, self.sql_ctx)
1687
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/py4j/java_gateway.py in __call__(self, *args)
1307
1308 answer = self.gateway_client.send_command(command)
-> 1309 return_value = get_return_value(
1310 answer, self.gateway_client, self.target_id, self.name)
1311
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
115 # Hide where the exception came from that shows a non-Pythonic
116 # JVM exception message.
--> 117 raise converted from None
118 else:
119 raise
AnalysisException: Resolved attribute(s) 0#991184L missing from __index_level_0__#991164L,a#991165L,b#991166L,__natural_order__#991170L in operator !Project [__index_level_0__#991164L, a#991165L, b#991166L, 0#991184L AS c#991191L, __natural_order__#991170L].;
!Project [__index_level_0__#991164L, a#991165L, b#991166L, 0#991184L AS c#991191L, __natural_order__#991170L]
+- Project [__index_level_0__#991164L, a#991165L, b#991166L, monotonically_increasing_id() AS __natural_order__#991170L]
+- LogicalRDD [__index_level_0__#991164L, a#991165L, b#991166L], false
---------------------------------------------------------------------------
AnalysisException Traceback (most recent call last)
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/IPython/core/formatters.py in __call__(self, obj)
343 method = get_real_method(obj, self.print_method)
344 if method is not None:
--> 345 return method()
346 return None
347 else:
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/frame.py in _repr_html_(self)
11684 return self._to_internal_pandas().to_html(notebook=True, bold_rows=bold_rows)
11685
> 11686 pdf = self._get_or_create_repr_pandas_cache(max_display_count)
11687 pdf_length = len(pdf)
11688 pdf = pdf.iloc[:max_display_count]
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/frame.py in _get_or_create_repr_pandas_cache(self, n)
11652 if not hasattr(self, "_repr_pandas_cache") or n not in self._repr_pandas_cache:
11653 object.__setattr__(
> 11654 self, "_repr_pandas_cache", {n: self.head(n + 1)._to_internal_pandas()}
11655 )
11656 return self._repr_pandas_cache[n]
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/frame.py in head(self, n)
5748 return DataFrame(self._internal.with_filter(F.lit(False)))
5749 else:
-> 5750 sdf = self._internal.resolved_copy.spark_frame
5751 if get_option("compute.ordered_head"):
5752 sdf = sdf.orderBy(NATURAL_ORDER_COLUMN_NAME)
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/utils.py in wrapped_lazy_property(self)
576 def wrapped_lazy_property(self):
577 if not hasattr(self, attr_name):
--> 578 setattr(self, attr_name, fn(self))
579 return getattr(self, attr_name)
580
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/internal.py in resolved_copy(self)
1066 def resolved_copy(self) -> "InternalFrame":
1067 """ Copy the immutable InternalFrame with the updates resolved. """
-> 1068 sdf = self.spark_frame.select(self.spark_columns + list(HIDDEN_COLUMNS))
1069 return self.copy(
1070 spark_frame=sdf,
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/pyspark/sql/dataframe.py in select(self, *cols)
1683 [Row(name='Alice', age=12), Row(name='Bob', age=15)]
1684 """
-> 1685 jdf = self._jdf.select(self._jcols(*cols))
1686 return DataFrame(jdf, self.sql_ctx)
1687
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/py4j/java_gateway.py in __call__(self, *args)
1307
1308 answer = self.gateway_client.send_command(command)
-> 1309 return_value = get_return_value(
1310 answer, self.gateway_client, self.target_id, self.name)
1311
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
115 # Hide where the exception came from that shows a non-Pythonic
116 # JVM exception message.
--> 117 raise converted from None
118 else:
119 raise
AnalysisException: Resolved attribute(s) 0#991184L missing from __index_level_0__#991164L,a#991165L,b#991166L,__natural_order__#991170L in operator !Project [__index_level_0__#991164L, a#991165L, b#991166L, 0#991184L AS c#991191L, __natural_order__#991170L].;
!Project [__index_level_0__#991164L, a#991165L, b#991166L, 0#991184L AS c#991191L, __natural_order__#991170L]
+- Project [__index_level_0__#991164L, a#991165L, b#991166L, monotonically_increasing_id() AS __natural_order__#991170L]
+- LogicalRDD [__index_level_0__#991164L, a#991165L, b#991166L], false
你能帮我理解我的方法出了什么问题以及如何将新列分配给 koalas datadrame 吗?
老实说,我不知道为什么 assign
会出现错误,但是向 koalas.DataFrame
添加新列的一种方法是使用标准赋值方法 ['']
如下所示。
重要的是更改选项 compute.ops_on_diff_frames
以允许对不同的 Series/DataFrames.
import databricks.koalas as ks
ks.set_option('compute.ops_on_diff_frames', True)
dft = ks.DataFrame({'a':[1,2,3],'b':[3,4,5]})
dft['c'] = koalas.Series([1,2,3])
dft
# a b c
# 0 1 3 1
# 1 2 4 2
# 2 3 5 3
不幸的是,您只能在 assign
方法中对数据框的现有列使用表达式。
说明
错误堆栈中的重要部分是 spark 执行计划:
AnalysisException: Resolved attribute(s) 0#991184L missing from __index_level_0__#991164L,a#991165L,b#991166L,__natural_order__#991170L in operator !Project [__index_level_0__#991164L, a#991165L, b#991166L, 0#991184L AS c#991191L, __natural_order__#991170L].;
!Project [__index_level_0__#991164L, a#991165L, b#991166L, 0#991184L AS c#991191L, __natural_order__#991170L]
+- Project [__index_level_0__#991164L, a#991165L, b#991166L, monotonically_increasing_id() AS __natural_order__#991170L]
+- LogicalRDD [__index_level_0__#991164L, a#991165L, b#991166L], false
在spark执行计划中,Project
可以翻译成SQL的SELECT
。你可以看到执行计划在第二个 Project
失败(你从下到上阅读了 spark 执行计划),因为它找不到列 0#991184L
(这是你想要添加到你的 dft
dataframe)在 dft
dataframe 中的 __index_level_0__#991164L
、a#991165L
、b#991166L
、__natural_order__#991170L
的确,第 0#991184L
列来自您突然创建的系列,而不是来自您的 dft
数据框的系列。对于 Spark,这意味着该列来自另一个数据帧,因此您显然无法使用 SELECT
从 dft
数据帧中检索它,这正是 Spark 正在尝试做的。
对于 link pandas 和 Spark APIs,等同于 assign
的 Spark 将是 withColumn
Spark dataframe method,其文档说明:
The column expression must be an expression over this DataFrame; attempting to add a column from some other DataFrame will raise an error.
注:其实Spark相当于assign
比select
function多了withColumn
限制只能加一列,但是[=29的限制=] 也适用于 select
因此 assign
适用于以下情况:
dft=koalas.DataFrame({'a':[1,2,3],'b':[3,4,5]})
# Type of dft['a'] and dft['b'] is Serie
dft.assign(c=dft['a']))
dft.assign(d=dft['a']*2))
dft.assign(e=dft['a']*dft['b']))
但在以下情况下不适用:
dft=koalas.DataFrame({'a':[1,2,3],'b':[3,4,5]})
dft.assign(c=koalas.Series([1,2,3]))
dft2=pd.DataFrame({'d': [1, 2, 3]})
# Type of dft2['d'] is Serie
dft.assign(d=dft2['d'])
解决方法
这里的解决方法是按照 dft['c'] = koalas.Series([1,2,3])
它之所以有效,是因为在这种情况下,Spark 将连接两个数据帧,而不是仅仅从第一个数据帧中选择列。作为连接,这里被考拉 API 隐藏,在 Spark 中可能是非常昂贵的操作,您需要通过将 compute.ops_on_diff_frames
设置为 True
将 compute.ops_on_diff_frames
设置为 true 只是告诉考拉“我承认此操作是一个连接,可能会导致性能不佳”。您实际上可以在执行操作后将此选项重置为其先前的值,使用 koalas.reset_option('compute.ops_on_diff_frames')