用 Pyspark 编写的自定义 UDF,输出为 "java.lang.Object"

Custom UDF written in Pyspark giving output as "java.lang.Object"

我写了一个自定义转换器classPOSWordTagger。我的 _transform() 方法代码是

def _transform(self, dataset):

    def f(s):
        tokens = nltk.tokenize.wordpunct_tokenize(s)
        pos_tags = nltk.pos_tag(tokens)
        return pos_tags

t = ArrayType(StringType())
out_col = self.getOutputCol()
in_col = dataset[self.getInputCol()]
return dataset.withColumn(out_col, udf(f, t)(in_col))

我这样称呼我的变压器class,

sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

sentenceDataFrame = sqlContext.createDataFrame([
  (0, "Hi I heard about Spark"),
  (0, "I wish Java could use case classes"),
  (1, "Logistic regression models are neat")
  ], ["label", "sentence"])

pos_tagger = POSWordTagger(inputCol="sentence", outputCol="pos")

pos_output=pos_tagger.transform(sentenceDataFrame)
pos_output.select("pos").show()

我得到的输出是,

+--------------------+
|                 pos|
+--------------------+
|[[Ljava.lang.Obje...|
|[[Ljava.lang.Obje...|
|[[Ljava.lang.Obje...|
+--------------------+

即使我将架构作为 ArrayType(StringType()) 传递,我也得到了对象引用作为输出。但是,如果我 return 仅 tokens 作为输出而不是 _transform() 方法中的 pos_tags,我将正确获得输出,即标记列表。任何人都可以让我知道我错过了什么或做错了什么吗?任何帮助表示赞赏。我的环境是 Spark 1.6 和 Python 2.7.

看下面的例子,pos_tag returns list(tuple(string)):

>>> text = word_tokenize("And now for something completely different")
>>> nltk.pos_tag(text)


[('And', 'CC'), ('now', 'RB'), ('for', 'IN'), ('something', 'NN'),
('completely', 'RB'), ('different', 'JJ')]

所以您的代码中的问题在这里 ArrayType(StringType()),所以它应该是 ArrayType(ArrayType(StringType()))

###### 回答评论
import pyspark.sql.types as T 
import pyspark.sql.functions as F 
def flattenArray(obj):
    return reduce(lambda x,y:x+y, obj)

pos_output.select(F.udf(flattenArray, T.ArrayType(T.StringType()))("pos").alias("pos")).show(truncate = False)