如何根据条件更改考拉数据框中的值

How change the value in a koalas dataframe based in a condition

我正在使用 Koalas,我想根据条件更改列的值。

在 pandas 中,我可以使用:

import pandas as pd
df_test = pd.DataFrame({
    'a': [1,2,3]
    ,'b': ['one','two','three']})

df_test2 =   pd.DataFrame({
    'c': [2,1,3]
    ,'d': ['one','two','three']})


df_test.loc[df_test.a.isin(df_test2['c']),'b'] = 'four'

df_test.head()

    a   b
0   1   four
1   2   four
2   3   four


我正尝试在 Koalas 中使用相同的方法,但出现此错误:

---------------------------------------------------------------------------
PandasNotImplementedError                 Traceback (most recent call last)
<ipython-input-15-814219258adb> in <module>
      5 new_loans['write_offs'] = 0
      6 
----> 7 new_loans.loc[(new_loans['ID'].isin(userinput_write_offs['id'])),'write_offs'] = 1
      8 new_loans.loc[new_loans['write_offs']==1,'is_active'] = 0
      9 new_loans = new_loans.sort_values(by = ['ZOHOID','Disb Date'])

/usr/local/lib/python3.7/dist-packages/databricks/koalas/base.py in isin(self, values)
    894             )
    895 
--> 896         return self._with_new_scol(self.spark.column.isin(list(values)))
    897 
    898     def isnull(self) -> Union["Series", "Index"]:

/usr/local/lib/python3.7/dist-packages/databricks/koalas/series.py in __iter__(self)
   5871 
   5872     def __iter__(self):
-> 5873         return MissingPandasLikeSeries.__iter__(self)
   5874 
   5875     if sys.version_info >= (3, 7):

/usr/local/lib/python3.7/dist-packages/databricks/koalas/missing/__init__.py in unsupported_function(*args, **kwargs)
     21     def unsupported_function(*args, **kwargs):
     22         raise PandasNotImplementedError(
---> 23             class_name=class_name, method_name=method_name, reason=reason
     24         )
     25 

PandasNotImplementedError: The method `pd.Series.__iter__()` is not implemented. If you want to collect your data as an NumPy array, use 'to_numpy()' instead.

如何在 Koalas 中进行相同的操作?

更新

下面这个问题:我已经完成了:

df_test.loc[df_test.a.isin(df_test2['c'].to_list()),'b'] = 'four'

但是现在我有这个错误:

---------------------------------------------------------------------------
PythonException                           Traceback (most recent call last)
/usr/local/lib/python3.7/dist-packages/IPython/core/formatters.py in __call__(self, obj)
    700                 type_pprinters=self.type_printers,
    701                 deferred_pprinters=self.deferred_printers)
--> 702             printer.pretty(obj)
    703             printer.flush()
    704             return stream.getvalue()

/usr/local/lib/python3.7/dist-packages/IPython/lib/pretty.py in pretty(self, obj)
    392                         if cls is not object \
    393                                 and callable(cls.__dict__.get('__repr__')):
--> 394                             return _repr_pprint(obj, self, cycle)
    395 
    396             return _default_pprint(obj, self, cycle)

/usr/local/lib/python3.7/dist-packages/IPython/lib/pretty.py in _repr_pprint(obj, p, cycle)
    698     """A pprint that just redirects to the normal repr function."""
    699     # Find newlines and replace them with p.break_()
--> 700     output = repr(obj)
    701     lines = output.splitlines()
    702     with p.group():

/usr/local/lib/python3.7/dist-packages/databricks/koalas/frame.py in __repr__(self)
  10614             return self._to_internal_pandas().to_string()
  10615 
> 10616         pdf = self._get_or_create_repr_pandas_cache(max_display_count)
  10617         pdf_length = len(pdf)
  10618         pdf = pdf.iloc[:max_display_count]

