根据列 PySpark 的变化模式对行进行分组
Group rows based on changing pattern of a column PySpark
我正在努力处理 Pyspark 数据框,其中包含会议信息,每一个单词都表示为一行。我喜欢将每个人所说的话分组,直到另一个人开始说话。 (只有两个人发言)
我已经尝试了一些 window 函数,但始终无法达到所需的输出。
很乐意提供任何帮助!
Input:
| Call_id| Speaker | WordNum| Word |
|------- |----------| -------|------|
| 1 | Speaker_1| 1 |Hi |
| 1 | Speaker_1| 2 |I |
| 1 | Speaker_1| 3 |am |
| 1 | Speaker_1| 4 |Pete |
| 1 | Speaker_2| 5 |Hello |
| 1 | Speaker_1| 6 |Sorry |
| 1 | Speaker_1| 7 |Gotta |
| 1 | Speaker_1| 8 |Leave |
| 2 | Speaker_2| 1 |Hello |
| 2 | Speaker_2| 2 |Luis |
| 2 | Speaker_1| 3 |Hey |
Desired Output:
| Call_id| Speaker | Sentence |
|------- |----------| ----------------------------|
| 1 | Speaker_1| ["Hi", "I", "am", "Pete"] |
| 1 | Speaker_2| ["Hello"] |
| 1 | Speaker_1| ["Sorry", "Gotta", "Leave"] |
| 2 | Speaker_2| ["Hello", "Luis"] |
| 2 | Speaker_1| ["Hey"] |
您可以创建句子id,然后按呼叫id、说话人和句子id分组并收集单词。创建句子 id - 通过比较当前值和先前值来检查说话者值何时发生变化。创建指示器列,如果说话者发生变化,则使用单词编号作为标识符,否则为 0。之后,cumsum 该指示器将用作句子 id。
import pyspark.sql.functions as F
from pyspark.sql import Window
# 1 create speaker lag column and fill in NAs with current speaker info
w = Window.partitionBy("Call_id").orderBy("WordNum")
df1 = (df
.withColumn("speaker_lag", F.lag("Speaker").over(w))
.withColumn("speaker_lag1", F.coalesce("speaker_lag", "Speaker")))
# 2 create sentence indicator
df2 = (df1
.withColumn("session",
F.when(F.col("Speaker")==F.col("speaker_lag1"), 0)
.otherwise(F.col("WordNum"))))
# create sentence id
w = (Window
.partitionBy("Call_id")
.orderBy("WordNum")
.rangeBetween(Window.unboundedPreceding, 0))
df3 = df2.withColumn("Sentence_id", F.sum("session").over(w))
# group by and collect
df4 = (df3
.groupBy("Call_id", "Speaker", "Sentence_id")
.agg(F.sort_array(F.collect_list(F.struct("WordNum", "Word")))
.alias("collect"))
.withColumn("Sentence", F.col("collect")["Word"])
.select("Call_id", "Speaker", "Sentence_id", "Sentence")
)
对于您的数据输入:
cols = ["Call_id", "Speaker", "WordNum", "Word"]
data = [
( 1 , "Speaker_1", 1 ,"Hi" ),
( 1 , "Speaker_1", 2 ,"I" ),
( 1 , "Speaker_1", 3 ,"am" ),
( 1 , "Speaker_1", 4 ,"Pete" ),
( 1 , "Speaker_2", 5 ,"Hello" ),
( 1 , "Speaker_1", 6 ,"Sorry" ),
( 1 , "Speaker_1", 7 ,"Gotta" ),
( 1 , "Speaker_1", 8 ,"Leave" ),
( 2 , "Speaker_2", 1 ,"Hello" ),
( 2 , "Speaker_2", 2 ,"Luis" ),
( 2 , "Speaker_1", 3 ,"Hey" )]
df = spark.createDataFrame(data, schema=cols)
输出 df4 将是
我正在努力处理 Pyspark 数据框,其中包含会议信息,每一个单词都表示为一行。我喜欢将每个人所说的话分组,直到另一个人开始说话。 (只有两个人发言)
我已经尝试了一些 window 函数,但始终无法达到所需的输出。 很乐意提供任何帮助!
Input:
| Call_id| Speaker | WordNum| Word |
|------- |----------| -------|------|
| 1 | Speaker_1| 1 |Hi |
| 1 | Speaker_1| 2 |I |
| 1 | Speaker_1| 3 |am |
| 1 | Speaker_1| 4 |Pete |
| 1 | Speaker_2| 5 |Hello |
| 1 | Speaker_1| 6 |Sorry |
| 1 | Speaker_1| 7 |Gotta |
| 1 | Speaker_1| 8 |Leave |
| 2 | Speaker_2| 1 |Hello |
| 2 | Speaker_2| 2 |Luis |
| 2 | Speaker_1| 3 |Hey |
Desired Output:
| Call_id| Speaker | Sentence |
|------- |----------| ----------------------------|
| 1 | Speaker_1| ["Hi", "I", "am", "Pete"] |
| 1 | Speaker_2| ["Hello"] |
| 1 | Speaker_1| ["Sorry", "Gotta", "Leave"] |
| 2 | Speaker_2| ["Hello", "Luis"] |
| 2 | Speaker_1| ["Hey"] |
您可以创建句子id,然后按呼叫id、说话人和句子id分组并收集单词。创建句子 id - 通过比较当前值和先前值来检查说话者值何时发生变化。创建指示器列,如果说话者发生变化,则使用单词编号作为标识符,否则为 0。之后,cumsum 该指示器将用作句子 id。
import pyspark.sql.functions as F
from pyspark.sql import Window
# 1 create speaker lag column and fill in NAs with current speaker info
w = Window.partitionBy("Call_id").orderBy("WordNum")
df1 = (df
.withColumn("speaker_lag", F.lag("Speaker").over(w))
.withColumn("speaker_lag1", F.coalesce("speaker_lag", "Speaker")))
# 2 create sentence indicator
df2 = (df1
.withColumn("session",
F.when(F.col("Speaker")==F.col("speaker_lag1"), 0)
.otherwise(F.col("WordNum"))))
# create sentence id
w = (Window
.partitionBy("Call_id")
.orderBy("WordNum")
.rangeBetween(Window.unboundedPreceding, 0))
df3 = df2.withColumn("Sentence_id", F.sum("session").over(w))
# group by and collect
df4 = (df3
.groupBy("Call_id", "Speaker", "Sentence_id")
.agg(F.sort_array(F.collect_list(F.struct("WordNum", "Word")))
.alias("collect"))
.withColumn("Sentence", F.col("collect")["Word"])
.select("Call_id", "Speaker", "Sentence_id", "Sentence")
)
对于您的数据输入:
cols = ["Call_id", "Speaker", "WordNum", "Word"]
data = [
( 1 , "Speaker_1", 1 ,"Hi" ),
( 1 , "Speaker_1", 2 ,"I" ),
( 1 , "Speaker_1", 3 ,"am" ),
( 1 , "Speaker_1", 4 ,"Pete" ),
( 1 , "Speaker_2", 5 ,"Hello" ),
( 1 , "Speaker_1", 6 ,"Sorry" ),
( 1 , "Speaker_1", 7 ,"Gotta" ),
( 1 , "Speaker_1", 8 ,"Leave" ),
( 2 , "Speaker_2", 1 ,"Hello" ),
( 2 , "Speaker_2", 2 ,"Luis" ),
( 2 , "Speaker_1", 3 ,"Hey" )]
df = spark.createDataFrame(data, schema=cols)
输出 df4 将是