使用 pyspark/pandas 跨行查找常用词
finding common words across rows using pyspark/pandas
我有一个文本文件,如下所示,带有竖线分隔符
person_id | category | notes
1|A|He bought cat
1|A|He bought dog
1|B|He has hen
2|A|Switzerland Australia
2|A|Australia
我想按 person_id 和类别分组,只查找在所有行中重复的词
预期输出
1|A|He bought
1|B|he has hen
2|A|Australia
我已经按 person_id 和类别购买了每个使用组的字数,我无法获取输出
我使用 rdd 字数统计和 spark-sql
像下面这样使用 group by 得到了字数统计
person_id | category | notes
1|A|He (2) bought(2) cat(1) dog(1)
1|B|He(1) has(1) hen(1)
2|A|Switzerland(1) Australia(2)
您可以将notes
字符串拆分成单词,然后分解单词。最后 count
每个 person_id, category
的 notes
的数量和每个 person_id, category
的单词出现次数。如果它们计数相等,则按 collect_list
.
构造单词
from pyspark.sql import functions as F
from pyspark.sql import Window
data = [(1, "A", "He bought cat"),
(1, "A", "He bought dog"),
(1, "B", "He has hen"),
(2, "A", "Switzerland Australia"),
(2, "A", "Australia"),]
df = spark.createDataFrame(data, ("person_id", "category", "notes", ))
window_spec_category = Window.partitionBy("person_id", "category")
df_word = df.withColumn("category_count", F.count("*").over(window_spec_category))\
.select("person_id", "category", F.posexplode(F.split(F.col("notes"), " ")).alias("pos", "word"))
window_spec_word = Window.partitionBy("person_id", "category", "word")
matching_words = df_word.withColumn("word_count", F.count("*").over(window_spec_word))\
.withColumn("rn", F.row_number().over(window_spec_word.orderBy(F.lit(None))))\
.filter(F.col("word_count") == F.col("category_count"))\
.filter(F.col("rn") == F.lit(1))\
.drop("rn")
window_spec_collect = window_spec_category.orderBy("pos").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
matching_words.withColumn("result", F.concat_ws(" ", F.collect_list("word").over(window_spec_collect)))\
.withColumn("rn", F.row_number().over(window_spec_category.orderBy(F.lit(None))))\
.filter(F.col("rn") == F.lit(1))\
.select("person_id", "category", "result")\
.show()
输出
+---------+--------+----------+
|person_id|category| result|
+---------+--------+----------+
| 1| A| He bought|
| 1| B|He has hen|
| 2| A| Australia|
+---------+--------+----------+
您可以使用 Spark 数组函数实现:
- 拆分列
notes
以获得单词数组
- 按
person_id
和category
分组收集单词列表
- 通过使用高阶函数检查结果数组是否存在于所有收集的子数组(即单词)中来过滤结果数组
filter
import pyspark.sql.functions as F
df1 = df.withColumn("notes", F.split("notes", " ")) \
.groupBy("person_id", "category") \
.agg(F.collect_list(F.col("notes")).alias("notes")) \
.withColumn("w", F.array_distinct(F.flatten("notes"))) \
.withColumn("notes", F.array_join(F.expr("filter(w, x -> size(filter(notes, y -> array_contains(y, x))) = size(notes))"), " ")) \
.drop("w")
df1.show()
#+---------+--------+----------+
#|person_id|category|notes |
#+---------+--------+----------+
#|1 |A |He bought |
#|1 |B |He has hen|
#|2 |A |Australia |
#+---------+--------+----------+
另一种选择是定义一个 UDF,它将查找字符串数组的交集:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.getOrCreate()
data = [
{"person_id": 1, "category": "A", "notes": "He bought cat"},
{"person_id": 1, "category": "A", "notes": "He bought dog"},
{"person_id": 1, "category": "B", "notes": "He has hen"},
{"person_id": 2, "category": "A", "notes": "Switzerland Australia"},
{"person_id": 2, "category": "A", "notes": "Australia"},
]
def common(x):
l = [i.split() for i in x]
return " ".join(sorted(set.intersection(*map(set, l)), key=l[0].index))
df = spark.createDataFrame(data)
df = df.groupBy(["person_id", "category"]).agg(F.collect_list("notes").alias("b"))
df = df.withColumn("result", F.udf(common)(F.col("b")))
结果:
+---------+--------+----------+
|person_id|category|result |
+---------+--------+----------+
|1 |A |He bought |
|1 |B |He has hen|
|2 |A |Australia |
+---------+--------+----------+
我有一个文本文件,如下所示,带有竖线分隔符
person_id | category | notes
1|A|He bought cat
1|A|He bought dog
1|B|He has hen
2|A|Switzerland Australia
2|A|Australia
我想按 person_id 和类别分组,只查找在所有行中重复的词
预期输出
1|A|He bought
1|B|he has hen
2|A|Australia
我已经按 person_id 和类别购买了每个使用组的字数,我无法获取输出
我使用 rdd 字数统计和 spark-sql
像下面这样使用 group by 得到了字数统计person_id | category | notes
1|A|He (2) bought(2) cat(1) dog(1)
1|B|He(1) has(1) hen(1)
2|A|Switzerland(1) Australia(2)
您可以将notes
字符串拆分成单词,然后分解单词。最后 count
每个 person_id, category
的 notes
的数量和每个 person_id, category
的单词出现次数。如果它们计数相等,则按 collect_list
.
from pyspark.sql import functions as F
from pyspark.sql import Window
data = [(1, "A", "He bought cat"),
(1, "A", "He bought dog"),
(1, "B", "He has hen"),
(2, "A", "Switzerland Australia"),
(2, "A", "Australia"),]
df = spark.createDataFrame(data, ("person_id", "category", "notes", ))
window_spec_category = Window.partitionBy("person_id", "category")
df_word = df.withColumn("category_count", F.count("*").over(window_spec_category))\
.select("person_id", "category", F.posexplode(F.split(F.col("notes"), " ")).alias("pos", "word"))
window_spec_word = Window.partitionBy("person_id", "category", "word")
matching_words = df_word.withColumn("word_count", F.count("*").over(window_spec_word))\
.withColumn("rn", F.row_number().over(window_spec_word.orderBy(F.lit(None))))\
.filter(F.col("word_count") == F.col("category_count"))\
.filter(F.col("rn") == F.lit(1))\
.drop("rn")
window_spec_collect = window_spec_category.orderBy("pos").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
matching_words.withColumn("result", F.concat_ws(" ", F.collect_list("word").over(window_spec_collect)))\
.withColumn("rn", F.row_number().over(window_spec_category.orderBy(F.lit(None))))\
.filter(F.col("rn") == F.lit(1))\
.select("person_id", "category", "result")\
.show()
输出
+---------+--------+----------+
|person_id|category| result|
+---------+--------+----------+
| 1| A| He bought|
| 1| B|He has hen|
| 2| A| Australia|
+---------+--------+----------+
您可以使用 Spark 数组函数实现:
- 拆分列
notes
以获得单词数组 - 按
person_id
和category
分组收集单词列表 - 通过使用高阶函数检查结果数组是否存在于所有收集的子数组(即单词)中来过滤结果数组
filter
import pyspark.sql.functions as F
df1 = df.withColumn("notes", F.split("notes", " ")) \
.groupBy("person_id", "category") \
.agg(F.collect_list(F.col("notes")).alias("notes")) \
.withColumn("w", F.array_distinct(F.flatten("notes"))) \
.withColumn("notes", F.array_join(F.expr("filter(w, x -> size(filter(notes, y -> array_contains(y, x))) = size(notes))"), " ")) \
.drop("w")
df1.show()
#+---------+--------+----------+
#|person_id|category|notes |
#+---------+--------+----------+
#|1 |A |He bought |
#|1 |B |He has hen|
#|2 |A |Australia |
#+---------+--------+----------+
另一种选择是定义一个 UDF,它将查找字符串数组的交集:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.getOrCreate()
data = [
{"person_id": 1, "category": "A", "notes": "He bought cat"},
{"person_id": 1, "category": "A", "notes": "He bought dog"},
{"person_id": 1, "category": "B", "notes": "He has hen"},
{"person_id": 2, "category": "A", "notes": "Switzerland Australia"},
{"person_id": 2, "category": "A", "notes": "Australia"},
]
def common(x):
l = [i.split() for i in x]
return " ".join(sorted(set.intersection(*map(set, l)), key=l[0].index))
df = spark.createDataFrame(data)
df = df.groupBy(["person_id", "category"]).agg(F.collect_list("notes").alias("b"))
df = df.withColumn("result", F.udf(common)(F.col("b")))
结果:
+---------+--------+----------+
|person_id|category|result |
+---------+--------+----------+
|1 |A |He bought |
|1 |B |He has hen|
|2 |A |Australia |
+---------+--------+----------+