PySpark PythonUDF 缺少输入属性
PySpark PythonUDF Missing input attributes
我正在尝试使用 Spark SQL Data Frame 来读取一些数据并对每一行应用一堆文本清理函数。
import langid
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
from pyspark.sql import HiveContext
hsC = HiveContext(sc)
df = hsC.sql("select * from sometable")
def check_lang(data_str):
language = langid.classify(data_str)
# only english
record = ''
if language[0] == 'en':
# probability of correctly id'ing the language greater than 90%
if language[1] > 0.9:
record = data_str
return record
check_lang_udf = udf(lambda x: check_lang(x), StringType())
clean_df = df.select("Field1", check_lang_udf("TextField"))
然而,当我尝试 运行 时,出现以下错误:
py4j.protocol.Py4JJavaError: An error occurred while calling o31.select.
: java.lang.AssertionError: assertion failed: Unable to evaluate PythonUDF. Missing input attributes
我花了很多时间试图收集更多关于此的信息,但我找不到任何东西。
作为旁注,我知道下面的代码有效,但我想继续使用数据帧。
removeNonEn = data.map(lambda record: (record[0], check_lang(record[1])))
我还没有尝试过这段代码,但是 API 文档表明这应该有效:
hsC.registerFunction("check_lang", check_lang)
clean_df = df.selectExpr("Field1", "check_lang('TextField')")
我正在尝试使用 Spark SQL Data Frame 来读取一些数据并对每一行应用一堆文本清理函数。
import langid
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
from pyspark.sql import HiveContext
hsC = HiveContext(sc)
df = hsC.sql("select * from sometable")
def check_lang(data_str):
language = langid.classify(data_str)
# only english
record = ''
if language[0] == 'en':
# probability of correctly id'ing the language greater than 90%
if language[1] > 0.9:
record = data_str
return record
check_lang_udf = udf(lambda x: check_lang(x), StringType())
clean_df = df.select("Field1", check_lang_udf("TextField"))
然而,当我尝试 运行 时,出现以下错误:
py4j.protocol.Py4JJavaError: An error occurred while calling o31.select.
: java.lang.AssertionError: assertion failed: Unable to evaluate PythonUDF. Missing input attributes
我花了很多时间试图收集更多关于此的信息,但我找不到任何东西。
作为旁注,我知道下面的代码有效,但我想继续使用数据帧。
removeNonEn = data.map(lambda record: (record[0], check_lang(record[1])))
我还没有尝试过这段代码,但是 API 文档表明这应该有效:
hsC.registerFunction("check_lang", check_lang)
clean_df = df.selectExpr("Field1", "check_lang('TextField')")