如何在我的数据框中基于包含在两个不同列表中的值创建一个新列?

How do I create a new column in my dataframe based values contained in two different lists?

我有一个像这样的 pyspark 数据框:

+--------------------+--------------------+
|               label|           sentences|
+--------------------+--------------------+
|[things, we, eati...|<p>I am construct...|
|[elephants, nordi...|<p><strong>Edited...|
|[bee, cross-entro...|<p>I have a data ...|
|[milking, markers...|<p>There is an Ma...|
|[elephants, tease...|<p>I have Score d...|
|[references, gene...|<p>I'm looking fo...|
|[machines, exitin...|<p>I applied SVM ...|
+--------------------+--------------------+

还有一个 top_ten 这样的列表:

['bee', 'references', 'milking', 'expert', 'bombardier', 'borscht', 'distributions', 'wires', 'keyboard', 'correlation']

如果 top_ten 列表中至少存在一个标签值(对于每一行,当然)。

虽然逻辑是有道理的,但我对语法的经验不足。这个问题肯定有一个简短的答案吗?

我试过:

temp = train_df.withColumn('label', F.when(lambda x: x.isin(top_ten), 1.0).otherwise(0.0))

还有这个:

def matching_top_ten(top_ten, labels):
    for label in labels:
        if label.isin(top_ten):
            return 1.0
        else:
            return 0.0

我在最后一次尝试后发现这些函数无法映射到数据帧。所以我想我可以将列转换为 RDD,将其映射,然后 .join() 将其返回,但这听起来不必要地乏味。

**更新:**将上述函数作为 UDF 进行了尝试,但也没有成功...

from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
matching_udf = udf(matching_top_ten, FloatType())
temp = train_df.select('label', matching_udf(top_ten, 'label').alias('new_labels'))
----
TypeError: Invalid argument, not a string or column: [...top_ten list values...] of type <class 'list'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.

我在 SO 上发现了其他类似的问题,但是,none 其中涉及针对另一个列表验证列表的逻辑(充其量是针对列表的单个值)。

您可以为前十名列表创建一个新列作为 array,将 sentence 列拆分为数组中的单独单词,然后按以下方式应用 udf:

import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType

top_ten_list = ['bee', 'references', 'milking', 'expert', 'bombardier', 'borscht', 'distributions', 'wires', 'keyboard', 'correlation']
df.withColumn("top_ten_list", F.array([F.lit(x) for x in top_ten_list]))

def matching_top_ten(normal_string, top_ten_ls):
    if len(set(normal_string).intersection(set(top_ten_ls))) > 0:
        return 1
    return 0

matching_top_ten_udf = F.udf(matching_top_ten, IntegerType())

df = df.withColumn("label_flag", matching_top_ten_udf(F.col("label"), F.col("top_ten_list")))
df = df.withColumn("split_sentence", F.split("sentence", " ")).withColumn("label_flag", matching_top_ten_udf(F.col("split_sentence"), F.col("top_ten_list")))

您可以跳过第一步,因为我看到您已经将 top_ten_list 作为 label

使用我使用的 df 的示例输出(与您的架构不同):

  customer  Month  year  spend        ls1                    sentence                      sentence_split  label
0        a     11  2018   -800  [new, me]  This is a new thing for me  [This, is, a, new, thing, for, me]      1
1        a     12  2018   -800  [new, me]  This is a new thing for me  [This, is, a, new, thing, for, me]      1
2        a      1  2019    300  [new, me]  This is a new thing for me  [This, is, a, new, thing, for, me]      1
3        a      2  2019    150  [new, me]  This is a new thing for me  [This, is, a, new, thing, for, me]      1
4        a      3  2019    300  [new, me]  This is a new thing for me  [This, is, a, new, thing, for, me]      1
5        a      4  2019   -500  [new, me]  This is a new thing for me  [This, is, a, new, thing, for, me]      1
6        a      5  2019   -800  [new, me]  This is a new thing for me  [This, is, a, new, thing, for, me]      1
7        a      6  2019    600  [new, me]  This is a new thing for me  [This, is, a, new, thing, for, me]      1
8        a      7  2019   -400  [new, me]  This is a new thing for me  [This, is, a, new, thing, for, me]      1
9        a      8  2019   -800  [new, me]  This is a new thing for me  [This, is, a, new, thing, for, me]      1

您可以分解标签列并将数据框与从您的列表创建的数据框连接起来,以避免使用效率低下的 UDF:

from pyspark.sql.functions import monotonicallyIncreasingId, explode, col

# creating id to group edxploded columns later
train_df = train_df.withColumn("id", monotonicallyIncreasingId())

# Exploding column
train_df = train_df.withColumn("label", explode("label"))

# Creation of dataframe with the top ten list
top_df = sqlContext.createDataFrame(
    [('bee', 'references', 'milking', 'expert', 'bombardier', 'borscht', 'distributions', 'wires', 'keyboard', 'correlation',)], ['top']
)

# Join to keep elements
train_df = train_df.join(top_df, col("label") == col("top"), "left")

# Replace nulls with 0s or 1s
train_df = train_df.withColumn("top", when(col("top").isNull(),0).otherwise(1))

# Group results
train_df = train_df.groupby("id").agg(collect_list("label").alias("label"), first("sentences").alias("sentences"), sum("top").alias("new_label"))

# drop id and transform label column to be 1 or 0
train_df = train_df.withColumn("new_label", when(col("new_label")>0,1).otherwise(0))
train_df = train_df.drop("id")

您不需要 并且可以避免 explode + agg.

的开销

Spark 版本 2.4+

您可以使用 pyspark.sql.functions.arrays_overlap:

import pyspark.sql.functions as F

top_ten_array = F.array(*[F.lit(val) for val in top_ten])

temp = train_df.withColumn(
    'new_label', 
    F.when(F.arrays_overlap('label', top_ten_array), 1.0).otherwise(0.0)
)

或者,您应该可以使用 pyspark.sql.functions.array_intersect()

temp = train_df.withColumn(
    'new_label', 
    F.when(
        F.size(F.array_intersect('label', top_ten_array)) > 0, 1.0
    ).otherwise(0.0)
)

这两个检查 labeltop_ten 的交集大小是否非零。


对于 Spark 1.5 到 2.3,您可以在 top_ten:

的循环中使用 array_contains
from operator import or_
from functools import reduce

temp = train_df.withColumn(
    'new_label',
    F.when(
        reduce(or_, [F.array_contains('label', val) for val in top_ten]),
        1.0
    ).otherwise(0.0)
)

您测试 label 是否包含 top_ten 中的任何值,并使用按位或运算减少结果。只有 return True 如果 top_ten 中的任何值包含在 label.