/usr/local/lib/python3.7/dist-packages/databricks/koalas/frame.py in _get_or_create_repr_pandas_cache(self, n)
  10606     def _get_or_create_repr_pandas_cache(self, n):
  10607         if not hasattr(self, "_repr_pandas_cache") or n not in self._repr_pandas_cache:
> 10608             self._repr_pandas_cache = {n: self.head(n + 1)._to_internal_pandas()}
  10609         return self._repr_pandas_cache[n]
  10610 

/usr/local/lib/python3.7/dist-packages/databricks/koalas/frame.py in _to_internal_pandas(self)
  10602         This method is for internal use only.
  10603         """
> 10604         return self._internal.to_pandas_frame
  10605 
  10606     def _get_or_create_repr_pandas_cache(self, n):

/usr/local/lib/python3.7/dist-packages/databricks/koalas/utils.py in wrapped_lazy_property(self)
    514     def wrapped_lazy_property(self):
    515         if not hasattr(self, attr_name):
--> 516             setattr(self, attr_name, fn(self))
    517         return getattr(self, attr_name)
    518 

/usr/local/lib/python3.7/dist-packages/databricks/koalas/internal.py in to_pandas_frame(self)
    807         """ Return as pandas DataFrame. """
    808         sdf = self.to_internal_spark_frame
--> 809         pdf = sdf.toPandas()
    810         if len(pdf) == 0 and len(sdf.schema) > 0:
    811             pdf = pdf.astype(

/usr/local/spark/python/pyspark/sql/pandas/conversion.py in toPandas(self)
    136 
    137         # Below is toPandas without Arrow optimization.
--> 138         pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
    139         column_counter = Counter(self.columns)
    140 

/usr/local/spark/python/pyspark/sql/dataframe.py in collect(self)
    594         """
    595         with SCCallSiteSync(self._sc) as css:
--> 596             sock_info = self._jdf.collectToPython()
    597         return list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer())))
    598 

/usr/local/lib/python3.7/dist-packages/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    132                 # Hide where the exception came from that shows a non-Pythonic
    133                 # JVM exception message.
--> 134                 raise_from(converted)
    135             else:
    136                 raise

/usr/local/spark/python/pyspark/sql/utils.py in raise_from(e)

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 589, in main
    func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 447, in read_udfs
    udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i))
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 254, in read_single_udf
    f, return_type = read_command(pickleSer, infile)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 74, in read_command
    command = serializer._read_with_length(file)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 172, in _read_with_length
    return self.loads(obj)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 458, in loads
    return pickle.loads(obj, encoding=encoding)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 1110, in subimport
    __import__(name)
ModuleNotFoundError: No module named 'pandas'

为什么要尝试使用 pandas?

Koalas 包公开了 Pandas 类似于 API 对用户来说是高层次的,但在幕后 实现是使用 PySpark API 完成的.

我观察到在您粘贴的堆栈跟踪日志中,正在使用 toPandas() 方法从 sdf spark Dataframe 创建一个 pandas dataframe 并分配给 pdf

toPandas()函数的实现中,正在导入pandasnumpy

检查 line numbers 809138

/usr/local/lib/python3.7/dist-packages/databricks/koalas/internal.py in to_pandas_frame(self)
    807         """ Return as pandas DataFrame. """
    808         sdf = self.to_internal_spark_frame
--> 809         pdf = sdf.toPandas()
    810         if len(pdf) == 0 and len(sdf.schema) > 0:
    811             pdf = pdf.astype(

/usr/local/spark/python/pyspark/sql/pandas/conversion.py in toPandas(self)
    136 
    137         # Below is toPandas without Arrow optimization.
--> 138         pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
    139         column_counter = Counter(self.columns)
    140 

/usr/local/spark/python/pyspark/sql/dataframe.py in collect(self)
    594         """
    595         with SCCallSiteSync(self._sc) as css:
--> 596             sock_info = self._jdf.collectToPython()
    597         return list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer())))
    598 

您可以在下面link查看toPandas()函数的实现: https://github.com/apache/spark/blob/master/python/pyspark/sql/pandas/conversion.py