如何在我的数据框中基于包含在两个不同列表中的值创建一个新列?
How do I create a new column in my dataframe based values contained in two different lists?
我有一个像这样的 pyspark 数据框:
+--------------------+--------------------+
| label| sentences|
+--------------------+--------------------+
|[things, we, eati...|<p>I am construct...|
|[elephants, nordi...|<p><strong>Edited...|
|[bee, cross-entro...|<p>I have a data ...|
|[milking, markers...|<p>There is an Ma...|
|[elephants, tease...|<p>I have Score d...|
|[references, gene...|<p>I'm looking fo...|
|[machines, exitin...|<p>I applied SVM ...|
+--------------------+--------------------+
还有一个 top_ten
这样的列表:
['bee', 'references', 'milking', 'expert', 'bombardier', 'borscht', 'distributions', 'wires', 'keyboard', 'correlation']
和如果 top_ten
列表中至少存在一个标签值(对于每一行,当然)。
虽然逻辑是有道理的,但我对语法的经验不足。这个问题肯定有一个简短的答案吗?
我试过:
temp = train_df.withColumn('label', F.when(lambda x: x.isin(top_ten), 1.0).otherwise(0.0))
还有这个:
def matching_top_ten(top_ten, labels):
for label in labels:
if label.isin(top_ten):
return 1.0
else:
return 0.0
我在最后一次尝试后发现这些函数无法映射到数据帧。所以我想我可以将列转换为 RDD,将其映射,然后 .join()
将其返回,但这听起来不必要地乏味。
**更新:**将上述函数作为 UDF 进行了尝试,但也没有成功...
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
matching_udf = udf(matching_top_ten, FloatType())
temp = train_df.select('label', matching_udf(top_ten, 'label').alias('new_labels'))
----
TypeError: Invalid argument, not a string or column: [...top_ten list values...] of type <class 'list'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.
我在 SO 上发现了其他类似的问题,但是,none 其中涉及针对另一个列表验证列表的逻辑(充其量是针对列表的单个值)。
您可以为前十名列表创建一个新列作为 array
,将 sentence
列拆分为数组中的单独单词,然后按以下方式应用 udf:
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType
top_ten_list = ['bee', 'references', 'milking', 'expert', 'bombardier', 'borscht', 'distributions', 'wires', 'keyboard', 'correlation']
df.withColumn("top_ten_list", F.array([F.lit(x) for x in top_ten_list]))
def matching_top_ten(normal_string, top_ten_ls):
if len(set(normal_string).intersection(set(top_ten_ls))) > 0:
return 1
return 0
matching_top_ten_udf = F.udf(matching_top_ten, IntegerType())
df = df.withColumn("label_flag", matching_top_ten_udf(F.col("label"), F.col("top_ten_list")))
df = df.withColumn("split_sentence", F.split("sentence", " ")).withColumn("label_flag", matching_top_ten_udf(F.col("split_sentence"), F.col("top_ten_list")))
您可以跳过第一步,因为我看到您已经将 top_ten_list
作为 label
列
使用我使用的 df 的示例输出(与您的架构不同):
customer Month year spend ls1 sentence sentence_split label
0 a 11 2018 -800 [new, me] This is a new thing for me [This, is, a, new, thing, for, me] 1
1 a 12 2018 -800 [new, me] This is a new thing for me [This, is, a, new, thing, for, me] 1
2 a 1 2019 300 [new, me] This is a new thing for me [This, is, a, new, thing, for, me] 1
3 a 2 2019 150 [new, me] This is a new thing for me [This, is, a, new, thing, for, me] 1
4 a 3 2019 300 [new, me] This is a new thing for me [This, is, a, new, thing, for, me] 1
5 a 4 2019 -500 [new, me] This is a new thing for me [This, is, a, new, thing, for, me] 1
6 a 5 2019 -800 [new, me] This is a new thing for me [This, is, a, new, thing, for, me] 1
7 a 6 2019 600 [new, me] This is a new thing for me [This, is, a, new, thing, for, me] 1
8 a 7 2019 -400 [new, me] This is a new thing for me [This, is, a, new, thing, for, me] 1
9 a 8 2019 -800 [new, me] This is a new thing for me [This, is, a, new, thing, for, me] 1
您可以分解标签列并将数据框与从您的列表创建的数据框连接起来,以避免使用效率低下的 UDF:
from pyspark.sql.functions import monotonicallyIncreasingId, explode, col
# creating id to group edxploded columns later
train_df = train_df.withColumn("id", monotonicallyIncreasingId())
# Exploding column
train_df = train_df.withColumn("label", explode("label"))
# Creation of dataframe with the top ten list
top_df = sqlContext.createDataFrame(
[('bee', 'references', 'milking', 'expert', 'bombardier', 'borscht', 'distributions', 'wires', 'keyboard', 'correlation',)], ['top']
)
# Join to keep elements
train_df = train_df.join(top_df, col("label") == col("top"), "left")
# Replace nulls with 0s or 1s
train_df = train_df.withColumn("top", when(col("top").isNull(),0).otherwise(1))
# Group results
train_df = train_df.groupby("id").agg(collect_list("label").alias("label"), first("sentences").alias("sentences"), sum("top").alias("new_label"))
# drop id and transform label column to be 1 or 0
train_df = train_df.withColumn("new_label", when(col("new_label")>0,1).otherwise(0))
train_df = train_df.drop("id")
您不需要 并且可以避免 explode
+ agg
.
的开销
Spark 版本 2.4+
您可以使用 pyspark.sql.functions.arrays_overlap
:
import pyspark.sql.functions as F
top_ten_array = F.array(*[F.lit(val) for val in top_ten])
temp = train_df.withColumn(
'new_label',
F.when(F.arrays_overlap('label', top_ten_array), 1.0).otherwise(0.0)
)
或者,您应该可以使用 pyspark.sql.functions.array_intersect()
。
temp = train_df.withColumn(
'new_label',
F.when(
F.size(F.array_intersect('label', top_ten_array)) > 0, 1.0
).otherwise(0.0)
)
这两个检查 label
和 top_ten
的交集大小是否非零。
对于 Spark 1.5 到 2.3,您可以在 top_ten
:
的循环中使用 array_contains
from operator import or_
from functools import reduce
temp = train_df.withColumn(
'new_label',
F.when(
reduce(or_, [F.array_contains('label', val) for val in top_ten]),
1.0
).otherwise(0.0)
)
您测试 label
是否包含 top_ten
中的任何值,并使用按位或运算减少结果。只有 return True
如果 top_ten
中的任何值包含在 label
.
中
我有一个像这样的 pyspark 数据框:
+--------------------+--------------------+
| label| sentences|
+--------------------+--------------------+
|[things, we, eati...|<p>I am construct...|
|[elephants, nordi...|<p><strong>Edited...|
|[bee, cross-entro...|<p>I have a data ...|
|[milking, markers...|<p>There is an Ma...|
|[elephants, tease...|<p>I have Score d...|
|[references, gene...|<p>I'm looking fo...|
|[machines, exitin...|<p>I applied SVM ...|
+--------------------+--------------------+
还有一个 top_ten
这样的列表:
['bee', 'references', 'milking', 'expert', 'bombardier', 'borscht', 'distributions', 'wires', 'keyboard', 'correlation']
和如果 top_ten
列表中至少存在一个标签值(对于每一行,当然)。
虽然逻辑是有道理的,但我对语法的经验不足。这个问题肯定有一个简短的答案吗?
我试过:
temp = train_df.withColumn('label', F.when(lambda x: x.isin(top_ten), 1.0).otherwise(0.0))
还有这个:
def matching_top_ten(top_ten, labels):
for label in labels:
if label.isin(top_ten):
return 1.0
else:
return 0.0
我在最后一次尝试后发现这些函数无法映射到数据帧。所以我想我可以将列转换为 RDD,将其映射,然后 .join()
将其返回,但这听起来不必要地乏味。
**更新:**将上述函数作为 UDF 进行了尝试,但也没有成功...
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
matching_udf = udf(matching_top_ten, FloatType())
temp = train_df.select('label', matching_udf(top_ten, 'label').alias('new_labels'))
----
TypeError: Invalid argument, not a string or column: [...top_ten list values...] of type <class 'list'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.
我在 SO 上发现了其他类似的问题,但是,none 其中涉及针对另一个列表验证列表的逻辑(充其量是针对列表的单个值)。
您可以为前十名列表创建一个新列作为 array
,将 sentence
列拆分为数组中的单独单词,然后按以下方式应用 udf:
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType
top_ten_list = ['bee', 'references', 'milking', 'expert', 'bombardier', 'borscht', 'distributions', 'wires', 'keyboard', 'correlation']
df.withColumn("top_ten_list", F.array([F.lit(x) for x in top_ten_list]))
def matching_top_ten(normal_string, top_ten_ls):
if len(set(normal_string).intersection(set(top_ten_ls))) > 0:
return 1
return 0
matching_top_ten_udf = F.udf(matching_top_ten, IntegerType())
df = df.withColumn("label_flag", matching_top_ten_udf(F.col("label"), F.col("top_ten_list")))
df = df.withColumn("split_sentence", F.split("sentence", " ")).withColumn("label_flag", matching_top_ten_udf(F.col("split_sentence"), F.col("top_ten_list")))
您可以跳过第一步,因为我看到您已经将 top_ten_list
作为 label
列
使用我使用的 df 的示例输出(与您的架构不同):
customer Month year spend ls1 sentence sentence_split label
0 a 11 2018 -800 [new, me] This is a new thing for me [This, is, a, new, thing, for, me] 1
1 a 12 2018 -800 [new, me] This is a new thing for me [This, is, a, new, thing, for, me] 1
2 a 1 2019 300 [new, me] This is a new thing for me [This, is, a, new, thing, for, me] 1
3 a 2 2019 150 [new, me] This is a new thing for me [This, is, a, new, thing, for, me] 1
4 a 3 2019 300 [new, me] This is a new thing for me [This, is, a, new, thing, for, me] 1
5 a 4 2019 -500 [new, me] This is a new thing for me [This, is, a, new, thing, for, me] 1
6 a 5 2019 -800 [new, me] This is a new thing for me [This, is, a, new, thing, for, me] 1
7 a 6 2019 600 [new, me] This is a new thing for me [This, is, a, new, thing, for, me] 1
8 a 7 2019 -400 [new, me] This is a new thing for me [This, is, a, new, thing, for, me] 1
9 a 8 2019 -800 [new, me] This is a new thing for me [This, is, a, new, thing, for, me] 1
您可以分解标签列并将数据框与从您的列表创建的数据框连接起来,以避免使用效率低下的 UDF:
from pyspark.sql.functions import monotonicallyIncreasingId, explode, col
# creating id to group edxploded columns later
train_df = train_df.withColumn("id", monotonicallyIncreasingId())
# Exploding column
train_df = train_df.withColumn("label", explode("label"))
# Creation of dataframe with the top ten list
top_df = sqlContext.createDataFrame(
[('bee', 'references', 'milking', 'expert', 'bombardier', 'borscht', 'distributions', 'wires', 'keyboard', 'correlation',)], ['top']
)
# Join to keep elements
train_df = train_df.join(top_df, col("label") == col("top"), "left")
# Replace nulls with 0s or 1s
train_df = train_df.withColumn("top", when(col("top").isNull(),0).otherwise(1))
# Group results
train_df = train_df.groupby("id").agg(collect_list("label").alias("label"), first("sentences").alias("sentences"), sum("top").alias("new_label"))
# drop id and transform label column to be 1 or 0
train_df = train_df.withColumn("new_label", when(col("new_label")>0,1).otherwise(0))
train_df = train_df.drop("id")
您不需要 explode
+ agg
.
Spark 版本 2.4+
您可以使用 pyspark.sql.functions.arrays_overlap
:
import pyspark.sql.functions as F
top_ten_array = F.array(*[F.lit(val) for val in top_ten])
temp = train_df.withColumn(
'new_label',
F.when(F.arrays_overlap('label', top_ten_array), 1.0).otherwise(0.0)
)
或者,您应该可以使用 pyspark.sql.functions.array_intersect()
。
temp = train_df.withColumn(
'new_label',
F.when(
F.size(F.array_intersect('label', top_ten_array)) > 0, 1.0
).otherwise(0.0)
)
这两个检查 label
和 top_ten
的交集大小是否非零。
对于 Spark 1.5 到 2.3,您可以在 top_ten
:
array_contains
from operator import or_
from functools import reduce
temp = train_df.withColumn(
'new_label',
F.when(
reduce(or_, [F.array_contains('label', val) for val in top_ten]),
1.0
).otherwise(0.0)
)
您测试 label
是否包含 top_ten
中的任何值,并使用按位或运算减少结果。只有 return True
如果 top_ten
中的任何值包含在 label
.