根据列 PySpark 的变化模式对行进行分组

Group rows based on changing pattern of a column PySpark

我正在努力处理 Pyspark 数据框,其中包含会议信息,每一个单词都表示为一行。我喜欢将每个人所说的话分组,直到另一个人开始说话。 (只有两个人发言)

我已经尝试了一些 window 函数,但始终无法达到所需的输出。 很乐意提供任何帮助!

Input:

| Call_id| Speaker  | WordNum| Word |
|------- |----------| -------|------|
| 1      | Speaker_1| 1      |Hi    |
| 1      | Speaker_1| 2      |I     |
| 1      | Speaker_1| 3      |am    |
| 1      | Speaker_1| 4      |Pete  |
| 1      | Speaker_2| 5      |Hello |
| 1      | Speaker_1| 6      |Sorry |
| 1      | Speaker_1| 7      |Gotta |
| 1      | Speaker_1| 8      |Leave |
| 2      | Speaker_2| 1      |Hello |
| 2      | Speaker_2| 2      |Luis  |
| 2      | Speaker_1| 3      |Hey   |

Desired Output:

| Call_id| Speaker  | Sentence                    | 
|------- |----------| ----------------------------|
| 1      | Speaker_1| ["Hi", "I", "am", "Pete"]   |
| 1      | Speaker_2| ["Hello"]                   |
| 1      | Speaker_1| ["Sorry", "Gotta", "Leave"] |
| 2      | Speaker_2| ["Hello", "Luis"]           |
| 2      | Speaker_1| ["Hey"]                     |

您可以创建句子id,然后按呼叫id、说话人和句子id分组并收集单词。创建句子 id - 通过比较当前值和先前值来检查说话者值何时发生变化。创建指示器列,如果说话者发生变化,则使用单词编号作为标识符,否则为 0。之后,cumsum 该指示器将用作句子 id。

import pyspark.sql.functions as F
from pyspark.sql import Window

# 1  create speaker lag column and fill in NAs with current speaker info
w = Window.partitionBy("Call_id").orderBy("WordNum")
df1 = (df
       .withColumn("speaker_lag", F.lag("Speaker").over(w))
       .withColumn("speaker_lag1", F.coalesce("speaker_lag", "Speaker")))

# 2 create sentence indicator
df2 = (df1
       .withColumn("session", 
                   F.when(F.col("Speaker")==F.col("speaker_lag1"), 0)
                    .otherwise(F.col("WordNum"))))

# create sentence id
w = (Window
      .partitionBy("Call_id")
      .orderBy("WordNum")
      .rangeBetween(Window.unboundedPreceding, 0))
df3 = df2.withColumn("Sentence_id", F.sum("session").over(w))

# group by and collect
df4 = (df3
       .groupBy("Call_id", "Speaker", "Sentence_id")
       .agg(F.sort_array(F.collect_list(F.struct("WordNum", "Word")))
            .alias("collect"))
       .withColumn("Sentence", F.col("collect")["Word"])
       .select("Call_id", "Speaker", "Sentence_id", "Sentence")
      )

对于您的数据输入:

cols = ["Call_id", "Speaker", "WordNum", "Word"]
data = [
( 1      , "Speaker_1", 1      ,"Hi"    ),
( 1      , "Speaker_1", 2      ,"I"     ),
( 1      , "Speaker_1", 3      ,"am"    ),
( 1      , "Speaker_1", 4      ,"Pete"  ),
( 1      , "Speaker_2", 5      ,"Hello" ),
( 1      , "Speaker_1", 6      ,"Sorry" ),
( 1      , "Speaker_1", 7      ,"Gotta" ),
( 1      , "Speaker_1", 8      ,"Leave" ),
( 2      , "Speaker_2", 1      ,"Hello" ),
( 2      , "Speaker_2", 2      ,"Luis" ),
( 2      , "Speaker_1", 3      ,"Hey"  )]

df = spark.createDataFrame(data, schema=cols)

输出 df4 将是