How can I solve AttributeError: 'RDD' object has no attribute '_get_object_id' when using UDF?

How can I solve AttributeError: 'RDD' object has no attribute '_get_object_id' when using UDF?

我有下一个代码:

from pyspark.sql.functions import lit
from pyspark.sql.functions import UserDefinedFunction

def aa(a, b):
    if (a == 1):
        return 3
    else:
        return 6

example_dataframe = sqlContext.createDataFrame([(1, 1), (2, 2)], ['a', 'b'])
example_dataframe.show()
af = UserDefinedFunction(lambda (line_a, line_b): aa(line_a, line_b), StringType())
a = af(example_dataframe.rdd)
print(a)
example_dataframe.withColumn('c',lit(a))
example_dataframe.show()

我想根据其他属性的条件生成一个新列。我知道可以使用 "withColumn" 子句指定条件,但我想尝试使用 UDF。

我收到下一个错误:

Traceback (most recent call last):
File "/var/folders/vs/lk870p4x449gmqrtyz9hdry40000gn/T/zeppelin_pyspark-2901893392381883952.py", line 349, in <module>
raise Exception(traceback.format_exc())
Exception: Traceback (most recent call last):
File "/var/folders/vs/lk870p4x449gmqrtyz9hdry40000gn/T/zeppelin_pyspark-2901893392381883952.py", line 337, in <module>
exec(code)
File "<stdin>", line 9, in <module>
File "/Users/javier/Downloads/Apache_ZEPPELIN/zeppelin-0.7.1-bin-all/interpreter/spark/pyspark/pyspark.zip/pyspark/sql/functions.py", line 1848, in __call__
jc = self._judf.apply(_to_seq(sc, cols, _to_java_column))
File "/Users/javier/Downloads/Apache_ZEPPELIN/zeppelin-0.7.1-bin-all/interpreter/spark/pyspark/pyspark.zip/pyspark/sql/column.py", line 59, in _to_seq
cols = [converter(c) for c in cols]
File "/Users/javier/Downloads/Apache_ZEPPELIN/zeppelin-0.7.1-bin-all/interpreter/spark/pyspark/pyspark.zip/pyspark/sql/column.py", line 47, in _to_java_column
jcol = _create_column_from_name(col)
File "/Users/javier/Downloads/Apache_ZEPPELIN/zeppelin-0.7.1-bin-all/interpreter/spark/pyspark/pyspark.zip/pyspark/sql/column.py", line 40, in _create_column_from_name
return sc._jvm.functions.col(name)
File "/Users/javier/Downloads/Apache_ZEPPELIN/zeppelin-0.7.1-bin-all/interpreter/spark/pyspark/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1124, in __call__
args_command, temp_args = self._build_args(*args)
File "/Users/javier/Downloads/Apache_ZEPPELIN/zeppelin-0.7.1-bin-all/interpreter/spark/pyspark/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1094, in _build_args
[get_command_part(arg, self.pool) for arg in new_args])
File "/Users/javier/Downloads/Apache_ZEPPELIN/zeppelin-0.7.1-bin-all/interpreter/spark/pyspark/py4j-0.10.4-src.zip/py4j/protocol.py", line 289, in get_command_part
command_part = REFERENCE_TYPE + parameter._get_object_id()
AttributeError: 'RDD' object has no attribute '_get_object_id'

如何在 UDF 中传递属性值?

您必须传递数据框列而不是数据框本身。

>>> from pyspark.sql.types import *
>>> example_dataframe.show()
+---+---+
|  a|  b|
+---+---+
|  1|  1|
|  2|  2|
+---+---+
>>> af = UserDefinedFunction(lambda line_a, line_b : aa(line_a, line_b), StringType())
>>>example_dataframe.withColumn('c',af(example_dataframe['a'],example_dataframe['b'])).show()

+---+---+---+
|  a|  b|  c|
+---+---+---+
|  1|  1|  3|
|  2|  2|  6|
+---+---+---+